16

メッセージを読み取らずに、AMQP キューからメッセージを選択的に削除したいと考えています。

シナリオは次のとおりです。

送信側は、タイプ X の新しい情報が到着したという事実に基づいて、タイプ X のメッセージを期限切れにしたいと考えています。サブスクライバーがタイプ X の最新のメッセージをまだ消費していない可能性が非常に高いため、パブリッシャーは以前の X タイプのメッセージを削除し、最新のメッセージをキューに入れる必要があります。操作全体はサブスクライバーに対して透過的である必要があります。実際、サブスクライバーは STOMP のような単純なものを使用してメッセージを取得する必要があります。

AMQP を使用してそれを行う方法は? それとも、別のメッセージング プロトコルの方が便利なのでしょうか?

複雑なインフラは避けたい。必要なメッセージング全体は上記のように単純です: 1 つのキュー、1 つのサブスクライバー、1 つのパブリッシャーですが、パブリッシャーは特定の基準でメッセージをアドホックに削除する機能を備えている必要があります。

パブリッシャーのクライアントは Ruby を使用しますが、実際には、プロトコルでそれを行う方法を見つけ次第、どの言語にも対応します。

4

4 に答える 4

10

メッセージ キューではなく、キー値データベースが必要です。たとえば、Redis や Tokyo Tyrant を使用して、ネットワークからアクセス可能な単純なキー値データベースを取得できます。または、単に memcache を使用します。

各メッセージ タイプがキーです。同じキーで新しいメッセージを書き込むと、以前の値が上書きされるため、このデータベースのリーダーは古い情報を取得できなくなります。

この時点で必要なのは、キーを読み取る順序を確立するためのメッセージ キューのみです (それが重要な場合)。それ以外の場合は、データベースを継続的にスキャンしてください。データベースを継続的にスキャンする場合は、データベースをリーダーの近くに配置して、ネットワーク トラフィックを減らすことをお勧めします。

私はおそらくこのようなことをするでしょう key: typecode value: lastUpdated, important data

次に、That way を含むメッセージを送信します typecode, lastUpdated。リーダーは、そのキーの lastupdated とデータベースから最後に読み取ったキーを比較し、既に最新であるため読み取りをスキップできます。

本当に AMQP でこれを行う必要がある場合は、RabbitMQ とカスタム交換タイプ、具体的には Last Value Cache Exchange を使用してください。サンプルコードはこちらhttps://github.com/squaremo/rabbitmq-lvc-plugin

于 2011-06-01T04:30:00.487 に答える
8

現在、RabbitMQ (より一般的には AMQP) でこれを自動的に行うことはできません。しかし、ここに簡単な回避策があります。

X、Y、Z の 3 種類のメッセージを送信するとします。私があなたの質問を正しく理解していれば、X メッセージが到着したときに、まだ配信されていない他のすべての X メッセージをブローカーに忘れさせたいと考えています。

これは、RabbitMQ で行うのは非常に簡単です。

  • プロデューサは、X、Y、および Z の 3 つのキューを宣言します (これらは、ルーティング キーとして名前を使用してデフォルトの交換に自動的にバインドされます。これは、まさに私たちが望んでいたことです)。
  • メッセージをパブリッシュするとき、プロデューサは最初に関連するキューをパージします (したがって、X メッセージをパブリッシュしている場合は、最初に X キューをパージします); これにより、古いメッセージが効果的に削除されます。
  • コンシューマーは、必要なキューから消費するだけです (X メッセージの場合は X、Y メッセージの場合は Y など)。その観点からは、basic.get を実行して次の関連メッセージを取得するだけです。

これは、2 つのプロデューサがほぼ同時に同じタイプのメッセージを送信した場合の競合状態を意味します。その結果、キューは同時に 2 つ (またはそれ以上) のメッセージを持つことができますが、メッセージの数はプロデューサーの数によって上限があり、余分なメッセージは次のパブリッシュでパージされるためです。 、これは大した問題ではありません。

要約すると、このソリューションには、最適なソリューションから 1 つだけ余分な手順があります。つまり、タイプ X のメッセージを公開する前に、キュー X をパージします。

この構成のセットアップにヘルプが必要な場合は、rabbitmq-discuss メーリング リストでアドバイスを求めるのに最適な場所です。

于 2010-08-08T22:06:26.460 に答える