17

さて、ここで何が起こっているのかの概要があります:

    M <-- Message with unique id of 1234
    |
    +-Start Queue
    |
    |
    | <-- Exchange
   /|\
  / | \
 /  |  \ <-- bind to multiple queues
Q1  Q2  Q3
\   |   / <-- start of the problem is here
 \  |  / 
  \ | /
   \|/
    |
    Q4 <-- Queues 1,2 and 3 must finish first before Queue 4 can start
    |
    C <-- Consumer 

したがって、複数のキューにプッシュする交換があります。各キューにはタスクがあり、すべてのタスクが完了すると、キュー4を開始できます。

したがって、一意のIDが1234のメッセージが交換に送信され、交換はそれをすべてのタスクキュー(Q1、Q2、Q3など)にルーティングします。メッセージID 1234のすべてのタスクが完了したら、メッセージに対してQ4を実行します。 ID1234。

どうすればこれを実装できますか?

Symfony2、RabbitMQBundle、RabbitMQ3.xを使用する

資力:

更新#1

わかりました、これが私が探しているものだと思います:

並列処理を使用するRPCですが、メッセージをグループ化し、どのキューを識別するために、相関IDを一意のIDに設定するにはどうすればよいですか?

4

5 に答える 5

7

これを実装する必要があります:http : //www.eaipatterns.com/Aggregator.htmlが、SymfonyのRabbitMQBundleはそれをサポートしていないため、基盤となるphp-amqplibを使用する必要があります。

バンドルからの通常のコンシューマーコールバックはAMQPMessageを取得します。そこから、チャネルにアクセスして、「パイプとフィルター」の実装で次に来る交換に手動で公開できます。

于 2012-12-13T21:50:16.927 に答える
5

RabbitMQのサイトのRPCチュートリアルには、キュー内のユーザーへのメッセージを識別できる「相関ID」を渡す方法があります。

メッセージである種のIDを最初の3つのキューに使用してから、別のプロセスで3つのメッセージをある種のバケットにデキューすることをお勧めします。これらのバケットが3つのタスクの完了であると私が想定しているものを受け取ったら、処理のために最後のメッセージを4番目のキューに送信します。

1人のユーザーの各キューに複数の作業項目を送信する場合、特定のユーザーがキューに配置した項目の数を確認するために少し前処理を行う必要がある場合があります。これにより、4より前にデキューするプロセスは、キューに入れる前に予想される数を認識します。上。


私はC#でrabbitmqを実行しているので、疑似コードがphpスタイルではないことを残念に思います

// Client
byte[] body = new byte[size];
body[0] = uniqueUserId;
body[1] = howManyWorkItems;
body[2] = command;

// Setup your body here

Queue(body)

// Server
// Process queue 1, 2, 3
Dequeue(message)

switch(message.body[2])
{
    // process however you see fit
}

processedMessages[message.body[0]]++;

if(processedMessages[message.body[0]] == message.body[1])
{
    // Send to queue 4
    Queue(newMessage)
}

アップデート#1への対応

クライアントを端末と考えるのではなく、クライアントをサーバー上のプロセスと考えると便利な場合があります。したがって、このようなサーバーにRPCクライアントをセットアップする場合、必要なのは、サーバーにユーザーの一意のIDの生成を処理させ、メッセージを適切なキューに送信することだけです。

    public function call($uniqueUserId, $workItem) {
        $this->response = null;
        $this->corr_id = uniqid();

        $msg = new AMQPMessage(
            serialize(array($uniqueUserId, $workItem)),
            array('correlation_id' => $this->corr_id,
            'reply_to' => $this->callback_queue)
        );

        $this->channel->basic_publish($msg, '', 'rpc_queue');
        while(!$this->response) {
            $this->channel->wait();
        }

        // We assume that in the response we will get our id back
        return deserialize($this->response);
    }


$rpc = new Rpc();

// Get unique user information and work items here

// Pass even more information in here, like what queue to use or you could even loop over this to send all the work items to the queues they need.
$response = rpc->call($uniqueUserId, $workItem);

$responseBuckets[array[0]]++;

// Just like above code that sees if a bucket is full or not
于 2012-12-13T14:36:44.927 に答える
2

ここで何を達成しようとしているのか、少しわかりません。ただし、キューからすべてのメッセージがクリアされたら、キュー4に公開する別のエクスチェンジに公開するように、設計を多少変更する可能性があります。

于 2012-12-13T14:30:51.817 に答える
2

RPCベースの回答に加えて、EIPアグリゲーターパターンに基づく別の回答を追加したいと思います。

次のアイデアは次のとおりです。すべてが非同期であり、RPCやその他の同期はありません。すべてのタスクは、それが完了した場合でも送信します。アグリゲーターはそのイベントにサブスクライブされます。基本的にタスクをカウントし、カウンターが期待される数(この場合は3)に達するとtask4メッセージを送信します。シンプルにするために、カウンターのストレージとしてファイルシステムを選択します。そこでデータベースを使用できます。

プロデューサーはよりシンプルに見えます。それはただ発砲して忘れます

<?php
use Enqueue\Client\Message;
use Enqueue\Client\ProducerInterface;
use Enqueue\Util\UUID;
use Symfony\Component\DependencyInjection\ContainerInterface;

/** @var ContainerInterface $container */

