Storm Trident で同じボルトから複数のストリームに放出するにはどうすればよいですか?
いくつかの計算を行うボルトがあり、その結果に基づいて、ある値をあるストリームに渡し、他の値を別のストリームに渡したいと考えています。
Storm (Trident ではない) では、次の方法でそれを実現できます。
ストリームを複数のストリームに分割します。
@Override
public void declareOutputFields(final OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declareStream("type1-stream", new Fields("type1"));
outputFieldsDeclarer.declareStream("type2-stream", new Fields("type2"));
outputFieldsDeclarer.declareStream("error-stream", new Fields("error"));
}
次に、結果に基づいて次のように発行します。
collector.emit("type1-stream", new Values("type 1 data"));
collector.emit("type2-stream", new Values("type 2 data"));
collector.emit("error-stream", new Values("error data"));
次に、予想されるストリームをリッスンして残りの作業を行います。
builder.setBolt("errorBolt", errorBolt).shuffleGrouping("errorBoltStream", "error-stream");
builder.setBolt("type1Bolt", type1Bolt).shuffleGrouping("type1BoltStream", "type1-stream");
では、Storm Trident を使用して同じ動作を実現するにはどうすればよいでしょうか?
1つのオプションは、同じストリームに対して「each」を呼び出し、同じボルトを実行し、そのストリームに発行したいものに基づいてのみ発行するか、別のオプションは、キーと値のペアを発行し、キーに基づいてストリームをフィルタリングします(type1など) type2、エラーなど)、再び複数のストリームを作成します。しかし、どれも私には良いデザインとは思えません。それを達成するための最良の方法は何ですか?