1

トライデント初心者です。kafka からデータを読み取るトライデント トポロジを作成しています。トピック名は「テスト」です。私はローカルのカフカをセットアップしています。飼育係、カフカをローカルで始めました。そして、kafka でトピック「test」を作成し、プロデューサーを開いて「Hello Kafka!」というメッセージを入力しました。

トライデントを使用して、「test」トピックから「Hello Kafka」というメッセージを読みたいです。

以下は私のコードです。空のタプルを取得しています。

    TridentTopology topology = new TridentTopology();
    BrokerHosts brokerHosts = new ZkHosts("localhost:2181");

    TridentKafkaConfig kafkaConfig = new TridentKafkaConfig(brokerHosts, "test");
    kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
    kafkaConfig.bufferSizeBytes = 1024 * 1024 * 4;
    kafkaConfig.fetchSizeBytes = 1024 * 1024 * 4;
    kafkaConfig.forceFromStart = false;
    OpaqueTridentKafkaSpout opaqueTridentKafkaSpout = new OpaqueTridentKafkaSpout(kafkaConfig);

    topology.newStream("TestSpout", opaqueTridentKafkaSpout).parallelismHint(1)
      .each(new Fields(), new TestFilter()).parallelismHint(1)
      .each(new Fields(), new Utils.PrintFilter());

これは私の TestFilter クラスコードです

public TestFilter()
{
    //
}

@Override
public boolean isKeep(TridentTuple tuple) {
    boolean isKeep=true;
    System.out.println("TestFilter is called...");
    if (tuple != null && tuple.getValues().size()>0) {
        System.out.println("data from kafka ::: "+tuple.getValues());
    } 
    return isKeep;
}

kafka プロデューサーで「テスト」トピックにメッセージを入力するたびに、最初の sysout が出力されますが、if ループは渡されません。「TestFilter が呼び出されました...」というメッセージが表示されるだけです。

私が作成した実際のデータを「テスト」トピックに取得したいと考えています。どのように?

4

1 に答える 1