Skip to content

Commit

Permalink
feat: 优化WatchDogCommonsImpl的逻辑
Browse files Browse the repository at this point in the history
  • Loading branch information
CC11001100 committed Aug 14, 2023
1 parent 2845449 commit dc582ed
Show file tree
Hide file tree
Showing 2 changed files with 109 additions and 53 deletions.
24 changes: 15 additions & 9 deletions actions.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package storage_lock

// 通用
// 通用的事件
const (
ActionLockNotFoundError = "lock-not-found-error"
ActionNotLockOwner = "not-lock-owner"
Expand All @@ -11,7 +11,7 @@ const (
ActionGetLeaseExpireTimeError = "getLeaseExpireTime-error"
)

// 获取锁
// 获取锁相关的事件
const (
ActionLockBegin = "StorageLock.Lock.Begin"
ActionLockFinish = "StorageLock.Lock.Finish"
Expand All @@ -34,7 +34,7 @@ const (
ActionLockRollbackError = "StorageLock.Lock.Rollback.Error"
)

// 释放锁
// 释放锁相关的事件
const (
ActionUnlock = "StorageLock.Unlock"
ActionUnlockFinish = "StorageLock.Unlock.Finish"
Expand All @@ -52,6 +52,7 @@ const (
ActionWatchDogRefresh = "WatchDog.Refresh"
ActionWatchDogRefreshBegin = "WatchDog.Refresh.Begin"
ActionWatchDogRefreshSuccess = "WatchDog.Refresh.Success"
ActionWatchDogRefreshError = "WatchDog.Refresh.Error"

ActionWatchDogCreate = "WatchDog.Create"
ActionWatchDogCreateSuccess = "WatchDog.Create.Success"
Expand All @@ -65,13 +66,18 @@ const (
ActionWatchDogStopSuccess = "WatchDog.Stop.success"
ActionWatchDogStopError = "WatchDog.Stop.error"

ActionWatchDogExit = "WatchDog.Exit"
ActionWatchDogExitByTooManyError = "WatchDog.Exit.TooManyError"
ActionWatchDogExit = "WatchDog.Exit"
//ActionWatchDogExitByTooManyError = "WatchDog.Exit.TooManyError"

ActionWatchDogSetEvent = "WatchDog.SetEvent"
)

// Payload的名字
const (
PayloadLastVersion = "lastVersion"
PayloadVersionMissCount = "versionMissCount"
PayloadLockBusyCount = "lockBusyCount"
PayloadSleep = "sleep"
PayloadLastVersion = "lastVersion"
PayloadVersionMissCount = "versionMissCount"
PayloadLockBusyCount = "lockBusyCount"
PayloadSleep = "sleep"
PayloadRefreshSuccessCount = "refreshSuccessCount"
PayloadContinueErrorCount = "continueErrorCount"
)
138 changes: 94 additions & 44 deletions watch_dog_commons_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ type WatchDogCommonsImpl struct {

// 租约续费协程有唯一ID标识,用于区分方便观测
id string
// 创建看门狗的时候就把锁的id给固定住,防止options被瞎几把改导致流程出错
lockId string

// 当前协程运行期间产生的事件都是这个事件的子事件
e *events.Event
Expand All @@ -38,16 +40,17 @@ var _ WatchDog = &WatchDogCommonsImpl{}
func NewWatchDogCommonsImpl(ctx context.Context, e *events.Event, lock *StorageLock, ownerId string) *WatchDogCommonsImpl {

// 为看门狗协程生成一个唯一ID
lockId := lock.options.LockId
id := utils.RandomID(WatchDogIDPrefix)

// 设置一些通用的观测属性
e.SetLockId(lock.options.LockId).
e.SetLockId(lockId).
SetOwnerId(ownerId).
SetWatchDogId(id).
SetStorageName(lock.storage.GetName())

// 发送创建看门狗的事件
e.Fork().AddActionByName(ActionWatchDogCreate).Publish(ctx)
e.AddActionByName(ActionWatchDogCreate).Publish(ctx)

return &WatchDogCommonsImpl{
id: id,
Expand Down Expand Up @@ -76,42 +79,83 @@ func (x *WatchDogCommonsImpl) Start(ctx context.Context) error {

x.isRunning.Store(true)
go func() {

// 已经刷新成功多少次了
refreshSuccessCount := 0
// 统计连续多少次发生错误了
continueErrorCount := 0

// 退出的时候给一个信号
go func() {
exitAction := events.NewAction(ActionWatchDogExit).
AddPayload(PayloadRefreshSuccessCount, refreshSuccessCount).
AddPayload(PayloadContinueErrorCount, continueErrorCount)
x.e.Fork().AddAction(exitAction).Publish(context.Background())
}()

// 先休眠一下,再死循环刷新
// 这是针对锁定时间比较短的锁的一个优化,当狗狗休眠结束锁已经被释放掉了,而狗狗也已经被标记为退出状态
// 能够避免一次无效的刷新,也能够避免因为自身续租而导致的miss率
// 而对于持有时间比较长的锁来说,也不差这么点时间
// 时间不要太长,避免协程泄露,1秒封顶
needSleep := x.storageLock.options.LeaseRefreshInterval
if needSleep > time.Second {
needSleep = time.Second
}
time.Sleep(needSleep)

for x.isRunning.Load() {

// 发送一个租约刷新开始的事件,携带着当前的一些上下文
refreshBeginAction := events.NewAction(ActionWatchDogRefreshBegin).
AddPayload("continueErrorCount", continueErrorCount).
AddPayload("refreshSuccessCount", refreshSuccessCount)
AddPayload(PayloadContinueErrorCount, continueErrorCount).
AddPayload(PayloadRefreshSuccessCount, refreshSuccessCount)
x.e.Fork().AddAction(refreshBeginAction).Publish(context.Background())

// 调用刷新的方法进行一次刷新
refreshBeginTime := time.Now()
err := x.refreshLeaseExpiredTime()
if err != nil {
continueErrorCount++
// 连续失败次数太多把自己关掉
// TODO 2023-8-12 20:46:01 cutoff提取为参数,由外部决定
if continueErrorCount > 10 {
ctx, cancelFunc := context.WithTimeout(context.Background(), time.Minute*5)
err := x.Stop(ctx)
cancelFunc()
if err != nil {
x.e.Fork().AddAction(events.NewAction(ActionWatchDogStopError).SetErr(err)).Publish(ctx)
} else {
x.e.Fork().AddAction(events.NewAction(ActionWatchDogStopError).SetErr(err)).Publish(ctx)
}
x.e.AddAction(events.NewAction(ActionWatchDogExitByTooManyError).
AddPayload("continueErrorCount", continueErrorCount).
AddPayload("refreshSuccessCount", refreshSuccessCount))
break

// 如果锁已经不是自己持有了,则退出
if errors.Is(err, ErrLockNotBelongYou) {
notLockOwnerAction := events.NewAction(ActionNotLockOwner).
AddPayload(PayloadContinueErrorCount, continueErrorCount).
AddPayload(PayloadRefreshSuccessCount, refreshSuccessCount).
SetErr(err)
x.e.Fork().AddAction(notLockOwnerAction).Publish(context.Background())
return
}
x.e.Fork().
AddAction(events.NewAction("watch-dog-refreshLeaseExpiredTime-error").AddPayload("continueErrorCount", continueErrorCount).SetErr(err)).
Publish(context.Background())

// 2023-8-14 22:17:44 即使一直发生错误,也要眼含着泪花把工作进行下去,不能半途不干了,万一后面还有转机呢
//// 连续失败次数太多把自己关掉
//// TODO 2023-8-12 20:46:01 cutoff提取为参数,由外部决定
//if continueErrorCount > 10 {
// ctx, cancelFunc := context.WithTimeout(context.Background(), time.Minute*5)
// err := x.Stop(ctx)
// cancelFunc()
// if err != nil {
// x.e.Fork().AddAction(events.NewAction(ActionWatchDogStopError).SetErr(err)).Publish(ctx)
// } else {
// x.e.Fork().AddAction(events.NewAction(ActionWatchDogStopError).SetErr(err)).Publish(ctx)
// }
// x.e.AddAction(events.NewAction(ActionWatchDogExitByTooManyError).
// AddPayload("continueErrorCount", continueErrorCount).
// AddPayload("refreshSuccessCount", refreshSuccessCount))
// break
//}
//x.e.Fork().
// AddAction(events.NewAction("watch-dog-refreshLeaseExpiredTime-error").AddPayload("continueErrorCount", continueErrorCount).SetErr(err)).
// Publish(context.Background())

// 租约刷新失败事件
refreshErrorAction := events.NewAction(ActionWatchDogRefreshError).
AddPayload(PayloadContinueErrorCount, continueErrorCount).
AddPayload(PayloadRefreshSuccessCount, refreshSuccessCount).
SetErr(err)
x.e.Fork().AddAction(refreshErrorAction).Publish(context.Background())

} else {

// 记录当前的刷新成功
Expand All @@ -122,9 +166,8 @@ func (x *WatchDogCommonsImpl) Start(ctx context.Context) error {

// 发送锁的租约刷新成功的事件
refreshSuccessAction := events.NewAction(ActionWatchDogRefreshSuccess).
AddPayload("continueErrorCount", continueErrorCount).
AddPayload("refreshSuccessCount", refreshSuccessCount).
SetErr(err)
AddPayload(PayloadContinueErrorCount, continueErrorCount).
AddPayload(PayloadRefreshSuccessCount, refreshSuccessCount)
x.e.Fork().AddAction(refreshSuccessAction).Publish(context.Background())

}
Expand All @@ -133,9 +176,6 @@ func (x *WatchDogCommonsImpl) Start(ctx context.Context) error {
time.Sleep(x.computeRefreshSleepDuration(refreshBeginTime))
}

// 给一个退出信号,注意这里是真正的
x.e.AddActionByName(ActionWatchDogExit).Publish(context.Background())

}()

return nil
Expand All @@ -151,47 +191,47 @@ func (x *WatchDogCommonsImpl) computeRefreshSleepDuration(refreshBeginTime time.
// 刷新锁的过期时间,为其续约
func (x *WatchDogCommonsImpl) refreshLeaseExpiredTime() error {

e := x.e.Fork().AddActionByName(ActionWatchDogRefresh)
refreshEvent := x.e.Fork().AddActionByName(ActionWatchDogRefresh)

// 计算操作超时时长,这里就简单的设置为不超过租约的间隔了
ctx, cancelFunc := context.WithTimeout(context.Background(), x.storageLock.options.LeaseRefreshInterval)
defer cancelFunc()

// 查询锁的当前状态
information, err := x.storageLock.getLockInformation(ctx, x.e, x.storageLock.options.LockId)
information, err := x.storageLock.getLockInformation(ctx, x.e, x.lockId)
if err != nil {

// 如果是锁已经不存在了,则先将续租协程停掉,以免在短时间内进行大量获取释放操作时积压了太多无用的续租协程过慢的退出
if errors.Is(err, ErrLockNotFound) {
e.AddAction(events.NewAction(ActionLockNotFoundError).SetErr(err))
refreshEvent.AddAction(events.NewAction(ActionLockNotFoundError).SetErr(err))
err := x.Stop(ctx)
if err != nil {
e.AddAction(events.NewAction(ActionWatchDogStopError).SetErr(err))
refreshEvent.AddAction(events.NewAction(ActionWatchDogStopError).SetErr(err))
} else {
e.AddAction(events.NewAction(ActionWatchDogStopSuccess).SetErr(err))
refreshEvent.AddAction(events.NewAction(ActionWatchDogStopSuccess).SetErr(err))
}
} else {
e.AddAction(events.NewAction(ActionGetLockInformationError).SetErr(err))
refreshEvent.AddAction(events.NewAction(ActionGetLockInformationError).SetErr(err))
}

// 当发生错误的时候只是补充一些上下文发送事件,之后就会退出
e.Publish(ctx)
refreshEvent.Publish(ctx)
return err
}

// 锁已经不是自己持有了,则直接退出,每个续租狗狗都是很忠贞的只为一个owner续租,并不会进行协程复用
if information.OwnerId != x.ownerId {
// 触发事件
e.AddAction(events.NewAction(ActionNotLockOwner).AddPayload("lockInformation", information)).
// 试图刷新不是自己的锁
refreshEvent.AddAction(events.NewAction(ActionNotLockOwner).AddPayload(storage_events.PayloadLockInformation, information)).
AddActionByName(ActionWatchDogStop).
Publish(ctx)
return ErrLockNotBelongYou
}

// 计算租约续租之后的过期时间,这里计算的时候需要使用到Storage中统一时间源
expireTime, err := x.storageLock.getLeaseExpireTime(ctx, e.Fork())
expireTime, err := x.storageLock.getLeaseExpireTime(ctx, refreshEvent.Fork())
if err != nil {
e.AddAction(events.NewAction(ActionGetLeaseExpireTimeError).SetErr(err)).Publish(ctx)
refreshEvent.AddAction(events.NewAction(ActionGetLeaseExpireTimeError).SetErr(err)).Publish(ctx)
return err
}
information.LeaseExpireTime = expireTime
Expand All @@ -201,14 +241,14 @@ func (x *WatchDogCommonsImpl) refreshLeaseExpiredTime() error {
information.Version++

// 尝试更新Storage中存储的锁的信息
err = x.storageLock.storageExecutor.UpdateWithVersion(ctx, e.Fork(), x.storageLock.options.LockId, lastVersion, information.Version, information)
err = x.storageLock.storageExecutor.UpdateWithVersion(ctx, refreshEvent.Fork(), x.lockId, lastVersion, information.Version, information)
if err != nil {
e.AddAction(events.NewAction(storage_events.ActionStorageUpdateWithVersion + "-error").SetErr(err))
refreshEvent.AddAction(events.NewAction(storage_events.ActionStorageUpdateWithVersion + "-error").SetErr(err))
} else {
e.AddAction(events.NewAction(storage_events.ActionStorageUpdateWithVersion + "-success"))
refreshEvent.AddAction(events.NewAction(storage_events.ActionStorageUpdateWithVersion + "-success"))
}

e.Publish(ctx)
refreshEvent.Publish(ctx)
return err
}

Expand All @@ -223,11 +263,21 @@ func (x *WatchDogCommonsImpl) refreshLeaseExpiredTime() error {

// Stop 停止续租协程
func (x *WatchDogCommonsImpl) Stop(ctx context.Context) error {
x.e.Fork().AddActionByName(ActionWatchDogStop).Publish(ctx)

x.isRunning.Store(false)
x.e.Fork().AddActionByName(ActionWatchDogStop).Publish(ctx)

return nil
}

// SetEvent 允许在创建后更改日志源
func (x *WatchDogCommonsImpl) SetEvent(e *events.Event) {

// 更新事件源
x.e = e

// 触发看门狗事件源更改事件
ctx, cancelFunc := context.WithTimeout(context.Background(), time.Minute*5)
defer cancelFunc()
x.e.AddAction(events.NewAction(ActionWatchDogSetEvent)).Publish(ctx)
}

0 comments on commit dc582ed

Please sign in to comment.