トライデント初心者です。kafka からデータを読み取るトライデント トポロジを作成しています。トピック名は「テスト」です。私はローカルのカフカをセットアップしています。飼育係、カフカをローカルで始めました。そして、kafka でトピック「test」を作成し、プロデューサーを開いて「Hello Kafka!」というメッセージを入力しました。
トライデントを使用して、「test」トピックから「Hello Kafka」というメッセージを読みたいです。
以下は私のコードです。空のタプルを取得しています。
TridentTopology topology = new TridentTopology();
BrokerHosts brokerHosts = new ZkHosts("localhost:2181");
TridentKafkaConfig kafkaConfig = new TridentKafkaConfig(brokerHosts, "test");
kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
kafkaConfig.bufferSizeBytes = 1024 * 1024 * 4;
kafkaConfig.fetchSizeBytes = 1024 * 1024 * 4;
kafkaConfig.forceFromStart = false;
OpaqueTridentKafkaSpout opaqueTridentKafkaSpout = new OpaqueTridentKafkaSpout(kafkaConfig);
topology.newStream("TestSpout", opaqueTridentKafkaSpout).parallelismHint(1)
.each(new Fields(), new TestFilter()).parallelismHint(1)
.each(new Fields(), new Utils.PrintFilter());
これは私の TestFilter クラスコードです
public TestFilter()
{
//
}
@Override
public boolean isKeep(TridentTuple tuple) {
boolean isKeep=true;
System.out.println("TestFilter is called...");
if (tuple != null && tuple.getValues().size()>0) {
System.out.println("data from kafka ::: "+tuple.getValues());
}
return isKeep;
}
kafka プロデューサーで「テスト」トピックにメッセージを入力するたびに、最初の sysout が出力されますが、if ループは渡されません。「TestFilter が呼び出されました...」というメッセージが表示されるだけです。
私が作成した実際のデータを「テスト」トピックに取得したいと考えています。どのように?