12

現在、Elastic search のドキュメントPython API を使用した例を参照して、Elastic search セットアップのインデックスを再作成しようとしています。

ただし、これがどのように機能するかについては少し混乱しています。Python API からスクロール ID を取得できました。

es = Elasticsearch("myhost")

index = "myindex"
query = {"query":{"match_all":{}}}
response = es.search(index= index, doc_type= "my-doc-type", body= query, search_type= "scan", scroll= "10m")

scroll_id = response["_scroll_id"]

さて、私の質問は、これが私にとって何の役に立つのかということです. スクロールIDを知ることで何が得られますか? ドキュメントには「Bulk API」を使用するように記載されていますが、scoll_id がこれにどのように影響するのかわかりません。少し混乱しました。

scroll_id を正しく取得したことを考慮して、この時点からインデックスを再作成する方法を示す簡単な例を誰か教えてください。

4

4 に答える 4

9

これは、elasticsearch-py を使用して別の Elasticsearch ノードに再インデックスする例です。

from elasticsearch import helpers
es_src = Elasticsearch(["host"])
es_des = Elasticsearch(["host"])

helpers.reindex(es_src, 'src_index_name', 'des_index_name', target_client=es_des)

クエリの結果を別のインデックスに再インデックスすることもできます。その方法は次のとおりです。

from elasticsearch import helpers
es_src = Elasticsearch(["host"])
es_des = Elasticsearch(["host"])

body = {"query": {"term": {"year": "2004"}}}
helpers.reindex(es_src, 'src_index_name', 'des_index_name', target_client=es_des, query=body)
于 2016-01-14T13:30:39.127 に答える
7

こんにちは、スクロール API を使用して、最も効率的な方法ですべてのドキュメントを確認できます。scroll_id を使用すると、特定のスクロール要求のためにサーバーに保存されているセッションを見つけることができます。したがって、より多くのアイテムを取得するには、リクエストごとに scroll_id を提供する必要があります。

一括 API は、より効率的なドキュメントのインデックス作成用です。コピーとインデックス作成には両方が必要ですが、実際には関係ありません。

それがどのように機能するかについてより良いアイデアを得るのに役立つかもしれないJavaコードがいくつかあります。

    public void reIndex() {
    logger.info("Start creating a new index based on the old index.");

    SearchResponse searchResponse = client.prepareSearch(MUSIC_INDEX)
            .setQuery(matchAllQuery())
            .setSearchType(SearchType.SCAN)
            .setScroll(createScrollTimeoutValue())
            .setSize(SCROLL_SIZE).execute().actionGet();

    BulkProcessor bulkProcessor = BulkProcessor.builder(client,
            createLoggingBulkProcessorListener()).setBulkActions(BULK_ACTIONS_THRESHOLD)
            .setConcurrentRequests(BULK_CONCURRENT_REQUESTS)
            .setFlushInterval(createFlushIntervalTime())
            .build();

    while (true) {
        searchResponse = client.prepareSearchScroll(searchResponse.getScrollId())
                .setScroll(createScrollTimeoutValue()).execute().actionGet();

        if (searchResponse.getHits().getHits().length == 0) {
            logger.info("Closing the bulk processor");
            bulkProcessor.close();
            break; //Break condition: No hits are returned
        }

        for (SearchHit hit : searchResponse.getHits()) {
            IndexRequest request = new IndexRequest(MUSIC_INDEX_NEW, hit.type(), hit.id());
            request.source(hit.sourceRef());
            bulkProcessor.add(request);
        }
    }
}
于 2014-10-14T22:17:07.780 に答える