0

IO を非同期で実行して、プロセス内のキューからジョブを処理する必要があります。それはとても簡単です。問題は、これらのジョブが追加のアイテムをキューに追加できることです。

私はこの問題をあまりにも長くいじっていたので、頭がぼんやりしていると思います — それほど難しくないはずです. 私は二者択一のシナリオを考え続けます:

  1. キューは非同期でジョブを実行でき、後で結果を結合できます。
  2. キューは、最後のジョブが終了してキューが空になるまで、ジョブを同期的に実行できます。

私は、EventMachine と Goliath (どちらも を使用できますEM::HttpRequest) から Celluloid (ただし、実際にそれを使って何かを構築することはありませんでした) まで、あらゆるものをいじり、Fiber を使用して Enumerator を作成しました。私の脳は揚げられていますが。

私が望むのは、単純に、これを行うことができることです:

items = [1,2,3]
items.each do |item|
  if item.has_particular_condition? 
    items << item.process_one_way
  elsif item.other_condition?
    items << item.process_another_way
  # ...
  end
end

#=> [1,2,3,4,5,6]

...ここで、4、5、および 6 はすべてセット内の元のアイテムを処理した結果であり、7、8、および 9 は 4、5、および 6 を処理した結果です。無期限に処理することを心配する必要はありません処理中のデータは数回の反復後に終了するためです。

低レベルの実装コード例だけでなく、高レベルのガイダンス、コメント、他のライブラリへのリンクなども大歓迎です。

4

2 に答える 2

0

私は過去に同様の要件を持っていましたが、あなたが必要としているのは、その音からの堅牢で高性能なワーク キューです。私が 1 年以上前に発見し、それ以来 Ruby で何千ものジョブを確実に処理するために使用しているbeanstalkdを確認することをお勧めします。

特に、beanstalkd を中心とした堅牢な Ruby ライブラリの開発を開始しました。特に、beanstalkdを使用した Ruby のプロダクション対応ワーク キューであるbackburnerを確認してください。構文とセットアップは簡単で、ジョブ プロセスの迅速な処理方法の定義、ジョブの失敗と再試行の処理はすべて、ジョブ スケジューリングなどと同様に組み込まれています。

ご不明な点がございましたらお知らせください。beanstalkd と backburner はお客様の要件に非常に適していると思います。

于 2012-11-15T11:36:35.977 に答える
0

私は最終的に、少し理想的ではないものを実装しました。基本的には、新しい結果がキューに入れられなくなると終了するループで EM Fibre Iterator をラップするだけです。

require 'set'

class SetRunner
  def initialize(seed_queue)
    @results = seed_queue.to_set
  end

  def run
    begin
      yield last_loop_results, result_bucket
    end until new_loop_results.empty?

    return @results
  end

  def last_loop_results
    result_bucket.shift(result_bucket.count)
  end

  def result_bucket
    @result_bucket ||= @results.to_a
  end

  def new_loop_results
    # .add? returns nil if already in the set
    result_bucket.each { |item| @results.add? item }.compact
  end
end

次に、EventMachine で使用するには:

queue = [1,2,3]
results = SetRunner.new(queue).run do |set, output|
  EM::Synchrony::FiberIterator.new(set, 3).each do |item|
    output.push(item + 3) if item <= 6
  end
end
# => [1,2,3,4,5,6,7,8,9]

次に、各セットは、FiberIterator に渡された同時実行レベルで実行されますが、各セットの結果は、外側の SetRunner ループの次の反復で実行されます。

于 2012-12-03T18:34:07.477 に答える