4

Spring Reactor Stream API (rxjava に類似) を使用して、2 つのダウンストリーム サービスによって提供される応答をラップするサービスで応答オブジェクトを構築するのに苦労しています。

以下は、accept()私の Channel Consumer のメソッドです。罪のない人を保護するために、いくつかの名前が変更されています..

@オーバーライド
public void accept(最終的な NetChannel チャネル) {
    log.info("FullHttpRequest の NetChannel を消費しています");
    試す {
        // 最初の Stream は、XML ドキュメントを含む単一の HTTP リクエストです。
        ストリーム requestStream = channel.in();

        requestStream.filter((x) -> {return true;})
                .dispatchOn(ディスパッチャ)
                .map(新しい FooRequestFunction()) // 1)
                .flatMap(新しい BarRequestStreamFunction()) // 2)
                .flatMap(new DownstreamRequestZipFunction()) // 3)
                .toList() // 4)
                .onComplete(新しい ResponsesConsumer(チャネル)); // 5)

    キャッチ(例外e){
        log.error("チャネル処理中に例外がスローされました", e);
    }
}

したがって、 はFooRequest多数の をラップしBarRequests、それぞれに 1 つの関連付けられた Classify 要求と 1 つの関連付けられた Validate 要求があります。1) に変換するFooRequest、2)FooRequestを一連の に変換するBarRequests、3) それぞれに対して 2 つのダウンストリーム リクエストを実行するBarRequest、4) すべてのBarResponseオブジェクトを全体的な応答に集約する、5) クライアントに応答を送り返す。

私が問題に遭遇するポイントは、toList()決して実行されないように見えるメソッドです。を含む何かを試みるたびに、Promiseそれは常に壊れているように見えますが、これも例外ではありません.

FooRequestFunctionBarRequestStreamFunctionかなり単純で、問題なく動作するようです。それらのメソッド シグネチャは次のとおりです。

// FooRequestFunction
public FooRequest apply(final FullHttpRequest request);

と:

// BarRequestStreamFunction
public Stream<BarRequest> apply(FooRequest dsoRequests);

DownstreamRequestZipFunction次のようになります。

@オーバーライド
public Stream apply(BarRequest t) {
    ストリーム classifyRes = ストリーム
            .just(t)
            .flatMap(新しい ClassifyDownstreamRequestFunction());

    ストリーム validateRes = ストリーム
            .just(t)
            .flatMap(新しい ValidateDownstreamRequestFunction());

    return Streams.zip(classifyRes, validateRes, (tuple) -> {
        BarResponse 応答 = 新しい BarResponse();
        response.setClassifyRes(tuple.getT1());
        response.setValidateRes(tuple.getT2());
        応答を返します。
    });
}

両方のダウンストリーム リクエスト関数が結果を返す限り、これは問題なく機能するようです。

最後に、連鎖呼び出しの最後にある Consumer には、次の署名があります。

// ResponsesConsumer
public void accept(Promise<List<BarResponse>> responses)

これが行うのは、応答の約束を await() し、それらの応答すべてをチャネルに書き戻された単一の XML ドキュメントに集約することです。ロギングが発生しないため、実行がこのメソッドまで到達しないことがわかります。すべてが .toList() で停止しているようです。

このセットアップが実行されたように見える理由toList()や、その後何かを知っている人はいますか?

編集:わかりました、もう少し情報があります。デバッグを容易にするためにアプリケーション内の各スレッドに命名規則を指定した後、accept() メソッドを実行するスレッドが「shared-1」で待機状態になり、そこにとどまっていることがわかります。これは、基になる Dispatcher がリングバッファ ディスパッチャであり、シングル スレッドであるという事実に関係している可能性があります。

アプローチが少し異なるようにコードを変更し、マルチスレッド ディスパッチャーを使用して、 を使用しPromiseないようにしましたが、連鎖した一連の呼び出しの末尾が実行されない状態が残っています。下記参照:

