0

私はカフカが初めてです。私は 2 つのトピックを作成し、2 人のプロデューサーからこれら 2 つのトピックについて公開しています。両方のトピックからのメッセージを消費する 1 つのコンシューマーがあります。優先度に応じて処理したいからです。

両方のトピックからストリームを取得していますが、ストリームの繰り返しを開始するとすぐConsumerItreatorにブロックされます。ドキュメントに書かれている通り、新しいメッセージを受け取るまでブロックされます。

単一の Kafka Consumer から 2 つのトピックと 2 つのストリームを読み取る方法を知っている人はいますか?

    Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
                topicCountMap.put(KafkaConstants.HIGH_TEST_TOPIC, new Integer(1));
                topicCountMap.put(KafkaConstants.LOW_TEST_TOPIC, new Integer(1));
                Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumerConnector.createMessageStreams(topicCountMap);
                KafkaStream<byte[], byte[]> highPriorityStream = consumerMap.get(KafkaConstants.HIGH_TEST_TOPIC).get(0);
                ConsumerIterator<byte[], byte[]> highPrioerityIterator = highPriorityStream.iterator();

                while (highPriorityStream.nonEmpty() && highPrioerityIterator.hasNext())
                {
                    byte[] bytes = highPrioerityIterator.next().message();
                    Object obj = null;
                    CLoudDataObject thunderDataObject = null;
                    try
                    {

                        obj = SerializationUtils.deserialize(bytes);
                        if (obj instanceof CLoudDataObject)
                        {
                            thunderDataObject = (CLoudDataObject) obj;
                            System.out.println(thunderDataObject);
                            // TODO Got the Thunder object here, now write code to send it to Thunder service.
                        }

                    }
                    catch (Exception e)
                    {
                    }
                }
4

1 に答える 1

0

優先度の高いメッセージの前に優先度の低いメッセージを処理したくない場合は、consumer.timeout.ms プロパティを設定し、ConsumerTimeoutException をキャッチして、優先度の高いフローが利用可能な最後のメッセージに到達したことを検出するのはどうですか? デフォルトでは、新しいメッセージが到着するまでブロックするように -1 に設定されています。( http://kafka.apache.org/07/configuration.html )

以下に、優先度の異なる複数のフローを同時に処理する方法を説明します。

Kafka にはマルチスレッド プログラミングが必要です。あなたの場合、2 つのトピックのストリームは、フローのスレッドによって処理される必要があります。各スレッドは独立して実行されてメッセージを処理するため、1 つのブロック フロー (スレッド) が他のフローに影響を与えることはありません。

Java の ThreadPool 実装は、マルチスレッド アプリケーションの作成に役立ちます。ここで実装例を見つけることができます:

https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example

実行の優先度に関しては、 Thread.currentThread.setPriority メソッドを呼び出して、提供する Kafka トピックに基づいてスレッドの適切な優先度を設定できます。

于 2015-06-09T23:18:22.967 に答える