2

RabbitMQ (Spring AMQP または Java クライアントを直接使用) で着信メッセージを合体またはチャンクする最良の方法を理解しようとしています。

言い換えれば、私は100個の着信メッセージを取り、それらを1つに結合し、信頼できる(正しくACK編集された方法で)別のキューに再送信したいと考えています。これは、EIPではアグリゲーターパターンと呼ばれていると思います。

Spring Integrationがアグリゲーター ソリューションを提供することは知っていますが、実装はフェイル セーフではないように見えます (つまり、結合されたメッセージを構築するためにメッセージを確認して消費する必要があるように見えるため、これを行っている間にシャットダウンすると、メッセージが失われますか? )。

4

2 に答える 2

3

Spring Integrationライブラリについて直接コメントすることはできないため、RabbitMQに関して一般的に説明します。

アグリゲーターのSpringIntegration実装に100%確信がなく、自分で実装しようとしている場合はtx、RabbitMQの内部でトランザクションを使用するものを使用しないことをお勧めします。

RabbitMQのトランザクションは低速であり、トラフィック/スループットの高いシステムを構築している場合は、パフォーマンスの問題が確実に発生します。

むしろ、RabbitMQに実装されているAMQPの拡張機能であるPublisherConfirmsを確認することをお勧めします。これは、新しいhttp://www.rabbitmq.com/blog/2011/02/10/introducing-publisher-confirms/での紹介です。

パフォーマンスを正しくするには、プリフェッチ設定を微調整する必要があります。http: //www.rabbitmq.com/blog/2012/05/11/some-queuing-theory-throughput-latency-and-bandwidth/をご覧ください。詳細については。

上記のすべてはあなたにあなたの問題を解決するのを助けるためにあなたにいくつかの背景を与えます。実装はかなり簡単です。

コンシューマーを作成するときは、ACKが必要になるように設定する必要があります。

  1. n個のメッセージをデキューします。デキューするときに、各メッセージのDeliveryTagをメモする必要があります(これはメッセージのACKを確認するために使用されます)
  2. メッセージを新しいメッセージに集約します
  3. 新しいメッセージを公開する
  4. デキューされた各メッセージをACKする

注意すべき点の1つは、消費者が3を過ぎてから、4が完了する前に死亡した場合、ACKされなかったメッセージは、復活したときに再処理されるということです。

于 2013-02-26T09:49:07.017 に答える
2

この属性を 100 に設定する<amqp-inbound-channel-adapter/> tx-sizeと、コンテナーは 100 メッセージごとに ack するため、メッセージの損失を防ぐことができます。

ただし、集約されたメッセージの送信 (100 回目の受信時) をトランザクションに対応させて、受信メッセージの確認応答の前にブローカーがメッセージを持っていることを確認したい場合があります。

于 2013-02-25T16:45:19.150 に答える