1

Trident+DRPC を実装しようとしています。無期限に実行されないようにトポロジを設計しました。1 つはスパウトの実装用、もう 1 つは DRPC と Trident の実装用です。私のスパウト クラス (IRichSpout を拡張するスパウト) は、顧客の ID を発行します。すなわち

public class TriSpout implements IRichSpout{
  //some logic here
    spoutOutputCollector.emit(new Values(id))
  }

ここで、DRPC を使用して Trident を実装する別のクラスの出力コレクターから値を取得しました。

public class TriDrpc{

    .....
    TriSpout spout=new TriSpout1();        
    TridentTopology topology = new TridentTopology();  
    TridentState wordCounts =
          topology.newStream("spout1",spout)
            .parallelismHint(1)
            .each(new Fields("id"), new Compute(), new Fields("value"))
            .persistentAggregate(new MemoryMapState.Factory(),
                                 new Count(), new Fields("count"))   

drpc トポロジ定義は次のとおりです。

topology.newDRPCStream("Calc", drpc)
         .each(new Fields("args"), new Split(), new Fields("word"))                
         .stateQuery(wordCounts, new Fields("word"), new MapGet(), new Fields("count"));         

DRPC リクエストは次のとおりです。

public static void main(String[] args) throws Exception {
    Config conf = new Config(); 

    if (args.length == 0) {
    LocalDRPC drpc = new LocalDRPC();
    LocalCluster cluster = new LocalCluster();
    cluster.submitTopology("Calculator", conf,   buildTopology(drpc));          
    System.out.println("DRPC RESULT: "
                + drpc.execute("Calc", "id"));
    Thread.sleep(1000);

    } else {
        conf.setNumWorkers(8);
        StormSubmitter.submitTopology(args[0], conf, buildTopology(null));
    }
}

上記のコードでは、DRPC リクエストで、つまり

System.out.println("DRPC RESULT: " + drpc.execute("Calc", "id"));

スパウトによって発行された ID と同じである必要があります。"id"つまり、この ID を使用してアクティブなアカウントを持っている顧客を知りたいので、スパウトによって発行されたすべての ID に対して DRPC 要求を送信する必要があります。現在、DRPC はメイン クラスにあります。ID を手動で指定せずに、スパウトによって発行された値を DRPC リクエストに渡すにはどうすればよいですか?

誰か助けてください

新しい情報で編集

4

1 に答える 1

3

アップデート

さて、あなたの問題が何であるかがより明確になりました、ありがとう。

したがって、同じ DRPC のトポロジ スパウトが発行しているのと同じ ID の DRPC 要求を処理する必要があります。

これを実現できる唯一の方法は、スパウトから発行した ID を Storm の外部永続ストレージ (RDMS や分散ハッシュマップなど) に永続化することです。

こうすることで、Storm クラスターで実行するためにトポロジを送信した後、新しい ID について永続ストレージをポーリングし、新しい ID ごとに DRPC 要求を実行できます。

元の答え

私は質問を理解していないと思います。同じ DRPC トポロジのスパウトの出力から取得したリクエストの ID 引数を使用して、Storm DRPC リクエストを実行しようとしていますか? これは、DRPC トポロジの効果的かつ意図的な使用法ではないと思います。通常のトポロジを使用することをお勧めします。

DRPC トポロジは有限計算を目的としていますが、通常のトポロジは連続計算に使用されます。DRPC 呼び出しは、DRPC トポロジの名前と、DRPC 呼び出しの結果を計算するための一連の入力引数を受け取ります。通常のストーム (またはトライデント) トポロジは無期限に実行され、何らかの結果を計算して永続化します。

これが役立つことを願っています。そうでない場合は、問題が何であるかが明確ではないため、質問をより適切に再定式化してください。

于 2013-09-20T08:43:38.100 に答える