6

String キー HashMap を値として持つ状態ストアを作成する必要があります。以下の2つの方法を試しました。

// First method
StateStoreSupplier avgStoreNew = Stores.create("AvgsNew")
          .withKeys(Serdes.String())
          .withValues(HashMap.class)
          .persistent()
          .build();

// Second method
HashMap<String ,Double> h = new HashMap<String ,Double>();

StateStoreSupplier avgStore1 = Stores.create("Avgs")
          .withKeys(Serdes.String())
          .withValues(Serdes.serdeFrom(h.getClass()))
          .persistent()
          .build();

コードはエラーなしで正常にコンパイルされますが、実行時エラーが発生します

io.confluent.examples.streams.WordCountProcessorAPIException in thread "main" java.lang.IllegalArgumentException: Unknown class for built-in serializer

状態ストアを作成する正しい方法を教えてもらえますか?

4

1 に答える 1

6

状態ストアを作成する場合は、使用するタイプのシリアライザーデシリアライザークラスを提供する必要があります。Kafka Stream には、シリアライザーとデシリアライザーを単一のクラスにラップするSerdeと呼ばれる単一の抽象化があります。

使用する場合は、.withValues(Class<K> keyClass)それを保持する必要があります

@param keyClass キーのクラス。これは、Kafka が組み込みの serdes を持っているタイプの 1 つである必要があります。

組み込みのSerdesforがないためHashMap、最初に 1 つ実装し (おそらく と呼ばれるHashMapSerde)、このクラスをメソッドに渡す必要があります.withValues(Serde<K> keySerde)。さらに、実際のシリアライザーとデシリアライザーも実装する必要がありHashMapます。HashMap のジェネリック型がわかっている場合は、それらを指定する必要があります (これにより、シリアライザーとデシリアライザーの実装がはるかに簡単になります。

このようなもの (単なるスケッチ; ジェネリック型は省略):

import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.Deserializer;

public class HashMapSerde implements Serde<HashMap> {

    void configure(Map<String, ?> configs, boolean isKey) {
        /* put your code here */
    }

    void close() {
        /* put your code here */
    }

    Serializer<HashMap> serializer() {
        return new Serializer<HashMap>() {
            public void configure(Map<String, ?> configs, boolean isKey) {
                /* put your code here */
            }

            public byte[] serialize(String topic, T data) {
                /* put your code here */
            }

            public void close() {
                /* put your code here */
            }
        };
    }

    Deserializer<HashMap> deserializer() {
        return new Deserializer<HashMap>() {
            public void configure(Map<String, ?> configs, boolean isKey) {
                /* put your code here */
            }

            public T deserialize(String topic, byte[] data) {
                /* put your code here */
            }

            public void close() {
                /* put your code here */
            }
        };
    }
}

(デ) シリアライザーと を実装する方法の例を見たい場合Serdeは、https://github.com/apache/kafka/tree/trunk/clients/src/main/java/org/apache/kafka/をご覧ください。 common/serializationおよびhttps://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/serialization/Serdes.java

于 2016-08-29T10:44:53.570 に答える