2

いくつかの flink DataSet を結合しようとしています。それらはSeqに含まれています。以下は、問題を生成するコードです

case class clickZap ( date: LocalDateTime, stbId:String, channelId :Int , nozap:Boolean)
val afterLastz: DataSet[clickZap]= ... 

val ma_range: IndexedSeq[DataSet[(Int, Option[(java.time.LocalDateTime, String, Int, Boolean)])]]  = for (i  <- Range (0,min_n))
      yield afterLastz.reduceGroup(it =>(i, maxBeforezTCZ(it,at plusMinutes(i))))
//val ma_all =  ma_range.slice(1, min_n).foldLeft(ma_range.head)(_ union _)
val ma_all = ma_range.head union(ma_range.tail.head)

私が得るものは

スレッド「メイン」org.apache.flink.api.common.InvalidProgramException での例外: 異なるタイプの入力を結合できません。Input1=scala.Tuple2(_1: 整数、_2: オプション[scala.Tuple4(_1: GenericType [java.time.LocalDateTime]、_2: 文字列、_3: 整数、_4: ブール値)])、input2=scala.Tuple2( _1: 整数、_2: オプション[scala.Tuple4(_1: GenericType[java.time.LocalDateTime]、_2: 文字列、_3: 整数、_4: Boolean)])

私は何が欠けていますか?種類が違うじゃないですか。ユニオン演算子は安価であるはずなので、問題を回避するのは魅力的ではないようです。最初の 2 行のコードは、DataSet 内のデータの型が同じであることを示す引数として指定しました。flink バージョン 0.9.0 と 0.9.1 を使用しました

4

1 に答える 1

2

問題は、Flink 自身のタイピング システムのバグです。OptionTypeInfoScala を表す は、適切なメソッドOptionを定義していませんでした。equalsその結果、2 つOptionTypeInfosが等しいとは検出されませんでした。

JIRA の問題を作成し、問題を修正するためにプル リクエストを開きました。プル リクエストは 2 日でマージされるはずです。その後、最新0.10-SNAPSHOTバージョンを使用すると、問題は修正されるはずです。

于 2015-09-07T23:17:04.107 に答える