状態ストアを作成する場合は、使用するタイプのシリアライザーとデシリアライザークラスを提供する必要があります。Kafka Stream には、シリアライザーとデシリアライザーを単一のクラスにラップするSerdeと呼ばれる単一の抽象化があります。
使用する場合は、.withValues(Class<K> keyClass)
それを保持する必要があります
@param keyClass キーのクラス。これは、Kafka が組み込みの serdes を持っているタイプの 1 つである必要があります。
組み込みのSerdes
forがないためHashMap
、最初に 1 つ実装し (おそらく と呼ばれるHashMapSerde
)、このクラスをメソッドに渡す必要があります.withValues(Serde<K> keySerde)
。さらに、実際のシリアライザーとデシリアライザーも実装する必要がありHashMap
ます。HashMap のジェネリック型がわかっている場合は、それらを指定する必要があります (これにより、シリアライザーとデシリアライザーの実装がはるかに簡単になります。
このようなもの (単なるスケッチ; ジェネリック型は省略):
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.Deserializer;
public class HashMapSerde implements Serde<HashMap> {
void configure(Map<String, ?> configs, boolean isKey) {
/* put your code here */
}
void close() {
/* put your code here */
}
Serializer<HashMap> serializer() {
return new Serializer<HashMap>() {
public void configure(Map<String, ?> configs, boolean isKey) {
/* put your code here */
}
public byte[] serialize(String topic, T data) {
/* put your code here */
}
public void close() {
/* put your code here */
}
};
}
Deserializer<HashMap> deserializer() {
return new Deserializer<HashMap>() {
public void configure(Map<String, ?> configs, boolean isKey) {
/* put your code here */
}
public T deserialize(String topic, byte[] data) {
/* put your code here */
}
public void close() {
/* put your code here */
}
};
}
}
(デ) シリアライザーと を実装する方法の例を見たい場合Serde
は、https://github.com/apache/kafka/tree/trunk/clients/src/main/java/org/apache/kafka/をご覧ください。 common/serializationおよびhttps://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/serialization/Serdes.java