3

jeromq 0.3.2 を使用して Java で記述されたマルチスレッド アプリケーションがあります。Windows サービスとして実行できるように Tanuki サービス ラッパーに組み込む作業を行っていますが、スレッドをきれいに停止するのに問題があります。プロキシのメソッドは、ガイドのSimple Pirateパターンrun()のバリエーションであり、次のようになります。

public void run()
{
    ZContext context = new ZContext();
    Socket frontend = context.createSocket(ZMQ.ROUTER);
    Socket backend = context.createSocket(ZMQ.ROUTER);
    frontend.bind("tcp://*:5555");
    backend.bind("inproc://workers");

    while (!Thread.currentThread().isInterrupted())
    {
        ZMQ.Poller poller = new ZMQ.Poller(2);
        poller.register(frontend, ZMQ.Poller.POLLIN);
        poller.register(backend, ZMQ.Poller.POLLIN);
        poller.poll();
        if (poller.pollin(0))
        {
            processFrontend(frontend, backend, context);
        }
        if (poller.pollin(1))
        {
            processBackend(frontend, backend);
        }
    }

    // additonal code to close worker threads
}

制御ラッパーがアプリケーションを停止したい場合、このループをきれいに終了するにはどうすればよいですか?

現在接続されているクライアントがない場合、ループは への呼び出しでブロックされるpoller.poll()ため、ラッパーがinterrupt()スレッドで呼び出しても無視されます。現在メッセージを送信しているクライアントがある場合、 を呼び出すinterrupt()と、poller.poll()メソッドはzmq.ZError$IOException: java.nio.channels.ClosedByInterruptException

私も使用しようとしました:

PollItem[] items = {
    new PollItem(frontend, Poller.POLLIN),
    new PollItem(backend, Poller.POLLIN)
};
int rc = ZMQ.poll(items, 2, -1);

if (rc == -1)
    break;

if (items[0].isReadable())
{
    processFrontend(frontend, backend, context);
}

if (items[1].isReadable())
{
    processBackend(frontend, backend);
}

しかし、 への呼び出しZMQ.pollは同じ動作を示します。次の 2 つの方法があります。

  • タイムアウトをオンに設定し、メソッドの内容を IOException の try/catch に ZMQ.pollラップします。run()
  • processFrontendRunnable にメソッドを追加します。このメソッドはフロントエンドに接続し、読み込まれてコードがループから抜け出す特別なメッセージを送信します。

最初のものは少し汚れているように見え、2番目のものは少しもろいように感じます. 私が考慮すべき他のアプローチはありますか?スレッド割り込みによりきれいに反応する、使用できる他のポーリング方法はありますか?

4

1 に答える 1

3

ループを別のスレッドに抽出し、追加の inprocPULLソケットを開くことをお勧めします。これにより、ループは 3 つのソケットからメッセージをポーリングします:frontendと。メイン スレッドが割り込み信号を受信した場合、プロキシ ループにコマンドを送信する必要があります。backendcontrolSTOP

まさにこのパターンが私のプロジェクトに実装されています: https://github.com/thriftzmq/thriftzmq-java/blob/master/thriftzmq/src/main/java/org/thriftzmq/ProxyLoop.java

メインスレッドでは、ループ内からループにコマンドを追加しshutdownHookて送信できます。STOP

 Runtime.getRuntime().addShutdownHook(new Thread() {
    @Override
    public void run() {
        try {
            //Connect to controlSocket and send STOP command
            ZMQ.Socket peer = context.socket(ZMQ.PUSH);
            peer.connect(CONTROL_ENDPOINT);
            peer.send(STOP_MESSAGE, 0);
            peer.close();
        } catch (Exception ex) {
            logger.error("Failed to stop service", ex);
            System.exit(1);
        }
   }
});

PS guava-library のプリミティブを使用してサービスのライフサイクルを管理すると、作業がずっと楽になります。

于 2014-02-27T12:48:09.380 に答える