実行中のストリーミング アプリケーションの構成を更新することは、一般的な要件です。CoFlatMapFunction
Flink の DataStream API では、 2 つの入力ストリームを処理する、いわゆる を使用してこれを行うことができます。ストリームの 1 つをデータ ストリーム、もう 1 つを制御ストリームにすることができます。
次の例は、特定の長さを超える文字列を除外するユーザー関数を動的に適応させる方法を示しています。
val data: DataStream[String] = ???
val control: DataStream[Int] = ???
val filtered: DataStream[String] = data
// broadcast all control messages to the following CoFlatMap subtasks
.connect(control.broadcast)
// process data and control messages
.flatMap(new DynLengthFilter)
class DynLengthFilter extends CoFlatMapFunction[String, Int, String] with Checkpointed[Int] {
var length = 0
// filter strings by length
override def flatMap1(value: String, out: Collector[String]): Unit = {
if (value.length < length) {
out.collect(value)
}
}
// receive new filter length
override def flatMap2(value: Int, out: Collector[String]): Unit = {
length = value
}
override def snapshotState(checkpointId: Long, checkpointTimestamp: Long): Int = length
override def restoreState(state: Int): Unit = {
length = state
}
}
DynLengthFilter
ユーザー関数は、フィルター長のインターフェイスを実装しますCheckpointed
。障害が発生した場合、この情報は自動的に復元されます。