Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Weighted queues #85

Open
dkam opened this issue Jul 12, 2022 · 1 comment
Open

Weighted queues #85

dkam opened this issue Jul 12, 2022 · 1 comment

Comments

@dkam
Copy link

dkam commented Jul 12, 2022

Hi there,
First, thanks for writing this excellent software - I've been using it for 10+ years and processed literally billions of jobs with it.

I recently started using weighted queues in another project which uses Sidekiq and I really enjoy not having to worry about fine turning priorities and running multiple job processors to avoid queue starvation. It's nice to get a predictable amount of processing for every queue.

Is there any way to achieve something similar with Beanstalkd? Essentially, selecting from job queues in a weighted random fashion?

I imagine I could rig up a system that uses weights to randomly select a queue, peak to see if there are jobs ready and if so, reserve a job from that queue. Rough code below:

pipes = [{pipe: 'low_priority', weight: 1}, {pipe: 'medium_priority', weight: 2}, {pipe: 'high_priority', weight: 4}]

q_min = pipes.min_by {|q| q.dig(:weight)}.dig(:weight)
q_max = pipes.inject(0) {|r, q| r + q.dig(:weight) }
range = q_min..q_max

loop do
  ## Randomly select a queue based on weights and assign queue name to pipe_to_process

  q_rand = Random.rand(range)
  q_accumulate = 0
  pipe_to_process = pipes.find do |q| 
    q_accumulate += q.dig(:weight)
    q_accumulate >= q_rand
  end

  pipe = pipe_to_process.dig(:pipe)
  
  unless beanstalk.tubes.find(pipe).peek(:ready).nil?
    puts "Getting job from #{pipe}"
    beanstalk.tubes.watch!(pipe)
    job = beanstalk.tubes.reserve(1)
    puts "Got job: #{job.id} : tube: #{pipe}"
    job.release delay: 5
  else
    puts "No jobs in #{pipe}"
  end
end

Not ideal to wait on a queue that might be empty ( if running multiple processors ). And when there are no jobs it thrashes between all the queues. Any thoughts on better ways to get weighted queues?

@dkam
Copy link
Author

dkam commented Jul 12, 2022

I wanted to test if that algorithm actually worked and distributed calls to the pipes - this was my test script:

#!/usr/bin/env ruby

pipes = [{pipe: 'low_priority', weight: 1}, {pipe: 'medium_priority', weight: 2}, {pipe: 'high_priority', weight: 4}]

q_min = pipes.min_by {|q| q.dig(:weight)}.dig(:weight)
q_max = pipes.inject(0) {|r, q| r + q.dig(:weight) }
range = q_min..q_max

result = { 'low_priority' => 0, 'medium_priority' => 0, 'high_priority' => 0 }

loop do
  q_rand = Random.rand(range)
  q_accumulate = 0
  pipe_to_process = pipes.find do |q| 
    q_accumulate += q.dig(:weight)
    q_accumulate >= q_rand
  end

  pipe = pipe_to_process.dig(:pipe)
  
  result[pipe] += 1

  puts result.map {|p| p.join(': ') }.join(" | ")
end

I killed it at 1,000,000 for low priority:

low_priority: 1000000 | medium_priority: 2002152 | high_priority: 4004216

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant