2

プロデューサー側でフロー制御の状況を処理しようとしています。最大キューサイズが設定された qpid-broker にキューがあります。また、キューに flow_stop_count と flow_resume_count を設定します。

現在、プロデューサは、この flow_stop_count に達するまでメッセージを継続的に生成し続けます。このカウントに違反すると、Exception リスナーによって処理される例外がスローされます。しばらくすると、キューのコンシューマーが追いつき、flow_resume_count に到達します。問題は、プロデューサーがこのイベントをどのように知るかです。

プロデューサーのサンプルコードは次のとおりです。

    connection connection = connectionFactory.createConnection();
    connection.setExceptionListenr(new MyExceptionListerner());
    connection.start();
    Session session = connection.createSession(false,Session.CLIENT_ACKNOWLEDGE);
    Queue queue = (Queue)context.lookup("Test");
    MessageProducer producer = session.createProducer(queue);
    while(notStopped){
        while(suspend){//---------------------------how to resume this flag???
            Thread.sleep(1000);
        }
        TextMessage message = session.createTextMessage();
        message.setText("TestMessage");
        producer.send(message);
    }
    session.close();
    connection.close();

および例外リスナーの場合

    private class MyExceptionListener implements ExceptionListener {
    public void onException(JMSException e) {
        System.out.println("got exception:" + e.getMessage());
        suspend=true;
    }
}

現在、exceptionlistener は例外の一般的なリスナーであるため、それを介してプロデューサー フローを一時停止することはお勧めできません。

私が必要としているのは、メッセージを送信する前に確認するために使用できるproduer.isFlowStopped()のようなプロデューサー レベルのメソッドです。そのような機能は qpid api に存在しますか?

qpid Web サイトには、これを実行できることを示唆するドキュメントがいくつかあります。しかし、これが行われている例はどこにも見つかりませんでした。

この種のシナリオを処理する標準的な方法はありますか。

4

3 に答える 3

2

私が Apache QPid のドキュメントから読んだところによると、flow_resume_count と flow_stop_count によってプロデューサーがブロックされ始めるようです。

したがって、唯一のオプションは、メッセージが再び流れ始めるまで定期的にポーリングすることです。

ここから抽出します

プロデューサが満杯のキューに送信すると、ブローカはクライアントにこれ以上メッセージを送信しないように指示して応答します。これの影響は、ブローカがフロー制御命令を取り消すまで、今後の送信の試みがブロックされることです。

ブロックしている間、クライアントは、フロー制御を待ってブロックされているという事実を定期的にログに記録します。

WARN AMQSession - ブローカによるフロー制御が実施されました WARN AMQSession - ブローカによるフロー制御により、メッセージ送信が 5 秒遅れます WARN AMQSession - ブローカによるフロー制御により、メッセージ送信が 10 秒遅れます呼び出しコードに。

エラー AMQSession - ブローカ強制フロー制御での待機がタイムアウトしたため、メッセージの送信に失敗しました。

このドキュメントから、プロデューサーを管理するソフトウェアが自己管理しなければならないことが暗示されます。したがって、基本的に、キューがいっぱいであるという例外を受け取った場合は、バックオフし、おそらくポーリングしてメッセージの送信を再試行する必要があります。

于 2012-01-21T21:11:08.787 に答える
1

プロデューサ フロー制御は、まだ JMS クライアントに実装されていません。https://issues.apache.org/jira/browse/QPID-3388を参照してください。

于 2012-06-20T18:43:24.277 に答える
1

キューの容量 (キューがいっぱいであると見なされるバイト単位のサイズ) および flowResumeCapacity (プロデューサーがアンフローされるキューのサイズ) プロパティを設定してみることができます。

サイズが容量値を超えると、send() はブロックされます。

アイデアを得るために、レポでこのテストケースファイルを見ることができます。

于 2012-01-21T20:55:03.527 に答える