0

RxJava と RxNetty を使用して、Apache Mesos の新しい HTTP スケジューラ API のクライアントを作成中です。

RxNetty との接続を正常に作成しObservable<Event>、結果のチャンク ストリームから を作成することができました。

現在、リソースのオファーを要求/拒否したり、タスクステータスの更新を確認したりするために、Mesos にコールバックを送信するために使用できるシンクをモデル化しようとしています。

Mesos に送信されるメッセージはです。シンクに入るすべてのメッセージに対してまたはCallを提供できる必要があります。これは、Mesosが送信された に対して同期検証を実行するためです。onCompletedonErrorCallCall

私は基本的に次のことを許可しようとしています。

final MesosSchedulerClient client = new MesosSchedulerClient();
final Observable<Event> events = client.openEventStream(subscribeCall);

final Observable<Observable<Call>> ackCalls = events
    .filter(event -> event.getType() == Event.Type.UPDATE && event.getUpdate().getStatus().hasUuid())
    .zipWith(frameworkIDObservable, (Event e, AtomicReference<FrameworkID>> fwId) -> {
        final TaskStatus status = e.getUpdate().getStatus();
        final Call ackCall = ackUpdate(fwId.get(), status.getUuid(), status.getAgentId(), status.getTaskId());
        return Observable.just(ackCall)
            .doOnComplete(() -> { ... })
            .doOnError((e) -> { ... });
    });

client.sink(ackCalls);

今、Subject を拡張し、Calland Action0foronCompletedAction1<Throwable>forを指定するカスタム オブジェクト [1] を考え出しましたonError。ただし、可能であれば、RxJava の既存の構造を使用したいと思います。私が思いついたものの使用例[2]。

ガイダンスをいただければ幸いです。

[1] https://github.com/BenWhitehead/mesos-rxjava/blob/sink-operation/mesos-rxjava-core/src/main/java/org/apache/mesos/rx/java/SinkOperation.java#L17

[2] https://github.com/BenWhitehead/mesos-rxjava/blob/sink-operation/mesos-rxjava-example/mesos-rxjava-example-framework/src/main/java/org/apache/mesos/rx /java/example/framework/sleepy/Main.java#L117-L124

4

1 に答える 1

0

私がたどり着いた解決策は、イベント ストリームを処理し、要求を mesos に送り返すカスタム サブスクライバーを作成することでした。

于 2015-11-12T19:59:03.353 に答える