8

クライアントから送信されたメッセージに反応するアプリケーションがあります。1 つのメッセージはreload_credentials、新しいクライアントが登録されるたびにアプリケーションが受信するというものです。次に、このメッセージは PostgreSQL データベースに接続し、すべての資格情報に対してクエリを実行し、それらを通常の Ruby ハッシュ ( client_id => client_token ) に格納します。

アプリケーションが受け取る可能性のあるその他のメッセージにはstartstoppauseなどがあります。これらは、いくつかのセッション時間を追跡するために使用されます。私の要点は、アプリケーションが次のように機能することを想定しているということです。

  • クライアントがメッセージを送信します
  • メッセージがキューに入れられる
  • キューは処理中です

ただし、たとえば、リアクターをブロックしたくありません。さらに、reload_credentialsキューに次のメッセージがあるとします。資格情報が DB からリロードされるまで、キューからの他のメッセージを処理したくありません。また、特定のメッセージを処理している間 (資格情報のクエリが完了するのを待つなど)、他のメッセージをキューに入れることを許可したいと考えています。

このような問題を解決する方法を教えてください。を使用する必要があるかもしれないと考えていますem-synchronyが、よくわかりません。

4

2 に答える 2

7

リアクターをブロックしないように、PostgresqlEMドライバーの1つまたはEM.deferを使用します。

'reload_credentials'メッセージを受信したら、フラグを反転するだけで、後続のすべてのメッセージがキューに入れられます。'reload_credentials'が終了したら、キューからのすべてのメッセージを処理します。キューが空になったら、メッセージが受信されたときにメッセージが処理されるようにするフラグを反転します。

PostgresqlのEMドライバーはここにリストされています:https ://github.com/eventmachine/eventmachine/wiki/Protocol-Implementations

module Server
  def post_init
    @queue               = []
    @loading_credentials = false
  end

  def recieve_message(type, data)
    return @queue << [type, data] if @loading_credentials || !@queue.empty?
    return process_msg(type, data) unless :reload_credentials == type
    @loading_credentials = true
    reload_credentials do
      @loading_credentials = false
      process_queue
    end
  end

  def reload_credentials(&when_done)
    EM.defer( proc { query_and_load_credentials }, when_done )
  end


  def process_queue
    while (type, data = @queue.shift)
      process_msg(type, data)
    end
  end

  # lots of other methods
end

EM.start_server(HOST, PORT, Server)

いずれかの接続が「reload_connections」メッセージを受信するたびにすべての接続がメッセージをキューに入れるようにする場合は、固有クラスを介して調整する必要があります。

于 2012-09-08T13:45:50.120 に答える
4

以下は、現在の実装のようなものだと思います。

    class Worker
      def initialize queue
        @queue = queue
        dequeue
      end

      def dequeue
        @queue.pop do |item|
          begin
            work_on item
          ensure
            dequeue
          end
        end
      end

      def work_on item
        case item.type
        when :reload_credentials
          # magic happens here
        else
          # more magic happens here
        end
      end
    end


    q = EM::Queue.new

    workers = Array.new(10) { Worker.new q }

上記の問題は、私の理解が正しければ、worker が reload_credentials ジョブよりも新しいジョブ (プロデューサー タイムラインで以前に到着したジョブ)に取り組んでほしくないということです。以下はこれを提供する必要があります(最後に追加の注意事項)。

    class Worker
      def initialize queue
        @queue = queue
        dequeue
      end

      def dequeue
        @queue.pop do |item|
          begin
            work_on item
          ensure
            dequeue
          end
        end
      end

      def work_on item
        case item.type
        when :reload_credentials
          # magic happens here
        else
          # more magic happens here
        end
      end
    end

    class LockingDispatcher
      def initialize channel, queue
        @channel = channel
        @queue = queue

        @backlog = []
        @channel.subscribe method(:dispatch_with_locking)

        @locked = false
      end

      def dispatch_with_locking item
        if locked?
          @backlog << item
        else
          # You probably want to move the specialization here out into a method or
          # block that's passed into the constructor, to make the lockingdispatcher
          # more of a generic processor
          case item.type
          when :reload_credentials
            lock
            deferrable = CredentialReloader.new(item).start
            deferrable.callback { unlock }
            deferrable.errback  { unlock }
          else
            dispatch_without_locking item
          end
        end
      end

      def dispatch_without_locking item
        @queue << item
      end

      def locked?
        @locked
      end

      def lock
        @locked = true
      end

      def unlock
        @locked = false
        bl = @backlog.dup
        @backlog.clear
        bl.each { |item| dispatch_with_locking item }
      end

    end

    channel = EM::Channel.new
    queue = EM::Queue.new

    dispatcher = LockingDispatcher.new channel, queue

    workers = Array.new(10) { Worker.new queue }

したがって、最初のシステムへの入力は で入りますqが、この新しいシステムでは で入りchannelます。はqueue引き続きワーカー間の作業分散に使用されますが、queue資格情報の更新操作が行われている間はデータが取り込まれません。残念ながら、私はそれ以上の時間を取らなかったのでLockingDispatcher、商品の種類や発送のコードと結び付けられていないなどの一般化はしていませんCredentialsReloader。お任せします。

ここで、これはあなたの最初のリクエストについて私が理解しているサービスですが、一般的にはこの種の要件を緩和する方がよいことに注意してください. その要件を変更しないと本質的に根絶できない未解決の問題がいくつかあります。

  • システムは、資格情報ジョブを開始する前にジョブの実行が完了するのを待たない
  • システムは資格情報ジョブのバーストを非常にうまく処理しません。処理可能な他のアイテムは処理できません。
  • 資格情報コードにバグがある場合、バックログが RAM をいっぱいにして障害を引き起こす可能性があります。コードが中止可能であり、その後のメッセージがそれ以上のデッドロックを回避するのに十分に処理可能である場合、単純なタイムアウトで壊滅的な影響を回避するのに十分な場合があります。

実際には、システムにユーザーIDの概念があるようです。要件を熟考すると、資格情報の更新状態にあるユーザー ID に関連する項目のみをバックログする必要がある可能性があります。これは別の問題であり、別の種類のディスパッチが関係しています。それらのユーザーのロックされたバックログのハッシュを試してください。認証情報の完了時にコールバックを使用して、それらのバックログをワーカーに排出するか、同様の取り決めを行います。

幸運を!

于 2012-09-09T18:44:29.813 に答える