confluent-3.0.1 プラットフォームを使用し、Kafka-Elasticsearch コネクタを構築しています。このために、SinkConnector と SinkTask (Kafka 接続 API) を拡張して、Kafka からデータを取得します。
このコードの一部として、SinkConnector の taskConfigs メソッドを拡張して「max.poll.records」を返し、一度に 100 レコードのみを取得しています。しかし、それは機能せず、すべてのレコードを同時に取得していますが、規定の時間内にオフセットをコミットできません。「max.poll.records」を設定するのを手伝ってください。
public List<Map<String, String>> taskConfigs(int maxTasks) {
ArrayList<Map<String, String>> configs = new ArrayList<Map<String, String>>();
for (int i = 0; i < maxTasks; i++) {
Map<String, String> config = new HashMap<String, String>();
config.put(ConfigurationConstants.CLUSTER_NAME, clusterName);
config.put(ConfigurationConstants.HOSTS, hosts);
config.put(ConfigurationConstants.BULK_SIZE, bulkSize);
config.put(ConfigurationConstants.IDS, elasticSearchIds);
config.put(ConfigurationConstants.TOPICS_SATELLITE_DATA, topics);
config.put(ConfigurationConstants.PUBLISH_TOPIC, topicTopublish);
config.put(ConfigurationConstants.TYPES, elasticSearchTypes);
config.put("max.poll.records", "100");
configs.add(config);
}
return configs;
}