1

mysql データベースから大量のデータをインポートし、ドキュメントをインデックス化する必要があります (約 1000 ドキュメント)。インデックス作成プロセス中に、拡張要求を外部の Apache Stanbol サーバーに送信して、フィールドの特別な処理を行う必要があります。以下のように、更新チェーンで StanbolContentProcessor を使用するように solrconfig.xml で dataimport-handler を構成しました。

<updateRequestProcessorChain name="stanbolInterceptor">
    <processor class="com.solr.stanbol.processor.StanbolContentProcessorFactory"/>
    <processor class="solr.RunUpdateProcessorFactory" />
</updateRequestProcessorChain>

<requestHandler name="/dataimport" class="solr.DataImportHandler">   
    <lst name="defaults">  
        <str name="config">data-config.xml</str>
        <str name="update.chain">stanbolInterceptor</str>
    </lst>  
</requestHandler>

サンプルの data-config.xml は次のとおりです。

<dataConfig>
    <dataSource type="JdbcDataSource" driver="com.mysql.jdbc.Driver" 
                url="jdbc:mysql://localhost:3306/solrTest" 
                user="test" password="test123" batchSize="1" />
    <document name="stanboldata">
        <entity name="stanbolrequest" query="SELECT * FROM documents">
            <field column="id" name="id" />
            <field column="content" name="content" />
            <field column="title" name="title" />
        </entity>
    </document>
</dataConfig>

約 1000 のドキュメントを含む大規模なインポートを実行すると、上記の Solr Stanbolnterceptor による負荷が高いためと思われるスタンボル サーバーがダウンします。Stanbol が管理可能な数のリクエストを同時に処理できるように、dataimport をバッチで調整したいと考えています。

これは、data-config の dataSource 要素の batchSize パラメータを使用して達成できますか?

Solr でのデータインポートの負荷を抑えるためのアイデアを教えてください。

これは、/dataimport 中に Stanbol リクエストを処理するカスタム UpdateProcessor クラスです。

public class StanbolContentProcessorFactory extends
        UpdateRequestProcessorFactory {

    public static final String NLP_ORGANIZATION = "nlp_organization";
    public static final String NLP_PERSON = "nlp_person";
    public static final String[] STANBOL_REQUEST_FIELDS = { "title", "content" };
    public static final String STANBOL_ENDPOINT = "http://localhost:8080/enhancer";

    @Override
    public UpdateRequestProcessor getInstance(SolrQueryRequest req,
            SolrQueryResponse res, UpdateRequestProcessor next) {

        return new StanbolContentProcessor(next);
    }

    class StanbolContentProcessor extends UpdateRequestProcessor {

        public StanbolContentProcessor(UpdateRequestProcessor next) {
            super(next);
        }

        @Override
        public void processAdd(AddUpdateCommand cmd) throws IOException {
            SolrInputDocument doc = cmd.getSolrInputDocument();
            String request = "";
            for (String field : STANBOL_REQUEST_FIELDS) {
                if (null != doc.getFieldValue(field)) {
                    request += (String) doc.getFieldValue(field) + ". ";
                }

            }
            try {
                EnhancementResult result = stanbolPost(request, getBaseURI());
                Collection<TextAnnotation> textAnnotations = result
                        .getTextAnnotations();
                // extracting text annotations
                Set<String> personSet = new HashSet<String>();
                Set<String> orgSet = new HashSet<String>();
                for (TextAnnotation text : textAnnotations) {
                    String type = text.getType();
                    String selectedText = text.getSelectedText();

                    if (null != type && null != selectedText) {
                        if (type.equalsIgnoreCase(StanbolConstants.DBPEDIA_PERSON)
                                || type.equalsIgnoreCase(StanbolConstants.FOAF_PERSON)) {
                            personSet.add(selectedText);

                        } else if (type
                                .equalsIgnoreCase(StanbolConstants.DBPEDIA_ORGANIZATION)
                                || type.equalsIgnoreCase(StanbolConstants.FOAF_ORGANIZATION)) {
                            orgSet.add(selectedText);

                        }
                    }
                }
                for (String person : personSet) {
                    doc.addField(NLP_PERSON, person);
                }
                for (String org : orgSet) {
                    doc.addField(NLP_ORGANIZATION, org);
                }
                cmd.solrDoc = doc;
                super.processAdd(cmd);
            } catch (Exception ex) {
                ex.printStackTrace();
            }
        }

    }

    private EnhancementResult stanbolPost(String request, URI uri) {
        Client client = Client.create();
        WebResource webResource = client.resource(uri);
        ClientResponse response = webResource.type(MediaType.TEXT_PLAIN)
                .accept(new MediaType("application", "rdf+xml"))
                .entity(request, MediaType.TEXT_PLAIN)
                .post(ClientResponse.class);

        int status = response.getStatus();
        if (status != 200 && status != 201 && status != 202) {
            throw new RuntimeException("Failed : HTTP error code : "
                    + response.getStatus());
        }
        String output = response.getEntity(String.class);
        // Parse the RDF model

        Model model = ModelFactory.createDefaultModel();
        StringReader reader = new StringReader(output);
        model.read(reader, null);
        return new EnhancementResult(model);

    }


    private static URI getBaseURI() {
        return UriBuilder.fromUri(STANBOL_ENDPOINT).build();
    }

}
4

