0

以下のコードを使用して、VM から Kafka のテスト トピック (ホスト OS で 192.168.0.12 IP で実行) にデータ ストリームを送信しています。

public class WriteToKafka {

    public  static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // Use ingestion time => TimeCharacteristic == EventTime + IngestionTimeExtractor
        env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);

        DataStream<JoinedStreamEvent> joinedStreamEventDataStream = env
                .addSource(new JoinedStreamGenerator()).assignTimestampsAndWatermarks(new IngestionTimeExtractor<>());

        Properties properties = new Properties();

        properties.setProperty("bootstrap.servers", "192.168.0.12:9092");
        properties.setProperty("zookeeper.connect", "192.168.0.12:2181");
        properties.setProperty("group.id", "test");

        DataStreamSource<JoinedStreamEvent> stream = env.addSource(new JoinedStreamGenerator());
        stream.addSink(new FlinkKafkaProducer09<JoinedStreamEvent>("test", new TypeInformationSerializationSchema<>(stream.getType(),env.getConfig()), properties));

        env.execute();
    }

JoinedStreamEvent基本的DataSream<Tuple3<Integer,Integer,Integer>>に2つのストリームに参加respirationRateStreamし、 heartRateStream

 public JoinedStreamEvent(Integer patient_id, Integer heartRate, Integer respirationRate) {
        Patient_id = patient_id;
        HeartRate = heartRate;
        RespirationRate = respirationRate;

ホスト OS で実行されている別の Flink プログラムが kafka からデータ ストリームを読み取ろうとしています。ホスト OS で kafka と Zookeper が実行されているため、ここでは localhost を使用しています。

public class ReadFromKafka {


    public static void main(String[] args) throws Exception {
        // create execution environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9092");
        properties.setProperty("zookeeper.connect", "localhost:2181");
        properties.setProperty("group.id", "test");



       DataStream<String> message = env.addSource(new FlinkKafkaConsumer09<String>("test", new SimpleStringSchema(), properties));

       /* DataStream<JoinedStreamEvent> message = env.addSource(new FlinkKafkaConsumer09<JoinedStreamEvent>("test",
                new , properties));*/

        message.print();


        env.execute();


    } //main
} //ReadFromKafka

私はこのような出力を得ています

ここに画像の説明を入力

type のデシリアライザーを実装する必要があると思いますJoinedStreamEventJoinedStreamEvent誰かが私にどのように書くべきか、タイプのデシリアライザーを教えてくださいDataSream<Tuple3<Integer, Integer, Integer>>

他に何かする必要がある場合はお知らせください。

PS - 私は次のデシリアライザーを書くことを考えましたが、それは正しくないと思います

DataStream<JoinedStreamEvent> message = env.addSource(new FlinkKafkaConsumer09<JoinedStreamEvent>("test",
                new TypeInformationSerializationSchema<JoinedStreamEvent>() , properties));
4

1 に答える 1