2 つの入力ストリームを持ち、各ストリームから項目を取得して両方を同時に処理する演算子 (結合など) を実装したいと考えています。さらに、両方の入力のいずれかにデータがない場合、オペレーターはブロックして待機します。
これを行う必要がある場合、どのクラスが関係していますか? それについてのチュートリアルははるかに優れています。どんな提案でもいただければ幸いです!
2 つの入力ストリームを持ち、各ストリームから項目を取得して両方を同時に処理する演算子 (結合など) を実装したいと考えています。さらに、両方の入力のいずれかにデータがない場合、オペレーターはブロックして待機します。
これを行う必要がある場合、どのクラスが関係していますか? それについてのチュートリアルははるかに優れています。どんな提案でもいただければ幸いです!
DataStream
2つを接続して適用する必要がありますTwoInputStreamOperator
。すでに定義済みの演算子がたくさんあります。あなたの場合、 aCoFlatMapFunction
が良い選択です:
DataStream input1 = ...
DataStream input2 = ...
input1.connect(input2).flatMap(new MyOwnCoFlatMapFunction());
詳細はこちら: https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming_guide.html#co-operators
ただし、このオペレーターは、希望どおりにブロックすることはできません。したがって、次のパターンを適用する必要があります。左または右から入力を受け取るたびに、反対側からの入力が利用できない場合は、入力をバッファリングする必要があります。
MyOwnCoFlatMapFunction implements CoFlatMapFunction {
List<IN> leftInput = new LinkedList<IN>();
List<IN> rightInput = new LinkedList<IN>();
void flatMap1(IN1 value, Collector<OUT> out) throws Exception {
if(rightInput.size() > 0) {
IN right = rightInput.remove();
// process left input (value) and right input (right) together
} else {
leftInput.add(value);
}
}
// reverse pattern for flatMap2 here
}
ただし、ストリーム処理ではブロッキングは危険であることに注意する必要があります。入力ストリームの日付レートが異なる場合、このアプローチは機能しません (!)。低速のストリームが高速のストリームを抑制し、高速のストリームに背圧がかかるためです。あなたのユースケースはわかりませんが、何か「間違っている」ようです。なぜ時間通りに参加できないのですか?