54

API を使用して IDE から Kafka でトピックを作成するにはどうすればよいですか。

bin/kafka-create-topic.sh --topic mytopic --replica 3 --zookeeper localhost:2181

エラーが発生します:

bash: bin/kafka-create-topic.sh: No such file or directory

そして、開発者のセットアップをそのまま踏襲しました。

4

11 に答える 11

74

Kafka 0.8.1+ (今日の最新バージョンの Kafka) では、 を介してプログラムで新しいトピックを作成できますAdminCommandCreateTopicCommandこの質問に対する以前の回答の 1 つで言及されていた (古い Kafka 0.8.0 の一部)の機能は、に移動されましたAdminCommand

Kafka 0.8.1 の Scala の例:

import kafka.admin.AdminUtils
import kafka.utils.ZKStringSerializer
import org.I0Itec.zkclient.ZkClient

// Create a ZooKeeper client
val sessionTimeoutMs = 10000
val connectionTimeoutMs = 10000
// Note: You must initialize the ZkClient with ZKStringSerializer.  If you don't, then
// createTopic() will only seem to work (it will return without error).  The topic will exist in
// only ZooKeeper and will be returned when listing topics, but Kafka itself does not create the
// topic.
val zkClient = new ZkClient("zookeeper1:2181", sessionTimeoutMs, connectionTimeoutMs,
    ZKStringSerializer)

// Create a topic named "myTopic" with 8 partitions and a replication factor of 3
val topicName = "myTopic"
val numPartitions = 8
val replicationFactor = 3
val topicConfig = new Properties
AdminUtils.createTopic(zkClient, topicName, numPartitions, replicationFactor, topicConfig)

例として sbt を使用して、依存関係を構築します。

libraryDependencies ++= Seq(
  "com.101tec" % "zkclient" % "0.4",
  "org.apache.kafka" % "kafka_2.10" % "0.8.1.1"
    exclude("javax.jms", "jms")
    exclude("com.sun.jdmk", "jmxtools")
    exclude("com.sun.jmx", "jmxri"),
  ...
)

編集: Kafka 0.9.0.0 の Java の例を追加しました (2016 年 1 月現在の最新バージョン)。

Maven の依存関係:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.11</artifactId>
    <version>0.9.0.0</version>
</dependency>
<dependency>
    <groupId>com.101tec</groupId>
    <artifactId>zkclient</artifactId>
    <version>0.7</version>
</dependency>

コード:

import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;

import java.util.Properties;

import kafka.admin.AdminUtils;
import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;

public class KafkaJavaExample {

  public static void main(String[] args) {
    String zookeeperConnect = "zkserver1:2181,zkserver2:2181";
    int sessionTimeoutMs = 10 * 1000;
    int connectionTimeoutMs = 8 * 1000;
    // Note: You must initialize the ZkClient with ZKStringSerializer.  If you don't, then
    // createTopic() will only seem to work (it will return without error).  The topic will exist in
    // only ZooKeeper and will be returned when listing topics, but Kafka itself does not create the
    // topic.
    ZkClient zkClient = new ZkClient(
        zookeeperConnect,
        sessionTimeoutMs,
        connectionTimeoutMs,
        ZKStringSerializer$.MODULE$);

    // Security for Kafka was added in Kafka 0.9.0.0
    boolean isSecureKafkaCluster = false;
    ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperConnect), isSecureKafkaCluster);

    String topic = "my-topic";
    int partitions = 2;
    int replication = 3;
    Properties topicConfig = new Properties(); // add per-topic configurations settings here
    AdminUtils.createTopic(zkUtils, topic, partitions, replication, topicConfig);
    zkClient.close();
  }

}

編集 2: Kafka 0.10.2.0 の Java の例を追加しました (2017 年 4 月現在の最新バージョン)。

Maven の依存関係:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.11</artifactId>
    <version>0.10.2.0</version>
</dependency>
<dependency>
    <groupId>com.101tec</groupId>
    <artifactId>zkclient</artifactId>
    <version>0.9</version>
</dependency>

コード:

import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;

import java.util.Properties;

import kafka.admin.AdminUtils;
import kafka.admin.RackAwareMode;
import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;

public class KafkaJavaExample {

