問題タブ [apache-beam]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票する
0 に答える
519 参照

java - Flink バックエンドを使用する Apache Beam が、protobuf-java ライブラリ メソッドの呼び出しで NoSuchMethodError をスローする

Protocol Buffer を使用して Beam 関数間でデータを渡す単純なパイプラインをローカル クラスターで実行しようとしています。はcom.google.protobuf:protobuf-javaFatJar に含まれています。

実行すると、すべて正常に動作します。

しかし、flink クラスターで実行しようとすると失敗します。

プロジェクトを実行する準備ができました: https://github.com/orian/beam-flink-local-cluster

Beam のバージョンは 0.3-Snapshot (ブリーディング エッジ) で、バージョン 1.0.3 の Flink を使用しています。私のローカル クラスターはバージョン1.0.3を実行しています。Flink は protobuf-java 2.5.0 を使用します。


プログラムは次の例外で終了しました:

0 投票する
1 に答える
358 参照

nullpointerexception - Apache Beam のデータフロー SDK を使用して BigTable に書き込むときに NullPointerException がキャッチされる

Apache's BeamSDK バージョンを使用0.2.0-incubating-SNAPSHOT しており、ランナーを使用してデータをビッグテーブルにプルしようとしていますDataflow。残念ながら、シンクとしてNullPointerException使用しているデータフロー パイプラインを実行すると、エラーが発生します。BigTableIO.Writeすでにチェック済みBigtableOptionsで、必要に応じてパラメーターは問題ありません。

基本的に、私は作成し、パイプラインのある時点でPCollection<KV<ByteString, Iterable<Mutation>>>、目的のビッグテーブルに書き込むステップがあります。

パイプラインを実行すると、メソッドNullPointerExceptionで BigtableIO クラスを正確に指すが得られました。public void processElement(ProcessContext c)

bigtable に書き込む前に、このメソッドがすべての要素を処理していることを確認しましたが、このパイプラインを実行すると、なぜそのような例外が発生するのかわかりません。以下のコードによると、このメソッドはbigtableWriter属性を使用して各 を処理しますc.element()が、ブレークポイントを設定してデバッグすることさえできませんnull。この問題を解決するためのアドバイスや提案はありますか?

ありがとう。

0 投票する
4 に答える
4450 参照

python - Apache Beam データフローの出力 csv のヘッダーを追加するにはどうすればよいですか?

java sdk に、csv ファイルのヘッダーを書き込める関数があることに気付きました。 https://cloud.google.com/dataflow/java-sdk/JavaDoc/com/google/cloud/dataflow/sdk/io/TextIO.Write.html#withHeader-java.lang.String-

この機能は python skd に反映されていますか?

0 投票する
1 に答える
239 参照

python - groupBy がパイプラインのボトルネックになっているのはなぜですか?

Python apache-beam で記述されたパイプラインがあります。800,000 のタイムスタンプ付きデータを、1 秒ごとにオーバーラップする 2 秒のウィンドウにウィンドウ化します。私の要素には異なるキーがあるかもしれません。

groupBy を実行すると、完了するまでに 3 時間かかります。10 個のワーカーを使用してクラウド データフローにデプロイされています。ワーカーの数を増やしても、処理速度が大幅に向上することはありません。この変換がパイプラインのボトルネックになっているのはなぜですか?

0 投票する
1 に答える
2541 参照

google-bigquery - ParDo 関数内から BigQuery への書き込み

ParDo 関数内から操作を呼び出して、beam.io.Write(beam.io.BigQuerySink(..))キーごとに個別の BigQuery テーブルを生成したいと考えていますPCollection(私は Python SDK を使用しています)。残念ながら役に立たなかった2つの同様のスレッドを次に示します。

1) https://stackoverflow.com/questions/31156774/about-key-grouping-with-groupbykey

2)データフロー パイプラインから BQ に書き込むときの動的テーブル名

次のコードを実行すると、最初のキーの行が BigQuery に挿入され、パイプラインが次のエラーで失敗します。私が間違っていることについての提案、またはそれを修正する方法についての提案を本当に感謝します。

パイプライン コード:

エラーメッセージ:


編集(最初の回答の後):

自分のが である必要があることに気づきませんでしたPCollection

コードを次のように変更しました (これはおそらく非常に非効率的です)。

これはローカルでBlockingDataflowPipelineRunnerは正常に機能しますが、 :-(では機能しません

パイプラインは次のエラーで失敗します。

0 投票する
2 に答える
1603 参照

python - Python Apache Beamのウィンドウで要素を注文するにはどうすればよいですか?

java apache ビームにはクラス groupby.sortbytimestamp があることに気付きました python にはまだその機能が実装されていますか? そうでない場合、ウィンドウ内の要素をソートする方法は何ですか? ウィンドウ全体を DoFn で並べ替えることができると思いますが、もっと良い方法があるかどうか知りたいです。

0 投票する
1 に答える
929 参照

scala - Dataflow を使用して Cloud Storage に PubSub ストリームを書き込むときにエラーが発生しました

からSCIOを使用 spotifyして のジョブを書き込みます。たとえば、 g1e.g2Dataflowの 2 つの例に従って にストリームを書き込みますが、以下のコードでは次のエラーが発生します。PubSubGCS

エラー

コード

ウィンドウの概念を Bounded PCollection と混同している可能性があります。これを達成する方法はありますか、それともこれを実現するために何らかの変換を適用する必要がありますか?