5

この説明と同様の理由で、あるRails3アプリケーションから別のアプリケーションへの同期RPC呼び出しのためにRESTの代わりにメッセージングを実験しています。どちらのアプリもシンで実行されています。

「サーバー」アプリケーションには、 rubyamqp.infoドキュメントconfig/initializers/amqp.rbの要求/応答パターンに基づくファイルがあります。

require "amqp"

EventMachine.next_tick do
  connection = AMQP.connect ENV['CLOUDAMQP_URL'] || 'amqp://guest:guest@localhost'
  channel    = AMQP::Channel.new(connection)

  requests_queue = channel.queue("amqpgem.examples.services.time", :exclusive => true, :auto_delete => true)
  requests_queue.subscribe(:ack => true) do |metadata, payload|
    puts "[requests] Got a request #{metadata.message_id}. Sending a reply..."
    channel.default_exchange.publish(Time.now.to_s,
                                     :routing_key    => metadata.reply_to,
                                     :correlation_id => metadata.message_id,
                                     :mandatory      => true)
    metadata.ack
  end

  Signal.trap("INT") { connection.close { EventMachine.stop } }
end

「クライアント」アプリケーションで、「サーバー」への同期呼び出しの結果をビューに表示したいと思います。これは、amqp gemのような本質的に非同期のライブラリの快適ゾーンから少し外れていることを認識していますが、それを機能させる方法があるかどうか疑問に思っています。これが私のクライアントですconfig/initializers/amqp.rb

require 'amqp'

EventMachine.next_tick do
  AMQP.connection = AMQP.connect 'amqp://guest:guest@localhost'  
  Signal.trap("INT") { AMQP.connection.close { EventMachine.stop } }
end

コントローラは次のとおりです。

require "amqp"

class WelcomeController < ApplicationController
  def index    
    puts "[request] Sending a request..."

    WelcomeController.channel.default_exchange.publish("get.time",
      :routing_key => "amqpgem.examples.services.time",
      :message_id  => Kernel.rand(10101010).to_s,
      :reply_to    => WelcomeController.replies_queue.name)

    WelcomeController.replies_queue.subscribe do |metadata, payload|
      puts "[response] Response for #{metadata.correlation_id}: #{payload.inspect}"
      @message = payload.inspect
    end   
  end

  def self.channel
    @channel ||= AMQP::Channel.new(AMQP.connection)
  end

  def self.replies_queue
    @replies_queue ||= channel.queue("reply", :exclusive => true, :auto_delete => true)
  end
end

welcome#index異なるポートで両方のアプリケーションを起動し、ビュー にアクセスしたとき。@message結果がまだ返されていないため、ビューではnilです。結果は、ビューがレンダリングされてコンソールに表示されてから数ミリ秒後に到着します。

$ thin start
>> Using rack adapter
>> Thin web server (v1.5.0 codename Knife)
>> Maximum connections set to 1024
>> Listening on 0.0.0.0:3000, CTRL+C to stop
[request] Sending a request...
[response] Response for 3877031: "2012-11-27 22:04:28 -0600"

ここで驚くことでsubscribeはありません。明らかに同期呼び出しを対象としたものではありません。驚くべきことは、AMQPgemソースコードやオンラインのドキュメントで同期の代替手段が見つからないことです。それに代わる方法で、subscribe必要なRPC動作が得られますか?私が合法的に非同期呼び出しを使用したいシステムの他の部分があることを考えると、バニージェムはその仕事に適したツールのようには思えませんでした。別の見方をする必要がありますか?

サムストークスに応じて編集

:async/async.callbackをスローするためのポインターを提供してくれたSamに感謝します。私はこれまでこのテクニックを見たことがなく、これはまさに私が最初にこの実験で学ぼうとしていたようなものです。send_response.finishRails 3ではなくなりましたが、マイナーな変更を加えるだけで、少なくとも1つのリクエストに対して彼の例を機能させることができました。

render :text => @message
rendered_response = response.prepare!

後続のリクエストは。で失敗し!! Unexpected error while processing request: deadlock; recursive lockingます。これは、SamがActionControllerで同時リクエストを許可することについてのコメントで得ていたものかもしれませんが、引用された要点はRails 2でのみ機能します。development.rbを追加するconfig.allow_concurrency = trueと、Rails 3でこのエラーがなくなりますがThis queue already has default consumer.、AMQPから発生します。

このヤクは十分に剃られていると思います。;-)

