17

最新の Kafka ドキュメントhttp://kafka.apache.org/documentation.htmlを使い始めています。しかし、新しい Consumer API を使用しようとすると、いくつかの問題が発生します。私は次の手順で仕事をしました:

1. 新しい依存関係を追加する

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>0.8.2.1</version>
</dependency>

2.構成を追加する

    Map<String, Object> config = new HashMap<String, Object>();
    config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
            "host:9092");
    config.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
    config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
            StringDeserializer.class.getName());
    config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
            StringDeserializer.class.getName());
    config.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY, "range");

3. KafkaConsumer API を使用する

KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(config);
consumer.subscribe("topic");

ただし、ブローカーからメッセージをポーリングしようとすると、null しか得られませんでした。

Map<String, ConsumerRecords<String, String>> records = consumer.poll(0);
if (records != null)
    process(records);
else
    System.err.println("null");

そして、ソースコードを確認した後、消費者の何が問題なのかがわかります。

@Override
public Map<String, ConsumerRecords<K,V>> poll(long timeout) {
    // TODO Auto-generated method stub
    return null;
}

さらに悪いことに、0.8.2 API に関するその他の有用な情報を見つけることができません。Kafka に関するすべての使用法が最新バージョンと互換性がないためです。誰でも私を助けることができますか?どうもありがとう。

4

2 に答える 2

1

また、新しいプロデューサーによって生成されたメッセージを読み取るために、Kafka 0.8.2.1 の上にコンシューマーを作成しようとしています。

@habsqが指摘したように、コンシューマー側では0.8.3を待つ必要がありますが、コンシューマー用にいくつかのコードが含まれていることが既にわかっていますが、まだ機能していません。

したがって、使用するクライアント (現在のクライアント API) は、現在の Kafka バージョン、つまり 0.8.2.1 の「コア」プロジェクトにあるものです (クライアントを他のバージョンにダウングレードしない方がよいでしょう)。

そのため、今のところ 2 つの jar をインポートする必要があります。1 つは「新しい」Java クライアント用、もう 1 つはコア プロジェクト用で、使用している scala バージョンにも依存します (私は 2.11 を使用しています)。

私の場合、依存関係を管理するためにgradleを使用しているので、インポートするだけです

dependencies {
  compile group: 'org.apache.kafka', name: 'kafka-clients', version: '0.8.2.1'
  compile group: 'org.apache.kafka', name: 'kafka_2.11', version: '0.8.2.1'
}

依存関係を更新すると、必要なすべてのライブラリが取得されます。

別のバージョンの Scala を使用している場合は、バージョンを変更するだけです。とにかく、 maven central ですべての異なるバージョンまたは完全な pom を見つけることができます。 %220.8.2.1%22

これらの Consumer 実装を使用すると、現在のすべての例が通常どおり機能するはずです。

PS ref: Kafka-users ml スレッドhttp://grokbase.com/t/kafka/users/153bepxet5/high-level-consumer-example-in-0-8-2

于 2015-08-06T12:39:37.483 に答える
0

はい、リリース 0.8.2.1 でメッセージの消費に問題があったことを確認できます。Java / Groovy とリリース 0.10.1.0 を使用して単純なコンシューマを作成すると、すべてが完全に機能します。

設定する必要はもうありませんPARTITION_ASSIGNMENT_STRATEGY

于 2016-10-26T12:27:45.220 に答える