1

問題:
推論生成プロセスは、1 秒あたり約 300 の推論データを MongoDB コレクションに書き込みます。MongoDB の変更ストリーム機能は、別のプロセスによって利用され、これらの推論を読み戻し、後処理を行います。現在、変更ストリーム関数 API (mongoc_change_stream_next()) が呼び出されると、単一の推論データのみが返されます。したがって、すべての推論データを 1 秒以内に保存するには、このような呼び出しを合計 300 回行う必要があります。ただし、各読み取りの後、単一/複数の推論データの後処理を実行するために約 50 ミリ秒の時間が必要です。単一データ リターン モデルのため、15 倍の実効レイテンシが導入されます。この問題に取り組むために、私たちはMongoDB の変更ストリーム機能に沿ったバッチ読み取りメカニズム。同じものを実装するためにさまざまなオプションを試しましたが、変更ストリーム API 呼び出しごとに 1 つのデータしか取得できませんでした。この問題を整理する方法はありますか?

プラットフォーム:
OS: Ubuntu 16.04
Mongo-c-driver: 1.15.1
Mongo サーバー: 4.0.12

試したオプション:
カーソルのバッチ サイズを 1 より大きく設定する。

int main(void) {
    const char *uri_string = "mongodb://localhost:27017/replicaSet=set0";
    mongoc_change_stream_t *stream;
    mongoc_collection_t *coll;
    bson_error_t error;
        mongoc_uri_t *uri;
    mongoc_client_t *client;

    /*
    * Add the Mongo DB blocking read and scall the inference parse function with the Json
                 * */
    uri = mongoc_uri_new_with_error (uri_string, &error);
    if (!uri) {
        fprintf (stderr,
        "failed to parse URI: %s\n"
        "error message:       %s\n",
        uri_string,
        error.message);
        return -1;
    }

    client = mongoc_client_new_from_uri (uri);
    if (!client) {
        return -1;
    }

    coll = mongoc_client_get_collection (client,  <DB-NAME>, <collection-name>);
    stream = mongoc_collection_watch (coll, &empty, NULL);
    mongoc_cursor_set_batch_size(stream->cursor, 20);
    while (1){
        while (mongoc_change_stream_next (stream, &doc)) {
            char *as_json = bson_as_relaxed_extended_json (doc, NULL); 
            ............
            ............
            //post processing consuming 50 ms of time
            ............
            ............
        }
        if (mongoc_change_stream_error_document (stream, &error, &err_doc)) {
            if (!bson_empty (err_doc)) {
                fprintf (stderr,
                "Server Error: %s\n",
                bson_as_relaxed_extended_json (err_doc, NULL));
            } else {
                fprintf (stderr, "Client Error: %s\n", error.message);
            }
            break;
        }
    }
    return 0;
}

4

1 に答える 1

1

現在、変更ストリーム関数 API (mongoc_change_stream_next()) が呼び出されると、単一の推論データのみが返されます。

技術的には、単一のドキュメントが返されるわけではありません。これは、mongoc_change_stream_next()が基になるカーソルを繰り返し、それぞれbsonを次のドキュメントに設定するためです。そのため、返されるバッチ サイズが複数であっても、ドキュメントごとに繰り返す必要があります。

あなたは試すことができます:

  • ドキュメントを並行して処理するために個別のスレッドを作成するため、ドキュメントごとに 50 ミリ秒または累積で 15 秒待つ必要はありません。

  • ドキュメントのバッチをループします。つまり、50 個のドキュメントをキャッシュしてからバッチ処理を実行します。

  • それらを別々のスレッドでバッチ処理します(上記の2つの組み合わせ)

于 2020-03-15T22:35:26.223 に答える