0

confluent-3.0.1 プラットフォームを使用し、Kafka-Elasticsearch コネクタを構築しています。このために、SinkConnector と SinkTask (Kafka 接続 API) を拡張して、Kafka からデータを取得します。

このコードの一部として、SinkConnector の taskConfigs メソッドを拡張して「max.poll.records」を返し、一度に 100 レコードのみを取得しています。しかし、それは機能せず、すべてのレコードを同時に取得していますが、規定の時間内にオフセットをコミットできません。「max.poll.records」を設定するのを手伝ってください。

 public List<Map<String, String>> taskConfigs(int maxTasks) {
    ArrayList<Map<String, String>> configs = new ArrayList<Map<String, String>>();
    for (int i = 0; i < maxTasks; i++) {
      Map<String, String> config = new HashMap<String, String>();
      config.put(ConfigurationConstants.CLUSTER_NAME, clusterName);
      config.put(ConfigurationConstants.HOSTS, hosts);
      config.put(ConfigurationConstants.BULK_SIZE, bulkSize);
      config.put(ConfigurationConstants.IDS, elasticSearchIds);
      config.put(ConfigurationConstants.TOPICS_SATELLITE_DATA, topics);
      config.put(ConfigurationConstants.PUBLISH_TOPIC, topicTopublish);
      config.put(ConfigurationConstants.TYPES, elasticSearchTypes);
      config.put("max.poll.records", "100");

      configs.add(config);
    }
    return configs;
  }
4

2 に答える 2

8

max.poll.recordsコネクタ構成のように、ほとんどの Kafka コンシューマー構成をオーバーライドすることはできません。ただし、consumer.プレフィックスを使用して、Connect ワーカー構成でこれを行うことができます。

于 2016-11-10T19:39:36.157 に答える
2

解決しました。connect-avro-standalone.properties に以下の構成を追加しました

 group.id=mygroup
 consumer.max.poll.records=1000

コネクタを実行するためのコマンドの下で実行しました。

sh ./bin/connect-standalone ./etc/schema-registry/connect-avro-standalone.properties ./etc/kafka-connect-elasticsearch/connect-elasticsearch-sink.properties
于 2016-11-14T06:48:47.347 に答える