3

ドキュメントを Storm から Elasticsearch にインデックス付けしたいのですが、Elasticsearch にインデックス付けされるドキュメントを取得できませんでした。

私のトポロジでは、{ "tweetId": 1, "text": "hello" } のような json を、Storm タプルを Elasticsearch (docはこちら: https://www.elastic.co/guide/en/elasticsearch/hadoop/current/storm.html )。これらは、私の EsBolt の構成です。

Map conf = new HashMap();
conf.put("es.nodes","127.0.0.1");
conf.put("es.port","9200");
conf.put("es.resource","twitter/tweet");
conf.put("es.index.auto.create","no");
conf.put("es.input.json", "true");
conf.put("es.mapping.id", "tweetId");
EsBolt elasticsearchBolt = new EsBolt("twitter/tweet", conf);

最初の 2 つの構成にはデフォルトでこれらの値がありますが、明示的に設定することにしました。私もそれらなしで試してみましたが、同じ結果が得られました。

そして、これがトポロジを構築する方法です。

TopologyBuilder builder = new TopologyBuilder();

builder.setSpout(TWEETS_DATA_KAFKA_SPOUT_ID, kafkaSpout, kafkaSpoutParallelism)
        .setNumTasks(kafkaSpoutNumberOfTasks);


builder.setBolt(ELASTICSEARCH_BOLT_ID, elasticsearchBolt, elasticsearchBoltParallelism)
        .setNumTasks(elasticsearchBoltNumberOfTasks)
        .shuffleGrouping(TWEETS_DATA_KAFKA_SPOUT_ID);

return builder.createTopology();

トポロジをローカルで実行する前に、Elasticsearch で「twitter」インデックスを作成し、このインデックスのマッピング「tweet」を作成します。新しく作成したタイプのマッピングを取得すると、次のようになります (curl -XGET ' http://localhost:9200/twitter/_mapping/tweet '):

{
   "twitter": {
      "mappings": {
         "tweet": {
            "properties": {
               "text": {
                  "type": "string"
               },
               "tweetId": {
                  "type": "string"
               }
            }
         }
      }
   }
}

トポロジをローカルで実行すると、タプルを処理するときにコンソールに次のように表示されます。

Processing received message FOR 6 TUPLE: source: tweets-data-kafka-spout:9, stream: default, id: {-8010897758788654352=-6240339405307942979}, [{"tweetId":"1","text":"hello"}]

Emitting: elasticsearch-bolt __ack_ack [-8010897758788654352 -6240339405307942979]

TRANSFERING tuple TASK: 2 TUPLE: source: elasticsearch-bolt:6, stream: __ack_ack, id: {}, [-8010897758788654352 -6240339405307942979]

BOLT ack TASK: 6 TIME:  TUPLE: source: tweets-data-kafka-spout:9, stream: default, id: {-8010897758788654352=-6240339405307942979}, [{"tweetId":"1","text":"hello"}]

Execute done TUPLE source: tweets-data-kafka-spout:9, stream: default, id: {-8010897758788654352=-6240339405307942979}, [{"tweetId":"1","text":"hello"}] TASK: 6 DELTA:

したがって、タプルは処理されているようです。ただし、Elasticsearch でインデックスが作成されたドキュメントはありません。

EsBolt の構成を設定するときに、何か間違ったことをしていると思います。おそらく、構成または何かが欠落しています。

4

2 に答える 2

1

es.storm.bolt.flush.entries.sizeで指定されたフラッシュ サイズに達した場合にのみ、ドキュメントのインデックスが作成されます。

または、キューのフラッシュをトリガーする TICK 頻度を設定することもできます。

config.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 5);

デフォルトでは、es.storm.bolt.tick.tuple.flushパラメーターに従って、es-hadoop はティックでフラッシュします。

于 2016-09-09T18:51:20.803 に答える