3

sensor_code: x, time: 1526978768, address: Y 各センサーコードで一意のアドレスをそれぞれ格納する KTable を作成したいように見える受信メッセージを含む Kafka ストリームがあり ます。

KTable

KTable<String, Long> numCount = streams
            .map(kvm1)
            .groupByKey(Serialized.with(stringSerde, stringSerde))
            .count()
            .groupBy(kvm2, Serialized.with(stringSerde, longSerde))
            .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("StateStore"));

どこkvm1kvm2は自分のものKeyValueMappersです。私の考えは、既存のキーを に置き換えて、 とをsensor_code=x, address=y実行することでした。次に、別のwhereが既存のものを変更してを含むようにすると、値はそのカウントになります。しかし、それが機能していないので、間違っているのかもしれません...文字列を探しているため、ロングとしてキャストしようとし、例外をスローします。カウントを にしたいですよね?groupByKey()count()groupBy(kvm2, Serialized.with(stringSerde, longSerde))kvm2keysensor_codeLong

KeyValueMapperそれぞれのヘルプ機能で使用する最初のものを次に示します。

    private static String getKeySensorIdAddress(String o) {
    String x = "sensor_id=\"x\", address=\"y\""; 
    try {
        WifiStringEvent event = mapper.readValue(o, WifiStringEvent.class);
        x = x.replace("x", event.getSensor_code());
        x = x.replace("y", event.getAddress());
        return x;
    } catch(Exception ex) {
        System.out.println("Error... " + ex);
        return "Error";
    }
}
        //KeyValueMapper1
KeyValueMapper<String, String, KeyValue<String, String>> kvm1 = 
    new KeyValueMapper<String, String, KeyValue<String, String>>() {
         public KeyValue<String, String> apply(String key, String value) {
             return new KeyValue<>(getKeySensorIdAddress(value), value);
         }
    };

2 番目KeyValueMapperとそのヘルプ関数を次に示します。

    private static String getKeySensorId(String o) {
    int a = o.indexOf(",");
    return o.substring(0,a);
}

        //KeyValueMapper2 
    KeyValueMapper<String, Long, KeyValue<String, Long>> kvm2 = 
    new KeyValueMapper<String, Long, KeyValue<String, Long>>() {
         public KeyValue<String, Long> apply(String key, Long value) {
             return new KeyValue<>(getKeySensorId(key), value);
         }
    };

コードを実行しようとすると返される例外とエラーを次に示します。

[2018-05-29 15:28:40,119] エラー ストリーム スレッド [testUniqueAddresses-ed48daf8-fff0-42e4-bb5a-687584734b45-StreamThread-1] 次のエラーのため、ストリーム タスク 2_0 の処理に失敗しました: (org.apache. kafka.streams.processor.internals.AssignedStreamsTasks:105) java.lang.ClassCastException: java.lang.Long は org.apache.kafka.common.serialization.StringSerializer.serialize(StringSerializer.java で java.lang.String にキャストできません:28) org.apache.kafka.streams.state.StateSerdes.rawValue(StateSerdes.java:178) で org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore$1.innerValue(MeteredKeyValueBytesStore.java:66) で org .apache.kafka.streams.state.internals.MeteredKeyValueBytesStore$1.innerValue(MeteredKeyValueBytesStore.java:57) org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.put(InnerMeteredKeyValueStore.java:198) at org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.put(MeteredKeyValueBytesStore.java:117) at org.apache.kafka.streams.kstream.internals.KTableAggregate$KTableAggregateProcessor.process( KTableAggregate.java:95) org.apache.kafka.streams.kstream.internals.KTableAggregate$KTableAggregateProcessor.process(KTableAggregate.java:56) で

java.lang.ClassCastException: java.lang.Long cannot be cast to java.lang.Stringエラーに注意してください。

このエラーが発生する理由と、それを修正する方法、またはコードを編集して目的の出力に到達する方法をアドバイスする方法はありますか?

よろしくお願いします!

編集: アプローチの1つを放棄したため、質問を大幅に見直しました。

4

1 に答える 1