0

3 つのパーティションと 3 つのレプリケーションで Kafka トピックを作成しました

$KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 3 --topic test

APIからデータを取得するためのKafka-Producerプログラムを作成し、正常に動作しました。私のデータは 3 つのパーティションと 3 つのレプリケーションに正常に保存されています。

ここで、パーティション化された Kafka ログにあるデータを処理する必要があります。ストーム トポロジがあります。トポロジがログからのデータを入力として受け入れる方法を教えてください。

最初に、1 つのパーティション、1 つのレプリケーションでトピックを作成しましたが、ストーム トポロジは正常に動作します。それを解決する方法。

私のトポロジコード

 public static void main(String[] args) throws Exception{

            Map map = Maps.newHashMap();
            map.put("dataSourceClassName", "org.postgresql.ds.PGSimpleDataSource");
            map.put("dataSource.url","jdbc:postgresql://localhost:5432/test?user=postgres");

            ConnectionProvider cp = new MyConnectionProvider(map);

            String argument = args[0];
            Config conf = new Config();
            conf.put(JDBC_CONF, map);
            conf.setDebug(true);
            conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 3);
            //set the number of workers
            conf.setNumWorkers(3);

            TopologyBuilder builder = new TopologyBuilder();

           //Setup Kafka spout
            BrokerHosts hosts = new ZkHosts("localhost:2181");
            String topic = "test";
            String zkRoot = "";
            String consumerGroupId = "group1";
            SpoutConfig spoutConfig = new SpoutConfig(hosts, topic, zkRoot, consumerGroupId);
            spoutConfig.scheme = new RawMultiScheme();
            spoutConfig.scheme = new SchemeAsMultiScheme(new Search_Parser());
            KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
            builder.setSpout("KafkaSpout", kafkaSpout);

この問題について助けてください

前もって感謝します

4

0 に答える 0