18

C++ サーバーと C# WPF UI の 2 つのアプリケーションがあります。C++ コードは、ZeroMQ メッセージング [PUB/SUB] サービスを介して (どこからでも誰からでも) 要求を受け取ります。私は自分の C# コードをバック テストに使用し、「バック テスト」を作成して実行しています。これらのバック テストは、多くの「単体テスト」で構成でき、それぞれが C++ サーバーから何千ものメッセージを送受信します。

現在、個々のバック テストはうまく機能しており、それぞれ数千のリクエストとキャプチャを含む N 個の単体テストを送信できます。私の問題は建築です。別のバック テスト (最初のテストに続いて) をディスパッチすると、ポーリング スレッドがキャンセルおよび破棄されないために、イベント サブスクリプションが 2 回行われるという問題が発生します。これにより、誤った出力が発生します。これは些細な問題のように思えるかもしれませんが (一部の人にとってはそうかもしれません)、現在の構成でこのポーリング タスクをキャンセルするのは面倒です。いくつかのコード...

私のメッセージブローカークラスはシンプルで、次のようになります

public class MessageBroker : IMessageBroker<Taurus.FeedMux>, IDisposable
{
    private Task pollingTask;
    private NetMQContext context;
    private PublisherSocket pubSocket;

    private CancellationTokenSource source;
    private CancellationToken token;
    private ManualResetEvent pollerCancelled;

    public MessageBroker()
    {
        this.source = new CancellationTokenSource();
        this.token = source.Token;

        StartPolling();
        context = NetMQContext.Create();
        pubSocket = context.CreatePublisherSocket();
        pubSocket.Connect(PublisherAddress);
    }

    public void Dispatch(Taurus.FeedMux message)
    {
        pubSocket.Send(message.ToByteArray<Taurus.FeedMux>());
    }

    private void StartPolling()
    {
        pollerCancelled = new ManualResetEvent(false);
        pollingTask = Task.Run(() =>
        {
            try
            {
                using (var context = NetMQContext.Create())
                using (var subSocket = context.CreateSubscriberSocket())
                {
                    byte[] buffer = null;
                    subSocket.Options.ReceiveHighWatermark = 1000;
                    subSocket.Connect(SubscriberAddress);
                    subSocket.Subscribe(String.Empty);
                    while (true)
                    {
                        buffer = subSocket.Receive();
                        MessageRecieved.Report(buffer.ToObject<Taurus.FeedMux>());
                        if (this.token.IsCancellationRequested)
                            this.token.ThrowIfCancellationRequested();
                    }
                }
            }
            catch (OperationCanceledException)
            {
                pollerCancelled.Set();
            }
        }, this.token);
    }

    private void CancelPolling()
    {
        source.Cancel();
        pollerCancelled.WaitOne();
        pollerCancelled.Close();
    }

    public IProgress<Taurus.FeedMux> MessageRecieved { get; set; }
    public string PublisherAddress { get { return "tcp://127.X.X.X:6500"; } }
    public string SubscriberAddress { get { return "tcp://127.X.X.X:6501"; } }

    private bool disposed = false;

    protected virtual void Dispose(bool disposing)
    {
        if (!disposed)
        {
            if (disposing)
            {
                if (this.pollingTask != null)
                {
                    CancelPolling();
                    if (this.pollingTask.Status == TaskStatus.RanToCompletion ||
                         this.pollingTask.Status == TaskStatus.Faulted ||
                         this.pollingTask.Status == TaskStatus.Canceled)
                    {
                        this.pollingTask.Dispose();
                        this.pollingTask = null;
                    }
                }
                if (this.context != null)
                {
                    this.context.Dispose();
                    this.context = null;
                }
                if (this.pubSocket != null)
                {
                    this.pubSocket.Dispose();
                    this.pubSocket = null;
                }
                if (this.source != null)
                {
                  this.source.Dispose();
                  this.source = null;
                }
            }
            disposed = true;
        }
    }

    public void Dispose()
    {
        Dispose(true);
        GC.SuppressFinalize(this);
    }

    ~MessageBroker()
    {
        Dispose(false);
    }
}

