1

AB、およびCの3 つのオブザーバブルがあるとします。3つすべてを同時に実行する必要があります(素人の場合は非同期で)が、次のとおりです。

  1. Aから何かを取得した場合は、それを発行します...他には何も発行しないでください。
  2. Aが何も発行せずに完了した場合、ルール 1 をBに適用します。
  3. Bが何も発行せずに完了した場合、 Cから項目を発行します。
  4. Cが何も発行せずに完了した場合、デフォルト項目を発行します。

私は昨日これを理解しようと何時間も費やしましたが、これを可能にする操作上の組み合わせがRxJavaには既にないようです。

左から右にカスケードする値を考えることができます。

A --> B --> C

また、カスケードはブロックされますが、それぞれが非同期で実行され、値がキャッシュされます。

A (何もない) --> B (何もない) --> C (何もない) --> デフォルト項目

明確にするために、Aは、他のオブザーバーから何かが発行される前に完了する必要があります。A、B、C が何も出力しない場合、B、C の同じロジックがデフォルトになります。

明らかにキャッシュが含まれており、オブザーバブルをリプレイしたくありません。キャッシュされた値を再生する必要があります。それは各ゲートで開催されます。

動作はconcat()と非常に似ていますが、チェーンの次の部分は、その前にエミッションがあった場合に解き放たれることはありません。

4

2 に答える 2

0
public class ConcatObservable<T> {

private final List<Observable<? extends T>> observables;

private ConcatObservable(List<Observable<? extends T>> observables) {
    this.observables = observables;
}

public static <T> ConcatObservable<T> from(Observable<? extends T>... observables) {
    return new ConcatObservable<T>(Arrays.asList(observables));
}

public Observable<T> asObservable() {
    return Observable.create(new Observable.OnSubscribe<T>() {
        @Override
        public void call(final Subscriber<? super T> subscriber) {
            List<Observable<? extends T>> cachedObservables = new ArrayList<Observable<? extends T>>();
            for (Observable<? extends T> observable : observables) {
                ConnectableObservable<? extends T> replayedObservable = observable.replay();
                cachedObservables.add(replayedObservable);
                subscriber.add(replayedObservable.connect());
            }
            Subscription s = Observable.concat(Observable.from(cachedObservables)).take(1).subscribe(subscriber);
            subscriber.add(s);
        }
    });
}
}

編集

これは近いですが、次のテストに失敗します。

@Test @SuppressWarnings("unchecked")
public void it_onlyEmitsFromFirstObservable() {
  Observable<String> A = Observable.from(Arrays.asList("A", "A", "A"));
  Observable<String> B = Observable.from(Arrays.asList("B", "B", "B"));
  Observable<String> C = Observable.from(Arrays.asList("C", "C", "C"));

  Observable<String> observable = ConcatObservable.from(A, B, C).asObservable();

  TestSubscriber<String> testSubscriber = new TestSubscriber<String>();
  observable.subscribe(testSubscriber);

  assertThat(testSubscriber.getOnNextEvents()).containsExactly("A", "A", "A");
}
于 2014-06-08T03:54:16.673 に答える
0

これが私が思いついたものです:

