4

これが多くの Rx ルールに違反していることはわかっていますが、私はRxJava-JDBCが本当に好きで、チームメイトもそうしています。リレーショナル データベースは私たちの活動の中核であり、Rx も同様です。

ただし、として出力するのObservable<ResultSet>ではなく、プルベースの Java 8Stream<ResultSet>または Kotlinだけを使用したい場合がありSequence<ResultSet>ます。しかし、RxJava-JDBC ライブラリーはObservable<ResultSet>.

Observable<ResultSet>したがって、拡張関数を使用して を に変換し、中間のコレクションや呼び出しSequence<ResultSet>を行わない方法があるかどうか疑問に思っています。toBlocking()以下はこれまでのところすべてですが、プッシュおよびプルベースのシステムに接続しようとして頭が回転しています。呼び出しResultSetごとにステートフルであるため、どちらもバッファリングできません。onNext()これは不可能な作業ですか?

import rx.Observable
import rx.Subscriber
import java.sql.ResultSet

fun Observable<ResultSet>.asSequence() = object: Iterator<ResultSet>, Subscriber<ResultSet>() {

    private var isComplete = false

    override fun onCompleted() {
        isComplete = true
    }

    override fun onError(e: Throwable?) {
        throw UnsupportedOperationException()
    }

    override fun onNext(rs: ResultSet?) {
        throw UnsupportedOperationException()
    }


    override fun hasNext(): Boolean {
        throw UnsupportedOperationException()
    }

    override fun next(): ResultSet {
        throw UnsupportedOperationException()
    }

}.asSequence()
4

2 に答える 2

4

それがあなたが望むものを達成する最も簡単な方法かどうかはわかりませんが、このコードを試すことができます. ブロッキング キューを作成し、からこのキューにすべてのイベントを発行することで、Observableをに変換します。キューからイベントをプルし、何もない場合はブロックします。次に、受信した現在のイベントに応じて自身の状態を変更します。IteratorObservableIterable

class ObservableIterator<T>(
    observable: Observable<T>,
    scheduler: Scheduler
) : Iterator<T>, Closeable {

  private val queue = LinkedBlockingQueue<Notification<T>>()
  private var cached: Notification<T>? = null
  private var completed: Boolean = false

  private val subscription =
      observable
          .materialize()
          .subscribeOn(scheduler)
          .subscribe({ queue.put(it) })

  override fun hasNext(): Boolean {
    cacheNext()
    return !completed
  }

  override fun next(): T {
    cacheNext()
    val notification = cached ?: throw NoSuchElementException()
    check(notification.isOnNext)
    cached = null
    return notification.value
  }

  private fun cacheNext() {
    if (completed) {
      return
    }

    if (cached == null) {
      queue.take().let { notification ->
        if (notification.isOnError) {
          completed = true
          throw RuntimeException(notification.throwable)
        } else if (notification.isOnCompleted) {
          completed = true
        } else {
          cached = notification
        }
      }
    }
  }

  override fun close() {
    subscription.unsubscribe()
    completed = true
    cached = null
  }
}
于 2016-05-24T20:35:18.550 に答える
3

次のヘルパー関数を使用できます。

fun <T> Observable<T>.asSequence() = Sequence { toBlocking().getIterator() }

オブザーバブルは、返されたシーケンスがイテレータに対して呼び出されるときにサブスクライブされます。

オブザーバブルがサブスクライブされたのと同じスレッドで要素を発行する場合 (Observable.justたとえば)、返される機会を得る前にイテレーターのバッファーにデータを入力します。この場合、次の呼び出しを使用して別のスレッドへのサブスクリプションを指示する必要がある場合がありますsubscribeOn

observable.subscribeOn(scheduler).asSequence()

ただし、toBlocking().getIterator()すべての結果をバッファリングするわけではありませんが、イテレータによってタイムリーに消費されない場合、結果の一部をバッファリングできます。次が到着ResultSetしたときに何らかの理由で a が期限切れになった場合、これは問題になる可能性があります。ResultSet

于 2016-05-25T03:21:51.057 に答える