2

Vert.x と RxJava を使用してオブザーバブルに圧縮しようとしています。何かを誤解しているのか、それとも単なるバグなのかわかりません。これがコードです。

public class BusVerticle extends Verticle {

    public void start() {

        final RxVertx rxVertx = new RxVertx(vertx);

        Observable<RxMessage<JsonObject>> bus = rxVertx.eventBus().registerHandler("busName");

        Observable<RxHttpClientResponse> httpResponse = bus.mapMany(new Func1<RxMessage<JsonObject>, Observable<RxHttpClientResponse>>() {
            public Observable<RxHttpClientResponse> call(RxMessage<JsonObject> rxMessage) {
                RxHttpClient rxHttpClient = rxVertx.createHttpClient();
                rxHttpClient.coreHttpClient().setHost("localhost").setPort(80);
                return rxHttpClient.getNow("/uri");
            }
        });

        Observable<RxMessage<JsonObject>> zipObservable = Observable.zip(bus, httpResponse, new Func2<RxMessage<JsonObject>, RxHttpClientResponse, RxMessage<JsonObject>>() {
            public RxMessage<JsonObject> call(RxMessage<JsonObject> rxMessage, RxHttpClientResponse rxHttpClientResponse) {
                return rxMessage;
            }
        });

        zipObservable.subscribe(new Action1<RxMessage<JsonObject>>() {
            public void call(RxMessage<JsonObject> rxMessage) {
                rxMessage.reply();
            }
        });
    }
}

受信したメッセージからの情報を使用して HTTP 要求を作成し、HTTP 応答からの情報を使用してメッセージに返信するために、イベント バスと HTTP 応答の両方のオブザーバブルを圧縮したいと考えています。

送信先のメッセージに対する応答がありません。

前もって感謝します!

4

1 に答える 1

2

回避策で解決しました。ある種の混合溶液。

public class BusVerticle extends Verticle {

public void start() {
    final RxVertx rxVertx = new RxVertx(vertx);

    vertx.eventBus().registerHandler("busName", new Handler<Message<JsonObject>>() {
        public void handle(final Message<JsonObject> message) {
            RxHttpClient rxHttpClient = rxVertx.createHttpClient();
            rxHttpClient.coreHttpClient().setHost("localhost").setPort(80);
            Observable<RxHttpClientResponse> httpRequest = rxHttpClient.getNow("/uri");
            httpRequest.subscribe(new Action1<RxHttpClientResponse>() {
                public void call(RxHttpClientResponse response) {
                    container.logger().error(response.statusCode());
                    message.reply(new JsonObject().putString("status", "ok"));
                }
            });
        }
    });
}

}

于 2014-01-15T10:29:37.523 に答える