1

CoGroupByKey の問題

データの説明。

2 つのデータセットがあります。

  • レコード- 1 つ目は、(key,day). テストには、2 ~ 3 個のキーと 5 ~ 10 日分のデータを使用します。私が狙うのは 1000 個以上のキーです。各レコードには、キー、μ 秒単位のタイムスタンプ、およびその他のデータが含まれています。
  • 構成- 2 つ目はかなり小さいです。これは時間内のキーを記述します。たとえば、タプルのリストと考えることができます: (key, start date, end date, description).

調査のために、データを長さのプレフィックス付きプロトコル バッファー バイナリ エンコード メッセージのファイルとしてエンコードしました。さらに、ファイルは gzip で圧縮されています。データは日付ごとに分割されます。各ファイルは約10MBです。

パイプライン

Apache Beam を使用してパイプラインを表現します。

  1. まず、両方のデータセットにキーを追加します。Records データセットの場合は(key, day rounded timestamp). Config の場合、キーは です。ここで、day はと(key, day)の間の各タイムスタンプ値です(真夜中を指す)。start dateend date
  2. データセットは、CoGroupByKey を使用してマージされます。

キータイプとして、リポジトリgithub.com/orian/tuple-coderorg.apache.flink.api.java.tuple.Tuple2で使用します。Tuple2Coder

問題

Records データセットが 5 日間のように小さい場合、すべて問題ないようです (normal_run.log を確認してください)。

INFO [main] (FlinkPipelineRunner.java:124) - Final aggregator values:
INFO [main] (FlinkPipelineRunner.java:127) - item count : 4322332
INFO [main] (FlinkPipelineRunner.java:127) - missing val1 : 0
INFO [main] (FlinkPipelineRunner.java:127) - multiple val1 : 0

パイプラインを 10 日以上実行すると、一部のレコードに構成がないことを示すエラーが発生します (wrong_run.log)。

INFO [main] (FlinkPipelineRunner.java:124) - Final aggregator values:
INFO [main] (FlinkPipelineRunner.java:127) - item count : 8577197
INFO [main] (FlinkPipelineRunner.java:127) - missing val1 : 6
INFO [main] (FlinkPipelineRunner.java:127) - multiple val1 : 0

次に、いくつかの追加のログ メッセージを追加しました。

(a.java:144) - 68643 items for KeyValue3 on: 1462665600000000
(a.java:140) - no items for KeyValue3 on: 1463184000000000
(a.java:123) - missing for KeyValue3 on: 1462924800000000
(a.java:142) - 753707 items for KeyValue3 on: 1462924800000000 marked as no-loc
(a.java:123) - missing for KeyValue3 on: 1462752000000000
(a.java:142) - 749901 items for KeyValue3 on: 1462752000000000 marked as no-loc
(a.java:144) - 754578 items for KeyValue3 on: 1462406400000000
(a.java:144) - 751574 items for KeyValue3 on: 1463011200000000
(a.java:123) - missing for KeyValue3 on: 1462665600000000
(a.java:142) - 754758 items for KeyValue3 on: 1462665600000000 marked as no-loc
(a.java:123) - missing for KeyValue3 on: 1463184000000000
(a.java:142) - 694372 items for KeyValue3 on: 1463184000000000 marked as no-loc

最初の行で 68643 アイテムが KeyValue3 と時間 1462665600000000 に対して処理されたことを確認できます。
その後の 9 行目では、操作が同じキーを再度処理しているように見えますが、これらのレコードに対して使用できる構成がなかったことが報告されています。
行 10 は、no-loc としてマークされていることを通知します。

2 行目は、KeyValue3 と時刻 1463184000000000 のアイテムがなかったことを示していますが、11 行目では、この (キー、日) ペアのアイテムが後で処理され、構成が不足していることを読み取ることができます。

いくつかの手がかり

探索の実行中に例外が発生しました (exception_thrown.log)。

05/26/2016 03:49:49 GroupReduce (GroupReduce at GroupByKey)(1/5) switched to FAILED
java.lang.Exception: The data preparation for task 'GroupReduce (GroupReduce at GroupByKey)' , caused an error: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: tried to access field com.esotericsoftware.kryo.io.Input.inputStream from class org.apache.flink.api.java.typeutils.runtime.NoFetchingInput
  at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:455)
  at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
  at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: tried to access field com.esotericsoftware.kryo.io.Input.inputStream from class org.apache.flink.api.java.typeutils.runtime.NoFetchingInput
  at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
  at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1079)
  at org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:94)
  at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:450)
  ... 3 more
Caused by: java.io.IOException: Thread 'SortMerger spilling thread' terminated due to an exception: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: tried to access field com.esotericsoftware.kryo.io.Input.inputStream from class org.apache.flink.api.java.typeutils.runtime.NoFetchingInput
  at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:799)
Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: tried to access field com.esotericsoftware.kryo.io.Input.inputStream from class org.apache.flink.api.java.typeutils.runtime.NoFetchingInput
  at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
  at org.apache.flink.runtime.operators.sort.LargeRecordHandler.finishWriteAndSortKeys(LargeRecordHandler.java:263)
  at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1409)
  at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' terminated due to an exception: tried to access field com.esotericsoftware.kryo.io.Input.inputStream from class org.apache.flink.api.java.typeutils.runtime.NoFetchingInput
  at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:799)
Caused by: java.lang.IllegalAccessError: tried to access field com.esotericsoftware.kryo.io.Input.inputStream from class org.apache.flink.api.java.typeutils.runtime.NoFetchingInput
  at org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.readBytes(NoFetchingInput.java:122)
  at com.esotericsoftware.kryo.io.Input.readBytes(Input.java:297)
  at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.read(DefaultArraySerializers.java:35)
  at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.read(DefaultArraySerializers.java:18)
  at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:706)
  at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:611)
  at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
  at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732)
  at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228)
  at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:242)
  at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:144)
  at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
  at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:144)
  at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
  at org.apache.flink.runtime.io.disk.InputViewIterator.next(InputViewIterator.java:43)
  at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:973)
  at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)

回避策 (さらにテストした後、動作せず、Tuple2 のまま)

Tuple2 の使用から Protocol Buffer メッセージの使用に切り替えました。

message KeyDay {
  optional ByteString key = 1;
  optional int64 timestamp_usec = 2;
}

しかし、使用するTuple2.of()のはより簡単でした: KeyDay.newBuilder().setKey(...).setTimestampUsec(...).build().

protobuf.Message から派生したクラスであるキーに切り替えると、問題は 10 ~ 15 日間解消されました (したがって、Tuple2 の問題であったデータ サイズ)。

4

0 に答える 0