1

キューをいっぱいにして、作業するタスクの数が適切であることを確認してprefetch(1)から、各ワーカーが一度に 1 つのタスクしか処理しないように並列に設定します。

私は、各ワーカーがそのタスクを実行し、手動で確認を送信し、さらに作業がある場合はキューから取得して作業を続けたいと考えています。

それ以上の作業がない場合、つまりキューが空である場合は、ワーカー スクリプトを終了してreturn(0).

だから、これは私が今持っているものです:

require 'bunny'

connection = Bunny.new("amqp://my_conn")
connection.start

channel = connection.create_channel
queue = channel.queue('my_queue_name')

channel.prefetch(1)
puts ' [*] Waiting for messages.'

begin
  payload = 'init'
  until queue.message_count == 0
    puts "worker working queue length is #{queue.message_count}"
    _delivery_info, _properties, payload = queue.pop
    unless payload.nil?
      puts " [x] Received #{payload}"
      raise "payload invalid" unless payload[/cucumber/]

      begin
        do_stuff(payload)
      rescue => e
        puts "Error running #{payload}: #{e.backtrace.join('\n')}"
        #failing stuff
      end
    end
    puts " [x] Done with #{payload}"
  end

  puts "done with queue"
  connection.close
  exit(0)
ensure
  connection.close
end 

キューが空になったときに完了したことを確認したい。これは、RabbitMQ サイトの例です... https://www.rabbitmq.com/tutorials/tutorial-two-ruby.html。作業キューに必要なものがいくつかありますが、最も重要なのは手動の承認です。しかし、それは実行を停止しないため、キューが完了したときにプログラムで停止する必要があります。

#!/usr/bin/env ruby
require 'bunny'

connection = Bunny.new(automatically_recover: false)
connection.start

channel = connection.create_channel
queue = channel.queue('task_queue', durable: true)

channel.prefetch(1)
puts ' [*] Waiting for messages. To exit press CTRL+C'

begin
  queue.subscribe(manual_ack: true, block: true) do |delivery_info, _properties, body|
    puts " [x] Received '#{body}'"
    # imitate some work
    sleep body.count('.').to_i
    puts ' [x] Done'
    channel.ack(delivery_info.delivery_tag)
  end
rescue Interrupt => _
  connection.close
end

キューが完全に処理されたとき (合計 0 で 0 未確認) に終了するようにこのスクリプトをどのように適応させることができますか?

4

1 に答える 1