Skip to content

Commit

Permalink
feat: optimize filter subs (#1144)
Browse files Browse the repository at this point in the history
Co-authored-by: richΛrd <[email protected]>
  • Loading branch information
chaitanyaprem and richard-ramos committed Jul 1, 2024
1 parent e3d7ab1 commit 5b5ea97
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 14 deletions.
8 changes: 6 additions & 2 deletions waku/v2/api/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,12 @@ func Subscribe(ctx context.Context, wf *filter.WakuFilterLightNode, contentFilte
return sub, nil
}

func (apiSub *Sub) Unsubscribe() {
apiSub.cancel()
func (apiSub *Sub) Unsubscribe(contentFilter protocol.ContentFilter) {
_, err := apiSub.wf.Unsubscribe(apiSub.ctx, contentFilter)
//Not reading result unless we want to do specific error handling?
if err != nil {
apiSub.log.Debug("failed to unsubscribe", zap.Error(err), zap.Stringer("content-filter", contentFilter))
}
}

func (apiSub *Sub) subscriptionLoop() {
Expand Down
7 changes: 4 additions & 3 deletions waku/v2/api/filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@ func (s *FilterApiTestSuite) TestSubscribe() {

s.Require().Equal(apiConfig.MaxPeers, 2)
s.Require().Equal(contentFilter.PubsubTopic, s.TestTopic)

ctx, cancel := context.WithCancel(context.Background())
s.Log.Info("About to perform API Subscribe()")
apiSub, err := Subscribe(context.Background(), s.LightNode, contentFilter, apiConfig, s.Log)
apiSub, err := Subscribe(ctx, s.LightNode, contentFilter, apiConfig, s.Log)
s.Require().NoError(err)
s.Require().Equal(apiSub.ContentFilter, contentFilter)
s.Log.Info("Subscribed")
Expand Down Expand Up @@ -89,7 +89,8 @@ func (s *FilterApiTestSuite) TestSubscribe() {
s.Require().NotEqual(fullNodeData2.FullNodeHost.ID(), sub.PeerID)
}

apiSub.Unsubscribe()
apiSub.Unsubscribe(contentFilter)
cancel()
for range apiSub.DataCh {
}
s.Log.Info("DataCh is closed")
Expand Down
14 changes: 8 additions & 6 deletions waku/v2/protocol/subscription/subscription_details.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,37 +37,37 @@ type SubscriptionDetails struct {
}

func (s *SubscriptionDetails) Add(contentTopics ...string) {
s.mapRef.Lock()
defer s.mapRef.Unlock()
s.Lock()
defer s.Unlock()

for _, ct := range contentTopics {
if _, ok := s.ContentFilter.ContentTopics[ct]; !ok {
s.ContentFilter.ContentTopics[ct] = struct{}{}
// Increase the number of subscriptions for this (pubsubTopic, contentTopic) pair
s.mapRef.Lock()
s.mapRef.increaseSubFor(s.ContentFilter.PubsubTopic, ct)
s.mapRef.Unlock()
}
}
}

func (s *SubscriptionDetails) Remove(contentTopics ...string) {
s.mapRef.Lock()
defer s.mapRef.Unlock()
s.Lock()
defer s.Unlock()

for _, ct := range contentTopics {
if _, ok := s.ContentFilter.ContentTopics[ct]; ok {
delete(s.ContentFilter.ContentTopics, ct)
// Decrease the number of subscriptions for this (pubsubTopic, contentTopic) pair
s.mapRef.Lock()
s.mapRef.decreaseSubFor(s.ContentFilter.PubsubTopic, ct)
s.mapRef.Unlock()
}
}

if len(s.ContentFilter.ContentTopics) == 0 {
// err doesn't matter
_ = s.mapRef.Delete(s)
_ = s.mapRef.DeleteNoLock(s)
}
}

Expand Down Expand Up @@ -105,7 +105,9 @@ func (s *SubscriptionDetails) CloseC() {

func (s *SubscriptionDetails) Close() error {
s.CloseC()
return s.mapRef.Delete(s)
s.mapRef.Lock()
defer s.mapRef.Unlock()
return s.mapRef.DeleteNoLock(s)
}

func (s *SubscriptionDetails) SetClosing() {
Expand Down
6 changes: 3 additions & 3 deletions waku/v2/protocol/subscription/subscriptions_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,9 @@ func (sub *SubscriptionsMap) Has(peerID peer.ID, cf protocol.ContentFilter) bool

return true
}
func (sub *SubscriptionsMap) Delete(subscription *SubscriptionDetails) error {
sub.Lock()
defer sub.Unlock()

// Caller has to acquire lock before invoking this method.This is done to avoid possible deadlock
func (sub *SubscriptionsMap) DeleteNoLock(subscription *SubscriptionDetails) error {

peerSubscription, ok := sub.items[subscription.PeerID]
if !ok {
Expand Down

0 comments on commit 5b5ea97

Please sign in to comment.