1

ActiveMQ に対して書いている高スループット プロデューサーのパフォーマンスを向上させようとしています。このuseAsyncSend によると、次のようになります。

パフォーマンスを大幅に向上させる非同期送信の使用を強制します。ただし、メッセージが送信されたかどうかに関係なく send() メソッドがすぐに返されるため、メッセージが失われる可能性があります。

ただし、単純なテストケースに違いがあるとは思えません。

この非常に基本的なアプリケーションを使用して:

const string QueueName = "....";
const string Uri = "....";

static readonly Stopwatch TotalRuntime = new Stopwatch();

static void Main(string[] args)
{
    TotalRuntime.Start();
    SendMessage();
    Console.ReadLine();
}

static void SendMessage()
{
    var session = CreateSession();
    var destination = session.GetQueue(QueueName);
    var producer = session.CreateProducer(destination);

    Console.WriteLine("Ready to send 700 messages");
    Console.ReadLine();

    var body = new byte[600*1024];

    Parallel.For(0, 700, i => SendMessage(producer, i, body, session));         
}

static void SendMessage(IMessageProducer producer, int i, byte[] body, ISession session)
{
     var message = session.CreateBytesMessage(body);

     var sw = new Stopwatch();
     sw.Start();
     producer.Send(message);
     sw.Stop();

     Console.WriteLine("Running for {0}ms: Sent message {1} blocked for {2}ms", 
            TotalRuntime.ElapsedMilliseconds, 
            i, 
            sw.ElapsedMilliseconds);
}       

static ISession CreateSession()
{
     var connectionFactory = new ConnectionFactory(Uri)
                                    {
                                        AsyncSend = true,
                                        CopyMessageOnSend = false
                                    };
     var connection = connectionFactory.CreateConnection();
     connection.Start();
     var session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
     return session;
}

次の出力が得られます。

Ready to send 700 messages

Running for 2430ms: Sent message 696 blocked for 12ms
Running for 4275ms: Sent message 348 blocked for 1858ms
Running for 5106ms: Sent message 609 blocked for 2689ms
Running for 5924ms: Sent message 1 blocked for 2535ms
Running for 6749ms: Sent message 88 blocked for 1860ms
Running for 7537ms: Sent message 610 blocked for 2429ms
Running for 8340ms: Sent message 175 blocked for 2451ms
Running for 9163ms: Sent message 89 blocked for 2413ms
.....

これは、各メッセージの送信に約 800 ミリ秒かかり、呼び出しが約 2.5 秒間session.Send()ブロックされることを示しています。ドキュメントにそう書いてあるのに

「send() メソッドはすぐに戻ります」

また、これらの数は、並列 for を通常の for ループに変更するか、または to に変更しても基本的に同じであるためAsyncSend = trueAlwaysSyncSend = true非同期スイッチがまったく機能しているとは思えません...

送信を非同期にするためにここで何が欠けているかを誰かが見ることができますか?


さらにテストした後:

ANTS パフォーマンス プロファイラーによると、ランタイムの大部分が同期の待機に費やされています。問題は、さまざまなトランスポート クラスがモニターを介して内部的にブロックしていることにあるようです。特に、一度に 1 つのスレッドしかアクセスできない MutexTransportの OneWay メソッドにハングアップしているようです。

Send の呼び出しは、前のメッセージが完了するまでブロックされるように見えます。これが、最初のメッセージが 12 ミリ秒ブロックされ、次のメッセージが 1858 ミリ秒かかったことを出力が示している理由を説明しています。メッセージごとの接続パターンを実装することで複数のトランスポートを使用できます。これにより、問題が改善され、メッセージ送信が並行して機能しますが、単一のメッセージを送信する時間が大幅に増加し、非常に多くのリソースを使用するようには見えません。正しい解決策。

これらすべてを 1.5.6 で再テストしましたが、違いは見られませんでした。

4

1 に答える 1

0

いつものように、最新バージョン (この記事の執筆時点では 1.5.6) に更新することをお勧めします。ブローカでプロデューサ フロー制御が有効になっており、キュー サイズの制限に達した場合、送信がブロックされる可能性があります。サポートを受ける良い方法の 1 つは、テスト ケースを作成し、Jira 課題を介して NMS.ActiveMQ サイトに送信して、テスト コードを使用して調査できるようにすることです。1.5.1 以降、多くの修正が行われているため、新しいバージョンを試してみることをお勧めします。

于 2012-08-03T21:55:24.413 に答える