興味深いことですが、これは単純なRPCでは明らかにやり過ぎです。このSinatraストリーミングの例のようなものは、クライアントと応答をやり取りするためのより適切なユースケースのようです。Tenderloveには、 AMQPで動作する可能性のあるRails4でイベントをストリーミングする今後の方法についてのブログ投稿もあります。

サムがHTTPの代替案についての議論で指摘しているように、REST / HTTPは、2つのRailsアプリを含む私のシステムのRPC部分にとって完全に理にかなっています。Clojureアプリへのより古典的な非同期イベント公開を含むシステムの他の部分があります。これらの場合、Railsアプリはファイアアンドフォーゲット方式でイベントを公開するだけでよいため、AMQPは、応答キューなしで元のコードを使用してそこで正常に機能します。

4

1 に答える 1

5

必要な動作を得ることができます-クライアントに単純なHTTPリクエストを作成させ、Webアプリはこれに非同期で応答します-しかし、さらに多くのトリックが必要です。非同期応答にはThinのサポートを使用する必要があります。

require "amqp"

class WelcomeController < ApplicationController
  def index
    puts "[request] Sending a request..."

    WelcomeController.channel.default_exchange.publish("get.time",
      :routing_key => "amqpgem.examples.services.time",
      :message_id  => Kernel.rand(10101010).to_s,
      :reply_to    => WelcomeController.replies_queue.name)

    WelcomeController.replies_queue.subscribe do |metadata, payload|
      puts "[response] Response for #{metadata.correlation_id}: #{payload.inspect}"
      @message = payload.inspect

      # Trigger Rails response rendering now we have the message.
      # Tested in Rails 2.3; may or may not work in Rails 3.x.
      rendered_response = send_response.finish

      # Pass the response to Thin and make it complete the request.
      # env['async.callback'] expects a Rack-style response triple:
      # [status, headers, body]
      request.env['async.callback'].call(rendered_response)
    end

    # This unwinds the call stack, skipping the normal Rails response
    # rendering, all the way back up to Thin, which catches it and
    # interprets as "I'll give you the response later by calling
    # env['async.callback']".
    throw :async
  end

  def self.channel
    @channel ||= AMQP::Channel.new(AMQP.connection)
  end

  def self.replies_queue
    @replies_queue ||= channel.queue("reply", :exclusive => true, :auto_delete => true)
  end
end

クライアントに関する限り、結果は、応答を返す前に同期呼び出しでブロックされているWebアプリと区別できません。しかし今、あなたのウェブアプリはそのような多くのリクエストを同時に処理することができます。

注意!

非同期レールは高度な技術です。あなたは自分が何をしているのかを知る必要があります。Railsの一部は、コールスタックを突然解体することに親切に対応していません。は、throwそれをキャッチして再スローすることを知らないラックミドルウェアをバイパスします(これはかなり古い部分的な解決策です)。ActiveSupportの開発モードクラスのリロードはthrow、応答を待たずに、の後にアプリのクラスをリロードします。これにより、コールバックがリロードされたクラスを参照している場合、非常に紛らわしい破損が発生する可能性があります。また、同時リクエストを許可するには、ActionControllerに適切に要求する必要があります。

要求/応答

また、リクエストとレスポンスを一致させる必要があります。現状では、リクエスト1が到着し、リクエスト1が応答を受け取る前にリクエスト2が到着した場合、どのリクエストがレスポンス1を受信するかは未定義です(キュー上のメッセージは、キューにサブスクライブしているコンシューマー間でラウンドロビンで分散されます)。

これを行うには、correlation_idを調べ(ちなみに、明示的に設定する必要があります。RabbitMQはそれを行いません!)、待っていた応答でない場合はメッセージを再キューイングします。私のアプローチは、開いているリクエストを追跡し、すべてのレスポンスをリッスンし、correlation_idに基づいて呼び出す適切なコールバックを検索する永続的なPublisherオブジェクトを作成することでした。

代替:HTTPを使用するだけ

ここでは、2つの異なる(そしてトリッキーな!)問題を実際に解決しています。Rails/ thinを説得してリクエストを非同期に処理することと、AMQPのパブリッシュ/サブスクライブモデルにリクエスト/レスポンスセマンティクスを実装することです。これは2つのRailsアプリ間での呼び出し用であるとおっしゃっていたとしたら、必要な要求/応答セマンティクスがすでに備わっているHTTPだけを使用してみませんか?そうすれば、最初の問題を解決するだけで済みます。em-http-requestなどの非ブロッキングHTTPクライアントライブラリを使用する場合でも、同時リクエスト処理を取得できます。

于 2012-11-28T23:00:42.520 に答える