/** @var ProducerInterface $producer */
$producer = $container->get('enqueue.client.producer');

$message = new Message('the task data');
$message->setCorrelationId(UUID::generate());

$producer->sendCommand('task1', clone $message);
$producer->sendCommand('task2', clone $message);
$producer->sendCommand('task3', clone $message);

タスクプロセッサは、ジョブが完了するとイベントを送信する必要があります。

<?php
use Enqueue\Client\CommandSubscriberInterface;
use Enqueue\Client\Message;
use Enqueue\Client\ProducerInterface;
use Enqueue\Psr\PsrContext;
use Enqueue\Psr\PsrMessage;
use Enqueue\Psr\PsrProcessor;

class Task1Processor implements PsrProcessor, CommandSubscriberInterface
{
    private $producer;

    public function __construct(ProducerInterface $producer)
    {
        $this->producer = $producer;
    }

    public function process(PsrMessage $message, PsrContext $context)
    {
        // do the job

        // same for other
        $eventMessage = new Message('the event data');
        $eventMessage->setCorrelationId($message->getCorrelationId());

        $this->producer->sendEvent('task_is_done', $eventMessage);

        return self::ACK;
    }

    public static function getSubscribedCommand()
    {
        return 'task1';
    }
}

そして、アグリゲータープロセッサ:

<?php

use Enqueue\Client\TopicSubscriberInterface;
use Enqueue\Psr\PsrContext;
use Enqueue\Psr\PsrMessage;
use Enqueue\Psr\PsrProcessor;
use Symfony\Component\Filesystem\LockHandler;

class AggregatorProcessor implements PsrProcessor, TopicSubscriberInterface
{
    private $producer;
    private $rootDir;

    /**
     * @param ProducerInterface $producer
     * @param string $rootDir
     */
    public function __construct(ProducerInterface $producer, $rootDir)
    {
        $this->producer = $producer;
        $this->rootDir = $rootDir;
    }

    public function process(PsrMessage $message, PsrContext $context)
    {
        $expectedNumberOfTasks = 3;

        if (false == $cId = $message->getCorrelationId()) {
            return self::REJECT;
        }

        try {
            $lockHandler = new LockHandler($cId, $this->rootDir.'/var/tasks');
            $lockHandler->lock(true);

            $currentNumberOfProcessedTasks = 0;
            if (file_exists($this->rootDir.'/var/tasks/'.$cId)) {
                $currentNumberOfProcessedTasks = file_get_contents($this->rootDir.'/var/tasks/'.$cId);

                if ($currentNumberOfProcessedTasks +1 == $expectedNumberOfTasks) {
                    unlink($this->rootDir.'/var/tasks/'.$cId);

                    $this->producer->sendCommand('task4', 'the task data');

                    return self::ACK;
                }
            }

            file_put_contents($this->rootDir.'/var/tasks/'.$cId, ++$currentNumberOfProcessedTasks);

            return self::ACK;
        } finally {
            $lockHandler->release();
        }
    }

    public static function getSubscribedTopics()
    {
        return 'task_is_done';
    }
}
于 2017-06-22T12:26:02.600 に答える
0

enqueue-bundleを使用してそれを行う方法を紹介します。

したがって、composerを使用してインストールし、他のバンドルとして登録します。次に、以下を構成します。

// app/config/config.yml

enqueue:
  transport:
    default: 'amnqp://'
  client: ~

このアプローチはRPCに基づいています。方法は次のとおりです。

<?php
use Enqueue\Client\ProducerInterface;
use Symfony\Component\DependencyInjection\ContainerInterface;

/** @var ContainerInterface $container */

/** @var ProducerInterface $producer */
$producer = $container->get('enqueue.client.producer');

$promises = new SplObjectStorage();

$promises->attach($producer->sendCommand('task1', 'the task data', true));
$promises->attach($producer->sendCommand('task2', 'the task data', true));
$promises->attach($producer->sendCommand('task3', 'the task data', true));

while (count($promises)) {
    foreach ($promises as $promise) {
        if ($replyMessage = $promise->receiveNoWait()) {
            // you may want to check the response here
            $promises->detach($promise);
        }
    }
}

$producer->sendCommand('task4', 'the task data');

コンシューマプロセッサは次のようになります。

use Enqueue\Client\CommandSubscriberInterface;
use Enqueue\Consumption\Result;
use Enqueue\Psr\PsrContext;
use Enqueue\Psr\PsrMessage;
use Enqueue\Psr\PsrProcessor;

class Task1Processor implements PsrProcessor, CommandSubscriberInterface
{
    public function process(PsrMessage $message, PsrContext $context)
    {
        // do task job

        return Result::reply($context->createMessage('the reply data'));
    }

    public static function getSubscribedCommand()
    {
        // you can simply return 'task1'; if you do not need a custom queue, and you are fine to use what enqueue chooses. 

        return [
          'processorName' => 'task1',
          'queueName' => 'Q1',
          'queueNameHardcoded' => true,
          'exclusive' => true,
        ];
    }
}

タグを使用してサービスとしてコンテナに追加し、enqueue.client.processorコマンドを実行しますbin/console enqueue:consume --setup-broker -vvv

これがプレーンなPHPバージョンです。

于 2017-06-22T11:51:43.537 に答える