1

以下のコードを実行しようとするとエラーが発生します..ここに何かが欠けているかどうかわかりません..出力はどこに表示されますか?

エラー

java.lang.RuntimeException: backtype.storm.storm の backtype.storm.drpc.DRPCSpout.open(DRPCSpout.java:79) のトポロジ用に DRPC サーバーが構成されていません。 .daemon.executor$fn__5802$fn__5817.invoke(executor.clj:519) at backtype.storm.util$async_loop$fn__442.invoke(util.clj:434) at clojure.lang.AFn.run(AFn.java:24) ) java.lang.Thread.run(Thread.java:744) で

Code:
----
package com.**.trident.storm;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import storm.kafka.*;
import storm.trident.*;

import backtype.storm.*;

public class EventTridentDrpcTopology
{
private static final String KAFKA_SPOUT_ID = "kafkaSpout";  

private static final Logger log = LoggerFactory.getLogger(EventTridentDrpcTopology.class);

public static StormTopology buildTopology(OpaqueTridentKafkaSpout spout) throws Exception
{
    TridentTopology tridentTopology = new TridentTopology();
    TridentState ts = tridentTopology.newStream("event_spout",spout)
    .name(KAFKA_SPOUT_ID)
    .each(new Fields("mac_address"), new SplitMac(), new Fields("mac"))
    .groupBy(new Fields("mac"))
    .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("maccount"))
    .parallelismHint(4)
    ;

    tridentTopology
    .newDRPCStream("mac_count")
    .each(new Fields("args"), new SplitMac(), new Fields("mac"))
    .stateQuery(ts,new Fields("mac"),new MapGet(), new Fields("maccount"))
    .each(new Fields("maccount"), new FilterNull())
    .aggregate(new Fields("maccount"), new Sum(), new Fields("sum"))
     ;

return tridentTopology.build();

}

public static void main(String[] str) throws Exception
{
    Config conf = new Config();
    BrokerHosts hosts = new ZkHosts("xxxx:2181,xxxx:2181,xxxx:2181");
    String topic = "event";
    //String zkRoot = topologyConfig.getProperty("kafka.zkRoot");
    String consumerGroupId = "StormSpout";

    DRPCClient drpc = new DRPCClient("xxxx",3772);


    TridentKafkaConfig tridentKafkaConfig = new TridentKafkaConfig(hosts, topic, consumerGroupId);
    tridentKafkaConfig.scheme = new SchemeAsMultiScheme(new XScheme()); 
    OpaqueTridentKafkaSpout opaqueTridentKafkaSpout = new OpaqueTridentKafkaSpout(tridentKafkaConfig);


    StormSubmitter.submitTopology("event_trident", conf, buildTopology(opaqueTridentKafkaSpout));

}

}
4

1 に答える 1

1

DRPC サーバーの場所を構成して起動する必要があります。http://storm.apache.org/releases/0.10.0/Distributed-RPC.htmlのリモート モード DRPC を参照してください。

DRPC サーバーを起動する DRPC サーバーの場所を構成する DRPC トポロジを Storm クラスターに送信する DRPC サーバーの起動は、ストーム スクリプトを使用して行うことができ、Nimbus や UI を起動するのと同じです。

ビン/ストームdrpc

次に、DRPC サーバーの場所を認識できるように、Storm クラスターを構成する必要があります。これは、DRPCSpout が関数呼び出しをどこから読み取るかを知る方法です。これは、storm.yaml ファイルまたはトポロジ構成を使用して実行できます。これを storm.yaml で構成すると、次のようになります。

drpc.servers: - "drpc1.foo.com" - "drpc2.foo.com"

于 2015-01-15T18:48:24.853 に答える