RxJava と RxNetty を使用して、Apache Mesos の新しい HTTP スケジューラ API のクライアントを作成中です。
RxNetty との接続を正常に作成しObservable<Event>
、結果のチャンク ストリームから を作成することができました。
現在、リソースのオファーを要求/拒否したり、タスクステータスの更新を確認したりするために、Mesos にコールバックを送信するために使用できるシンクをモデル化しようとしています。
Mesos に送信されるメッセージはです。シンクに入るすべてのメッセージに対してまたはCall
を提供できる必要があります。これは、Mesosが送信された に対して同期検証を実行するためです。onCompleted
onError
Call
Call
私は基本的に次のことを許可しようとしています。
final MesosSchedulerClient client = new MesosSchedulerClient();
final Observable<Event> events = client.openEventStream(subscribeCall);
final Observable<Observable<Call>> ackCalls = events
.filter(event -> event.getType() == Event.Type.UPDATE && event.getUpdate().getStatus().hasUuid())
.zipWith(frameworkIDObservable, (Event e, AtomicReference<FrameworkID>> fwId) -> {
final TaskStatus status = e.getUpdate().getStatus();
final Call ackCall = ackUpdate(fwId.get(), status.getUuid(), status.getAgentId(), status.getTaskId());
return Observable.just(ackCall)
.doOnComplete(() -> { ... })
.doOnError((e) -> { ... });
});
client.sink(ackCalls);
今、Subject を拡張し、Call
and Action0
foronCompleted
とAction1<Throwable>
forを指定するカスタム オブジェクト [1] を考え出しましたonError
。ただし、可能であれば、RxJava の既存の構造を使用したいと思います。私が思いついたものの使用例[2]。
ガイダンスをいただければ幸いです。