  public static void main(String[] args) {
    String zookeeperConnect = "zkserver1:2181,zkserver2:2181";
    int sessionTimeoutMs = 10 * 1000;
    int connectionTimeoutMs = 8 * 1000;

    String topic = "my-topic";
    int partitions = 2;
    int replication = 3;
    Properties topicConfig = new Properties(); // add per-topic configurations settings here

    // Note: You must initialize the ZkClient with ZKStringSerializer.  If you don't, then
    // createTopic() will only seem to work (it will return without error).  The topic will exist in
    // only ZooKeeper and will be returned when listing topics, but Kafka itself does not create the
    // topic.
    ZkClient zkClient = new ZkClient(
        zookeeperConnect,
        sessionTimeoutMs,
        connectionTimeoutMs,
        ZKStringSerializer$.MODULE$);

    // Security for Kafka was added in Kafka 0.9.0.0
    boolean isSecureKafkaCluster = false;

    ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeperConnect), isSecureKafkaCluster);
    AdminUtils.createTopic(zkUtils, topic, partitions, replication, topicConfig, RackAwareMode.Enforced$.MODULE$);
    zkClient.close();
  }

}
于 2014-04-29T09:00:25.243 に答える
10

kafka.admin.CreateTopicCommand scala クラスを使用して、必要な引数を指定して Java コードからトピックを作成することができます。

String [] arguments = new String[8];
arguments[0] = "--zookeeper";
arguments[1] = "10.***.***.***:2181";
arguments[2] = "--replica";
arguments[3] = "1";
arguments[4] = "--partition";
arguments[5] = "1";
arguments[6] = "--topic";
arguments[7] = "test-topic-Biks";

CreateTopicCommand.main(arguments);

注意:jopt-simple-4.5 &の Maven 依存関係を追加する必要があります。zkclient-0.1

于 2013-08-28T06:20:44.607 に答える
2

Kafka 0.10.0.0 以降を使用している場合、Java からトピックを作成するには、RackAwareMode タイプのパラメーターを渡す必要があります。これは Scala ケース オブジェクトであり、そのインスタンスを Java から取得するのはトリッキーです (例: Java から Scala ケース オブジェクトを「取得」するにはどうすればよいでしょうか?しかし、このケースには当てはまりません)。

幸い、rackAwareMode はオプションのパラメータです。ただし、Java はオプションのパラメーターをサポートしていません。どうすればそれを解決できますか? ここに解決策があります:

AdminUtils.createTopic(zkUtils, topic, 1, 1, 
    AdminUtils.createTopic$default$5(),
    AdminUtils.createTopic$default$6());

miguno の回答と一緒に使用すれば、準備完了です。

于 2016-07-18T17:26:54.220 に答える
1

あなたの電話がうまくいかないいくつかの方法。

  1. Kafka クラスターに、レプリケーション値 3 をサポートするのに十分なノードがなかった場合。

  2. chroot パスのプレフィックスがある場合は、zookeeper ポートの後に追加する必要があります

  3. 実行時に Kafka インストール ディレクトリにいない (これが最も可能性が高い)

于 2013-12-19T23:16:51.320 に答える
1

Kafka 0.8 Producer Exampleから、以下のサンプルは、という名前のトピックを作成し、属性がKafka Broker 構成ファイルで (デフォルト) に設定されてpage_visitsいる場合、生成を開始します。auto.create.topics.enabletrue

import java.util.*;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

public class TestProducer {
    public static void main(String[] args) {
        long events = Long.parseLong(args[0]);
        Random rnd = new Random();

        Properties props = new Properties();
        props.put("metadata.broker.list", "broker1:9092,broker2:9092 ");
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        props.put("partitioner.class", "example.producer.SimplePartitioner");
        props.put("request.required.acks", "1");

        ProducerConfig config = new ProducerConfig(props);

        Producer<String, String> producer = new Producer<String, String>(config);

        for (long nEvents = 0; nEvents < events; nEvents++) { 
            long runtime = new Date().getTime();  
            String ip = “192.168.2.” + rnd.nextInt(255); 
            String msg = runtime + “,www.example.com,” + ip; 
            KeyedMessage<String, String> data = new KeyedMessage<String, String>("page_visits", ip, msg);
            producer.send(data);
        }
        producer.close();
   }
}
于 2013-08-23T22:49:19.587 に答える
0

どのIDEから試していますか?

完全なパスを入力してください。以下は、トピックを作成するターミナルからのコマンドです。

  1. cd kafka/bin
  2. ./kafka-create-topic.sh --topic test --zookeeper localhost:2181
于 2013-06-23T07:59:56.003 に答える