このトライデントの仕事があるとします:
TridentState wordCounts =
topology.newStream("spout1", spout)
.each(new Fields("sentence"), new Split(), new Fields("word"))
.groupBy(new Fields("word"))
.persistentAggregate(
MemcachedState.opaque(serverLocations),
new Count(),
new Fields("count")
)
Spark Streaming で同じことを達成するにはどうすればよいですか? 私は見ましupdateStateByKey
たが、これは状態を内部的に保持しているように見えました(Memcachedのような外部状態に保持するのではなく)&無期限に。saveAsTextFile
また、そのバッチで更新されたキー値のみを発行するのではなく 、たとえば保存すると、各バッチですべてをダンプしようとするようです。
を使用して外部状態と簡単にやり取りできることはわかってforeachRDD
いますが、その場合、レコードを正確に 1 回処理するにはどうすればよいでしょうか?