diff --git a/README.md b/README.md index 6cb295e..ee460c6 100644 --- a/README.md +++ b/README.md @@ -87,6 +87,19 @@ class MyJob < ActiveJob::Base end ``` +### Control redis connection errors + +```ruby +class MyJob < ActiveJob::Base + # Proc gets the job instance including its arguments, and as keyword arguments the resource(lock key) `resource` and the original error `error` + unique :until_executing, on_redis_connection_error: ->(job, resource: _, error: _) { job.logger.info "Oops: #{job.arguments}" } + + def perform(args) + # work + end +end +``` + ### Control lock key arguments ```ruby diff --git a/lib/active_job/uniqueness/active_job_patch.rb b/lib/active_job/uniqueness/active_job_patch.rb index fd39832..3daebae 100644 --- a/lib/active_job/uniqueness/active_job_patch.rb +++ b/lib/active_job/uniqueness/active_job_patch.rb @@ -27,6 +27,7 @@ module ActiveJobPatch def unique(strategy, options = {}) validate_on_conflict_action!(options[:on_conflict]) validate_on_conflict_action!(options[:on_runtime_conflict]) + validate_on_redis_connection_error!(options[:on_redis_connection_error]) self.lock_strategy_class = ActiveJob::Uniqueness::Strategies.lookup(strategy) self.lock_options = options @@ -40,7 +41,9 @@ def unlock!(*arguments) private - delegate :validate_on_conflict_action!, to: :'ActiveJob::Uniqueness.config' + delegate :validate_on_conflict_action!, + :validate_on_redis_connection_error!, + to: :'ActiveJob::Uniqueness.config' end included do diff --git a/lib/active_job/uniqueness/configuration.rb b/lib/active_job/uniqueness/configuration.rb index e75237d..357b92d 100644 --- a/lib/active_job/uniqueness/configuration.rb +++ b/lib/active_job/uniqueness/configuration.rb @@ -14,6 +14,7 @@ class Configuration config_accessor(:lock_ttl) { 86_400 } # 1.day config_accessor(:lock_prefix) { 'activejob_uniqueness' } config_accessor(:on_conflict) { :raise } + config_accessor(:on_redis_connection_error) { :raise } config_accessor(:redlock_servers) { [ENV.fetch('REDIS_URL', 'redis://localhost:6379')] } config_accessor(:redlock_options) { { retry_count: 0 } } config_accessor(:lock_strategies) { {} } @@ -34,6 +35,18 @@ def validate_on_conflict_action!(action) raise ActiveJob::Uniqueness::InvalidOnConflictAction, "Unexpected '#{action}' action on conflict" end + + def on_redis_connection_error=(action) + validate_on_redis_connection_error!(action) + + config.on_redis_connection_error = action + end + + def validate_on_redis_connection_error!(action) + return if action.nil? || action == :raise || action.respond_to?(:call) + + raise ActiveJob::Uniqueness::InvalidOnConflictAction, "Unexpected '#{action}' action on_redis_connection_error" + end end end end diff --git a/lib/active_job/uniqueness/sidekiq_patch.rb b/lib/active_job/uniqueness/sidekiq_patch.rb index b5aa0dc..d0fd9f4 100644 --- a/lib/active_job/uniqueness/sidekiq_patch.rb +++ b/lib/active_job/uniqueness/sidekiq_patch.rb @@ -41,7 +41,7 @@ def remove_job module ScheduledSet def delete(score, job_id) entry = find_job(job_id) - ActiveJob::Uniqueness.unlock_sidekiq_job!(entry.item) if super(score, job_id) + ActiveJob::Uniqueness.unlock_sidekiq_job!(entry.item) if super entry end end @@ -67,7 +67,7 @@ def clear end def delete_by_value(name, value) - ActiveJob::Uniqueness.unlock_sidekiq_job!(Sidekiq.load_json(value)) if super(name, value) + ActiveJob::Uniqueness.unlock_sidekiq_job!(Sidekiq.load_json(value)) if super end end end diff --git a/lib/active_job/uniqueness/strategies/base.rb b/lib/active_job/uniqueness/strategies/base.rb index 81bbee3..efc7f88 100644 --- a/lib/active_job/uniqueness/strategies/base.rb +++ b/lib/active_job/uniqueness/strategies/base.rb @@ -11,12 +11,13 @@ class Base delegate :lock_manager, :config, to: :'ActiveJob::Uniqueness' - attr_reader :lock_key, :lock_ttl, :on_conflict, :job + attr_reader :lock_key, :lock_ttl, :on_conflict, :on_redis_connection_error, :job def initialize(job:) @lock_key = job.lock_key @lock_ttl = (job.lock_options[:lock_ttl] || config.lock_ttl).to_i * 1000 # ms @on_conflict = job.lock_options[:on_conflict] || config.on_conflict + @on_redis_connection_error = job.lock_options[:on_redis_connection_error] || config.on_redis_connection_error @job = job end @@ -60,6 +61,12 @@ def before_enqueue handle_conflict(resource: lock_key, on_conflict: on_conflict) abort_job + rescue RedisClient::ConnectionError => e + handle_redis_connection_error( + resource: lock_key, on_redis_connection_error: + on_redis_connection_error, error: e + ) + abort_job end def around_enqueue(block) @@ -86,6 +93,14 @@ def handle_conflict(on_conflict:, resource:, event: :conflict) end end + def handle_redis_connection_error(resource:, on_redis_connection_error:, error:) + case on_redis_connection_error + when :raise, nil then raise error + else + on_redis_connection_error.call(job, resource: resource, error: error) + end + end + def abort_job @job_aborted = true # ActiveJob 4.2 workaround diff --git a/lib/generators/active_job/uniqueness/templates/config/initializers/active_job_uniqueness.rb b/lib/generators/active_job/uniqueness/templates/config/initializers/active_job_uniqueness.rb index 8c234e2..99e408d 100644 --- a/lib/generators/active_job/uniqueness/templates/config/initializers/active_job_uniqueness.rb +++ b/lib/generators/active_job/uniqueness/templates/config/initializers/active_job_uniqueness.rb @@ -19,6 +19,13 @@ # # config.on_conflict = :raise + # Default action on redis connection error. Can be set per job. + # Allowed values are + # :raise - raises ActiveJob::Uniqueness::JobNotUnique + # proc - custom Proc. For example, ->(job, resource: _, error: _) { job.logger.info("Job already in queue: #{job.class.name} #{job.arguments.inspect} (#{job.job_id})") } + # + # config.on_conflict = :raise + # Digest method for lock keys generating. Expected to have `hexdigest` class method. # # config.digest_method = OpenSSL::Digest::MD5 diff --git a/spec/support/shared_examples/enqueuing.rb b/spec/support/shared_examples/enqueuing.rb index ce23413..f651478 100644 --- a/spec/support/shared_examples/enqueuing.rb +++ b/spec/support/shared_examples/enqueuing.rb @@ -65,6 +65,52 @@ end end + context 'when locking fails due to RedisClient error' do + before do + job_class.unique strategy + + allow_any_instance_of(ActiveJob::Uniqueness::LockManager).to receive(:lock).and_raise(RedisClient::ConnectionError) + end + + shared_examples 'of no jobs enqueued' do + it 'does not enqueue the job' do + expect { suppress(RedisClient::ConnectionError) { subject } }.not_to change(enqueued_jobs, :count) + end + + it 'does not remove the existing lock' do + expect { suppress(RedisClient::ConnectionError) { subject } }.not_to change(locks, :count) + end + end + + context 'when no options given' do + include_examples 'of no jobs enqueued' + + it 'raises a RedisClient::ConnectionError error' do + expect { subject }.to raise_error RedisClient::ConnectionError + end + end + + context 'when on_redis_connection_error: :raise given' do + before { job_class.unique strategy, on_redis_connection_error: :raise } + + include_examples 'of no jobs enqueued' + + it 'raises a RedisClient::ConnectionError error' do + expect { subject }.to raise_error RedisClient::ConnectionError + end + end + + context 'when on_redis_connection_error: Proc given' do + before { job_class.unique strategy, on_redis_connection_error: ->(job, **_kwargs) { job.logger.info('Oops') } } + + include_examples 'of no jobs enqueued' + + it 'calls the Proc' do + expect { subject }.to log(/Oops/) + end + end + end + context 'when the lock exists' do before do job_class.unique strategy