問題タブ [event-stream-processing]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
error-handling - Apache Flink のエラー処理と条件付き処理
私は Flink を初めて使用し、開始するためにサイト/サンプル/ブログを調べました。演算子の正しい使い方に苦労しています。基本的に私は2つの質問があります
質問 1: Flink は宣言型の例外処理をサポートしていますか? parse/validate/... エラーを処理する必要がありますか?
- org.apache.flink.runtime.operators.sort.ExceptionHandler などを使用してエラーを処理できますか?
- またはRich/FlatMap機能が私の最良の選択肢ですか?Rich/FlatMap が唯一のオプションである場合、Rich/FlatMap 関数内でストリームへのハンドルを取得して、エラー処理のためにシンクをアタッチできるようにする方法はありますか?
質問 2: 条件付きで異なるシンクを接続できますか?
- キー付き分割ストリームの特定のフィールドに基づいて、別のシンクを選択する必要があります。ストリームを再度分割するか、Rich/FlatMap を使用して処理しますか?
Flink 1.3.2 を使用しています。これが私の仕事の関連部分です
ここで正しい演算子を使用していますか?
更新 1:
@alpinegizmo で提案されているように ProcessFunction を使用しようとしましたが、入力を解析/検証するまで持っていないキー付きストリームに依存しているため、機能しませんでした。「InvalidProgramException: Field expression must be equal to '*' or '_' for non-composite types.」というメッセージが表示されます。
これは、最初の解析/検証入力でキー付きストリームがまだないという一般的なユースケースです。どのように解決しますか?
ご理解とご協力をお願いいたします。
python - Docker イメージの nginx バッファリング フラスコ イベント ストリーム
Python/flask を使用した REST API バックエンドがあり、イベント ストリームで応答をストリーミングしたいと考えています。すべてが nginx/uwsgi ( https://hub.docker.com/r/tiangolo/uwsgi-nginx-flask/ )を使用して docker コンテナー内で実行されます。
API は、イベント ストリームになるまで正常に動作します。サーバーが計算を終了し、すべてが一緒に送信されるまで、どの種類のクライアントも何も受信しないため、何か(おそらくnginx)が「収量」をバッファリングしているようです。
次のような追加の構成 (nginx_streaming.conf) ファイルを使用して、nginx の設定を (docker イメージの指示に従って) 調整しようとしました。
ドッカーファイル:
しかし、私はnginxの設定にあまり詳しくなく、ここで何をしているのか確信が持てません^^これは少なくとも機能しません..何か提案はありますか?
私のサーバー側の実装:
java - DataStream での Flink sql クエリ (Apache Flink Java)
私はApache flinkに完全に慣れていません。手を汚そうとしているだけです。次のシナリオがあります。
- イベントのデータストリーム
- イベントのデータストリーム
- ルールのデータストリーム
- ruleID に基づいてこれら 2 つのデータストリームを結合
今私は tuple3 のようなデータストリームを持っています<ruleId, Rule, Event>
。これらのルールは、イベントで実行したい SQL クエリです。
私は動的テーブルと Flink SQL の概念を経験していました。さらに処理する方法がわかりません。誰かがこれで私を助けてくれますか?
analytics - Kinesis Analytics を使用して、イベントと関連する欠落イベントを分析し、時間を分けて分析しますか?
「接続」または「切断」できるさまざまなデバイスのイベントのストリームがあります。
つまり、イベントには次の構造があります。
- タイムスタンプ
- デバイスID
- イベント (「接続」または「切断」)
デバイスが切断され、(デバイス固有の構成可能な) 期間内 (たとえば 1 時間) に接続されなかったときに、すぐにアクションをトリガーしたいと考えています。「切断された」イベントごとに1回だけトリガーしたい。
これは AWS Kinesis Analytics を使用して実行できますか? もしそうなら、クエリはどのようになりますか? そうでない場合、他のツールを使用して実行できますか、それともカスタムビルドする必要がありますか?
java - 「偽の」ストリーム データを生成します。カフカ - フリンク
ストリーム データを生成しようとしています。これは、整数型の 2 つの値を別の時間範囲でタイムスタンプと共に受け取り、Kafka をコネクタとして受け取る状況をシミュレートしようとしています。
Flink 環境をコンシューマーとして使用していますが、プロデューサーにとって最適なソリューションがどれかわかりません。(可能であれば、Java 構文は Scala より優れています)
Kafka から直接データを生成する必要がありますか? はいの場合、それを行うための最良の方法は何ですか? それとも、プロデューサーとして Flink からデータを生成し、それを Kafka に送信し、最後に Flink で再度使用する方が良いでしょうか? どうすればフリンクからそれを行うことができますか? または、ストリーム データを生成して Kafka に渡す別の簡単な方法があるかもしれません。
はいの場合は、それを達成するための軌道に乗せてください。
apache-beam - Apache Beam でローリング ウィンドウを作成するには? スライドまたは固定ではなく、回転ウィンドウ
過去 10 分間の特定のメトリックの平均を 1 分ごとに計算し、それを過去 20 分間の同じメトリックの平均と比較したいとします。2 つのウィンドウ (10 のスライディング ウィンドウと 20 のスライディング ウィンドウではありません) または固定期間の 2 つのウィンドウが必要です。毎分 (それぞれ 10 分と 20 分の期間) 1 分ずつロールフォワードし続ける必要がある 2 つのウィンドウが必要です。または、最新のスライディング ウィンドウ以外をすべて破棄できれば、問題は解決する可能性があります。それ以外の場合、複数のスライド ウィンドウは非常にコストがかかります。
ここで助けていただけますか?カスタム WindowFn() 関数は非常に役立ちます