Skip to content

Commit

Permalink
revise_id
Browse files Browse the repository at this point in the history
Signed-off-by: Song Gao <[email protected]>
  • Loading branch information
Yisaer committed Jul 5, 2024
1 parent e0ae706 commit cf82a65
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 4 deletions.
8 changes: 6 additions & 2 deletions internal/io/websocket/websocket_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,11 @@ func (w *WebsocketSink) Provision(ctx api.StreamContext, configs map[string]any)

func (w *WebsocketSink) Close(ctx api.StreamContext) error {
pubsub.RemovePub(w.topic)
return connection.DetachConnection(ctx, w.cfg.Endpoint, w.props)
return connection.DetachConnection(ctx, buildWebsocketEpID(w.cfg.Endpoint), w.props)
}

func (w *WebsocketSink) Connect(ctx api.StreamContext) error {
conn, err := connection.FetchConnection(ctx, w.cfg.Endpoint, "websocket", w.props)
conn, err := connection.FetchConnection(ctx, buildWebsocketEpID(w.cfg.Endpoint), "websocket", w.props)
if err != nil {
return err

Check warning on line 55 in internal/io/websocket/websocket_sink.go

View check run for this annotation

Codecov / codecov/patch

internal/io/websocket/websocket_sink.go#L55

Added line #L55 was not covered by tests
}
Expand All @@ -77,3 +77,7 @@ func GetSink() api.Sink {
}

var _ api.BytesCollector = &WebsocketSink{}

func buildWebsocketEpID(endpoint string) string {
return fmt.Sprintf("$$ws/%s", endpoint)
}
4 changes: 2 additions & 2 deletions internal/io/websocket/websocket_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,11 @@ func (w *WebsocketSource) Provision(ctx api.StreamContext, configs map[string]an

func (w *WebsocketSource) Close(ctx api.StreamContext) error {
pubsub.CloseSourceConsumerChannel(w.topic, w.sourceID)
return connection.DetachConnection(ctx, w.cfg.Endpoint, w.props)
return connection.DetachConnection(ctx, buildWebsocketEpID(w.cfg.Endpoint), w.props)
}

func (w *WebsocketSource) Connect(ctx api.StreamContext) error {
conn, err := connection.FetchConnection(ctx, w.cfg.Endpoint, "websocket", w.props)
conn, err := connection.FetchConnection(ctx, buildWebsocketEpID(w.cfg.Endpoint), "websocket", w.props)
if err != nil {
return err
}
Expand Down

0 comments on commit cf82a65

Please sign in to comment.