問題タブ [apache-beam]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
java - Flink バックエンドを使用する Apache Beam が、protobuf-java ライブラリ メソッドの呼び出しで NoSuchMethodError をスローする
Protocol Buffer を使用して Beam 関数間でデータを渡す単純なパイプラインをローカル クラスターで実行しようとしています。はcom.google.protobuf:protobuf-java
FatJar に含まれています。
実行すると、すべて正常に動作します。
しかし、flink クラスターで実行しようとすると失敗します。
プロジェクトを実行する準備ができました: https://github.com/orian/beam-flink-local-cluster
Beam のバージョンは 0.3-Snapshot (ブリーディング エッジ) で、バージョン 1.0.3 の Flink を使用しています。私のローカル クラスターはバージョン1.0.3を実行しています。Flink は protobuf-java 2.5.0 を使用します。
プログラムは次の例外で終了しました:
nullpointerexception - Apache Beam のデータフロー SDK を使用して BigTable に書き込むときに NullPointerException がキャッチされる
Apache's Beam
SDK バージョンを使用0.2.0-incubating-SNAPSHOT
しており、ランナーを使用してデータをビッグテーブルにプルしようとしていますDataflow
。残念ながら、シンクとしてNullPointerException
使用しているデータフロー パイプラインを実行すると、エラーが発生します。BigTableIO.Write
すでにチェック済みBigtableOptions
で、必要に応じてパラメーターは問題ありません。
基本的に、私は作成し、パイプラインのある時点でPCollection<KV<ByteString, Iterable<Mutation>>>
、目的のビッグテーブルに書き込むステップがあります。
パイプラインを実行すると、メソッドNullPointerException
で BigtableIO クラスを正確に指すが得られました。public void processElement(ProcessContext c)
bigtable に書き込む前に、このメソッドがすべての要素を処理していることを確認しましたが、このパイプラインを実行すると、なぜそのような例外が発生するのかわかりません。以下のコードによると、このメソッドはbigtableWriter
属性を使用して各 を処理しますc.element()
が、ブレークポイントを設定してデバッグすることさえできませんnull
。この問題を解決するためのアドバイスや提案はありますか?
ありがとう。
python - Apache Beam データフローの出力 csv のヘッダーを追加するにはどうすればよいですか?
java sdk に、csv ファイルのヘッダーを書き込める関数があることに気付きました。 https://cloud.google.com/dataflow/java-sdk/JavaDoc/com/google/cloud/dataflow/sdk/io/TextIO.Write.html#withHeader-java.lang.String-
この機能は python skd に反映されていますか?
python - groupBy がパイプラインのボトルネックになっているのはなぜですか?
Python apache-beam で記述されたパイプラインがあります。800,000 のタイムスタンプ付きデータを、1 秒ごとにオーバーラップする 2 秒のウィンドウにウィンドウ化します。私の要素には異なるキーがあるかもしれません。
groupBy を実行すると、完了するまでに 3 時間かかります。10 個のワーカーを使用してクラウド データフローにデプロイされています。ワーカーの数を増やしても、処理速度が大幅に向上することはありません。この変換がパイプラインのボトルネックになっているのはなぜですか?
google-bigquery - ParDo 関数内から BigQuery への書き込み
ParDo 関数内から操作を呼び出して、beam.io.Write(beam.io.BigQuerySink(..))
キーごとに個別の BigQuery テーブルを生成したいと考えていますPCollection
(私は Python SDK を使用しています)。残念ながら役に立たなかった2つの同様のスレッドを次に示します。
1) https://stackoverflow.com/questions/31156774/about-key-grouping-with-groupbykey
2)データフロー パイプラインから BQ に書き込むときの動的テーブル名
次のコードを実行すると、最初のキーの行が BigQuery に挿入され、パイプラインが次のエラーで失敗します。私が間違っていることについての提案、またはそれを修正する方法についての提案を本当に感謝します。
パイプライン コード:
エラーメッセージ:
編集(最初の回答の後):
自分の値が である必要があることに気づきませんでしたPCollection
。
コードを次のように変更しました (これはおそらく非常に非効率的です)。
これはローカルでBlockingDataflowPipelineRunner
は正常に機能しますが、 :-(では機能しません
パイプラインは次のエラーで失敗します。
python - Python Apache Beamのウィンドウで要素を注文するにはどうすればよいですか?
java apache ビームにはクラス groupby.sortbytimestamp があることに気付きました python にはまだその機能が実装されていますか? そうでない場合、ウィンドウ内の要素をソートする方法は何ですか? ウィンドウ全体を DoFn で並べ替えることができると思いますが、もっと良い方法があるかどうか知りたいです。