ドキュメントを Storm から Elasticsearch にインデックス付けしたいのですが、Elasticsearch にインデックス付けされるドキュメントを取得できませんでした。
私のトポロジでは、{ "tweetId": 1, "text": "hello" } のような json を、Storm タプルを Elasticsearch (docはこちら: https://www.elastic.co/guide/en/elasticsearch/hadoop/current/storm.html )。これらは、私の EsBolt の構成です。
Map conf = new HashMap();
conf.put("es.nodes","127.0.0.1");
conf.put("es.port","9200");
conf.put("es.resource","twitter/tweet");
conf.put("es.index.auto.create","no");
conf.put("es.input.json", "true");
conf.put("es.mapping.id", "tweetId");
EsBolt elasticsearchBolt = new EsBolt("twitter/tweet", conf);
最初の 2 つの構成にはデフォルトでこれらの値がありますが、明示的に設定することにしました。私もそれらなしで試してみましたが、同じ結果が得られました。
そして、これがトポロジを構築する方法です。
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout(TWEETS_DATA_KAFKA_SPOUT_ID, kafkaSpout, kafkaSpoutParallelism)
.setNumTasks(kafkaSpoutNumberOfTasks);
builder.setBolt(ELASTICSEARCH_BOLT_ID, elasticsearchBolt, elasticsearchBoltParallelism)
.setNumTasks(elasticsearchBoltNumberOfTasks)
.shuffleGrouping(TWEETS_DATA_KAFKA_SPOUT_ID);
return builder.createTopology();
トポロジをローカルで実行する前に、Elasticsearch で「twitter」インデックスを作成し、このインデックスのマッピング「tweet」を作成します。新しく作成したタイプのマッピングを取得すると、次のようになります (curl -XGET ' http://localhost:9200/twitter/_mapping/tweet '):
{
"twitter": {
"mappings": {
"tweet": {
"properties": {
"text": {
"type": "string"
},
"tweetId": {
"type": "string"
}
}
}
}
}
}
トポロジをローカルで実行すると、タプルを処理するときにコンソールに次のように表示されます。
Processing received message FOR 6 TUPLE: source: tweets-data-kafka-spout:9, stream: default, id: {-8010897758788654352=-6240339405307942979}, [{"tweetId":"1","text":"hello"}]
Emitting: elasticsearch-bolt __ack_ack [-8010897758788654352 -6240339405307942979]
TRANSFERING tuple TASK: 2 TUPLE: source: elasticsearch-bolt:6, stream: __ack_ack, id: {}, [-8010897758788654352 -6240339405307942979]
BOLT ack TASK: 6 TIME: TUPLE: source: tweets-data-kafka-spout:9, stream: default, id: {-8010897758788654352=-6240339405307942979}, [{"tweetId":"1","text":"hello"}]
Execute done TUPLE source: tweets-data-kafka-spout:9, stream: default, id: {-8010897758788654352=-6240339405307942979}, [{"tweetId":"1","text":"hello"}] TASK: 6 DELTA:
したがって、タプルは処理されているようです。ただし、Elasticsearch でインデックスが作成されたドキュメントはありません。
EsBolt の構成を設定するときに、何か間違ったことをしていると思います。おそらく、構成または何かが欠落しています。