@オーバーライド
public void accept(最終的な NetChannel チャネル) {
    log.info("FullHttpRequest の NetChannel を消費しています");
    試す {
        // 最初の Stream は、XML ドキュメントを含む単一の HTTP リクエストです。
        ストリーム requestStream = channel.in();

        requestStream.filter((x) -> {return true;})
                .dispatchOn(ディスパッチャ)
                .map(新しい FooRequestFunction()) // 1)
                .flatMap(新しい BarRequestStreamFunction()) // 2)
                .flatMap(new DownstreamRequestZipFunction()) // 3)
                .reduce(new ArrayList(), (list,resp) -> {log.info("Reducing"); list.add(resp); リストを返す;}) // 4)
                .consumeOn((x)->{log.info("消費");}, (x)->{log.error("エラー");}, (x)->{log.info("完了" );}、 発車係); // 5)

    キャッチ(例外e){
        log.error("チャネル処理中に例外がスローされました", e);
    }
}

上記では、toList() を reduce() の呼び出しに置き換え、すべてを 1 つの にまとめましたList<BarResponse>。これが実行され、ログに記録されていることがわかります。ただし、consume()、consumeOn()などを試した後、最後の呼び出しで何をしても、実行されることはなく、上記の最後の呼び出しがログに記録されることもありません。

VisualVM を見ると、ディスパッチャー スレッドはすべて、ブロッキング キューに関連付けられた同じオブジェクト モニターで待機していることがわかります。つまり、すべてのスレッドが作業の到着を待機しています。末尾の consumerOn() 呼び出しが完全に無視されているようです。

ここで何が間違っていますか?私は何を理解していませんか?

編集 2: 以下の Johns の返信を考えると、問題はサーバーのセットアップにあると思われます。おそらくリアクター リリース 2.0.0.M2 専用で、メインApplicationクラスで次のように構成されます。

@豆
    public NetServer httpServer(
            最終的な環境環境、
            最終的な MetricRegistry メトリック、
            最終 ChannelConsumer コンシューマ) は InterruptedException をスローします {

        NetServer サーバー = 新しい TcpServerSpec(
                NettyTcpServer.class)
                .env(env)
                .オプション(
                        new NettyServerSocketOptions()
                            .pipelineConfigurer((ChannelPipeline パイプライン) -> パイプライン
                                .addLast(新しい HttpServerCodec())
                                .addLast(新しい HttpObjectAggregator(MAX_CONTENT_LENGTH))))
                .consume(消費者)
                。得る();

        server.start().await();

        サーバーを返します。
    }

これにはディスパッチャが設定されておらず、内部では LMAX ディスラプタを使用しているようNettyEventLoopDispatcherです。NettyEventLoopDispatcher代わりのディスパッチャとして設定して使用する方法が明確ではありません。

4

1 に答える 1

0

consumeOn()呼び出しは冗長です (「実際の」コードでこの例とは異なるものを使用している場合を除きDispatcherます)。NettyEventLoopDispatcher出力を書き込むと、内部的にとにかくに切り替わります。

TcpServerこのフローが a の外で機能し、これが期待どおりに機能することを確認するために簡単なチェックを行いました。

@Test
public void testStream() throws InterruptedException {
    Stream<String> s1 = Streams.just("Hello World!");

    s1.filter(s -> s.startsWith("Hello"))
      .dispatchOn(Environment.sharedDispatcher())
      .map(s -> s.toUpperCase())
      .flatMap(s -> Streams.just(s, s))
      .flatMap(s -> Streams.just(s, s))
      .reduce(new ArrayList<>(), (l, s) -> {
          l.add(s);
          return l;
      })
      .consume(l -> {
          System.out.println("thread: " + Thread.currentThread() + ", l: " + l);
      });

    Thread.sleep(500);
}

.dispatchOn()呼び出しをコメントアウトするだけで、フローが Netty スレッドで機能するかどうかを知ることは興味深いでしょう。

于 2015-03-11T18:25:37.097 に答える