1

rabbitmq を使用して RPC を構築しようとしています。

rabbitmq http://www.rabbitmq.com/tutorials/tutorial-six-ruby.htmlを使用して RPC を構築するためのチュートリアルによると、クライアントごとに 1 つの応答キューを使用し、correlation_id を使用して応答と要求をマッピングできます。correlation_id の使用方法について混乱していますか?

これが私が実行している問題です。2 つの異なる相関 ID を持つ同じ応答キューを使用して、1 つのクライアントから 2 つの rpc 呼び出しを同期的に作成したいと考えています。ただし、チュートリアルで読んだことから、各クライアントが rpc 呼び出しを順番に行っていると想定しているように見えるため、これが正しいユースケースであるかどうかはわかりません。(この場合、なぜここで correlation_id が必要なのか、さらに混乱します)。

これは、私が達成しようとしているコード例です (rpc_server.rb はチュートリアルと同じです)。うまくいけば、私の質問がより明確になります。

thr1 に設定すると、correlation_id が thr2 によって上書きされるため、以下のコード ブロックは機能しません。

とにかくそれを修正して機能させることはあるのだろうか?@reply_queue.subscribe ブロックを初期化から外して別の call_id を渡そうとしても、thr1 が終了するのを待っている間に @reply-queue がロックされるように見えるため、まだ機能しません。

質問が不明な場合はお知らせください。ご回答いただきありがとうございます。

#!/usr/bin/env ruby
# encoding: utf-8

require "bunny"
require "thread"

conn = Bunny.new(:automatically_recover => false)
conn.start

ch   = conn.create_channel

class FibonacciClient
  attr_reader :reply_queue
  attr_accessor :response, :call_id
  attr_reader :lock, :condition

  def initialize(ch, server_queue)
    @ch             = ch
    @x              = ch.default_exchange

    @server_queue   = server_queue
    @reply_queue    = ch.queue("", :exclusive => true)

    @lock      = Mutex.new
    @condition = ConditionVariable.new
    that       = self

    @reply_queue.subscribe do |delivery_info, properties, payload|
      if properties[:correlation_id] == that.call_id
        that.response = payload.to_i
        that.lock.synchronize{that.condition.signal}
      end
    end
  end

  def call(n)
    self.call_id = self.generate_uuid

    @x.publish(n.to_s,
      :routing_key    => @server_queue,
      :correlation_id => call_id,
      :reply_to       => @reply_queue.name)

    lock.synchronize{condition.wait(lock)}
    response
  end

  protected

  def generate_uuid
    # very naive but good enough for code
    # examples
    "#{rand}#{rand}#{rand}"
  end
end

client   = FibonacciClient.new(ch, "rpc_queue")
thr1 = Thread.new{
    response1 = client.call(30)
    puts response1
}

thr2 = Thread.new{
    response2 = client.call(40)
    puts response2
}

ch.close
conn.close
4

1 に答える 1

0

ここで同じ問題。

'use strict';

const config = require('./config')
const amqp = require('amqplib').connect('amqp://' + config.username + ':' + config.password + '@ali3')
const co = require('co')

const args = process.argv.slice(2)

if (args.length == 0) {
    console.log("Usage: rpc_client.js num");
    process.exit(1)
}

function generateUuid() {
    return Math.random().toString() +
        Math.random().toString() +
        Math.random().toString()
}

function* init(){
    let conn = yield amqp
    let ch = yield conn.createChannel()
    let cbQueue = yield ch.assertQueue('', {exclusive: true})
    return {"conn": conn, "channel": ch, "cbQueue": cbQueue}
}

function* sender(initConfig, msg, resHandler) {
    try {
        let ch = initConfig.channel
        let conn = initConfig.conn
        let cbQueue = initConfig.cbQueue

        const corr = generateUuid()
        console.log(' [x] [%s] Requesting fib(%d)',corr, msg)
        ch.consume(cbQueue.queue, (resultMsg) => {
            resHandler(resultMsg, corr, conn)
        })
        ch.sendToQueue('rpc_queue', new Buffer(msg.toString()), {"correlationId": corr, "replyTo": cbQueue.queue})
    }
    catch (ex) {
        console.warn("ex:", ex)
    }
}

function responseHandler(res, corr, conn) {
    console.log("corr: %s - %s", corr, res.content.toString());//important debug info
    if (res.properties.correlationId == corr)
    {
        console.log(' [.] Got %s', res.content.toString());
        //setTimeout(  () =>  {
        //    conn.close()
        //    process.exit(0)
        //}, 500);
    }
};

function onerror(err) {
    console.error(err.stack);
}

co(function*() {
    let num = parseInt(args[0])
    let initConfig = yield init();
    //let initConfig2 = yield init();
    yield [
        sender(initConfig, num.toString(), responseHandler),
        sender(initConfig, (num+3).toString(), responseHandler)
    ]

}, onerror)

dengwei@RMBAP:~/projects/github/rabbitmq-demo/rpc$ node rpc_client_gen.js 5 [x] [0.64227353665046390.20330130192451180.5467283953912556] Requesting fib(5) [x] [0.461023105075582860.22911424539051950.9930733679793775] Requesting fib(8) corr: 0.64227353665046390 .20330130192451180.5467283953912556 - 5 [.] 得 5 補正: 0.461023105075582860.22911424539051950.9930733679793775 - 21 [.] 得 21

ここでの重要なデバッグから、メッセージがサーバーからコールバック キューに送信され、クライアントによって消費されたことがわかります。uuid が最初のメッセージに設定されていることを確認してください。それで :

if (res.properties.correlationId == corr)

このコード行は結果を無視します。

単一のメッセージを送信する前に毎回初期化しかできないようです。

于 2016-07-20T14:39:20.197 に答える