1

私は SnappyData について学びました (そして、それに関するいくつかのビデオを見ました)。主に、パフォーマンスが通常のスパーク ジョブよりも何倍も高速である可能性があると述べている場合に興味深いと思われます。

次のコード (スニペット) は、SnappyData 機能を活用して、ジョブのパフォーマンスを改善し、同じ動作を提供できますか?

Dataset<EventData> ds = spark
  .readStream()
  .format("kafka")
  (...)
  .as(Encoders.bean(EventData.class)); 

KeyValueGroupedDataset<String, EventData> kvDataset = ds.groupByKey(new MapFunction<EventData, String>() {
  public String call(EventData value) throws Exception {
    return value.getId();
  }
}, Encoders.STRING());

Dataset<EventData> processedDataset = kvDataset.mapGroupsWithState(new MapGroupsWithStateFunction<String, EventData, EventData, EventData>(){
  public EventData call(String key, Iterator<EventData> values, GroupState<EventData> state) throws Exception {

    /* state control code */

    EventData processed = EventHandler.validate(key,values);

    return processed;

}}, Encoders.bean(EventData.class), Encoders.bean(EventData.class));

StreamingQuery query = processedDataset.writeStream()
  .outputMode("update")
  .format("console")
  .start();
4

1 に答える 1