0

Apache Flink (Scala API) を使い始めたばかりです。問題は次のとおりです。Flink サイトの例に基づいて、Kafka から Apache Flink にデータをストリーミングしようとしています。

val stream =
  env.addSource(new FlinkKafkaConsumer09("testing", new SimpleStringSchema() , properties))

すべてが正常に機能し、stream.print() ステートメントによって画面に次のように表示されます。

2018-05-16 10:22:44 午前|1|11|-71.16|40.27

データをロードするためにケースクラスを使用したいのですが、使用してみました

flatMap(p=>p.split("|")) 

ただし、一度に 1 文字ずつデータを分割するだけです。

基本的に期待される結果は、次のようにケース クラスの 5 つのフィールドにデータを入力できることです。

  field(0)=2018-05-16 10:22:44 AM
  field(1)=1
  field(2)=11
  field(3)=-71.16 
  field(4)=40.27

しかし、それは今やっています:

   field(0) = 2
   field(1) = 0
   field(3) = 1
   field(4) = 8 

等...

アドバイスをいただければ幸いです。

前もって感謝します

フランク

4

1 に答える 1