Web イベントを他のデータソースにストリーミングするために Apache Kafka 0.8.2.1 を使用しています。私が作成した Kafka Producer はうまく機能しており、kafka-console-consumer.sh を実行すると、トピックを通じてデータがストリーミングされているのを確認できます。ただし、Kafka Consumer にメッセージを取得させようとしても、うまくいきませんでした。何か案は?
私のコードが consumer.createMessageStreams(topicCountMap) を実行しようとすると、不適切なパスに関する次のエラーが出力されます。
Exception in thread "main" java.lang.IllegalArgumentException: Path must not end with / character
at org.apache.zookeeper.common.PathUtils.validatePath(PathUtils.java:58)
at org.apache.zookeeper.ZooKeeper.exists(ZooKeeper.java:1024)
at org.apache.zookeeper.ZooKeeper.exists(ZooKeeper.java:1073)
at org.I0Itec.zkclient.ZkConnection.exists(ZkConnection.java:95)
at org.I0Itec.zkclient.ZkClient$11.call(ZkClient.java:827)
at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675)
at org.I0Itec.zkclient.ZkClient.watchForData(ZkClient.java:824)
at org.I0Itec.zkclient.ZkClient.subscribeDataChanges(ZkClient.java:136)
at kafka.consumer.ZookeeperConsumerConnector$$anonfun$kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer$4.apply(ZookeeperConsume
rConnector.scala:901)
at kafka.consumer.ZookeeperConsumerConnector$$anonfun$kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer$4.apply(ZookeeperConsume
rConnector.scala:898)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
at kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer(ZookeeperConsumerConnector.scala:
898)
at kafka.consumer.ZookeeperConsumerConnector.consume(ZookeeperConsumerConnector.scala:240)
at kafka.javaapi.consumer.ZookeeperConsumerConnector.createMessageStreams(ZookeeperConsumerConnector.scala:85)
at kafka.javaapi.consumer.ZookeeperConsumerConnector.createMessageStreams(ZookeeperConsumerConnector.scala:97)
これが私のKafka Consumerからのコードです。
val consumer: ConsumerConnector = kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig())
var executor: ExecutorService = null
def run(a_numThreads: Integer) {
var topicCountMap: java.util.Map[String, Integer] = new java.util.HashMap[String, Integer]()
topicCountMap.put("testEvent", new Integer(a_numThreads))
var consumerMap = consumer.createMessageStreams(topicCountMap)
var streams = consumerMap.get("testEvent")
// now launch all the threads
executor = Executors.newFixedThreadPool(a_numThreads)
// now create an object to consume the messages
//
var threadNumber: Integer = 0
var streamsItr = streams.iterator()
while (streamsItr.hasNext()) {
var stream = streamsItr.next()
executor.submit(new EventConsumer(stream, threadNumber))
threadNumber = threadNumber + 1
}
}
def createConsumerConfig(): ConsumerConfig = {
var props: Properties = new Properties()
props.put("zookeeper.connect", "127.0.0.1:2181")
props.put("zk.connect", "127.0.0.1:2181")
props.put("group.id", "testConsumer")
props.put("groupid", "tesConsumer")
props.put("zookeeper.session.timeout.ms", "400")
props.put("zookeeper.sync.time.ms", "200")
props.put("auto.commit.interval.ms", "1000")
return new ConsumerConfig(props)
}