私は Flink を初めて使用し、開始するためにサイト/サンプル/ブログを調べました。演算子の正しい使い方に苦労しています。基本的に私は2つの質問があります
質問 1: Flink は宣言型の例外処理をサポートしていますか? parse/validate/... エラーを処理する必要がありますか?
- org.apache.flink.runtime.operators.sort.ExceptionHandler などを使用してエラーを処理できますか?
- またはRich/FlatMap機能が私の最良の選択肢ですか?Rich/FlatMap が唯一のオプションである場合、Rich/FlatMap 関数内でストリームへのハンドルを取得して、エラー処理のためにシンクをアタッチできるようにする方法はありますか?
質問 2: 条件付きで異なるシンクを接続できますか?
- キー付き分割ストリームの特定のフィールドに基づいて、別のシンクを選択する必要があります。ストリームを再度分割するか、Rich/FlatMap を使用して処理しますか?
Flink 1.3.2 を使用しています。これが私の仕事の関連部分です
.....
.....
DataStream<String> eventTextStream = env.addSource(messageSource)
KeyedStream<EventPojo, Tuple> eventPojoStream = eventTextStream
// parse, transform or enrich
.flatMap(new MyParseTransformEnrichFunction())
.assignTimestampsAndWatermarks(new EventAscendingTimestampExtractor())
.keyBy("eventId");
// split stream based on eventType as different reduce and windowing functions need to be applied
SplitStream<EventPojo> splitStream = eventPojoStream
.split(new EventStreamSplitFunction());
// need to apply reduce function
DataStream<EventPojo> event1TypeStream = splitStream.select("event1Type");
// need to apply reduce function
DataStream<EventPojo> event2TypeStream = splitStream.select("event2Type");
// need to apply time based windowing function
DataStream<EventPojo> event3TypeStream = splitStream.select("event3Type");
....
....
env.execute("Event Processing");
ここで正しい演算子を使用していますか?
更新 1:
@alpinegizmo で提案されているように ProcessFunction を使用しようとしましたが、入力を解析/検証するまで持っていないキー付きストリームに依存しているため、機能しませんでした。「InvalidProgramException: Field expression must be equal to '*' or '_' for non-composite types.」というメッセージが表示されます。
これは、最初の解析/検証入力でキー付きストリームがまだないという一般的なユースケースです。どのように解決しますか?
ご理解とご協力をお願いいたします。