問題タブ [apache-beam]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票する
0 に答える
582 参照

apache-flink - アイテムが正しくグループ化されていない - CoGroupByKey

CoGroupByKey の問題

データの説明。

2 つのデータセットがあります。

  • レコード- 1 つ目は、(key,day). テストには、2 ~ 3 個のキーと 5 ~ 10 日分のデータを使用します。私が狙うのは 1000 個以上のキーです。各レコードには、キー、μ 秒単位のタイムスタンプ、およびその他のデータが含まれています。
  • 構成- 2 つ目はかなり小さいです。これは時間内のキーを記述します。たとえば、タプルのリストと考えることができます: (key, start date, end date, description).

調査のために、データを長さのプレフィックス付きプロトコル バッファー バイナリ エンコード メッセージのファイルとしてエンコードしました。さらに、ファイルは gzip で圧縮されています。データは日付ごとに分割されます。各ファイルは約10MBです。

パイプライン

Apache Beam を使用してパイプラインを表現します。

  1. まず、両方のデータセットにキーを追加します。Records データセットの場合は(key, day rounded timestamp). Config の場合、キーは です。ここで、day はと(key, day)の間の各タイムスタンプ値です(真夜中を指す)。start dateend date
  2. データセットは、CoGroupByKey を使用してマージされます。

キータイプとして、リポジトリgithub.com/orian/tuple-coderorg.apache.flink.api.java.tuple.Tuple2で使用します。Tuple2Coder

問題

Records データセットが 5 日間のように小さい場合、すべて問題ないようです (normal_run.log を確認してください)。

パイプラインを 10 日以上実行すると、一部のレコードに構成がないことを示すエラーが発生します (wrong_run.log)。

次に、いくつかの追加のログ メッセージを追加しました。

最初の行で 68643 アイテムが KeyValue3 と時間 1462665600000000 に対して処理されたことを確認できます。
その後の 9 行目では、操作が同じキーを再度処理しているように見えますが、これらのレコードに対して使用できる構成がなかったことが報告されています。
行 10 は、no-loc としてマークされていることを通知します。

2 行目は、KeyValue3 と時刻 1463184000000000 のアイテムがなかったことを示していますが、11 行目では、この (キー、日) ペアのアイテムが後で処理され、構成が不足していることを読み取ることができます。

いくつかの手がかり

探索の実行中に例外が発生しました (exception_thrown.log)。

回避策 (さらにテストした後、動作せず、Tuple2 のまま)

Tuple2 の使用から Protocol Buffer メッセージの使用に切り替えました。

しかし、使用するTuple2.of()のはより簡単でした: KeyDay.newBuilder().setKey(...).setTimestampUsec(...).build().

protobuf.Message から派生したクラスであるキーに切り替えると、問題は 10 ~ 15 日間解消されました (したがって、Tuple2 の問題であったデータ サイズ)。

0 投票する
1 に答える
239 参照

java - Google データフローでパーソナライズされた WindowFn を作成する方法

WindowFn入力エントリのタイムスタンプではなく、別のフィールドに基づいて入力要素に Windows を割り当てるような方法で別のものを作成したいと思います。WindowFnGoogle DataFlow SDK の事前定義された は、ウィンドウを割り当てる基準としてタイムスタンプを使用することを知っています。

より具体的には、SlidingWindowsタイムスタンプをウィンドウ割り当て基準と見なす代わりに、別のフィールドをその基準と見なしたいと思います。

カスタマイズした を作成するにはどうすればよいWindowFnですか? 自作するときのポイントはWindowFn

ありがとう。

0 投票する
1 に答える
110 参照

google-cloud-dataflow - 障害時にデータフロー プロセスが回復しない

AZ 全体が機能停止で失われるという最近のインシデントに続いて、Dataflow フェイルオーバー手順をよりよく理解したいと思います。

データフロー ジョブ (ストリーミング、PubSub から BigQuery へ) のワーカー ノードを手動で削除したところ、それらは正常に再作成/再起動されましたが、データフロー プロセス自体は復旧していませんでした。

すべてのステータスが OK であるにもかかわらず、データ項目が流れていませんでした。

