0

私がやりたいことは、1 時間ごとにサーバー上の RMQ 交換にメッセージのバッチを発行する定期的なプロセスを開始することです。

RMQProcessAMQP イベント ループを起動するクラスを と呼びましょう。これを行うには rufus-scheduler を使用できると思いました。

scheduler.every '10s', :times=>6 do
  process = RMQProcess.new
  process.start
end

scheduler.join

これは機能します...ループを通過するたびに、AMQP チャネルが増加します (2 から 4 から 6 など...)。これは、チャネルが適切に閉じられていないことを意味していると思います。これが問題を引き起こす可能性があります。

私の質問を要約すると、この種のことを行うための適切な (または少なくとも適切な) 方法は何ですか? スケジューラ プロセスに入る前に AMQP プロセスを起動する必要がありますか、それとも正しい方法で実行していますか? AMQP イベント ループ内で独自のスケジューリング ロジックを展開する必要がありますか? もっと良い方法があるに違いないように思えるので、それは私の恐れです。アドバイスをいただければ幸いです。

参考までに、start メソッドは次のとおりです (この場合は、RandomText gem を使用して意味のない文章を公開しているだけです)。

def start
    begin
      puts @rmq_params
      AMQP.start(@rmq_params) do 
        |connection|

        connection.on_error do
          |ch, connection_close|
          puts "#{connection_close.reply_text}"
        end

        connection.on_tcp_connection_loss do
          |conn, settings|
          puts "[network failure] Trying to reconnect..."
          conn.reconnect(false, 2)
        end

        channel = AMQP::Channel.new(connection, :auto_recovery => true)
        puts "Channel ID = #{channel.id}"
        exchange = channel.direct(@exchangeName,:durable => true)
        exchange.publish(Lorem.words)

        EM.add_timer(@duration) do
            connection.close do
                EM.stop_event_loop
            end
        end


        Signal.trap("INT") do
          connection.close do
            EM.stop_event_loop
          end
        end

      end
    rescue Exception => e
      puts "#{e.message} #{e.backtrace.join("\n")}"
    end
end
4

1 に答える 1

0

する

6.times do
  process = RMQProcess.new
  process.start
  sleep(10)
end

振る舞いが違う?

私があなただったら、RMQProcess#start の外で AMQP を「開始」し、結果の接続を再利用します。

于 2014-04-06T21:57:06.517 に答える