CoGroupByKey の問題
データの説明。
2 つのデータセットがあります。
- レコード- 1 つ目は、
(key,day)
. テストには、2 ~ 3 個のキーと 5 ~ 10 日分のデータを使用します。私が狙うのは 1000 個以上のキーです。各レコードには、キー、μ 秒単位のタイムスタンプ、およびその他のデータが含まれています。 - 構成- 2 つ目はかなり小さいです。これは時間内のキーを記述します。たとえば、タプルのリストと考えることができます:
(key, start date, end date, description)
.
調査のために、データを長さのプレフィックス付きプロトコル バッファー バイナリ エンコード メッセージのファイルとしてエンコードしました。さらに、ファイルは gzip で圧縮されています。データは日付ごとに分割されます。各ファイルは約10MBです。
パイプライン
Apache Beam を使用してパイプラインを表現します。
- まず、両方のデータセットにキーを追加します。Records データセットの場合は
(key, day rounded timestamp)
. Config の場合、キーは です。ここで、day はと(key, day)
の間の各タイムスタンプ値です(真夜中を指す)。start date
end date
- データセットは、CoGroupByKey を使用してマージされます。
キータイプとして、リポジトリgithub.com/orian/tuple-coderorg.apache.flink.api.java.tuple.Tuple2
で使用します。Tuple2Coder
問題
Records データセットが 5 日間のように小さい場合、すべて問題ないようです (normal_run.log を確認してください)。
INFO [main] (FlinkPipelineRunner.java:124) - Final aggregator values:
INFO [main] (FlinkPipelineRunner.java:127) - item count : 4322332
INFO [main] (FlinkPipelineRunner.java:127) - missing val1 : 0
INFO [main] (FlinkPipelineRunner.java:127) - multiple val1 : 0
パイプラインを 10 日以上実行すると、一部のレコードに構成がないことを示すエラーが発生します (wrong_run.log)。
INFO [main] (FlinkPipelineRunner.java:124) - Final aggregator values:
INFO [main] (FlinkPipelineRunner.java:127) - item count : 8577197
INFO [main] (FlinkPipelineRunner.java:127) - missing val1 : 6
INFO [main] (FlinkPipelineRunner.java:127) - multiple val1 : 0
次に、いくつかの追加のログ メッセージを追加しました。
(a.java:144) - 68643 items for KeyValue3 on: 1462665600000000
(a.java:140) - no items for KeyValue3 on: 1463184000000000
(a.java:123) - missing for KeyValue3 on: 1462924800000000
(a.java:142) - 753707 items for KeyValue3 on: 1462924800000000 marked as no-loc
(a.java:123) - missing for KeyValue3 on: 1462752000000000
(a.java:142) - 749901 items for KeyValue3 on: 1462752000000000 marked as no-loc
(a.java:144) - 754578 items for KeyValue3 on: 1462406400000000
(a.java:144) - 751574 items for KeyValue3 on: 1463011200000000
(a.java:123) - missing for KeyValue3 on: 1462665600000000
(a.java:142) - 754758 items for KeyValue3 on: 1462665600000000 marked as no-loc
(a.java:123) - missing for KeyValue3 on: 1463184000000000
(a.java:142) - 694372 items for KeyValue3 on: 1463184000000000 marked as no-loc
最初の行で 68643 アイテムが KeyValue3 と時間 1462665600000000 に対して処理されたことを確認できます。
その後の 9 行目では、操作が同じキーを再度処理しているように見えますが、これらのレコードに対して使用できる構成がなかったことが報告されています。
行 10 は、no-loc としてマークされていることを通知します。
2 行目は、KeyValue3 と時刻 1463184000000000 のアイテムがなかったことを示していますが、11 行目では、この (キー、日) ペアのアイテムが後で処理され、構成が不足していることを読み取ることができます。
いくつかの手がかり
探索の実行中に例外が発生しました (exception_thrown.log)。
05/26/2016 03:49:49 GroupReduce (GroupReduce at GroupByKey)(1/5) switched to FAILED
java.lang.Exception: The data preparation for task 'GroupReduce (GroupReduce at GroupByKey)' , caused an error: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: tried to access field com.esotericsoftware.kryo.io.Input.inputStream from class org.apache.flink.api.java.typeutils.runtime.NoFetchingInput
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:455)
at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: tried to access field com.esotericsoftware.kryo.io.Input.inputStream from class org.apache.flink.api.java.typeutils.runtime.NoFetchingInput
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1079)
at org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:94)
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:450)
... 3 more
Caused by: java.io.IOException: Thread 'SortMerger spilling thread' terminated due to an exception: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: tried to access field com.esotericsoftware.kryo.io.Input.inputStream from class org.apache.flink.api.java.typeutils.runtime.NoFetchingInput
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:799)
Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: tried to access field com.esotericsoftware.kryo.io.Input.inputStream from class org.apache.flink.api.java.typeutils.runtime.NoFetchingInput
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
at org.apache.flink.runtime.operators.sort.LargeRecordHandler.finishWriteAndSortKeys(LargeRecordHandler.java:263)
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1409)
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' terminated due to an exception: tried to access field com.esotericsoftware.kryo.io.Input.inputStream from class org.apache.flink.api.java.typeutils.runtime.NoFetchingInput
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:799)
Caused by: java.lang.IllegalAccessError: tried to access field com.esotericsoftware.kryo.io.Input.inputStream from class org.apache.flink.api.java.typeutils.runtime.NoFetchingInput
at org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.readBytes(NoFetchingInput.java:122)
at com.esotericsoftware.kryo.io.Input.readBytes(Input.java:297)
at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.read(DefaultArraySerializers.java:35)
at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.read(DefaultArraySerializers.java:18)
at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:706)
at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:611)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:242)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:144)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:144)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
at org.apache.flink.runtime.io.disk.InputViewIterator.next(InputViewIterator.java:43)
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:973)
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
回避策 (さらにテストした後、動作せず、Tuple2 のまま)
Tuple2 の使用から Protocol Buffer メッセージの使用に切り替えました。
message KeyDay {
optional ByteString key = 1;
optional int64 timestamp_usec = 2;
}
しかし、使用するTuple2.of()
のはより簡単でした: KeyDay.newBuilder().setKey(...).setTimestampUsec(...).build()
.
protobuf.Message から派生したクラスであるキーに切り替えると、問題は 10 ~ 15 日間解消されました (したがって、Tuple2 の問題であったデータ サイズ)。