Kafka Streams バージョン 0.10.0.1 を使用しており、ストリームで最小値を見つけようとしています。
着信メッセージは kafka-streams-topic というトピックから来ており、キーがあり、値は次のような JSON ペイロードです。
{"value":2334}
これは単純なペイロードですが、この JSON の最小値を見つけたいと考えています。
発信メッセージは単なる数字です: 2334
キーもメッセージの一部です。
したがって、受信トピックが次のようになった場合:
key=1, value={"value":1000}
min-topic という名前の発信トピックは、
key=1,value=1000
別のメッセージが届きます:
key=1, value={"value":100}
これは同じキーであるため、最初のメッセージよりも小さいため、key=1 value=100 のメッセージを生成したいと考えています。
今、私たちが得たとしましょう:
key=2 value=99
次の場所に新しいメッセージが作成されます。
key=2 and value=99 but the key=1 and associated value shouldn't change.
さらに、次のメッセージが表示された場合:
key=1 value=2000
このメッセージは現在の値の 100 より大きいため、メッセージは生成されません。
これは機能しますが、これが API の意図に準拠しているかどうか疑問に思っています。
public class MinProcessor implements Processor<String,String> {
private ProcessorContext context;
private KeyValueStore<String, Long> kvStore;
private Gson gson = new Gson();
@Override
public void init(ProcessorContext context) {
this.context = context;
this.context.schedule(1000);
kvStore = (KeyValueStore) context.getStateStore("Counts");
}
@Override
public void process(String key, String value) {
Long incomingPotentialMin = ((Double)gson.fromJson(value, Map.class).get("value")).longValue();
Long minForKey = kvStore.get(key);
System.out.printf("key: %s incomingPotentialMin: %s minForKey: %s \n", key, incomingPotentialMin, minForKey);
if (minForKey == null || incomingPotentialMin < minForKey) {
kvStore.put(key, incomingPotentialMin);
context.forward(key, incomingPotentialMin.toString());
context.commit();
}
}
@Override
public void punctuate(long timestamp) {}
@Override
public void close() {
kvStore.close();
}
}
実際にプロセッサを実行するコードは次のとおりです。
public class MinLauncher {
public static void main(String[] args) {
TopologyBuilder builder = new TopologyBuilder();
StateStoreSupplier countStore = Stores.create("Counts")
.withKeys(Serdes.String())
.withValues(Serdes.Long())
.persistent()
.build();
builder.addSource("source", "kafka-streams-topic")
.addProcessor("process", () -> new MinProcessor(), "source")
.addStateStore(countStore, "process")
.addSink("sink", "min-topic", "process");
KafkaStreams streams = new KafkaStreams(builder, KafkaStreamsProperties.properties("kafka-streams-min-poc"));
streams.cleanUp();
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
}