9

Pub/Sub モデルで ZeroMQ (clrzmq .net bindings (x86) via nuget) を使用する C# Winform アプリケーションを作成しようとしています。

多くの検索の結果、コードが while ステートメントを使用して新しいメッセージを無期限に処理するスタンドアロンの C# の例しか見つかりません。これらの例を使用しようとすると、どこにコードを配置すればよいかわかりません。また、GUI やその他すべてがブロックされるだけです。

別のスレッドを使用しないとできないかどうかはわかりませんが、余分なスレッドをコーディングしなくても、ZeroMQ の非同期動作が機能するという印象を受けました。おそらく、zeromq コードをどこに置くべきかわからないか、別のスレッドが本当に必要なのかもしれません。

誰かが、コードを実際にデフォルトの C# winform アプリケーションに挿入する場所の指示とともに、単純な pub/sub の例を提供できれば、非常に高く評価されます。

4

1 に答える 1

7

プロジェクトでclrzmq ZeroMq ラッパーを使用していると仮定しています。私が知る限り、clrzmq を使用して単純なループで非ブロックのメッセージを受信することはできません。特定の時間 (受信メソッドにタイムアウトを指定することにより)、またはメッセージを受信するまで、無期限にブロックされます。 .

ただし、ソケットを定期的にポーリングし、着信メッセージをQueue. 次に、たとえば単純な WinForms を使用してTimer、その (shared) から保留中のメッセージを定期的にデキューできますQueue。スレッド化されたサブスクライバーの実際の例を次に示します。

public class ZeroMqSubscriber
{
    private readonly ZmqContext _zmqContext;
    private readonly ZmqSocket _zmqSocket;
    private readonly Thread _workerThread;
    private readonly ManualResetEvent _stopEvent = new ManualResetEvent(false);
    private readonly object _locker = new object();
    private readonly Queue<string> _queue = new Queue<string>();

    public ZeroMqSubscriber(string endPoint)
    {
        _zmqContext = ZmqContext.Create();
        _zmqSocket = _zmqContext.CreateSocket(SocketType.SUB);
        _zmqSocket.Connect(endPoint);
        _zmqSocket.SubscribeAll();

        _workerThread = new Thread(ReceiveData);
        _workerThread.Start();
    }

    public string[] GetMessages()
    {
        lock (_locker)
        {
            var messages = _queue.ToArray();
            _queue.Clear();
            return messages;
        }
    }

    public void Stop()
    {
        _stopEvent.Set();
        _workerThread.Join();
    }

    private void ReceiveData()
    {
         try
         {
             while (!_stopEvent.WaitOne(0))
             {
                 var message = _zmqSocket.Receive(Encoding.UTF8, 
                               new TimeSpan(0, 0, 0, 1));
                 if (string.IsNullOrEmpty(message))
                     continue;

                 lock (_locker)
                     _queue.Enqueue(message);
             }
         }
         finally
         {
             _zmqSocket.Dispose();
             _zmqContext.Dispose();
         }
    }
}

からForm定期的に Queue をポーリングするだけです (この例では を使用しForms Timer、単にメッセージ データを に追加しますTextbox)。

private readonly ZeroMqSubscriber _zeroMqSubscriber = 
        new ZeroMqSubscriber("tcp://127.0.0.1:5000");

void ReceiveTimerTick(object sender, EventArgs e)
{
    var messages = _zeroMqSubscriber.GetMessages();
    foreach (var message in messages)
        _textbox.AppendText(message + Environment.NewLine);
}
于 2013-02-10T11:56:09.340 に答える