RabbitMQのサイトのRPCチュートリアルには、キュー内のユーザーへのメッセージを識別できる「相関ID」を渡す方法があります。
メッセージである種のIDを最初の3つのキューに使用してから、別のプロセスで3つのメッセージをある種のバケットにデキューすることをお勧めします。これらのバケットが3つのタスクの完了であると私が想定しているものを受け取ったら、処理のために最後のメッセージを4番目のキューに送信します。
1人のユーザーの各キューに複数の作業項目を送信する場合、特定のユーザーがキューに配置した項目の数を確認するために少し前処理を行う必要がある場合があります。これにより、4より前にデキューするプロセスは、キューに入れる前に予想される数を認識します。上。
私はC#でrabbitmqを実行しているので、疑似コードがphpスタイルではないことを残念に思います
// Client
byte[] body = new byte[size];
body[0] = uniqueUserId;
body[1] = howManyWorkItems;
body[2] = command;
// Setup your body here
Queue(body)
// Server
// Process queue 1, 2, 3
Dequeue(message)
switch(message.body[2])
{
// process however you see fit
}
processedMessages[message.body[0]]++;
if(processedMessages[message.body[0]] == message.body[1])
{
// Send to queue 4
Queue(newMessage)
}
アップデート#1への対応
クライアントを端末と考えるのではなく、クライアントをサーバー上のプロセスと考えると便利な場合があります。したがって、このようなサーバーにRPCクライアントをセットアップする場合、必要なのは、サーバーにユーザーの一意のIDの生成を処理させ、メッセージを適切なキューに送信することだけです。
public function call($uniqueUserId, $workItem) {
$this->response = null;
$this->corr_id = uniqid();
$msg = new AMQPMessage(
serialize(array($uniqueUserId, $workItem)),
array('correlation_id' => $this->corr_id,
'reply_to' => $this->callback_queue)
);
$this->channel->basic_publish($msg, '', 'rpc_queue');
while(!$this->response) {
$this->channel->wait();
}
// We assume that in the response we will get our id back
return deserialize($this->response);
}
$rpc = new Rpc();
// Get unique user information and work items here
// Pass even more information in here, like what queue to use or you could even loop over this to send all the work items to the queues they need.
$response = rpc->call($uniqueUserId, $workItem);
$responseBuckets[array[0]]++;
// Just like above code that sees if a bucket is full or not