4

Kafka Streams DSL の最も単純な使用例があります。CSV センサーデータを読み取り、タイムスタンプでグループ化し、出力します。次のコードはコンパイルされません。

public static void main(String[] args) {

    StreamsConfig streamingConfig = new StreamsConfig(getProperties());

    Serde<String> stringSerde = Serdes.String();

    CSVDeserializer<SensorData> sensorDataDeserializer = new CSVDeserializer<>(SensorData.class);
    JsonSerializer<SensorData> sensorDataSerializer = new JsonSerializer<>();
    Serde sensorDataSerde = Serdes.serdeFrom(sensorDataSerializer, sensorDataDeserializer);
    JsonDeserializer<SensorData> sensorDataJsonDeserializer = new JsonDeserializer<>(SensorData.class);
    Serde sensorDataJSONSerde = Serdes.serdeFrom(sensorDataSerializer, sensorDataJsonDeserializer);

    StringSerializer stringSerializer = new StringSerializer();
    StringDeserializer stringDeserializer = new StringDeserializer();
    WindowedSerializer<String> windowedSerializer = new WindowedSerializer<>(stringSerializer);
    WindowedDeserializer<String> windowedDeserializer = new WindowedDeserializer<>(stringDeserializer);
    Serde<Windowed<String>> windowedSerde = Serdes.serdeFrom(windowedSerializer, windowedDeserializer);

    JsonSerializer<SensorDataAccumulator> accSerializer = new JsonSerializer<>();
    JsonDeserializer accDeserializer = new JsonDeserializer<>(SensorDataAccumulator.class);
    Serde<SensorDataAccumulator> accSerde = Serdes.serdeFrom(accSerializer, accDeserializer);


    KStreamBuilder kStreamBuilder = new KStreamBuilder();
    KStream<String,SensorData> initialStream =  kStreamBuilder.stream(stringSerde,sensorDataSerde,"e40_orig");

    final KStream<String, SensorData> sensorDataKStream = initialStream
            .filter((k, v) -> (v != null))
            .map((k, v) -> new KeyValue<>(v.getMeasurementDateTime().toString(), v));

    sensorDataKStream
            .filter((k, v) -> (v != null))
            .groupBy((k,v) -> k, stringSerde, sensorDataJSONSerde)
            .aggregate(SensorDataAccumulator::new,
 ==> error          (k, v, list) -> list.add(v), //CHANGED THIS -->((SensorDataAccumulator)list).add((SensorData)v),
                    TimeWindows.of(10000),
                    accSerde, "acc")
            .to(windowedSerde, accSerde, "out");

    KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder,streamingConfig);
    kafkaStreams.start();
}

のため

エラー:(90, 45) java: シンボルが見つかりません symbol: メソッド add(java.lang.Object) 場所: タイプ java.lang.Object の変数リスト

変。

public class SensorDataAccumulator {

    ArrayList list = new ArrayList<SensorData>();

