Apache Flink (Scala API) を使い始めたばかりです。問題は次のとおりです。Flink サイトの例に基づいて、Kafka から Apache Flink にデータをストリーミングしようとしています。
val stream =
env.addSource(new FlinkKafkaConsumer09("testing", new SimpleStringSchema() , properties))
すべてが正常に機能し、stream.print() ステートメントによって画面に次のように表示されます。
2018-05-16 10:22:44 午前|1|11|-71.16|40.27
データをロードするためにケースクラスを使用したいのですが、使用してみました
flatMap(p=>p.split("|"))
ただし、一度に 1 文字ずつデータを分割するだけです。
基本的に期待される結果は、次のようにケース クラスの 5 つのフィールドにデータを入力できることです。
field(0)=2018-05-16 10:22:44 AM
field(1)=1
field(2)=11
field(3)=-71.16
field(4)=40.27
しかし、それは今やっています:
field(0) = 2
field(1) = 0
field(3) = 1
field(4) = 8
等...
アドバイスをいただければ幸いです。
前もって感謝します
フランク