2 に答える 2

4

このbatchSizeオプションは、メモリ使用量を削減するためにデータベース テーブルの行をバッチで取得するために使用されます (データ インポート ハンドラの実行時にメモリ不足を防ぐためによく使用されます)。バッチ サイズが小さいほど遅くなる可能性がありますが、このオプションはインポート プロセスの速度に影響を与えることを意図していません。

私の提案は、ファイアウォール ルールを使用するなど、別の方法でリクエストを制限することです。Linux を使用していて Netfilter にアクセスできる場合は、次のようなコマンドを実行できます。

iptables -A INPUT -p tcp --dport 12345 -m limit --limit 10/s -j ACCEPT

ここで、「12345」はスタンボル ポートで、「10/s」は 1 秒間に受け入れるパケット数です。

于 2013-11-27T08:50:08.387 に答える
3

Mowgli の言うとおりですbatchsize。これについては役に立ちません。ほとんどの人は逆に問題を抱えているため (My dataimport is too slow, please helpたとえば)、Solr にはこのようなものはありません。少なくとも私が知っていることは何もありません。


個人的には、スロットリングを処理するように Linux システムを構成することは選択しません。ステージからステージへ移動する場合、または別のサーバーに移行する場合は、これを覚えておく必要があります。そして、システムの存続期間中に人々が変わったとしても、彼らはこれを知りません.

したがって、あなたのコードはわかりませんが、他の質問StanbolContentProcessorFactoryですでに言及されているように、カスタムコードのようです。これはカスタム コードであるため、そこにスロットル メカニズムを追加することができます。これについてさらに詳しく説明するには、確認するコードが必要です。


アップデート

Solr には Google の guava があるので、ここで提案されているようにRateLimiter を使用します。Maven でビルドしている場合、これは scope を使用できることを意味します。Maven を使用していない場合は、fatjar を作成したり、Solr の lib フォルダーに guava を配置したりする必要はありません。provided

import com.google.common.util.concurrent.RateLimiter;

public class StanbolContentProcessorFactory extends
    UpdateRequestProcessorFactory {

    // ...

    // add a rate limiter to throttle your requests
    // this setting would allow 10 requests per second
    private RateLimiter throttle = RateLimiter.create(0.1);

    // ...

    private EnhancementResult stanbolPost(String request, URI uri) {
        Client client = Client.create();

        // this will throttle your requests
        throttle.acquire();

        WebResource webResource = client.resource(uri);
        ClientResponse response = webResource.type(MediaType.TEXT_PLAIN)
            .accept(new MediaType("application", "rdf+xml"))
            .entity(request, MediaType.TEXT_PLAIN)
            .post(ClientResponse.class);

        int status = response.getStatus();
        if (status != 200 && status != 201 && status != 202) {
            throw new RuntimeException("Failed : HTTP error code : "
                + response.getStatus());
        }
        String output = response.getEntity(String.class);
        // Parse the RDF model
        Model model = ModelFactory.createDefaultModel();
        StringReader reader = new StringReader(output);
        model.read(reader, null);
        return new EnhancementResult(model);
}
于 2013-11-27T11:54:26.803 に答える