    public SensorDataAccumulator add(SensorData s) {
        list.add(s);
        return this;
    } 

コメントとしてキャストすると、次のランタイム例外が発生します (ウィンドウ化された累積を出力する直前)。

[2017-01-02 13:00:45,614] INFO task [1_0] Initializing processor nodes of the topology (org.apache.kafka.streams.processor.internals.StreamTask:123)
[2017-01-02 13:01:04,173] WARN Error while fetching metadata with correlation id 779 : {out=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient:600)
[2017-01-02 13:01:04,662] INFO stream-thread [StreamThread-1] Shutting down (org.apache.kafka.streams.processor.internals.StreamThread:268)
[2017-01-02 13:01:04,663] INFO stream-thread [StreamThread-1] Committing consumer offsets of task 0_0 (org.apache.kafka.streams.processor.internals.StreamThread:358)
[2017-01-02 13:01:04,666] INFO stream-thread [StreamThread-1] Committing consumer offsets of task 1_0 (org.apache.kafka.streams.processor.internals.StreamThread:358)
[2017-01-02 13:01:04,668] INFO stream-thread [StreamThread-1] Closing a task 0_0 (org.apache.kafka.streams.processor.internals.StreamThread:751)
[2017-01-02 13:01:04,668] INFO stream-thread [StreamThread-1] Closing a task 1_0 (org.apache.kafka.streams.processor.internals.StreamThread:751)
[2017-01-02 13:01:04,668] INFO stream-thread [StreamThread-1] Flushing state stores of task 0_0 (org.apache.kafka.streams.processor.internals.StreamThread:368)
[2017-01-02 13:01:04,669] INFO stream-thread [StreamThread-1] Flushing state stores of task 1_0 (org.apache.kafka.streams.processor.internals.StreamThread:368)
Exception in thread "StreamThread-1" java.lang.NoSuchMethodError: org.rocksdb.RocksIterator.close()V
    at org.apache.kafka.streams.state.internals.RocksDBStore$RocksDbIterator.close(RocksDBStore.java:468)
    at org.apache.kafka.streams.state.internals.RocksDBStore.closeOpenIterators(RocksDBStore.java:411)
    at org.apache.kafka.streams.state.internals.RocksDBStore.close(RocksDBStore.java:397)
    at org.apache.kafka.streams.state.internals.RocksDBWindowStore.close(RocksDBWindowStore.java:276)
    at org.apache.kafka.streams.state.internals.MeteredWindowStore.close(MeteredWindowStore.java:109)
    at org.apache.kafka.streams.state.internals.CachingWindowStore.close(CachingWindowStore.java:125)
    at org.apache.kafka.streams.processor.internals.ProcessorStateManager.close(ProcessorStateManager.java:349)
    at org.apache.kafka.streams.processor.internals.AbstractTask.closeStateManager(AbstractTask.java:120)
    at org.apache.kafka.streams.processor.internals.StreamThread$2.apply(StreamThread.java:348)
    at org.apache.kafka.streams.processor.internals.StreamThread.performOnAllTasks(StreamThread.java:328)
    at org.apache.kafka.streams.processor.internals.StreamThread.closeAllStateManagers(StreamThread.java:344)
    at org.apache.kafka.streams.processor.internals.StreamThread.shutdownTasksAndState(StreamThread.java:305)
    at org.apache.kafka.streams.processor.internals.StreamThread.shutdown(StreamThread.java:269)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:252)
[2017-01-02 13:01:05,316] INFO stream-thread [StreamThread-1] Closing the state manager of task 0_0 (org.apache.kafka.streams.processor.internals.StreamThread:347)
[2017-01-02 13:01:05,316] INFO stream-thread [StreamThread-1] Closing the state manager of task 1_0 (org.apache.kafka.streams.processor.internals.StreamThread:347) 

addのメソッドをデバッグするとSensorDataAccumulator、手がかりが得られるはずです。

ここに画像の説明を入力

だから、私が正しく理解していれば、私は を保持してArrayList list = new ArrayList<SensorData>();いますが、実際には、プロセスのどこかでメンバーが に変更されLinkedTreeMapます。タイプチェッカーはここで私を失いました...

これLinkedTreeMapは、GSON が myJsonDeserializer およびJsonSerializerクラスに使用する基礎となるデータ構造です。したがって、完全を期すためにこれらを以下に追加します。

現在、何が間違っているのか、どこを修正すればよいのかわかりません。異なるシリアライザー、異なるデータ構造を使用する必要がありますか? 別の言語 ;) ?

任意の入力を歓迎します。

public class JsonSerializer<T> implements Serializer<T> {

    private Gson gson = new Gson();

    @Override
    public void configure(Map<String, ?> map, boolean b) {

    }

    @Override
    public byte[] serialize(String topic, T t) {
        return gson.toJson(t).getBytes(Charset.forName("UTF-8"));
    }

    @Override
    public void close() {

    }
} 

public class JsonDeserializer<T> implements Deserializer<T> {

    private Gson gson = new Gson();
    private Class<T> deserializedClass;

    public JsonDeserializer(Class<T> deserializedClass) {
        this.deserializedClass = deserializedClass;
    }

    public JsonDeserializer() {
    }

    @Override
    public void configure(Map<String, ?> map, boolean b) {
        if(deserializedClass == null) {
            deserializedClass = (Class<T>) map.get("serializedClass");
        }
    }

    @Override
    public T deserialize(String s, byte[] bytes) {
         if(bytes == null){
             return null;
         }

        return gson.fromJson(new String(bytes),deserializedClass);

    }

    @Override
    public void close() {

    }
}
4

0 に答える 0