1

私は Kafka を初めて使用し、https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Exampleで消費者の例を実行しようとしていますが、メッセージを受け取りません。

Eclipse コンソールの出力は次のとおりです。

log4j:WARN No appenders could be found for logger (kafka.utils.VerifiableProperties).
log4j:WARN Please initialize the log4j system properly.
Shutting down Thread: 2
Shutting down Thread: 0
Shutting down Thread: 1

以下は消費者向けの私のコードです

public class ConsumerDemo {
private final ConsumerConnector consumer; //why private final
private final String topic;
private ExecutorService executor;

public ConsumerDemo(String a_zookeeper,String a_groupId,String a_topic)
{
    consumer = Consumer.createJavaConsumerConnector(createConsumerConfig(a_zookeeper,a_groupId));
    this.topic=a_topic;
}

public void shutdown()
{
    if(consumer != null)
        consumer.shutdown();
    if(executor != null)
        executor.shutdown();
}

public void run(int numThreads)
{
    Map<String,Integer> topicCountMap= new HashMap<String,Integer>();
    topicCountMap.put(topic, new Integer(numThreads));
    Map<String,List<KafkaStream<byte[],byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
    List<KafkaStream<byte[],byte[]>> streams = consumerMap.get(topic);
    int m=streams.size();

    executor = Executors.newFixedThreadPool(numThreads);

    int threadNumber=0;
    for(final KafkaStream stream : streams)
    {
        executor.submit(new ConsumerMsgTask(stream,threadNumber));
        threadNumber++;
    }
}

private static ConsumerConfig createConsumerConfig(String a_zookeeper,String a_groupId)
{
    Properties props = new Properties();
    props.put("zookeeper.connect", a_zookeeper);
    props.put("group.id", a_groupId);
    props.put("auto.offset.reset", "smallest");
    props.put("zookeeper.session.timeout.ms", "4000");
    props.put("zookeeper.sync.time.ms", "200");
    props.put("auto.commit.interval.ms", "1000");

    return new ConsumerConfig(props);
}

public static void main(String[] arg)
{
    String[] args = {"192.168.0.123:2181","group-a","test1","3"};
    String zooKeeper = args[0];
    String groupId = args[1];
    String topic = args[2];
    int threads = Integer.parseInt(args[3]);

    ConsumerDemo demo = new ConsumerDemo(zooKeeper,groupId,topic);
    demo.run(threads);

    try
    {
        Thread.sleep(10000);
    }catch (InterruptedException ie)
    {

    }
    demo.shutdown();

}

これは ConsumerMsgTask です

    public class ConsumerMsgTask implements Runnable {
private KafkaStream<byte[], byte[]> m_stream;
private int m_threadNumber;

public ConsumerMsgTask(KafkaStream<byte[], byte[]> stream,int threadNumber)
{
    m_threadNumber = threadNumber;
    m_stream = stream;
}

public void run()
{
    ConsumerIterator<byte[],byte[]> it = m_stream.iterator();
    int i=it.size();
    while(it.hasNext())
        System.out.println("Thread "+m_threadNumber+": "+ new String(it.next().message()));
     System.out.println("Shutting down Thread: " + m_threadNumber);
}

これが私の ProducerDemo です

    public class ProducerDemo {
public static void main(String[] args)
{
    Random rnd= new Random();
    int events=100;

    Properties props= new Properties();
    props.put("metadata.broker.list", "192.168.0.123:9092,192.168.0.123:9093,192.168.0.123:9094");
    props.put("serializer.class", "kafka.serializer.StringEncoder");
    props.put("request.required.acks", "1");

    ProducerConfig config = new ProducerConfig(props);

    Producer<String,String> producer=new Producer<String,String>(config);
    long start=System.currentTimeMillis();
    for(int i=0;i<events;i++)
    {
        long runtime=new Date().getTime();
        String ip="192.168.5."+rnd.nextInt(255);
        String msgs=runtime+",www.maodou.com,"+ip;
        KeyedMessage<String,String> data=new KeyedMessage<String,String>("test1",ip,msgs);
        producer.send(data);
    }
    System.out.println("time:"+(System.currentTimeMillis()-start));
    producer.close();

}

}

以下のコマンドでトピック「test1」を作成しました

$ bin/kafka-topics.sh --create --zookeeper 192.168.0.123:2181 --replication-factor 3 --partitions 3 --topic test1

これは、OpenJDK "1.7.0_45" を使用して、CentOS リリース 6.5 (Final) で実行されている Kafka 0.8.2 を使用しています。

4

1 に答える 1

0

Main() に以下のコードがありますか? もしそうなら、それを削除すれば問題は解決します。メイン スレッドが 10 秒後にコンシューマーをシャットダウンしようとするのと同じ問題が発生します。Apache Commons Cli および Apache Commons Daemon を使用して、適切なシャットダウン/起動を実装する必要がある場合があります。

try {
    Thread.sleep(10000);
} catch (InterruptedException ie) {

}
example.shutdown();
于 2015-03-11T17:48:59.043 に答える