From 1ebcf89ce870ac501d8289b83dbc8f079af9e925 Mon Sep 17 00:00:00 2001 From: Nick Date: Tue, 25 Jun 2024 15:33:12 +0200 Subject: [PATCH 1/5] catch newly bubbled errors by redis client + redlock --- lib/active_job/uniqueness/strategies/base.rb | 24 ++++++++++++++++---- 1 file changed, 20 insertions(+), 4 deletions(-) diff --git a/lib/active_job/uniqueness/strategies/base.rb b/lib/active_job/uniqueness/strategies/base.rb index 81bbee3..5f2761d 100644 --- a/lib/active_job/uniqueness/strategies/base.rb +++ b/lib/active_job/uniqueness/strategies/base.rb @@ -11,12 +11,13 @@ class Base delegate :lock_manager, :config, to: :'ActiveJob::Uniqueness' - attr_reader :lock_key, :lock_ttl, :on_conflict, :job + attr_reader :lock_key, :lock_ttl, :on_conflict, :on_redis_connection_error, :job def initialize(job:) @lock_key = job.lock_key @lock_ttl = (job.lock_options[:lock_ttl] || config.lock_ttl).to_i * 1000 # ms @on_conflict = job.lock_options[:on_conflict] || config.on_conflict + @on_redis_connection_error = job.lock_options[:on_redis_connection_error] @job = job end @@ -24,6 +25,8 @@ def lock(resource:, ttl:, event: :lock) lock_manager.lock(resource, ttl).tap do |result| instrument(event, resource: resource, ttl: ttl) if result end + rescue RedisClient::ConnectionError => e + [:handle_redis_connection_error, e] end def unlock(resource:, event: :unlock) @@ -56,10 +59,17 @@ def after_perform module LockingOnEnqueue def before_enqueue - return if lock(resource: lock_key, ttl: lock_ttl) - handle_conflict(resource: lock_key, on_conflict: on_conflict) - abort_job + case lock(resource: lock_key, ttl: lock_ttl) + in [:handle_redis_connection_error, error] + handle_redis_connection_error(resource: lock_key, on_redis_connection_error: on_redis_connection_error, error: error) + abort_job + in nil | false + handle_conflict(resource: lock_key, on_conflict: on_conflict) + abort_job + else + return + end end def around_enqueue(block) @@ -86,6 +96,12 @@ def handle_conflict(on_conflict:, resource:, event: :conflict) end end + def handle_redis_connection_error(resource:, on_redis_connection_error:, error:) + raise error unless on_redis_connection_error + + on_redis_connection_error.call(job, resource:, error:) + end + def abort_job @job_aborted = true # ActiveJob 4.2 workaround From 19cbeb6d99d81839fc79317961193f28f71f14fd Mon Sep 17 00:00:00 2001 From: Nick Date: Tue, 25 Jun 2024 15:33:21 +0200 Subject: [PATCH 2/5] add tests --- spec/support/shared_examples/enqueuing.rb | 36 +++++++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/spec/support/shared_examples/enqueuing.rb b/spec/support/shared_examples/enqueuing.rb index ce23413..4767eb7 100644 --- a/spec/support/shared_examples/enqueuing.rb +++ b/spec/support/shared_examples/enqueuing.rb @@ -65,6 +65,42 @@ end end + context 'when locking fails due to RedisClient error' do + before do + job_class.unique strategy + + allow_any_instance_of(ActiveJob::Uniqueness::LockManager).to receive(:lock).and_raise(RedisClient::ConnectionError) + end + + shared_examples 'of no jobs enqueued' do + it 'does not enqueue the job' do + expect { suppress(RedisClient::ConnectionError) { subject } }.not_to change(enqueued_jobs, :count) + end + + it 'does not remove the existing lock' do + expect { suppress(RedisClient::ConnectionError) { subject } }.not_to change(locks, :count) + end + end + + context 'when no options given' do + include_examples 'of no jobs enqueued' + + it 'raises a RedisClient::ConnectionError error' do + expect { subject }.to raise_error RedisClient::ConnectionError + end + end + + context 'when on_redis_connection_error: Proc given' do + before { job_class.unique strategy, on_redis_connection_error: ->(job, **_kwargs) { job.logger.info('Oops') } } + + include_examples 'of no jobs enqueued' + + it 'calls the Proc' do + expect { subject }.to log(/Oops/) + end + end + end + context 'when the lock exists' do before do job_class.unique strategy From f4cc839929c2dee1644e01a292f02a804f4f24ae Mon Sep 17 00:00:00 2001 From: Nick Date: Tue, 25 Jun 2024 16:26:26 +0200 Subject: [PATCH 3/5] support on_redis_connection_error: :raise and as config --- lib/active_job/uniqueness/active_job_patch.rb | 3 ++- lib/active_job/uniqueness/configuration.rb | 14 ++++++++++++++ lib/active_job/uniqueness/strategies/base.rb | 11 +++++++---- spec/support/shared_examples/enqueuing.rb | 10 ++++++++++ 4 files changed, 33 insertions(+), 5 deletions(-) diff --git a/lib/active_job/uniqueness/active_job_patch.rb b/lib/active_job/uniqueness/active_job_patch.rb index fd39832..330b900 100644 --- a/lib/active_job/uniqueness/active_job_patch.rb +++ b/lib/active_job/uniqueness/active_job_patch.rb @@ -27,6 +27,7 @@ module ActiveJobPatch def unique(strategy, options = {}) validate_on_conflict_action!(options[:on_conflict]) validate_on_conflict_action!(options[:on_runtime_conflict]) + validate_on_redis_connection_error!(options[:on_redis_connection_error]) self.lock_strategy_class = ActiveJob::Uniqueness::Strategies.lookup(strategy) self.lock_options = options @@ -40,7 +41,7 @@ def unlock!(*arguments) private - delegate :validate_on_conflict_action!, to: :'ActiveJob::Uniqueness.config' + delegate :validate_on_conflict_action!, :validate_on_redis_connection_error!, to: :'ActiveJob::Uniqueness.config' end included do diff --git a/lib/active_job/uniqueness/configuration.rb b/lib/active_job/uniqueness/configuration.rb index e75237d..113d230 100644 --- a/lib/active_job/uniqueness/configuration.rb +++ b/lib/active_job/uniqueness/configuration.rb @@ -14,6 +14,7 @@ class Configuration config_accessor(:lock_ttl) { 86_400 } # 1.day config_accessor(:lock_prefix) { 'activejob_uniqueness' } config_accessor(:on_conflict) { :raise } + config_accessor(:on_redis_connection_error) { :raise } config_accessor(:redlock_servers) { [ENV.fetch('REDIS_URL', 'redis://localhost:6379')] } config_accessor(:redlock_options) { { retry_count: 0 } } config_accessor(:lock_strategies) { {} } @@ -34,6 +35,19 @@ def validate_on_conflict_action!(action) raise ActiveJob::Uniqueness::InvalidOnConflictAction, "Unexpected '#{action}' action on conflict" end + + + def on_redis_connection_error=(action) + validate_on_redis_connection_error!(action) + + config.on_redis_connection_error = action + end + + def validate_on_redis_connection_error!(action) + return if action.nil? || :raise == action || action.respond_to?(:call) + + raise ActiveJob::Uniqueness::InvalidOnConflictAction, "Unexpected '#{action}' action on_redis_connection_error" + end end end end diff --git a/lib/active_job/uniqueness/strategies/base.rb b/lib/active_job/uniqueness/strategies/base.rb index 5f2761d..199ba02 100644 --- a/lib/active_job/uniqueness/strategies/base.rb +++ b/lib/active_job/uniqueness/strategies/base.rb @@ -17,7 +17,7 @@ def initialize(job:) @lock_key = job.lock_key @lock_ttl = (job.lock_options[:lock_ttl] || config.lock_ttl).to_i * 1000 # ms @on_conflict = job.lock_options[:on_conflict] || config.on_conflict - @on_redis_connection_error = job.lock_options[:on_redis_connection_error] + @on_redis_connection_error = job.lock_options[:on_redis_connection_error] || config.on_redis_connection_error @job = job end @@ -97,9 +97,12 @@ def handle_conflict(on_conflict:, resource:, event: :conflict) end def handle_redis_connection_error(resource:, on_redis_connection_error:, error:) - raise error unless on_redis_connection_error - - on_redis_connection_error.call(job, resource:, error:) + case on_redis_connection_error + when :raise then raise error + when nil then raise error + else + on_redis_connection_error.call(job, resource:, error:) + end end def abort_job diff --git a/spec/support/shared_examples/enqueuing.rb b/spec/support/shared_examples/enqueuing.rb index 4767eb7..f651478 100644 --- a/spec/support/shared_examples/enqueuing.rb +++ b/spec/support/shared_examples/enqueuing.rb @@ -90,6 +90,16 @@ end end + context 'when on_redis_connection_error: :raise given' do + before { job_class.unique strategy, on_redis_connection_error: :raise } + + include_examples 'of no jobs enqueued' + + it 'raises a RedisClient::ConnectionError error' do + expect { subject }.to raise_error RedisClient::ConnectionError + end + end + context 'when on_redis_connection_error: Proc given' do before { job_class.unique strategy, on_redis_connection_error: ->(job, **_kwargs) { job.logger.info('Oops') } } From 97de8fa29eb1c36ccea9660c77983aad408b9d6a Mon Sep 17 00:00:00 2001 From: Nick Date: Tue, 25 Jun 2024 16:36:54 +0200 Subject: [PATCH 4/5] fix syntax for older ruby versions, fix rubocop offenses --- lib/active_job/uniqueness/active_job_patch.rb | 4 ++- lib/active_job/uniqueness/configuration.rb | 3 +- lib/active_job/uniqueness/sidekiq_patch.rb | 4 +-- lib/active_job/uniqueness/strategies/base.rb | 28 ++++++++----------- 4 files changed, 18 insertions(+), 21 deletions(-) diff --git a/lib/active_job/uniqueness/active_job_patch.rb b/lib/active_job/uniqueness/active_job_patch.rb index 330b900..3daebae 100644 --- a/lib/active_job/uniqueness/active_job_patch.rb +++ b/lib/active_job/uniqueness/active_job_patch.rb @@ -41,7 +41,9 @@ def unlock!(*arguments) private - delegate :validate_on_conflict_action!, :validate_on_redis_connection_error!, to: :'ActiveJob::Uniqueness.config' + delegate :validate_on_conflict_action!, + :validate_on_redis_connection_error!, + to: :'ActiveJob::Uniqueness.config' end included do diff --git a/lib/active_job/uniqueness/configuration.rb b/lib/active_job/uniqueness/configuration.rb index 113d230..357b92d 100644 --- a/lib/active_job/uniqueness/configuration.rb +++ b/lib/active_job/uniqueness/configuration.rb @@ -36,7 +36,6 @@ def validate_on_conflict_action!(action) raise ActiveJob::Uniqueness::InvalidOnConflictAction, "Unexpected '#{action}' action on conflict" end - def on_redis_connection_error=(action) validate_on_redis_connection_error!(action) @@ -44,7 +43,7 @@ def on_redis_connection_error=(action) end def validate_on_redis_connection_error!(action) - return if action.nil? || :raise == action || action.respond_to?(:call) + return if action.nil? || action == :raise || action.respond_to?(:call) raise ActiveJob::Uniqueness::InvalidOnConflictAction, "Unexpected '#{action}' action on_redis_connection_error" end diff --git a/lib/active_job/uniqueness/sidekiq_patch.rb b/lib/active_job/uniqueness/sidekiq_patch.rb index b5aa0dc..d0fd9f4 100644 --- a/lib/active_job/uniqueness/sidekiq_patch.rb +++ b/lib/active_job/uniqueness/sidekiq_patch.rb @@ -41,7 +41,7 @@ def remove_job module ScheduledSet def delete(score, job_id) entry = find_job(job_id) - ActiveJob::Uniqueness.unlock_sidekiq_job!(entry.item) if super(score, job_id) + ActiveJob::Uniqueness.unlock_sidekiq_job!(entry.item) if super entry end end @@ -67,7 +67,7 @@ def clear end def delete_by_value(name, value) - ActiveJob::Uniqueness.unlock_sidekiq_job!(Sidekiq.load_json(value)) if super(name, value) + ActiveJob::Uniqueness.unlock_sidekiq_job!(Sidekiq.load_json(value)) if super end end end diff --git a/lib/active_job/uniqueness/strategies/base.rb b/lib/active_job/uniqueness/strategies/base.rb index 199ba02..efc7f88 100644 --- a/lib/active_job/uniqueness/strategies/base.rb +++ b/lib/active_job/uniqueness/strategies/base.rb @@ -25,8 +25,6 @@ def lock(resource:, ttl:, event: :lock) lock_manager.lock(resource, ttl).tap do |result| instrument(event, resource: resource, ttl: ttl) if result end - rescue RedisClient::ConnectionError => e - [:handle_redis_connection_error, e] end def unlock(resource:, event: :unlock) @@ -59,17 +57,16 @@ def after_perform module LockingOnEnqueue def before_enqueue - - case lock(resource: lock_key, ttl: lock_ttl) - in [:handle_redis_connection_error, error] - handle_redis_connection_error(resource: lock_key, on_redis_connection_error: on_redis_connection_error, error: error) - abort_job - in nil | false - handle_conflict(resource: lock_key, on_conflict: on_conflict) - abort_job - else - return - end + return if lock(resource: lock_key, ttl: lock_ttl) + + handle_conflict(resource: lock_key, on_conflict: on_conflict) + abort_job + rescue RedisClient::ConnectionError => e + handle_redis_connection_error( + resource: lock_key, on_redis_connection_error: + on_redis_connection_error, error: e + ) + abort_job end def around_enqueue(block) @@ -98,10 +95,9 @@ def handle_conflict(on_conflict:, resource:, event: :conflict) def handle_redis_connection_error(resource:, on_redis_connection_error:, error:) case on_redis_connection_error - when :raise then raise error - when nil then raise error + when :raise, nil then raise error else - on_redis_connection_error.call(job, resource:, error:) + on_redis_connection_error.call(job, resource: resource, error: error) end end From 8c033b61262e62cc22c7d3031dc7deb2aa472f35 Mon Sep 17 00:00:00 2001 From: Nick Date: Wed, 26 Jun 2024 09:41:29 +0200 Subject: [PATCH 5/5] update readme and initializer --- README.md | 13 +++++++++++++ .../config/initializers/active_job_uniqueness.rb | 7 +++++++ 2 files changed, 20 insertions(+) diff --git a/README.md b/README.md index 6cb295e..ee460c6 100644 --- a/README.md +++ b/README.md @@ -87,6 +87,19 @@ class MyJob < ActiveJob::Base end ``` +### Control redis connection errors + +```ruby +class MyJob < ActiveJob::Base + # Proc gets the job instance including its arguments, and as keyword arguments the resource(lock key) `resource` and the original error `error` + unique :until_executing, on_redis_connection_error: ->(job, resource: _, error: _) { job.logger.info "Oops: #{job.arguments}" } + + def perform(args) + # work + end +end +``` + ### Control lock key arguments ```ruby diff --git a/lib/generators/active_job/uniqueness/templates/config/initializers/active_job_uniqueness.rb b/lib/generators/active_job/uniqueness/templates/config/initializers/active_job_uniqueness.rb index 8c234e2..99e408d 100644 --- a/lib/generators/active_job/uniqueness/templates/config/initializers/active_job_uniqueness.rb +++ b/lib/generators/active_job/uniqueness/templates/config/initializers/active_job_uniqueness.rb @@ -19,6 +19,13 @@ # # config.on_conflict = :raise + # Default action on redis connection error. Can be set per job. + # Allowed values are + # :raise - raises ActiveJob::Uniqueness::JobNotUnique + # proc - custom Proc. For example, ->(job, resource: _, error: _) { job.logger.info("Job already in queue: #{job.class.name} #{job.arguments.inspect} (#{job.job_id})") } + # + # config.on_conflict = :raise + # Digest method for lock keys generating. Expected to have `hexdigest` class method. # # config.digest_method = OpenSSL::Digest::MD5