フローを再開する唯一の方法は、ジョブをキャンセルして再送信することでした。

手動による削除が有効なテストではないことは理解していますが、人的エラーの要因を軽視することはできません.

ワークフローが自動的に再開されるべきであると私は理解していますが、ここでは観察されたケースではありません。

何が恋しいですか?

0 投票する
1 に答える
482 参照

google-cloud-dataflow - キーごとの値の数を制限する

現在、データフロー プロセスがありますがGroupByKeyDoPargroup-by がキーごとに取得する値が多すぎるため、これに対する適切な解決策があるかどうかを知りたいと考えていました。私が言えることから、ウィンドウごとに値の最大数を設定する方法はありません。

現在、次の 3 つのオプションを検討しています。

  1. より小さなウィンドウ - イベントが時間内にクラスター化される可能性があるため、これにはまだ問題があると考えられます。
  2. キーを分割するためにすべてのキーにランダムな値を追加する - これも理想的ではありません。これは、入ってくるイベントが少なくなると、キーごとの値が少なすぎるためです。また、イベントの数が指数関数的に増加する場合、パーティションの数を調整することはできません。
  3. 派手なトリガーまたはコンバイナーの使用 - おそらく最良の解決策ですが、これを行う方法がわかりません。

これを行うための標準的な方法またはベストプラクティスはありますか?

0 投票する
1 に答える
548 参照

python - 最新の python apache_beam cloud datafow sdk を使用してクラウド データストアから読み取るためのカスタム ソースを作成する

最近、cloud dataflow python sdk が利用可能になり、それを使用することにしました。残念ながら、クラウド データストアからの読み取りのサポートはまだ提供されていないため、カスタム ソースの記述に頼る必要があります。これにより、約束どおりに動的分割、進行状況の見積もりなどの利点を利用できるようになります。ドキュメントを徹底的に調べましたが、プロセス全体をスピードアップできるように断片をまとめることができません。

より明確にするために、私の最初のアプローチは次のとおりです。

  1. クラウド データストアのクエリ
  2. ParDo 関数を作成し、返されたクエリをそれに渡します。

しかし、これでは 20 万件以上のエントリを繰り返すのに 13 分かかりました。

そこで、エンティティを効率的に読み取るカスタム ソースを作成することにしました。しかし、ピースをまとめるという私の理解が不足しているため、それを達成することはできません. データストアから読み取るためのカスタム ソースを作成する方法を教えてください。

編集: 最初のアプローチの場合、私の要点へのリンクは次のとおりです

ありがとうございました。

0 投票する
1 に答える
768 参照

google-cloud-dataflow - CoGroupByKey は、discardingFiredPanes でどのように機能しますか?

つまり、各 pcollection のすべての新しい要素で起動するトリガーを持つ GlobalWindow があり、discardingFiredPanes に設定されている場合、CoGroupByKey は、rhs が発火したときに lhs null で発火しますか、それとも lhs の最後の値で発火しますか?

これらの 2 つのデータセットを考えると

p1: |id|x1| |1 |10| |1 |11| |1 |12| p2: |id|x2| |1 |20| |1 |21| |1 |22|

私は期待すべきですか:

|id| x1 | x2 | |1 |[10]|null| |1 |null|[20]| |1 |[11]|null| |1 |null|[21]| |1 |[12]|null| |1 |null|[22]|

また:

|id| x1 | x2 | |1 |[10]|null| |1 |[10]|[20]| |1 |[11]|[20]| |1 |[11]|[21]| |1 |[12]|[21]| |1 |[12]|[22]|

0 投票する
3 に答える
3867 参照

python - Python Apache Beam で gzip ファイルを開く

現在、Apache Beam を使用して Python で gzip ファイルから読み取ることは可能ですか? 私のパイプラインは、次のコード行で gcs から gzip ファイルをプルしています。

しかし、私はこのエラーが発生しています:

Python ビーム ソース コードで、シンクへの書き込み時に圧縮ファイルが処理されているように見えることに気付きました。 https://github.com/apache/incubator-beam/blob/python-sdk/sdks/python/apache_beam/io/fileio.py#L445

より詳細なトレースバック: