私はカフカが初めてです。私は 2 つのトピックを作成し、2 人のプロデューサーからこれら 2 つのトピックについて公開しています。両方のトピックからのメッセージを消費する 1 つのコンシューマーがあります。優先度に応じて処理したいからです。
両方のトピックからストリームを取得していますが、ストリームの繰り返しを開始するとすぐConsumerItreator
にブロックされます。ドキュメントに書かれている通り、新しいメッセージを受け取るまでブロックされます。
単一の Kafka Consumer から 2 つのトピックと 2 つのストリームを読み取る方法を知っている人はいますか?
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(KafkaConstants.HIGH_TEST_TOPIC, new Integer(1));
topicCountMap.put(KafkaConstants.LOW_TEST_TOPIC, new Integer(1));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumerConnector.createMessageStreams(topicCountMap);
KafkaStream<byte[], byte[]> highPriorityStream = consumerMap.get(KafkaConstants.HIGH_TEST_TOPIC).get(0);
ConsumerIterator<byte[], byte[]> highPrioerityIterator = highPriorityStream.iterator();
while (highPriorityStream.nonEmpty() && highPrioerityIterator.hasNext())
{
byte[] bytes = highPrioerityIterator.next().message();
Object obj = null;
CLoudDataObject thunderDataObject = null;
try
{
obj = SerializationUtils.deserialize(bytes);
if (obj instanceof CLoudDataObject)
{
thunderDataObject = (CLoudDataObject) obj;
System.out.println(thunderDataObject);
// TODO Got the Thunder object here, now write code to send it to Thunder service.
}
}
catch (Exception e)
{
}
}