2

Rails アプリケーションからいくつかのデータを収集して集計し、定期的にリモート サーバーに送信する必要があります。application.rb のグローバル変数 (知っている、知っている) で集計クラスをインスタンス化します。

集計クラス内で、10 秒間スリープするスレッドを起動し、キューを確認してデータを処理し、送信します。キューは、クラスのインスタンス変数に格納されたハッシュです。

Rails コントローラーから、アグリゲーター クラスのメソッドを呼び出して、ハッシュ内のデータをキューに入れます。もちろん、これはキューを読み取るバックグラウンド タスクとは別のスレッドにあります。問題は、バックグラウンド タスクがハッシュ内のデータを認識しないことです。私のログでは、(コントローラ スレッドから) 書き込むときと (バックグラウンド スレッドから) 読み取るときの両方で、ハッシュの object_id を出力します。hash#object_id は両方のスレッドから一致しますが、バックグラウンド スレッドにはデータが表示されません。

私を殺しているのは、これがrailsの外でうまく動作することです。私は多くのスレッドでテストを設定しましたが、実際にはうまくいきました (明確にするために示していないスレッド保護がいくつかあります)。がどのようにobject_id一致するか知っている人はいますが、内容が一貫していませんか?

class Aggregator

def initialize
  @q = {}
  @timer = nil
end

def start
  @timer = Thread.new do
    loop do
      sleep(10)
      flush_q
    end
  end
end

def flush_q
  logger.debug "flush: q.object_id = #{@q.object_id}"  # matches what I get below
  logger.debug "flush: q.length = #{@q.length}"   # always zero!
  @q.each_pair do |k,v|
    # pack it up and send it
  end
  @q.clear
end

def add(item)
  logger.debug "add: q.object_id = #{@q.object_id}"  # matches what I get above
  @q[item.name] ||= item
  logger.debug "add: q.length = #{@q.length}"   # increases with each add
  # not actually that simple, but not relevant
end

end
4

1 に答える 1

0

あなたのコードが forking アプリ サーバー (ユニコーンやパッセンジャーなど) を使用してデプロイされていると仮定します。

これは、アプリが一度読み込まれると、そのマスター インスタンスから新しいインスタンスがフォークされることを意味します。フォークは安価であるため、アプリの新しいインスタンスを非常に迅速に起動/シャットダウンできます。

アグリゲーター インスタンスは、このマスター プロセスで作成/開始されていると思います。これが分岐すると、プロセスのメモリ空間全体がコピーされます (そのため、新しいプロセスにアグリゲーターのインスタンスがあり、同じオブジェクト ID などがあります)。

ただし、現在のスレッドのみをフォークするとコピーされるため、アグリゲーターのフラッシュはマスタープロセスでのみ発生しますが、すべての追加は子プロセスで発生します。これは、ログに追加することで確認できProccess.pidます。ログが 2 つの異なるプロセスからのものであることがわかります。

これを修正する 1 つの方法は、子プロセスが分岐した後にスレッドを開始/再開することです。これを行う方法は、アプリの提供方法によって異なります。after_forkユニコーンを使用すると、メソッドを介してユニコーン構成でこれを行うことができます。同乗者とは

PhusionPassenger.on_event(:starting_worker_process) do |forked|
  if forked
    ...
  end
end
于 2013-07-17T21:22:48.890 に答える