バックテストの「エンジン」は、各バック テストを実行するために使用します。最初に、各テスト (ユニット テスト)Dictionaryを含むと、各テストの C++ アプリケーションにディスパッチするメッセージを作成します。Test

DispatchTests方法は、こちら

private void DispatchTests(ConcurrentDictionary<Test, List<Taurus.FeedMux>> feedMuxCollection)
{
    broker = new MessageBroker();
    broker.MessageRecieved = new Progress<Taurus.FeedMux>(OnMessageRecieved);
    testCompleted = new ManualResetEvent(false);

    try
    {
        // Loop through the tests. 
        foreach (var kvp in feedMuxCollection)
        {
            testCompleted.Reset();
            Test t = kvp.Key;
            t.Bets = new List<Taurus.Bet>();
            foreach (Taurus.FeedMux mux in kvp.Value)
            {
                token.ThrowIfCancellationRequested();
                broker.Dispatch(mux);
            }
            broker.Dispatch(new Taurus.FeedMux()
            {
                type = Taurus.FeedMux.Type.PING,
                ping = new Taurus.Ping() { event_id = t.EventID }
            });
            testCompleted.WaitOne(); // Wait until all messages are received for this test. 
        }
        testCompleted.Close();
    }
    finally
    {
        broker.Dispose(); // Dispose the broker.
    }
}

最後のPINGメッセージは、終了したことを C++ に伝えるためのものです。次に、強制的に待機させて、C++ コードからすべての戻り値を受け取る前に次の [ユニット] テストがディスパッチされないようにしますManualResetEvent

C++ が PING メッセージを受信すると、メッセージをそのまま送り返します。受信したメッセージを 経由で処理し、ユニット テストを続行できるようにOnMessageRecievedを設定するように PING から指示されます。ManualResetEvent.Set()"次の方"...

private async void OnMessageRecieved(Taurus.FeedMux mux)
{
    string errorMsg = String.Empty;
    if (mux.type == Taurus.FeedMux.Type.MSG)
    {
        // Do stuff.
    }
    else if (mux.type == Taurus.FeedMux.Type.PING)
    {
        // Do stuff.

        // We are finished reciving messages for this "unit test"
        testCompleted.Set(); 
    }
}

私の問題はbroker.Dispose()、最後に上記がヒットしないことです。バックグラウンド スレッドで実行される finally ブロックが実行されるとは限りません

上記の取り消し線のテキストは、私がコードをいじったためです。子が完了する前に親スレッドを停止していました。ただし、まだ問題があります...

Nowbroker.Dispose()が正しく呼び出され、呼び出されます。このメソッドでは、複数のサブスクリプションを回避するためbroker.Dispose()に、ポーラー スレッドをキャンセルし、正しく破棄しようとします。Task

スレッドをキャンセルするには、CancelPolling()メソッドを使用します

private void CancelPolling()
{
    source.Cancel();
    pollerCancelled.WaitOne(); <- Blocks here waiting for cancellation.
    pollerCancelled.Close();
}

しかし、StartPolling()方法では

while (true)
{
    buffer = subSocket.Receive();
    MessageRecieved.Report(buffer.ToObject<Taurus.FeedMux>());
    if (this.token.IsCancellationRequested)
        this.token.ThrowIfCancellationRequested();
}

ThrowIfCancellationRequested()が呼び出されることはなく、スレッドがキャンセルされることもないため、適切に破棄されることはありません。ポーラー スレッドがsubSocket.Receive()メソッドによってブロックされています。

今、私が望むものを達成する方法が明確ではありません。メッセージをポーリングするために使用されるスレッド以外のスレッドでbroker.Dispose()/を呼び出す必要があり、キャンセルを強制する方法もあります。PollerCancel()スレッドの中止は、私がどうしてもやりたいことではありません。

基本的に、次のバック テストを実行する前に適切に破棄したいのですが、brokerこれを正しく処理し、ポーリングを分割して別のアプリケーション ドメインで実行するにはどうすればよいですか?

ハンドラー内で破棄しようとしOnMessageRecivedましたが、これは明らかにポーラーと同じスレッドで実行され、追加のスレッドを呼び出さずにブロックする方法ではありません。

