問題タブ [apache-flink]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票する
1 に答える
85 参照

apache-spark - Spark または Flink を使用する場合、HDFS での位置認識はどのように達成されますか?

Spark または Flink の実行エンジン (マスター スケジューラ) が各ブロックに適したワーカーをどのように見つけ出すのか疑問に思っています。

namenode はブロックの正確な位置を彼らに伝えることができますが、このタスクは Spark と Flink のジョブ マネージャーによって行われますか、それとも YARN の出番なのでしょうか?

0 投票する
0 に答える
327 参照

apache-spark - Flink - ステートフルな計算

Apache Flink を使用して、次の問題の解決策を見つけるのに苦労しています。

ローカル フォルダー内のファイルによって表されるベクトルのストリームがあります。を使用して新しいテキスト ファイルをDataStream<String> text = env.readFileStream(...)見つけた後、入力を に変換 (flatMap) します。IntegerSingleOutputStreamOperator<Tuple2<String, Integer>, ?>はスコアリング関数からのスコアです。

スコアを使用して、トップ k ベクトルを含むグローバル HashMap を保持したいと考えています。ステートフル変換を使用して問題に取り組みました。

  1. 私が抱えている最初の問題は、HashMap がシンクごとのデータを保持することです (つまり、ワーカーのスレッドごとに、データの HashMap が 1 つ)。それをグローバルコレクションにするにはどうすればよいですか

  2. Apache Spark を使用して、私はそれを可能にしました。 JavaPairDStream<String, Integer> stateDstream = tuples.updateStateByKey(updateFunction);

で変換を行いstateDstreamます。FLink を使用して同じ機能を得る方法はありますか?

前もって感謝します!

0 投票する
1 に答える
2833 参照

streaming - クラスター内の Apache Flink ストリーミングは、ワーカーでジョブを分割しません

私の目的は、Kafka をソースとして使用し、Flink をストリーム処理エンジンとして使用して、高スループットのクラスターをセットアップすることです。これが私がやったことです。

マスターとワーカーで次の構成の 2 ノード クラスターをセットアップしました。

マスター flink-conf.yaml

ワーカー flink-conf.yaml

マスター ノードのslavesファイルは次のようになります。

両方のノードの flink セットアップは、同じ名前のフォルダーにあります。実行して、マスターでクラスターを起動します

これにより、Worker ノードでタスク マネージャーが起動します。

私の入力ソースは Kafka です。これがスニペットです。

ここに私のシンク機能があります

これが私の pom.xml の Flink 依存関係です。

次に、マスターでこのコマンドを使用してパッケージ化されたjarを実行します

ただし、メッセージを Kafka トピックに挿入すると、Kafka トピックから着信するすべてのメッセージを (SinkFunction実装の呼び出しメソッドのデバッグ メッセージを介して) マスター ノードだけで説明できます。

ジョブ マネージャーの UI では、次のように 2 つのタスク マネージャーが表示されます。 ジョブ マネージャー ダッシュボード - タスク マネージャー

また、ダッシュボードは次のようになります: 質問:ここに画像の説明を入力

  1. ワーカー ノードがタスクを取得しないのはなぜですか?
  2. いくつかの構成がありませんか?
0 投票する
0 に答える
316 参照

java - Flink ジョブでオブジェクトを初期化するために使用される HDFS 上のファイル

私は Flink でこの奇妙な問題を抱えています: 仕事で、事前にコンパイルされたリソース ファイルでオブジェクトを初期化する必要があります。そして、ジョブが開始されてから、初回は問題なく実行されます。しかし、最初とまったく同じように Web インターフェイスで再度アクセスすると、オブジェクトを初期化できず、エラーは次のようになります。

java.nio.file.NoSuchFileException: hdfs:/.../.../my_file

ファイルがそこにあり、オブジェクトが初めて正常に初期化されるため、混乱します。関連するコードは次のとおりです。

リソースがローカル システム上にあり、同じジョブでローカル サーバーを実行している場合、問題は発生しません。それで、誰も手がかりを持っていますか?

編集:完全なトレース

0 投票する
2 に答える
824 参照

twitter-streaming-api - Apache Flink を使用して Twitter ストリーミング API に接続する際の IOExcpetion

Apache Flink Streaming API を使用して Twitter のツイートを読み取る小さな Scala プログラムを作成しました。

実行すると、次の問題が発生します。

プログラムは接続を再確立しようとします。したがって、この 4 行のログ メッセージは引き続き出力されます。

これに関する奇妙な点は、Apache Flink プロジェクトで提供されているを実行すると、すべてが正常に機能することです (マスターの最新バージョンを GitHub から取得しました)。私も同じプロパティファイルを使用しています。そのサンプル クラスを自分のプロジェクトにコピーすると、上記の問題状態も発生します。

Flink アーキタイプを使用して、独自のプロジェクトを作成しました。バージョン 0.9.1 と 0.10-SNAPSHOT で試しました。依存関係flink-scalaflink-streaming-scalaflink-clientsおよびflink-connector-twitterは、対応するバージョンで使用されます。

誰かが同様の問題を経験したことがあり、私を正しい軌道に乗せることができますか?

0 投票する
1 に答える
596 参照

scala - apache flink のユニオン型の混乱?

いくつかの flink DataSet を結合しようとしています。それらはSeqに含まれています。以下は、問題を生成するコードです

私が得るものは

スレッド「メイン」org.apache.flink.api.common.InvalidProgramException での例外: 異なるタイプの入力を結合できません。Input1=scala.Tuple2(_1: 整数、_2: オプション[scala.Tuple4(_1: GenericType [java.time.LocalDateTime]、_2: 文字列、_3: 整数、_4: ブール値)])、input2=scala.Tuple2( _1: 整数、_2: オプション[scala.Tuple4(_1: GenericType[java.time.LocalDateTime]、_2: 文字列、_3: 整数、_4: Boolean)])

私は何が欠けていますか?種類が違うじゃないですか。ユニオン演算子は安価であるはずなので、問題を回避するのは魅力的ではないようです。最初の 2 行のコードは、DataSet 内のデータの型が同じであることを示す引数として指定しました。flink バージョン 0.9.0 と 0.9.1 を使用しました