**
 * Works like {@link rx.Observable#concat} but concatenated Observables
 * are all run immediately on their given {@link rx.Scheduler}.
 *
 * This Observable is blocking in the sense that items are emitted in order
 * like {@link rx.Observable#concat} but since each Observable is run on
 * an (possibly) asynchronous scheduler, items emitted further down the chain
 * of Observables are held until items further up the chain are (possibly) emitted.
 *
 * This Observable also short-circuits and does not emit items further down
 * the chain of Observables when an Observable higher up the chain emits items.
 *
 * For example:
 *
 * Given Observable A, B, and C
 *
 * If A emits item(s) emit them... do not emit anything else.
 * If A completes without emitting anything, apply previous rule to B.
 * If B completes without emitting anything, emit items from C (if any)
 *
 * @param <T>
 */
public class ConcatObservable<T> {
  private final List<Observable<? extends T>> observables;

  private ConcatObservable(List<Observable<? extends T>> observables) {
    this.observables = observables;
  }

  public static <T> ConcatObservable<T> from(Observable<? extends T>... observables) {
    return new ConcatObservable<T>(Arrays.asList(observables));
  }

  public Observable<T> asObservable() {
    final List<Subscription> subscriptions = new CopyOnWriteArrayList<Subscription>();

    return Observable.create(new Observable.OnSubscribe<T>() {
      @Override public void call(final Subscriber<? super T> subscriber) {
        List<Observable<? extends T>> cachedObservables = new ArrayList<Observable<? extends T>>();
        for (Observable<? extends T> observable : observables) {

          // tell it to cache values
          final ReplaySubject<T> subject = ReplaySubject.create();
          cachedObservables.add(subject);

          // run it with nobody listening
          Subscription subscription = observable.subscribe(new Observer<T>() {
            @Override public void onCompleted() {
              subject.onCompleted();
            }

            @Override public void onError(Throwable e) {
              subject.onError(e);
            }

            @Override public void onNext(T item) {
              subject.onNext(item);
            }
          });
          subscriptions.add(subscription);
        }

        final AtomicReference<Throwable> error = new AtomicReference<Throwable>();

        // for the cached ones, already running
        for (Observable<? extends T> observable : cachedObservables) {

          final AtomicBoolean shouldExit = new AtomicBoolean(false);
          final CountDownLatch latch = new CountDownLatch(1);
          Subscription subscription = observable.subscribe(new Observer<T>() {
            @Override public void onCompleted() {
              latch.countDown();
            }

            @Override public void onError(Throwable e) {
              error.set(e);
              shouldExit.set(true);
              latch.countDown();
            }

            @Override public void onNext(T item) {
              subscriber.onNext(item);
              shouldExit.set(true);
            }
          });

          // Track each subscription
          subscriptions.add(subscription);

          try {
            // Wait for this one to stop emitting, or error
            latch.await();
          } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException("Interrupted while waiting for subscription to complete", e);
          }

          // This one had an item(s), so we don't bother with the rest
          if (shouldExit.get()) {
            break;
          }
        }

        // Release inner subscriptions
        for (Subscription subscription : subscriptions) {
          subscription.unsubscribe();
        }

        // Obey the Observable contract...
        Throwable throwable = error.get();
        if (throwable != null) {
          subscriber.onError(throwable);
        } else {
          subscriber.onCompleted();
        }
      }
    });
  }
}

そして、対応するテストは次のとおりです。

public class ConcatObservableTest {

  @Test @SuppressWarnings("unchecked")
  public void it_onlyEmitsFromFirstObservable() {
    Observable<String> A = Observable.from(Arrays.asList("A", "A", "A"));
    Observable<String> B = Observable.from(Arrays.asList("B", "B", "B"));
    Observable<String> C = Observable.from(Arrays.asList("C", "C", "C"));

    Observable<String> observable = ConcatObservable.from(A, B, C).asObservable();

    TestSubscriber<String> testSubscriber = new TestSubscriber<String>();
    observable.subscribe(testSubscriber);

    assertThat(testSubscriber.getOnNextEvents()).containsExactly("A", "A", "A");
  }

  @Test @SuppressWarnings("unchecked")
  public void it_onlyEmitsFromSecondObservable() {
    Observable<String> A = Observable.empty();
    Observable<String> B = Observable.from(Arrays.asList("B", "B", "B"));
    Observable<String> C = Observable.from(Arrays.asList("C", "C", "C"));

    Observable<String> observable = ConcatObservable.from(A, B, C).asObservable();

    TestSubscriber<String> testSubscriber = new TestSubscriber<String>();
    observable.subscribe(testSubscriber);

    assertThat(testSubscriber.getOnNextEvents()).containsExactly("B", "B", "B");
  }

  @Test @SuppressWarnings("unchecked")
  public void it_onlyEmitsFromLastObservable() {
    Observable<String> A = Observable.empty();
    Observable<String> B = Observable.empty();
    Observable<String> C = Observable.from(Arrays.asList("C", "C", "C"));

    Observable<String> observable = ConcatObservable.from(A, B, C).asObservable();

    TestSubscriber<String> testSubscriber = new TestSubscriber<String>();
    observable.subscribe(testSubscriber);

    assertThat(testSubscriber.getOnNextEvents()).containsExactly("C", "C", "C");
  }

  @Test @SuppressWarnings("unchecked")
  public void it_shouldStartAllObservables() {
    TestObservable<String> letters = TestObservable.createTestObservable("A", "B", "C");
    TestObservable<String> numbers = TestObservable.createDelayedTestObservable(100, "1", "2", "3");
    TestObservable<String> animals = TestObservable.createDelayedTestObservable(200, "zebra", "donkey", "unicorn");

    Observable<String> observable = ConcatObservable.from(letters, numbers, animals).asObservable();

    TestSubscriber<String> testSubscriber = new TestSubscriber<String>();
    observable.subscribe(testSubscriber);

    assertThat(letters.isCalled()).isTrue();
    assertThat(numbers.isCalled()).isTrue();
    assertThat(animals.isCalled()).isTrue();
  }

  static class TestObservable<T> extends Observable<T> {
    private final TestOnSubscribe<T> onSubscribeFunc;

    private TestObservable(TestOnSubscribe<T> f) {
      super(f);
      onSubscribeFunc = f;
    }

    public boolean isCalled() {
      return onSubscribeFunc.isCalled();
    }

    @SuppressWarnings("unchecked")
    public static <T> TestObservable<T> createTestObservable(final T... items) {
      return createDelayedTestObservable(0, items);
    }

    @SuppressWarnings("unchecked")
    public static <T> TestObservable<T> createDelayedTestObservable(final long delay, final T... items) {
      return new TestObservable<T>(new TestOnSubscribe<T>(delay, items));
    }

    private static class TestOnSubscribe<T> implements OnSubscribe<T> {
      private final long delay;
      private final T[] items;
      private boolean isCalled;

      private TestOnSubscribe(long delay, T... items) {
        this.delay = delay;
        this.items = items;
      }

      @Override public void call(Subscriber<? super T> subscriber) {
        isCalled = true;

        for (T item : items) {
          if (delay > 0) {
            sleep(delay);
          }
          subscriber.onNext(item);
        }
        subscriber.onCompleted();
      }

      public boolean isCalled() {
        return isCalled;
      }

      private void sleep(long time) {
        try {
          Thread.sleep(time);
        } catch (InterruptedException e) { }
      }
    }
  }
}
于 2014-06-09T21:44:24.440 に答える