11

次のことを行うトポロジを作成しようとしています。

  1. Twitter フィードをサブスクライブするスパウト (キーワードに基づく)
  2. コレクション内の多数のツイート (N とします) を集約し、それらをプリンター ボルトに送信する集約ボルト
  3. コレクションを一度にコンソールに出力する単純なボルト。

実際には、コレクションに対してさらに処理を行いたいと考えています。

ローカルでテストしたところ、動作しているようです。ただし、ボルトにグループを正しく設定したかどうか、および実際のストーム クラスターに展開したときにこれが正しく機能するかどうかはわかりません。誰かがこのトポロジのレビューを手伝ってくれて、エラー、変更、または改善を提案してくれれば幸いです。

ありがとう。

これが私のトポロジの外観です。

builder.setSpout("spout", new TwitterFilterSpout("pittsburgh"));
   builder.setBolt("sampleaggregate", new SampleAggregatorBolt())
                .shuffleGrouping("spout");
   builder.setBolt("printaggreator",new PrinterBolt()).shuffleGrouping("sampleaggregate");

凝集ボルト

public class SampleAggregatorBolt implements IRichBolt {

    protected OutputCollector collector;
    protected Tuple currentTuple;
    protected Logger log;
    /**
     * Holds the messages in the bolt till you are ready to send them out
     */
    protected List<Status> statusCache;

    @Override
    public void prepare(Map stormConf, TopologyContext context,
                        OutputCollector collector) {
        this.collector = collector;

        log = Logger.getLogger(getClass().getName());
        statusCache = new ArrayList<Status>();
    }

    @Override
    public void execute(Tuple tuple) {
        currentTuple = tuple;

        Status currentStatus = null;
        try {
            currentStatus = (Status) tuple.getValue(0);
        } catch (ClassCastException e) {
        }
        if (currentStatus != null) {

            //add it to the status cache
            statusCache.add(currentStatus);
            collector.ack(tuple);


            //check the size of the status cache and pass it to the next stage if you have enough messages to emit
            if (statusCache.size() > 10) {
                collector.emit(new Values(statusCache));
            }

        }
    }

    @Override
    public void cleanup() {


    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("tweets"));

    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        return null;  //To change body of implemented methods use File | Settings | File Templates.
    }


    protected void setupNonSerializableAttributes() {

    }

}

プリンターボルト

public class PrinterBolt extends BaseBasicBolt {

    @Override
    public void execute(Tuple tuple, BasicOutputCollector collector) {
        System.out.println(tuple.size() + " "  + tuple);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer ofd) {
    }

}
4

2 に答える 2