0

アプリケーションは 1 つの Kafka トピックからメッセージを読み取り、MongoDB に保存していくつかの検証を行った後、別のトピックに書き込みます。ここで、アプリケーションが無限ループに陥るなどの問題に直面しています。私が持っているコードは以下です:

Hosts zkHosts = new ZkHosts("localhost:2181");
String zkRoot = "/brokers/topics" ;
String clientRequestID = "reqtest";
String clientPendingID = "pendtest";
SpoutConfig kafkaRequestConfig = new SpoutConfig(zkHosts,"reqtest",zkRoot,clientRequestID);
SpoutConfig kafkaPendingConfig = new SpoutConfig(zkHosts,"pendtest",zkRoot,clientPendingID);

kafkaRequestConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
kafkaPendingConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
KafkaSpout kafkaRequestSpout = new KafkaSpout(kafkaRequestConfig);
KafkaSpout kafkaPendingSpout = new KafkaSpout(kafkaPendingConfig);

MongoBolt mongoBolt = new MongoBolt() ;
DeviceFilterBolt deviceFilterBolt = new DeviceFilterBolt() ;
KafkaRequestBolt kafkaReqBolt = new KafkaRequestBolt() ;
abc1DeviceBolt abc1DevBolt = new abc1DeviceBolt() ;
DefaultTopicSelector defTopicSelector = new DefaultTopicSelector(xyzKafkaTopic.RESPONSE.name()) ;
KafkaBolt kafkaRespBolt = new KafkaBolt()
    .withTopicSelector(defTopicSelector)
    .withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper()) ;

TopologyBuilder topoBuilder = new TopologyBuilder();
topoBuilder.setSpout(xyzComponent.KAFKA_REQUEST_SPOUT.name(), kafkaRequestSpout);
topoBuilder.setSpout(xyzComponent.KAFKA_PENDING_SPOUT.name(), kafkaPendingSpout);
topoBuilder.setBolt(xyzComponent.KAFKA_PENDING_BOLT.name(),
    deviceFilterBolt, 1)
    .shuffleGrouping(xyzComponent.KAFKA_PENDING_SPOUT.name()) ;
topoBuilder.setBolt(xyzComponent.abc1_DEVICE_BOLT.name(),
    abc1DevBolt, 1)
    .shuffleGrouping(xyzComponent.KAFKA_PENDING_BOLT.name(),
        xyzDevice.abc1.name()) ;
topoBuilder.setBolt(xyzComponent.MONGODB_BOLT.name(), 
    mongoBolt, 1)
    .shuffleGrouping(xyzComponent.abc1_DEVICE_BOLT.name(),
        xyzStreamID.KAFKARESP.name());
topoBuilder.setBolt(xyzComponent.KAFKA_RESPONSE_BOLT.name(),
    kafkaRespBolt, 1)
    .shuffleGrouping(xyzComponent.abc1_DEVICE_BOLT.name(),
        xyzStreamID.KAFKARESP.name());

Config config = new Config() ;
config.setDebug(true);
config.setNumWorkers(1);

Properties props = new Properties();
props.put("metadata.broker.list", "localhost:9092");
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("request.required.acks", "1");
config.put(KafkaBolt.KAFKA_BROKER_PROPERTIES, props);

LocalCluster cluster = new LocalCluster();
try{
    cluster.submitTopology("demo", config, topoBuilder.createTopology());
}

上記のコードでKAFKA_RESPONSE_BOLTは、トピックにデータを書き込んでいます。 次のようなデータを発行することでabc1_DEVICE_BOLTこれを供給しています:KAFKA_RESPONSE_BOLT

@Override
public void declareOutputFields(OutputFieldsDeclarer ofd) {
    Fields respFields = IoTFields.getKafkaResponseFieldsRTEXY();
    ofd.declareStream(IoTStreamID.KAFKARESP.name(), respFields);
}

@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
    List<Object> newTuple = new ArrayList<Object>() ;
    String params  = tuple.getStringByField("params") ;
    newTuple.add(3, params);
    ----
    collector.emit(IoTStreamID.KAFKARESP.name(), newTuple);
}
4

1 に答える 1

1

私は長い間同じ質問に悩まされてきました.答えは非常に簡単です...あなたはそれを信じないでしょう.

私が理解している限りでKafkaBoltは、タプルを受信する必要があるの実装には、ボルトまたはスパウトに関係なく、「メッセージ」のフィールド名があります。これが役立つと信じて!)

具体的な理由はhttps://mail-archives.apache.org/mod_mbox/incubator-storm-user/201409.mbox/%3C6AF1CAC6-60EA-49D9-8333-0343777B48A7@andrashatvani.com%3Eで述べられています

于 2016-01-21T08:39:07.283 に答える