Skip to content

Commit

Permalink
some refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
mmadfox committed Aug 16, 2020
1 parent 81ac0f9 commit 07228f9
Show file tree
Hide file tree
Showing 6 changed files with 65 additions and 61 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ type Channels struct {

func NewChannels() *Channel {
channels := Channels{
gpsLocationChanged: channel.New(channel.IgnoreSlowClient()),
gpsLocationChanged: channel.New(channel.SkipSlowSubscribers()),
orderChanged: channel.New(channel.IgnoreSlowClient()),
accountChanged: channel.New(channel.IgnoreSlowClient()),
}
Expand Down Expand Up @@ -148,7 +148,7 @@ func (s *Subscriptions) GpsLocationChanged(ctx context.Context, input *models.GP
// maximum sessions per subscriber.
MaxLimitSessions(limit int) Option
// non-blocking message sending.
IgnoreSlowClients() Option
SkipSlowSubscribers() Option
// subscriptions buffered channel size.
SubscriptionBufSize(size int) Option
```
61 changes: 32 additions & 29 deletions bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,33 +5,29 @@ import (
)

type bucketOptions struct {
maxLimitSessions int
ignoreSlowClients bool
bufSize int
maxLimitSessions int
skipSlowSubscribers bool
bufSize int
}

type bucket struct {
sync.RWMutex
subscribers map[string][]Subscription
queue chan []byte
done chan struct{}
sessionCount int
subscribersCount int
maxLimitSessions int
ignoreSlowClients bool
bufSize int
subscribers map[string][]Subscription
queue chan []byte
done chan struct{}
sessionCount uint
subscribersCount uint
options *bucketOptions
}

func newBucket(
opt *bucketOptions,
) *bucket {
b := &bucket{
maxLimitSessions: opt.maxLimitSessions,
subscribers: make(map[string][]Subscription),
queue: make(chan []byte, 1),
done: make(chan struct{}),
ignoreSlowClients: opt.ignoreSlowClients,
bufSize: opt.bufSize,
subscribers: make(map[string][]Subscription),
queue: make(chan []byte, 1),
done: make(chan struct{}),
options: opt,
}
go b.listen()
return b
Expand Down Expand Up @@ -67,7 +63,7 @@ func (b *bucket) unsubscribe(subscriber string, session string) error {
} else {
b.subscribers[subscriber] = subscriptions
}
b.subscribersCount = len(b.subscribers)
b.subscribersCount = uint(len(b.subscribers))
if b.sessionCount > 0 {
b.sessionCount--
}
Expand All @@ -81,8 +77,11 @@ func (b *bucket) publishTo(subscriber string, payload []byte) error {
if !found {
return ErrSubscriberNotFound
}
for _, subscription := range subscriptions {
subscription.Publish(payload, b.ignoreSlowClients)
for i := len(subscriptions) - 1; i >= 0; i-- {
subscriptions[i].Publish(
payload,
b.options.skipSlowSubscribers,
)
}
return nil
}
Expand All @@ -91,8 +90,11 @@ func (b *bucket) publish(payload []byte) {
b.RLock()
defer b.RUnlock()
for _, subscriptions := range b.subscribers {
for _, subscription := range subscriptions {
subscription.Publish(payload, b.ignoreSlowClients)
for i := len(subscriptions) - 1; i >= 0; i-- {
subscriptions[i].Publish(
payload,
b.options.skipSlowSubscribers,
)
}
}
}
Expand All @@ -101,15 +103,16 @@ func (b *bucket) close() {
b.Lock()
defer b.Unlock()
for sid, subscriptions := range b.subscribers {
for _, subscription := range subscriptions {
if !subscription.IsClosed() {
subscription.Close()
for i := len(subscriptions) - 1; i >= 0; i-- {
if !subscriptions[i].IsClosed() {
subscriptions[i].Close()
b.sessionCount--
}

}
delete(b.subscribers, sid)
}
b.subscribersCount = len(b.subscribers)
b.subscribersCount = uint(len(b.subscribers))
close(b.done)
}

Expand All @@ -120,13 +123,13 @@ func (b *bucket) subscribe(subscriber string) (Subscription, error) {
if !found {
subscriptions = make([]Subscription, 0, 1)
}
if len(subscriptions) >= b.maxLimitSessions {
if len(subscriptions) >= b.options.maxLimitSessions {
return Subscription{}, ErrTooManySessions
}
s := MakeSubscription(subscriber, b.bufSize)
s := MakeSubscription(subscriber, b.options.bufSize)
subscriptions = append(subscriptions, s)
b.subscribers[subscriber] = subscriptions
b.subscribersCount = len(b.subscribers)
b.subscribersCount = uint(len(b.subscribers))
b.sessionCount++
return s, nil
}
10 changes: 5 additions & 5 deletions channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ func New(options ...Option) *Channel {
bucketSize: numCPU,
buckets: make([]*bucket, numCPU),
bucketOpts: &bucketOptions{
maxLimitSessions: DefaultMaxLimitSessions,
ignoreSlowClients: false,
bufSize: DefaultBufSize,
maxLimitSessions: DefaultMaxLimitSessions,
skipSlowSubscribers: false,
bufSize: DefaultBufSize,
},
}
for _, opt := range options {
Expand Down Expand Up @@ -110,8 +110,8 @@ func (c *Channel) Unsubscribe(s Subscription) error {
func (c *Channel) Stats() (s Stats) {
for _, bucket := range c.buckets {
bucket.RLock()
s.Sessions += bucket.sessionCount
s.Subscribers += bucket.subscribersCount
s.Sessions += uint(bucket.sessionCount)
s.Subscribers += uint(bucket.subscribersCount)
bucket.RUnlock()
}
return s
Expand Down
41 changes: 21 additions & 20 deletions channel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,16 @@ func TestChannel_OneSubscriberManySessions(t *testing.T) {
s3, err := customerChannel.Subscribe("qwerty")
assert.Nil(t, err)
stats := customerChannel.Stats()
assert.Equal(t, 1, stats.Subscribers)
assert.Equal(t, 3, stats.Sessions)
assert.Equal(t, uint(1), stats.Subscribers)
assert.Equal(t, uint(3), stats.Sessions)

assert.Nil(t, customerChannel.Unsubscribe(s1))
assert.Nil(t, customerChannel.Unsubscribe(s2))
assert.Nil(t, customerChannel.Unsubscribe(s3))

stats = customerChannel.Stats()
assert.Equal(t, 0, stats.Subscribers)
assert.Equal(t, 0, stats.Sessions)
assert.Equal(t, uint(0), stats.Subscribers)
assert.Equal(t, uint(0), stats.Sessions)
}

func TestChannel_SubscribeMaxLimitSessions(t *testing.T) {
Expand All @@ -65,22 +65,22 @@ func TestChannel_Close(t *testing.T) {
assert.Nil(t, err)

stats := customerChannel.Stats()
assert.Equal(t, 3, stats.Subscribers)
assert.Equal(t, 3, stats.Sessions)
assert.Equal(t, uint(3), stats.Subscribers)
assert.Equal(t, uint(3), stats.Sessions)

assert.Nil(t, customerChannel.Close())
assert.True(t, s1.IsClosed())
assert.True(t, s2.IsClosed())
assert.True(t, s3.IsClosed())

stats = customerChannel.Stats()
assert.Equal(t, 0, stats.Subscribers)
assert.Equal(t, 0, stats.Sessions)
assert.Equal(t, uint(0), stats.Subscribers)
assert.Equal(t, uint(0), stats.Sessions)
}

func TestChannel_Listen(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
customerChannel := New()
customerChannel := New(SubscriptionBufSize(0))
var flagc int32
err := customerChannel.Listen(ctx, "mmadfox", func(b []byte) {
assert.Equal(t, "mmadfox", string(b))
Expand All @@ -90,9 +90,9 @@ func TestChannel_Listen(t *testing.T) {
})
assert.Nil(t, err)
assert.Nil(t, customerChannel.PublishToAllSubscribers([]byte("mmadfox")))
<-time.After(time.Second)
<-time.After(300 * time.Millisecond)
cancel()
<-time.After(time.Second)
<-time.After(300 * time.Millisecond)
assert.Equal(t, int32(2), atomic.LoadInt32(&flagc))
}

Expand All @@ -114,8 +114,8 @@ func TestChannel_Subscribe(t *testing.T) {
}(subscription)
}
stats := customerChannel.Stats()
assert.Equal(t, subscribers, stats.Subscribers)
assert.Equal(t, subscribers, stats.Sessions)
assert.Equal(t, uint(subscribers), stats.Subscribers)
assert.Equal(t, uint(subscribers), stats.Sessions)
err := customerChannel.PublishToAllSubscribers([]byte("MSG"))
assert.Nil(t, err)
wg.Wait()
Expand Down Expand Up @@ -247,8 +247,8 @@ func TestChannel_Unsubscribe(t *testing.T) {
}

stats := customerChannel.Stats()
assert.Equal(t, count, stats.Subscribers)
assert.Equal(t, count, stats.Sessions)
assert.Equal(t, uint(count), stats.Subscribers)
assert.Equal(t, uint(count), stats.Sessions)

for _, s := range subscriptions {
assert.False(t, s.IsClosed())
Expand All @@ -258,15 +258,16 @@ func TestChannel_Unsubscribe(t *testing.T) {
}

stats = customerChannel.Stats()
assert.Equal(t, 0, stats.Subscribers)
assert.Equal(t, 0, stats.Sessions)
assert.Equal(t, uint(0), stats.Subscribers)
assert.Equal(t, uint(0), stats.Sessions)
}

func TestChannel_IgnoreSlowClients(t *testing.T) {
customerChannel := New(IgnoreSlowClients())
func TestChannel_SkipSlowSubscribers(t *testing.T) {
customerChannel := New(SkipSlowSubscribers())
subscription, err := customerChannel.Subscribe("user")
assert.Nil(t, err)
go func() {
// blocking
<-subscription.Channel()
}()
for i := 0; i < 10; i++ {
Expand All @@ -275,7 +276,7 @@ func TestChannel_IgnoreSlowClients(t *testing.T) {
}

func BenchmarkChannel_Publish(b *testing.B) {
customerChannel := New(IgnoreSlowClients())
customerChannel := New(SkipSlowSubscribers())
finished := make(chan struct{}, b.N)
for i := 0; i < b.N; i++ {
n := fmt.Sprintf("n%d", i)
Expand Down
6 changes: 3 additions & 3 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@ func MaxLimitSessions(limit int) Option {
}
}

// IgnoreSlowClients non-blocking message sending.
func IgnoreSlowClients() Option {
// SkipSlowSubscribers non-blocking message sending.
func SkipSlowSubscribers() Option {
return func(c *bucketOptions) {
c.ignoreSlowClients = true
c.skipSlowSubscribers = true
}
}

Expand Down
4 changes: 2 additions & 2 deletions subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,6 @@ func (s Subscription) String() string {
}

type Stats struct {
Subscribers int
Sessions int
Subscribers uint
Sessions uint
}

0 comments on commit 07228f9

Please sign in to comment.