Trident+DRPC を実装しようとしています。無期限に実行されないようにトポロジを設計しました。1 つはスパウトの実装用、もう 1 つは DRPC と Trident の実装用です。私のスパウト クラス (IRichSpout を拡張するスパウト) は、顧客の ID を発行します。すなわち
public class TriSpout implements IRichSpout{
//some logic here
spoutOutputCollector.emit(new Values(id))
}
ここで、DRPC を使用して Trident を実装する別のクラスの出力コレクターから値を取得しました。
public class TriDrpc{
.....
TriSpout spout=new TriSpout1();
TridentTopology topology = new TridentTopology();
TridentState wordCounts =
topology.newStream("spout1",spout)
.parallelismHint(1)
.each(new Fields("id"), new Compute(), new Fields("value"))
.persistentAggregate(new MemoryMapState.Factory(),
new Count(), new Fields("count"))
drpc トポロジ定義は次のとおりです。
topology.newDRPCStream("Calc", drpc)
.each(new Fields("args"), new Split(), new Fields("word"))
.stateQuery(wordCounts, new Fields("word"), new MapGet(), new Fields("count"));
DRPC リクエストは次のとおりです。
public static void main(String[] args) throws Exception {
Config conf = new Config();
if (args.length == 0) {
LocalDRPC drpc = new LocalDRPC();
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("Calculator", conf, buildTopology(drpc));
System.out.println("DRPC RESULT: "
+ drpc.execute("Calc", "id"));
Thread.sleep(1000);
} else {
conf.setNumWorkers(8);
StormSubmitter.submitTopology(args[0], conf, buildTopology(null));
}
}
上記のコードでは、DRPC リクエストで、つまり
System.out.println("DRPC RESULT: " + drpc.execute("Calc", "id"));
スパウトによって発行された ID と同じである必要があります。"id"
つまり、この ID を使用してアクティブなアカウントを持っている顧客を知りたいので、スパウトによって発行されたすべての ID に対して DRPC 要求を送信する必要があります。現在、DRPC はメイン クラスにあります。ID を手動で指定せずに、スパウトによって発行された値を DRPC リクエストに渡すにはどうすればよいですか?
誰か助けてください
新しい情報で編集