Skip to content

Commit

Permalink
catch newly bubbled errors by redis client + redlock
Browse files Browse the repository at this point in the history
  • Loading branch information
nduitz committed Jun 25, 2024
1 parent 179184f commit 1ebcf89
Showing 1 changed file with 20 additions and 4 deletions.
24 changes: 20 additions & 4 deletions lib/active_job/uniqueness/strategies/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,22 @@ class Base

delegate :lock_manager, :config, to: :'ActiveJob::Uniqueness'

attr_reader :lock_key, :lock_ttl, :on_conflict, :job
attr_reader :lock_key, :lock_ttl, :on_conflict, :on_redis_connection_error, :job

def initialize(job:)
@lock_key = job.lock_key
@lock_ttl = (job.lock_options[:lock_ttl] || config.lock_ttl).to_i * 1000 # ms
@on_conflict = job.lock_options[:on_conflict] || config.on_conflict
@on_redis_connection_error = job.lock_options[:on_redis_connection_error]
@job = job
end

def lock(resource:, ttl:, event: :lock)
lock_manager.lock(resource, ttl).tap do |result|
instrument(event, resource: resource, ttl: ttl) if result
end
rescue RedisClient::ConnectionError => e
[:handle_redis_connection_error, e]
end

def unlock(resource:, event: :unlock)
Expand Down Expand Up @@ -56,10 +59,17 @@ def after_perform

module LockingOnEnqueue
def before_enqueue
return if lock(resource: lock_key, ttl: lock_ttl)

handle_conflict(resource: lock_key, on_conflict: on_conflict)
abort_job
case lock(resource: lock_key, ttl: lock_ttl)
in [:handle_redis_connection_error, error]
handle_redis_connection_error(resource: lock_key, on_redis_connection_error: on_redis_connection_error, error: error)
abort_job
in nil | false
handle_conflict(resource: lock_key, on_conflict: on_conflict)
abort_job
else
return
end
end

def around_enqueue(block)
Expand All @@ -86,6 +96,12 @@ def handle_conflict(on_conflict:, resource:, event: :conflict)
end
end

def handle_redis_connection_error(resource:, on_redis_connection_error:, error:)
raise error unless on_redis_connection_error

on_redis_connection_error.call(job, resource:, error:)
end

def abort_job
@job_aborted = true # ActiveJob 4.2 workaround

Expand Down

0 comments on commit 1ebcf89

Please sign in to comment.