Skip to content

Commit

Permalink
feat: 增加API示例代码 & 完善options & 完善Storage重试逻辑
Browse files Browse the repository at this point in the history
  • Loading branch information
CC11001100 committed Aug 14, 2023
1 parent e7981cf commit e33156b
Show file tree
Hide file tree
Showing 5 changed files with 254 additions and 24 deletions.
1 change: 1 addition & 0 deletions actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,5 +72,6 @@ const (
const (
PayloadLastVersion = "lastVersion"
PayloadVersionMissCount = "versionMissCount"
PayloadLockBusyCount = "lockBusyCount"
PayloadSleep = "sleep"
)
213 changes: 213 additions & 0 deletions examples/storage_lock/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
package main

import (
"context"
"encoding/json"
"fmt"
"github.com/golang-infrastructure/go-iterator"
"github.com/storage-lock/go-events"
"github.com/storage-lock/go-storage"
storage_lock "github.com/storage-lock/go-storage-lock"
"sync"
"time"
)

// ------------------------------------------------ ---------------------------------------------------------------------

// MemoryStorage 把锁存储在内存中,可以借助这个实现进程级别的锁,算是对内部的锁的一个扩展,但是似乎作用不是很大,仅仅是为了丰富实现...
// 也可以认为这个Storage是一个实现的样例,其它的存储引擎的实现可以参考此实现的逻辑
type MemoryStorage struct {

// 实际存储锁的map
storageMap map[string]*MemoryStorageValue

// 用于线程安全的操作
storageLock sync.RWMutex
}

var _ storage.Storage = &MemoryStorage{}

func NewMemoryStorage() *MemoryStorage {
return &MemoryStorage{
storageMap: make(map[string]*MemoryStorageValue),
storageLock: sync.RWMutex{},
}
}

func (x *MemoryStorage) GetName() string {
return "memory-storage"
}

func (x *MemoryStorage) Init(ctx context.Context) error {
// 没有要初始化的,在创建的时候就初始化了
return nil
}

func (x *MemoryStorage) Get(ctx context.Context, lockId string) (string, error) {
x.storageLock.RLock()
defer x.storageLock.RUnlock()

value, exists := x.storageMap[lockId]
if !exists {
return "", storage_lock.ErrLockNotFound
} else {
return value.LockInformationJsonString, nil
}
}

func (x *MemoryStorage) UpdateWithVersion(ctx context.Context, lockId string, exceptedVersion, newVersion storage.Version, lockInformation *storage.LockInformation) error {
x.storageLock.Lock()
defer x.storageLock.Unlock()

// 被更新的锁必须已经存在,否则无法更新
oldValue, exists := x.storageMap[lockId]
if !exists {
return storage_lock.ErrLockNotFound
}

// 乐观锁的版本必须能够对应得上,否则拒绝更新
if oldValue.Version != exceptedVersion {
return storage_lock.ErrVersionMiss
}

// 开始更新锁的信息和版本
oldValue.LockInformationJsonString = lockInformation.ToJsonString()
oldValue.Version = newVersion
return nil
}

func (x *MemoryStorage) CreateWithVersion(ctx context.Context, lockId string, version storage.Version, lockInformation *storage.LockInformation) error {
x.storageLock.Lock()
defer x.storageLock.Unlock()

// 插入的时候之前的锁不能存在,否则认为是插入失败
_, exists := x.storageMap[lockId]
if exists {
return storage_lock.ErrLockAlreadyExists
}

// 开始插入
x.storageMap[lockId] = &MemoryStorageValue{
LockId: lockId,
Version: version,
LockInformationJsonString: lockInformation.ToJsonString(),
}
return nil
}

func (x *MemoryStorage) DeleteWithVersion(ctx context.Context, lockId string, exceptedVersion storage.Version, lockInformation *storage.LockInformation) error {
x.storageLock.Lock()
defer x.storageLock.Unlock()

// 被删除的锁必须已经存在,否则删除失败
oldValue, exists := x.storageMap[lockId]
if !exists {
return storage_lock.ErrLockNotFound
}

// 期望的版本号必须相等,否则无法删除
if oldValue.Version != exceptedVersion {
return storage_lock.ErrVersionMiss
}

// 开始删除
delete(x.storageMap, lockId)

return nil
}

func (x *MemoryStorage) GetTime(ctx context.Context) (time.Time, error) {
// 因为是单机的内存存储,所以直接返回当前机器的时间
return time.Now(), nil
}

func (x *MemoryStorage) Close(ctx context.Context) error {
return nil
}

func (x *MemoryStorage) List(ctx context.Context) (iterator.Iterator[*storage.LockInformation], error) {
slice := make([]*storage.LockInformation, 0)
for _, lock := range x.storageMap {
info := &storage.LockInformation{}
err := json.Unmarshal([]byte(lock.LockInformationJsonString), &info)
if err != nil {
return nil, err
}
slice = append(slice, info)
}
return iterator.FromSlice(slice), nil
}

// ------------------------------------------------ ---------------------------------------------------------------------

// MemoryStorageValue 锁在内存中的实际存储结构
type MemoryStorageValue struct {

// 存储的是哪个锁的信息
LockId string

// 锁的版本号是多少
Version storage.Version

// 锁的信息序列化为JSON字符串存储在这个字段
LockInformationJsonString string
}

// ------------------------------------------------ ---------------------------------------------------------------------

func main() {

// 锁的id,表示一份临界资源
lockId := "counter-lock-id"
// 锁的持久化存储,这里使用基于内存的存储,可以替换为其它的实现,从项目README查看内置的开箱即用的Storage
storage := NewMemoryStorage()
// 创建锁的各种选项
options := storage_lock.NewStorageLockOptionsWithLockId(lockId).AddEventListeners(events.NewListenerWrapper("print", func(ctx context.Context, e *events.Event) {
//fmt.Println(e.ToJsonString())
}))
// 创建一把分布式锁
lock, err := storage_lock.NewStorageLockWithOptions(storage, options)
if err != nil {
panic(err)
}

generator := storage_lock.NewOwnerIdGenerator()

// 临界资源
counter := 0

// 第一个参与竞争锁的角色
var wg sync.WaitGroup
// 启动一万个协程,每个协程对count加一千次
for i := 0; i < 100; i++ {
wg.Add(1)
go func() {
defer func() {
wg.Done()
}()
ownerId := generator.GenOwnerId()
for i := 0; i < 1000; i++ {

// 获取锁
err := lock.Lock(context.Background(), ownerId)
if err != nil {
panic(err)
}

// 临界区,操作资源
counter++
fmt.Println(counter)

// 释放锁
err = lock.UnLock(context.Background(), ownerId)
if err != nil {
panic(err)
}
}
}()
}

wg.Wait()
fmt.Println(fmt.Sprintf("counter: %d", counter))

}
25 changes: 16 additions & 9 deletions storage_lock_lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ func (x *StorageLock) Lock(ctx context.Context, ownerId string) error {

// 记录操作时版本miss的次数
versionMissCount := 0
lockBusyCount := 0

// 在方法退出的时候发送事件通知
defer func() {
Expand All @@ -43,18 +44,24 @@ func (x *StorageLock) Lock(ctx context.Context, ownerId string) error {
err := x.tryLock(ctx, e.Fork(), lockId, ownerId)
if err == nil {
// 获取锁成功,退出
e.Fork().AddAction(events.NewAction(ActionLockSuccess).AddPayload(PayloadVersionMissCount, versionMissCount)).Publish(ctx)
e.Fork().AddAction(events.NewAction(ActionLockSuccess).AddPayload(PayloadVersionMissCount, versionMissCount).AddPayload(PayloadLockBusyCount, lockBusyCount)).Publish(ctx)
return nil
}

// 只有在版本miss的情况下才会重试
if !errors.Is(err, ErrVersionMiss) {
e.Fork().AddAction(events.NewAction(ActionLockError).SetErr(err).AddPayload(PayloadVersionMissCount, versionMissCount)).Publish(ctx)
// 只有在版本miss的情况下或者锁被其它人持有者的情况才会等待重试
// 锁已经存在的错误被认为是版本miss的一种特殊情况
if errors.Is(err, ErrVersionMiss) || errors.Is(err, ErrLockAlreadyExists) {
// 尝试获取锁的时候版本miss了,触发一个获取锁版本miss的事件让外部能够感知得到
versionMissCount++
e.Fork().AddAction(events.NewAction(ActionLockVersionMiss).AddPayload(PayloadVersionMissCount, versionMissCount).AddPayload(PayloadLockBusyCount, lockBusyCount)).Publish(ctx)
} else if errors.Is(err, ErrLockBusy) {
// 锁被其它人持有着,勇敢牛牛,不怕困难,稍微一等,继续重试
lockBusyCount++
e.Fork().AddAction(events.NewAction(ActionLockBusy).AddPayload(PayloadVersionMissCount, versionMissCount).AddPayload(PayloadLockBusyCount, lockBusyCount)).Publish(ctx)
} else {
e.Fork().AddAction(events.NewAction(ActionLockError).SetErr(err).AddPayload(PayloadVersionMissCount, versionMissCount).AddPayload(PayloadLockBusyCount, lockBusyCount)).Publish(ctx)
return err
}
// 尝试获取锁的时候版本miss了,触发一个获取锁版本miss的事件让外部能够感知得到
versionMissCount++
e.Fork().AddAction(events.NewAction(ActionLockVersionMiss).AddPayload(PayloadVersionMissCount, versionMissCount)).Publish(ctx)

// 然后休眠一下再开始重新抢占锁
sleepDuration := x.options.VersionMissRetryInterval + x.retryIntervalRandomBase()
Expand All @@ -65,11 +72,11 @@ func (x *StorageLock) Lock(ctx context.Context, ownerId string) error {
select {
case <-ctx.Done():
// 没有时间了,算球没获取成功
e.Fork().AddAction(events.NewAction(ActionTimeout).AddPayload(PayloadVersionMissCount, versionMissCount)).Publish(ctx)
e.Fork().AddAction(events.NewAction(ActionTimeout).AddPayload(PayloadVersionMissCount, versionMissCount).AddPayload(PayloadLockBusyCount, lockBusyCount)).Publish(ctx)
return err
default:
// 还有时间,可以尝试重新获取
e.Fork().AddAction(events.NewAction(ActionSleepRetry).AddPayload(PayloadVersionMissCount, versionMissCount)).Publish(ctx)
e.Fork().AddAction(events.NewAction(ActionSleepRetry).AddPayload(PayloadVersionMissCount, versionMissCount).AddPayload(PayloadLockBusyCount, lockBusyCount)).Publish(ctx)
continue
}
}
Expand Down
38 changes: 23 additions & 15 deletions storage_lock_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@ var (
// 需要注意的是这个租约的刷新间隔不能超过 (DefaultLeaseExpireAfter - time.Second)
DefaultLeaseRefreshInterval = time.Second * 30

// DefaultVersionMissRetryTimes 默认的版本乐观锁未命中时的重试次数,这里为了防止高并发或者网络不好等情况导致失败率过高就把值设置的稍微大了一些,
// 用户可以根据自己的实际情况调整重试次数
//DefaultVersionMissRetryTimes = 100
DefaultVersionMissRetryInterval = time.Microsecond * 100
)

// 检查参数配置是否正确
Expand Down Expand Up @@ -68,11 +66,6 @@ type StorageLockOptions struct {
// 租约刷新间隔,当获取锁成功时会有一个协程专门负责续约租约,这个参数就决定它每隔多久发起一次续约操作,这个用来保证不会在锁使用的期间突然过期
LeaseRefreshInterval time.Duration

// 这个放弃时机感觉以具体的时间长度更为合适
//// 乐观锁的版本未命中的时候的重试次数
// 当传入的值等于0的时候表示不进行重试
// 当传入的值小于0的时候表示无限重试不成功永远不结束
//VersionMissRetryTimes uint

// 用于监听观测锁使用过程中的各种事件,如果需要的话自行设置
EventListeners []events.Listener
Expand All @@ -87,9 +80,9 @@ type StorageLockOptions struct {
// NewStorageLockOptions 使用默认值创建锁的配置项
func NewStorageLockOptions() *StorageLockOptions {
return &StorageLockOptions{
LeaseExpireAfter: DefaultLeaseExpireAfter,
LeaseRefreshInterval: DefaultLeaseRefreshInterval,
//VersionMissRetryTimes: DefaultVersionMissRetryTimes,
LeaseExpireAfter: DefaultLeaseExpireAfter,
LeaseRefreshInterval: DefaultLeaseRefreshInterval,
VersionMissRetryInterval: DefaultVersionMissRetryInterval,
}
}

Expand All @@ -113,7 +106,22 @@ func (x *StorageLockOptions) SetLeaseRefreshInterval(leaseRefreshInterval time.D
return x
}

//func (x *StorageLockOptions) WithVersionMissRetryTimes(versionMissRetryTimes uint) *StorageLockOptions {
// x.VersionMissRetryTimes = versionMissRetryTimes
// return x
//}
func (x *StorageLockOptions) SetEventListeners(eventListeners []events.Listener) *StorageLockOptions {
x.EventListeners = eventListeners
return x
}

func (x *StorageLockOptions) AddEventListeners(eventListener events.Listener) *StorageLockOptions {
x.EventListeners = append(x.EventListeners, eventListener)
return x
}

func (x *StorageLockOptions) SetWatchDogFactory(watchDogFactory WatchDogFactory) *StorageLockOptions {
x.WatchDogFactory = watchDogFactory
return x
}

func (x *StorageLockOptions) SetVersionMissRetryInterval(versionMissRetryInterval time.Duration) *StorageLockOptions {
x.VersionMissRetryInterval = versionMissRetryInterval
return x
}
1 change: 1 addition & 0 deletions watch_dog_commons_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ func NewWatchDogCommonsImpl(ctx context.Context, e *events.Event, lock *StorageL
isRunning: atomic.Bool{},
storageLock: lock,
ownerId: ownerId,
e: e,
}
}

Expand Down

0 comments on commit e33156b

Please sign in to comment.