2

このトライデントの仕事があるとします:

TridentState wordCounts =
      topology.newStream("spout1", spout)
        .each(new Fields("sentence"), new Split(), new Fields("word"))
        .groupBy(new Fields("word"))
        .persistentAggregate(
            MemcachedState.opaque(serverLocations),
            new Count(),
            new Fields("count")
        )                

Spark Streaming で同じことを達成するにはどうすればよいですか? 私は見ましupdateStateByKeyたが、これは状態を内部的に保持しているように見えました(Memcachedのような外部状態に保持するのではなく)&無期限に。saveAsTextFileまた、そのバッチで更新されたキー値のみを発行するのではなく 、たとえば保存すると、各バッチですべてをダンプしようとするようです。

を使用して外部状態と簡単にやり取りできることはわかってforeachRDDいますが、その場合、レコードを正確に 1 回処理するにはどうすればよいでしょうか?

4

1 に答える 1

2

いくつかの調査の後、これが私が見つけたものです:

  • Spark Streaming は、出力操作の少なくとも 1 回のセマンティクスのみをサポートするため、Trident の 1 回限りのセマンティクスを提供することはできません (少なくとも、それを行うコードを自分で作成しない限り)。
  • updateStateByKey 1 回限りのセマンティクスを提供できますが、これは、その出力が前の出力を完全に置き換えるという事実に依存しています (各チェックポイントで状態全体を出力します)。これにより、重要でない量の状態では使用できなくなります。さらに、既存のデータで状態を初期化する方法はありません。ジョブを再起動すると、状態がリセットされます (少なくともそれは私の理解です)。この機能は 1.3.0 で追加する予定です。

結論として、トランザクション更新が必要な場合は、Trident を使用するのが安全な方法のようです。

于 2015-01-10T12:24:06.877 に答える