19

ZeroMq ガイドを読んだところ、次のことがわかりました。

スレッド間で ØMQ ソケットを共有してはなりません。ØMQ ソケットはスレッドセーフではありません。技術的には可能ですが、セマフォ、ロック、またはミューテックスが必要です。これにより、アプリケーションが遅くなり、壊れやすくなります。スレッド間でソケットを共有することがリモートで妥当な唯一の場所は、ソケットでガベージ コレクションのような魔法を行う必要がある言語バインディングです。

そして後で:

注意:ソケットを作成したスレッド以外では、ソケットを使用したり閉じたりしないでください。

また、ZeroMQContextがスレッドセーフであることも理解しました。

クラスが別のクラスのイベントに登録する場合、.Net では、このイベントは、リスナーが作成されたスレッドとは異なるスレッドから呼び出される可能性があります。

イベントハンドラー内からZeroMQ-Socketsを介して何かをディスパッチできるオプションは2つしかないと思います。

  • eventhandler-invoking-thread を、ZeroMQ-Socketが作成されたスレッドに同期します。
  • スレッドセーフな ZeroMQ- を使用して、新しい ZeroMQ- を作成するSocket/イベントSocketハンドラ内のスレッドの既存の ZeroMQ- を取得するContext

0MQ-Guide が最初のものを思いとどまらせているようで、スレッドごとに新しい ZeroMq-Socket を作成することはパフォーマンス/方法ではないと思います。

私の質問:
イベントハンドラー内から 0MQ を介してメッセージを発行するための正しいパターン (意図されている方法) は何ですか?

また、ガイドの作成者は、次のように書いたときに、.Net の ZeroMQ-Binding を念頭に置いていましたか?

スレッド間でソケットを共有することがリモートで妥当な唯一の場所は、ソケットでガベージ コレクションのような魔法を行う必要がある言語バインディングです。?

私の問題/質問を強調するためのサンプルコードを次に示します。

public class ExampleClass
{
    public event EventHandler<ByteEventArgs> SomethinIsCalledFromAnotherThread;
}

public class ByteEventArgs : EventArgs
{
    public byte[] BytesToSend;
}


public class Dispatcher
{
    ZMQ.Context ctx;

    public Dispatcher(ZMQ.Context mqcontext, ExampleClass exampleClassInstance)
    {
        this.ctx = mqcontext;
        exampleClassInstance.SomethinIsCalledFromAnotherThread += new EventHandler<ByteEventArgs>(exampleClass_SomethinIsCalledFromAnotherThread);
    }

    void exampleClass_SomethinIsCalledFromAnotherThread(object sender, ByteEventArgs e)
    {
        // this method might be called by a different thread. So I have to get a new socket etc?
        using (var socket = ctx.Socket(ZMQ.SocketType.PUSH))
        {
            // init socket etc..... and finally: 
            socket.Send(e.BytesToSend);
        }
        // isn't that too much overhead?
    }
}
4

3 に答える 3

27

多くの 0MQ ソケットを、確実にスレッドと同じ数だけ作成できます。あるスレッドでソケットを作成し、それを別のスレッドで使用する場合、2 つの操作の間に完全なメモリ バリアを実行する必要があります。ソケットオブジェクトはスレッドセーフではないため、それ以外の場合、libzmq で奇妙なランダムエラーが発生します。

いくつかの従来のパターンがありますが、これらが具体的に .NET にどのようにマップされるかはわかりません。

  1. それらを使用するスレッドでソケットを作成します。1 つのプロセスに密にバインドされているスレッド間でコンテキストを共有し、密にバインドされていないスレッドに個別のコンテンツを作成します。高レベル C API (czmq) では、これらは接続スレッドおよび接続解除スレッドと呼ばれます。
  2. 親スレッドでソケットを作成し、スレッドの作成時に接続されたスレッドに渡します。スレッド作成呼び出しは、完全なメモリ バリアを実行します。それ以降は、子スレッドでのみソケットを使用してください。「使用」とは、recv、send、setsockopt、getsockopt、および close を意味します。
  3. あるスレッドでソケットを作成し、別のスレッドで使用して、使用ごとに独自の完全なメモリ バリアを実行します。これは非常にデリケートであり、「フル メモリ バリア」が何であるかを知らない場合は、これを行うべきではありません。
于 2011-04-30T14:05:01.623 に答える
7

.net Framework v4以降では、同時収集を使用してこの問題を解決できます。つまり、生産者/消費者パターン。複数のスレッド(ハンドラー)がデータをスレッドセーフキューに送信でき、1つのスレッドだけがキューからデータを消費してソケットを使用して送信します。

アイデアは次のとおりです。

sendQueue = new BlockingCollection<MyStuff>(new ConcurrentQueue<MyStuff>());
// concurrent queue can accept from multiple threads/handlers safely
MyHandler += (MyStuff stuffToSend) => sendQueue.Add(stuffToSend);

// start single-threaded data send loop
Task.Factory.StartNew(() => {
    using(var socket = context.Socket()) {
        MyStuff stuffToSend;
        // this enumerable will be blocking until CompleteAdding is called
        foreach(var stuff in sendQueue.GetConsumingEnumerable())
            socket.Send(stuff.Serialize());
    }
});

// break out of the send loop when done
OnMyAppExit += sendQueue.CompleteAdding;
于 2012-11-30T20:58:42.317 に答える
3

インプロセストランスポートを確認することを忘れないでください。スレッド間通信にinproc://ソケットを使用し、他のプロセス/サーバーと通信するためにソケットを開く1つのスレッドがあると便利な場合があります。

スレッドごとに少なくとも1つのソケットが必要ですが、inprocのソケットにはIPネットワーク層はまったく含まれていません。

于 2011-07-01T01:47:30.310 に答える