私が望むものを達成するための最良の方法は何ですか?私が従うことができるこの種のケースのパターンはありますか?

御時間ありがとうございます。

4

2 に答える 2

1

主題に関するより高いレベルの見解

テスト フレームワークの作成に専念するあなたの焦点と努力は、あなたの意志が厳格でプロフェッショナル グレードのアプローチを開発することを目指していることを示しています。そのような勇敢な取り組みに敬意を表し、敬意を表したいと思います。

テストは、テスト対象のシステムが定義された期待を満たしているという合理的な定量的証拠を提供するための重要な活動ですが、これの成功は、テスト環境が実際の展開の条件をどれだけ満たしているかにかかっています。

別の異なるベースでのテストは、テストされた環境とは主に異なる環境で実際の展開が期待どおりに実行されることを証明しないことに同意するかもしれません。


要素ごとの制御か、単に状態ごとの制御か、それが問題です。

あなたの努力 (少なくとも OP が投稿された時点で) は、次のテストバッテリーが開始する前に、インスタンスを所定の位置に保持し、Poller インスタンスの内部状態をリセットしようとするコードアーキテクチャに集中しています。

私の見解では、プロのテストを目指して努力する場合、テストには従うべきいくつかの原則があります。

  • テストの再現性の原則(テストの再実行は同じ結果を提供するため、結果のみを提供する準テストを回避します-「宝くじ」)

  • 非干渉テストの原則(テストの再実行は、テスト シナリオによって制御されない、「外部」干渉の対象とならないものとします)

そうは言っても、注目すべき定量的ポートフォリオ最適化研究でノーベル賞を受賞したハリー・マーコウィッツに触発されたメモをいくつか紹介しましょう。

むしろ、要素の完全なライフサイクルを制御するために一歩後退する

CACI Simulations, Inc. (Harry Markowitz の会社の 1 つ) は、90 年代初頭に主力ソフトウェア フレームワーク COMET III を開発しました。COMET III は、大規模で複雑な設計プロトタイピングおよび大規模コンピューティングで動作するプロセスのパフォーマンス シミュレーションのための非常に強力なシミュレーション エンジンです。ネットワーキング/通信ネットワーク。

COMET III からの最大の印象は、構成可能なテスト前の「ウォームアップ」プリロードを含むテスト シナリオを生成する機能であり、これにより、テストされる要素が機械的「疲労」が意味するものと同様の状態になります。拷問試験実験や、水素拡散脆弱性が原子力発電所の冶金学者にとって何を意味するのか。

はい、アルゴリズム、ノードバッファ、メモリ割り当て、パイプライン / 負荷分散 / グリッド処理アーキテクチャの選択、障害回復オーバーヘッド、ガベージ コレクション ポリシー、および制限付きリソース共有アルゴリズムについて、低レベルの詳細に入ると仕事と影響 (実際の作業負荷パターン「プレッシャー」の下で) エンドツーエンドのパフォーマンス/レイテンシー、この機能は単に不可欠です。

これは、個々のインスタンスに関連する単純な状態ごとの制御は、テストの再現性とテストの分離/非介入動作のいずれかの手段を提供しないため、十分ではないことを意味します。簡単に言えば、Poller インスタンスを「リセット」する方法を見つけたとしても、テスト前のウォームアップが可能で、テストの再現性が保証された現実的なテストにはなりません。

一歩下がって、抽象化とテスト シナリオ コントロールの上位レイヤーが必要です。

これはOPの問題にどのように適用されますか?

  • 単なる国家統制の代わりに
  • マルチレイヤ アーキテクチャ / コントロール プレーン / 個別のシグナリングを作成する

この目標をサポートする ZeroMQ の方法

  • 重要なパターンとして上部構造を作成する
  • テスト シナリオ内で使用されるインスタンスの完全なライフサイクル コントロールを使用する
  • ZeroMQ の格言を守る: ゼロ共有、ゼロブロッキング、...
  • Multi-Context() の利点
于 2015-05-01T10:15:00.240 に答える