0

文字列の解析が必要なアプリケーションがあり、RxScala を使用しています。私のコード:

import java.util.concurrent.TimeUnit
import rx.lang.scala.Subject
import rx.lang.scala.schedulers.NewThreadScheduler
import rx.lang.scala.subjects.{SerializedSubject, PublishSubject}
import scala.concurrent.duration.Duration

object RxScala extends App {
  val subject: Subject[String] = SerializedSubject(PublishSubject())

  val processLines = (lines: Seq[String]) => {
    // long action
  }

  subject
    .subscribeOn(NewThreadScheduler())
    .tumblingBuffer(Duration(2, TimeUnit.SECONDS), 100)
    .subscribe(processLines)

  for(i <- 1 to 100000) {
    subject.onNext("Line " + i)
  }
}

行を追加してから処理できるようになるため、問題があります。

たとえば200行のバッファを作成し、バッファがいっぱいの場合は、バッファがいっぱいでないときに新しいレコードを無視します。

Add 100 records (A)
Add 100 records (B)
Program start processLines (A) // buffer have (B) elements
Add 100 records (C) // buffer have (B, C) elements and it is full
Add 100 records (D) // elements are ignored
ProcessLines is finished 
Program start processLines (B) // buffer have (C) elements
Add 100 records (E) // buffer have (C, E) elements

RxScala にはこれを行う方法がありますか?

4

1 に答える 1

0
subject.
    tumblingBuffer(Duration(2, TimeUnit.SECONDS), 100).
    onBackpressureDrop.
    observeOn(NewThreadScheduler()).
    subscribe(processLines)

http://reactivex.io/documentation/operators/backpressure.html

onBackpressureDrop は、ダウンストリーム サブスクライバーからの保留中の要求がない限り、ソース Observable からの放出をドロップします。その場合、要求を満たすのに十分なアイテムを放出します。

于 2015-07-16T12:40:20.900 に答える