問題タブ [apache-flink]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
apache-spark - Spark または Flink を使用する場合、HDFS での位置認識はどのように達成されますか?
Spark または Flink の実行エンジン (マスター スケジューラ) が各ブロックに適したワーカーをどのように見つけ出すのか疑問に思っています。
namenode はブロックの正確な位置を彼らに伝えることができますが、このタスクは Spark と Flink のジョブ マネージャーによって行われますか、それとも YARN の出番なのでしょうか?
apache-spark - Flink - ステートフルな計算
Apache Flink を使用して、次の問題の解決策を見つけるのに苦労しています。
ローカル フォルダー内のファイルによって表されるベクトルのストリームがあります。を使用して新しいテキスト ファイルをDataStream<String> text = env.readFileStream(...)
見つけた後、入力を に変換 (flatMap) します。IntegerSingleOutputStreamOperator<Tuple2<String, Integer>, ?>
はスコアリング関数からのスコアです。
スコアを使用して、トップ k ベクトルを含むグローバル HashMap を保持したいと考えています。ステートフル変換を使用して問題に取り組みました。
私が抱えている最初の問題は、HashMap がシンクごとのデータを保持することです (つまり、ワーカーのスレッドごとに、データの HashMap が 1 つ)。それをグローバルコレクションにするにはどうすればよいですか
Apache Spark を使用して、私はそれを可能にしました。
JavaPairDStream<String, Integer> stateDstream = tuples.updateStateByKey(updateFunction);
で変換を行いstateDstream
ます。FLink を使用して同じ機能を得る方法はありますか?
前もって感謝します!
streaming - クラスター内の Apache Flink ストリーミングは、ワーカーでジョブを分割しません
私の目的は、Kafka をソースとして使用し、Flink をストリーム処理エンジンとして使用して、高スループットのクラスターをセットアップすることです。これが私がやったことです。
マスターとワーカーで次の構成の 2 ノード クラスターをセットアップしました。
マスター flink-conf.yaml
ワーカー flink-conf.yaml
マスター ノードのslaves
ファイルは次のようになります。
両方のノードの flink セットアップは、同じ名前のフォルダーにあります。実行して、マスターでクラスターを起動します
これにより、Worker ノードでタスク マネージャーが起動します。
私の入力ソースは Kafka です。これがスニペットです。
ここに私のシンク機能があります
これが私の pom.xml の Flink 依存関係です。
次に、マスターでこのコマンドを使用してパッケージ化されたjarを実行します
ただし、メッセージを Kafka トピックに挿入すると、Kafka トピックから着信するすべてのメッセージを (SinkFunction
実装の呼び出しメソッドのデバッグ メッセージを介して) マスター ノードだけで説明できます。
ジョブ マネージャーの UI では、次のように 2 つのタスク マネージャーが表示されます。
- ワーカー ノードがタスクを取得しないのはなぜですか?
- いくつかの構成がありませんか?
java - Flink ジョブでオブジェクトを初期化するために使用される HDFS 上のファイル
私は Flink でこの奇妙な問題を抱えています: 仕事で、事前にコンパイルされたリソース ファイルでオブジェクトを初期化する必要があります。そして、ジョブが開始されてから、初回は問題なく実行されます。しかし、最初とまったく同じように Web インターフェイスで再度アクセスすると、オブジェクトを初期化できず、エラーは次のようになります。
java.nio.file.NoSuchFileException: hdfs:/.../.../my_file
ファイルがそこにあり、オブジェクトが初めて正常に初期化されるため、混乱します。関連するコードは次のとおりです。
リソースがローカル システム上にあり、同じジョブでローカル サーバーを実行している場合、問題は発生しません。それで、誰も手がかりを持っていますか?
編集:完全なトレース
twitter-streaming-api - Apache Flink を使用して Twitter ストリーミング API に接続する際の IOExcpetion
Apache Flink Streaming API を使用して Twitter のツイートを読み取る小さな Scala プログラムを作成しました。
実行すると、次の問題が発生します。
プログラムは接続を再確立しようとします。したがって、この 4 行のログ メッセージは引き続き出力されます。
これに関する奇妙な点は、Apache Flink プロジェクトで提供されている例を実行すると、すべてが正常に機能することです (マスターの最新バージョンを GitHub から取得しました)。私も同じプロパティファイルを使用しています。そのサンプル クラスを自分のプロジェクトにコピーすると、上記の問題状態も発生します。
Flink アーキタイプを使用して、独自のプロジェクトを作成しました。バージョン 0.9.1 と 0.10-SNAPSHOT で試しました。依存関係flink-scala
、flink-streaming-scala
、flink-clients
およびflink-connector-twitter
は、対応するバージョンで使用されます。
誰かが同様の問題を経験したことがあり、私を正しい軌道に乗せることができますか?
scala - apache flink のユニオン型の混乱?
いくつかの flink DataSet を結合しようとしています。それらはSeqに含まれています。以下は、問題を生成するコードです
私が得るものは
スレッド「メイン」org.apache.flink.api.common.InvalidProgramException での例外: 異なるタイプの入力を結合できません。Input1=scala.Tuple2(_1: 整数、_2: オプション[scala.Tuple4(_1: GenericType [java.time.LocalDateTime]、_2: 文字列、_3: 整数、_4: ブール値)])、input2=scala.Tuple2( _1: 整数、_2: オプション[scala.Tuple4(_1: GenericType[java.time.LocalDateTime]、_2: 文字列、_3: 整数、_4: Boolean)])
私は何が欠けていますか?種類が違うじゃないですか。ユニオン演算子は安価であるはずなので、問題を回避するのは魅力的ではないようです。最初の 2 行のコードは、DataSet 内のデータの型が同じであることを示す引数として指定しました。flink バージョン 0.9.0 と 0.9.1 を使用しました