Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adjust to new redlock behaviour #78

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,19 @@ class MyJob < ActiveJob::Base
end
```

### Control redis connection errors

```ruby
class MyJob < ActiveJob::Base
# Proc gets the job instance including its arguments, and as keyword arguments the resource(lock key) `resource` and the original error `error`
unique :until_executing, on_redis_connection_error: ->(job, resource: _, error: _) { job.logger.info "Oops: #{job.arguments}" }

def perform(args)
# work
end
end
```

### Control lock key arguments

```ruby
Expand Down
5 changes: 4 additions & 1 deletion lib/active_job/uniqueness/active_job_patch.rb
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ module ActiveJobPatch
def unique(strategy, options = {})
validate_on_conflict_action!(options[:on_conflict])
validate_on_conflict_action!(options[:on_runtime_conflict])
validate_on_redis_connection_error!(options[:on_redis_connection_error])

self.lock_strategy_class = ActiveJob::Uniqueness::Strategies.lookup(strategy)
self.lock_options = options
Expand All @@ -40,7 +41,9 @@ def unlock!(*arguments)

private

delegate :validate_on_conflict_action!, to: :'ActiveJob::Uniqueness.config'
delegate :validate_on_conflict_action!,
:validate_on_redis_connection_error!,
to: :'ActiveJob::Uniqueness.config'
end

included do
Expand Down
13 changes: 13 additions & 0 deletions lib/active_job/uniqueness/configuration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ class Configuration
config_accessor(:lock_ttl) { 86_400 } # 1.day
config_accessor(:lock_prefix) { 'activejob_uniqueness' }
config_accessor(:on_conflict) { :raise }
config_accessor(:on_redis_connection_error) { :raise }
config_accessor(:redlock_servers) { [ENV.fetch('REDIS_URL', 'redis://localhost:6379')] }
config_accessor(:redlock_options) { { retry_count: 0 } }
config_accessor(:lock_strategies) { {} }
Expand All @@ -34,6 +35,18 @@ def validate_on_conflict_action!(action)

raise ActiveJob::Uniqueness::InvalidOnConflictAction, "Unexpected '#{action}' action on conflict"
end

def on_redis_connection_error=(action)
validate_on_redis_connection_error!(action)

config.on_redis_connection_error = action
end

def validate_on_redis_connection_error!(action)
return if action.nil? || action == :raise || action.respond_to?(:call)

raise ActiveJob::Uniqueness::InvalidOnConflictAction, "Unexpected '#{action}' action on_redis_connection_error"
end
end
end
end
4 changes: 2 additions & 2 deletions lib/active_job/uniqueness/sidekiq_patch.rb
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def remove_job
module ScheduledSet
def delete(score, job_id)
entry = find_job(job_id)
ActiveJob::Uniqueness.unlock_sidekiq_job!(entry.item) if super(score, job_id)
ActiveJob::Uniqueness.unlock_sidekiq_job!(entry.item) if super
entry
end
end
Expand All @@ -67,7 +67,7 @@ def clear
end

def delete_by_value(name, value)
ActiveJob::Uniqueness.unlock_sidekiq_job!(Sidekiq.load_json(value)) if super(name, value)
ActiveJob::Uniqueness.unlock_sidekiq_job!(Sidekiq.load_json(value)) if super
end
end
end
Expand Down
17 changes: 16 additions & 1 deletion lib/active_job/uniqueness/strategies/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,13 @@ class Base

delegate :lock_manager, :config, to: :'ActiveJob::Uniqueness'

attr_reader :lock_key, :lock_ttl, :on_conflict, :job
attr_reader :lock_key, :lock_ttl, :on_conflict, :on_redis_connection_error, :job

def initialize(job:)
@lock_key = job.lock_key
@lock_ttl = (job.lock_options[:lock_ttl] || config.lock_ttl).to_i * 1000 # ms
@on_conflict = job.lock_options[:on_conflict] || config.on_conflict
@on_redis_connection_error = job.lock_options[:on_redis_connection_error] || config.on_redis_connection_error
@job = job
end

Expand Down Expand Up @@ -60,6 +61,12 @@ def before_enqueue

handle_conflict(resource: lock_key, on_conflict: on_conflict)
abort_job
rescue RedisClient::ConnectionError => e
handle_redis_connection_error(
resource: lock_key, on_redis_connection_error:
on_redis_connection_error, error: e
)
abort_job
end

def around_enqueue(block)
Expand All @@ -86,6 +93,14 @@ def handle_conflict(on_conflict:, resource:, event: :conflict)
end
end

def handle_redis_connection_error(resource:, on_redis_connection_error:, error:)
case on_redis_connection_error
when :raise, nil then raise error
else
on_redis_connection_error.call(job, resource: resource, error: error)
end
end

def abort_job
@job_aborted = true # ActiveJob 4.2 workaround

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,13 @@
#
# config.on_conflict = :raise

# Default action on redis connection error. Can be set per job.
# Allowed values are
# :raise - raises ActiveJob::Uniqueness::JobNotUnique
# proc - custom Proc. For example, ->(job, resource: _, error: _) { job.logger.info("Job already in queue: #{job.class.name} #{job.arguments.inspect} (#{job.job_id})") }
#
# config.on_conflict = :raise

# Digest method for lock keys generating. Expected to have `hexdigest` class method.
#
# config.digest_method = OpenSSL::Digest::MD5
Expand Down
46 changes: 46 additions & 0 deletions spec/support/shared_examples/enqueuing.rb
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,52 @@
end
end

context 'when locking fails due to RedisClient error' do
before do
job_class.unique strategy

allow_any_instance_of(ActiveJob::Uniqueness::LockManager).to receive(:lock).and_raise(RedisClient::ConnectionError)
end

shared_examples 'of no jobs enqueued' do
it 'does not enqueue the job' do
expect { suppress(RedisClient::ConnectionError) { subject } }.not_to change(enqueued_jobs, :count)
end

it 'does not remove the existing lock' do
expect { suppress(RedisClient::ConnectionError) { subject } }.not_to change(locks, :count)
end
end

context 'when no options given' do
include_examples 'of no jobs enqueued'

it 'raises a RedisClient::ConnectionError error' do
expect { subject }.to raise_error RedisClient::ConnectionError
end
end

context 'when on_redis_connection_error: :raise given' do
before { job_class.unique strategy, on_redis_connection_error: :raise }

include_examples 'of no jobs enqueued'

it 'raises a RedisClient::ConnectionError error' do
expect { subject }.to raise_error RedisClient::ConnectionError
end
end

context 'when on_redis_connection_error: Proc given' do
before { job_class.unique strategy, on_redis_connection_error: ->(job, **_kwargs) { job.logger.info('Oops') } }

include_examples 'of no jobs enqueued'

it 'calls the Proc' do
expect { subject }.to log(/Oops/)
end
end
end

context 'when the lock exists' do
before do
job_class.unique strategy
Expand Down