問題タブ [flink-streaming]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票する
1 に答える
966 参照

java - Flink Streaming java.lang.Exception: タスクの呼び出し可能なクラスをロードできませんでした

基本的な Flink ストリーミング ジョブ (Java で) をローカルで実行しようとしています。Eclipse を使用してアプリケーションを実行すると、魅力的に機能します。しかし、Flink コマンドライン インターフェイスを使用して実行すると、次の例外が発生します。

Flink-Kafka 統合の例を data- artisans で実行しています。

0 投票する
1 に答える
2491 参照

java - Apache Flink ストリーミング ウィンドウのワードカウント

socketTextStream から単語をカウントする次のコードがあります。累積ワード カウントと時間ウィンドウ ワード カウントの両方が必要です。プログラムには、cumulateCounts が常にウィンドウ カウントと同じであるという問題があります。この問題が発生する理由 ウィンドウ化されたカウントに基づいて累積カウントを計算する正しい方法は何ですか?

0 投票する
1 に答える
1784 参照

apache-flink - Flink: CoFlatMapFunction で状態を共有する

で少し行き詰まりましたCoFlatMapFunction。ウィンドウの前に配置すると問題なく動作するように見えDataStreamますが、ウィンドウの「適用」関数の後に配置すると失敗します。

私は 2 つのストリームをテストしていました。データを継続的に取り込むメインの「機能」と、リクエストに応じてモデルを変更するflatMap1コントロール ストリーム「モデル」です。flatMap2

で b0/b1 を正しく設定して確認できますが、初期化時に 0 に設定されたように b0 と b1flatMap2flatMap1常に表示されます。

ここで明らかな何かが欠けていますか?

0 投票する
1 に答える
1995 参照

apache-flink - Flink InvalidTypesException: TypeVariable 'K' のタイプ 'class' を特定できませんでした

Flink 0.10.0 が最近リリースされました。0.9.1 からいくつかのコードを移行する必要があります。しかし、次のエラーが発生しました:

org.apache.flink.api.common.functions.InvalidTypesException:「クラス fi.aalto.dmg.frame.FlinkPairWorkloadOperator」の TypeVariable 'K' の型を特定できませんでした。これは、型消去の問題である可能性が最も高いです。型抽出は現在、戻り型のすべての変数が入力型から推定できる場合にのみ、ジェネリック変数を持つ型をサポートしています。

コードは次のとおりです。

InvalidTypesException がどのように発生するかを理解するために、この例外もスローする別の例がありますが、それについてはわかりません。このデモでは、プログラムは scala.Tuple2 で動作しますが、Flink Tuple2 では動作しません。

0 投票する
1 に答える
852 参照

apache-flink - Flink ファンアウト flatMap

Flink 0.10.0 DataStream を使用しています。これが私の要件です。

  • ソース システムは、メッセージをブロードキャストするカスタム システムです。私のカスタム SourceFunction 実装では、メッセージをリッスンするためにコールバックを実装しています。
  • 各コールバックは、異なるタイプのメッセージを受け取ります。
  • コールバックで受信したオブジェクトをデコード/変換して、SinkFunction に送信したいと考えています。これは FlatMapFunction などでできると思います。
  • さまざまなコールバックがあるため、リッスンするそれぞれのデコード ロジックは異なります。IN タイプが異なるため、それらすべてに対して単一の FlatMapFunction を持つことはできないと思います。

次のトポロジを持つシステムを設計するにはどうすればよいですか。

ソース
|- FlatMap_1 (コールバック 1 によって受信されたメッセージ タイプの処理) -> シンク
|- FlatMap_2 (コールバック 2 によって受信されたメッセージ タイプの処理) -> シンク
|- FlatMap_3 (コールバック 3 によって受信されたメッセージ タイプの処理) -> シンク
など。

一方の出力を他方に送信したくありません。これは基本的にファンアウトです。つまり、それぞれが並行して機能するようにし、受信したメッセージをどのオペレーターに送信するかをソースが判断できるようにしたいと考えています。

ドキュメントと例を確認しましたが、これに一致する例を見つけることができません。これについて助けていただければ幸いです。

0 投票する
2 に答える
764 参照

java - flink - クラスターを使用していないクラスター

私は、すべてが1台のマシンに割り当てられた最新のものまで、タスク(ステップ?ジョブ?)をかなり均等に分散していた3ノードクラスターをセットアップしました。

トポロジ (まだ flink にこの用語を使用していますか?):

kafka (3 topics on different feeds) -> flatmap -> union -> map

ここに画像の説明を入力

このセットアップについて、クラスター マネージャーにすべてを 1 台のマシンに配置するように指示する何かがありますか?

また、画像の「設定されていない」値は何ですか? 私が逃したいくつかのステップ?または、実装される予定の UI 機能はありますか?

0 投票する
2 に答える
2758 参照

scala - Apache Flink Streaming 0.10.0 で writeAsText に OVERWRITE を指定するには?

私はscalaにメソッドを持っています

counts.writeAsText(path_to_file)

ファイルが既に存在する場合は例外をスローし、指定することを提案します File or directory already exists. Existing files and directories are not overwritten in NO_OVERWRITE mode. Use OVERWRITE mode to overwrite existing files and directories.。しかし、私は DataStream クラスで を受け入れるメソッドを見つけていませんorg.apache.flink.core.fs.FileSystem.WriteMode。ミリ秒の Long を受け入れる署名のみがあります。