1

Kafka Streams バージョン 0.10.0.1 を使用しており、ストリームで最小値を見つけようとしています。

着信メッセージは kafka-streams-topic というトピックから来ており、キーがあり、値は次のような JSON ペイロードです。

{"value":2334}

これは単純なペイロードですが、この JSON の最小値を見つけたいと考えています。

発信メッセージは単なる数字です: 2334

キーもメッセージの一部です。

したがって、受信トピックが次のようになった場合:

key=1, value={"value":1000}

min-topic という名前の発信トピックは、

key=1,value=1000

別のメッセージが届きます:

key=1, value={"value":100}

これは同じキーであるため、最初のメッセージよりも小さいため、key=1 value=100 のメッセージを生成したいと考えています。

今、私たちが得たとしましょう:

key=2 value=99

次の場所に新しいメッセージが作成されます。

key=2 and value=99 but the key=1 and associated value shouldn't change.

さらに、次のメッセージが表示された場合:

key=1 value=2000

このメッセージは現在の値の 100 より大きいため、メッセージは生成されません。

これは機能しますが、これが API の意図に準拠しているかどうか疑問に思っています。

public class MinProcessor implements Processor<String,String> {

    private ProcessorContext context;
    private KeyValueStore<String, Long> kvStore;
    private Gson gson = new Gson();

    @Override
    public void init(ProcessorContext context) {
        this.context = context;
        this.context.schedule(1000);
        kvStore = (KeyValueStore) context.getStateStore("Counts");
    }

    @Override
    public void process(String key, String value) {
        Long incomingPotentialMin = ((Double)gson.fromJson(value, Map.class).get("value")).longValue();
        Long minForKey = kvStore.get(key);
        System.out.printf("key: %s incomingPotentialMin: %s minForKey: %s \n", key, incomingPotentialMin, minForKey);

        if (minForKey == null || incomingPotentialMin < minForKey) {
            kvStore.put(key, incomingPotentialMin);
            context.forward(key, incomingPotentialMin.toString());
            context.commit();
        }
    }

    @Override
    public void punctuate(long timestamp) {}

    @Override
    public void close() {
        kvStore.close();
    }
}

実際にプロセッサを実行するコードは次のとおりです。

public class MinLauncher {

    public static void main(String[] args) {    
        TopologyBuilder builder = new TopologyBuilder();

        StateStoreSupplier countStore = Stores.create("Counts")
                .withKeys(Serdes.String())
                .withValues(Serdes.Long())
                .persistent()
                .build();

        builder.addSource("source", "kafka-streams-topic")
                .addProcessor("process", () -> new MinProcessor(), "source")
                .addStateStore(countStore, "process")
                .addSink("sink", "min-topic", "process");

        KafkaStreams streams = new KafkaStreams(builder, KafkaStreamsProperties.properties("kafka-streams-min-poc"));
        streams.cleanUp();
        streams.start();

        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
    }
}
4

1 に答える 1