10

私の要件は、1 日に何百万ものレコードをストリーミングすることであり、外部構成パラメーターに大きく依存しています。たとえば、ユーザーは Web アプリケーションで必要な設定をいつでも変更でき、変更が行われた後は、新しいアプリケーション構成パラメーターを使用してストリーミングを実行する必要があります。これらはアプリ レベルの構成であり、各データを通過させてフィルター処理する必要があるいくつかの動的除外パラメーターもあります。

flink には、すべてのタスク マネージャーとサブタスクで共有されるグローバルな状態がないことがわかりました。一元化されたキャッシュを持つことはオプションですが、パラメーターごとにキャッシュから読み取る必要があり、レイテンシーが増加します。この種のシナリオを処理するためのより良いアプローチと、他のアプリケーションがそれをどのように処理しているかについてアドバイスしてください。ありがとう。

4

1 に答える 1

8

実行中のストリーミング アプリケーションの構成を更新することは、一般的な要件です。CoFlatMapFunctionFlink の DataStream API では、 2 つの入力ストリームを処理する、いわゆる を使用してこれを行うことができます。ストリームの 1 つをデータ ストリーム、もう 1 つを制御ストリームにすることができます。

次の例は、特定の長さを超える文字列を除外するユーザー関数を動的に適応させる方法を示しています。

val data: DataStream[String] = ???
val control: DataStream[Int] = ???

val filtered: DataStream[String] = data
  // broadcast all control messages to the following CoFlatMap subtasks
  .connect(control.broadcast)
  // process data and control messages
  .flatMap(new DynLengthFilter)


class DynLengthFilter extends CoFlatMapFunction[String, Int, String] with Checkpointed[Int] {

  var length = 0

  // filter strings by length
  override def flatMap1(value: String, out: Collector[String]): Unit = {
    if (value.length < length) {
      out.collect(value)
    }
  }

  // receive new filter length
  override def flatMap2(value: Int, out: Collector[String]): Unit = {
    length = value
  }

  override def snapshotState(checkpointId: Long, checkpointTimestamp: Long): Int = length

  override def restoreState(state: Int): Unit = {
    length = state
  }
}

DynLengthFilterユーザー関数は、フィルター長のインターフェイスを実装しますCheckpointed。障害が発生した場合、この情報は自動的に復元されます。

于 2016-09-26T09:53:58.580 に答える