2

リソースのリストを処理し、結果のアイテムを各リソースのコンテナー (std::map) に配置する場合と配置しない場合があるスレッド化されたアプリケーションを作成しています。リソースの処理は複数のスレッドで行われます。

結果コンテナがトラバースされ、各アイテムは個別のスレッドによって処理されます。このスレッドは、アイテムを受け取り、(mysqlcppconn API を使用して) MySQL データベースを更新し、コンテナからアイテムを削除して続行します。

簡単にするために、ロジックの概要を次に示します。

queueWorker() - thread
    getResourcesList() - seeds the global queue

databaseWorker() - thread
    commitProcessedResources() - commits results to a database every n seconds

processResources() - thread x <# of processor cores>
    processResource()
    queueResultItem()

そして、私がやっていることを示すための疑似実装。

/* not the actual stucts, but just for simplicities sake */
struct queue_item_t {
    int id;
    string hash;
    string text;
};

struct result_item_t {
    string hash; // hexadecimal sha1 digest
    int state;
}

std::map< string, queue_item_t > queue;
std::map< string, result_item_t > results;

bool processResource (queue_item_t *item)
{
    result_item_t result;

    if (some_stuff_that_doesnt_apply_to_all_resources)
    {
        result.hash = item->hash;
        result.state = 1;

        /* PROBLEM IS HERE */
        queueResultItem(result);
    }
}

void commitProcessedResources ()
{
    pthread_mutex_lock(&resultQueueMutex);

    // this can take a while since there

    for (std::map< string, result_item_t >::iterator it = results.begin; it != results.end();)
    {
        // do mysql stuff that takes a while

        results.erase(it++);
    }

    pthread_mutex_unlock(&resultQueueMutex);
}

void queueResultItem (result_item_t result)
{
    pthread_mutex_lock(&resultQueueMutex);

    results.insert(make_pair(result.hash, result));

    pthread_mutex_unlock(&resultQueueMutex);
}

processResource() で示されているように、そこに問題があり、commitProcessedResources() が実行されていて、resultQueueMutex がロックされている場合、queueResultItem() が同じミューテックスをロックしようとするため、待機するため、ここで待機します。完了するまで、しばらく時間がかかる場合があります。

実行中のスレッドの数には明らかに制限があるため、すべてのスレッドが queueResultItem() の終了を待機するとすぐに、mutex が解放されて queueResultItem() で使用できるようになるまで、それ以上の作業は行われません。

それで、私の質問は、これを実装するにはどうすればよいですか? 同時に挿入および削除できる特定の種類の標準コンテナはありますか、それとも私が知らない何かが存在しますか?

ここでの std::map の場合のように、各キュー項目が独自の一意のキーを持つことが厳密に必要というわけではありませんが、いくつかのリソースが同じ結果を生成する可能性があり、一意の結果のみを送信することを好むので、私はそれを好みます。INSERT IGNORE を使用して重複を無視する場合でも、データベースに追加します。

私は C++ にかなり慣れていないので、残念ながら Google で何を探すべきかわかりません。:(

4

3 に答える 3

7

での処理中に常にキューのロックを保持する必要はありませんcommitProcessedResources ()。代わりに、キューを空のキューと交換できます。

void commitProcessedResources ()
{
    std::map< string, result_item_t > queue2;
    pthread_mutex_lock(&resultQueueMutex);
    // XXX Do a quick swap.
    queue2.swap (results);
    pthread_mutex_unlock(&resultQueueMutex);

    // this can take a while since there

    for (std::map< string, result_item_t >::iterator it = queue2.begin();
        it != queue2.end();)
    {
        // do mysql stuff that takes a while

        // XXX You do not need this.
        //results.erase(it++);
    }   
}
于 2012-04-26T15:29:48.567 に答える
0

C ++ 03までは、標準ではスレッドやスレッドセーフについて何も定義されていませんでした(そして、pthreadsを使用しているので、それがほとんど使用していると思います)。

そのため、共有マップをロックして、常に1つのスレッドだけがマップにアクセスしようとするようにするのはあなた次第です。これがないと、内部データ構造が破損する可能性が高いため、マップはまったく有効ではなくなります。

あるいは(そして私は一般的にこれを好む)、複数のスレッドにデータをスレッドセーフなキューに入れ、そのキューからデータを取得してマップに入れる単一のスレッドを持たせることができます。シングルスレッドであるため、使用時にマップをロックする必要はありません。

マップをディスクにフラッシュする際の遅延に対処するには、いくつかの合理的な可能性があります。おそらく最も簡単なのは、同じスレッドをキューから読み取り、マップに挿入し、定期的にマップをディスクにフラッシュすることです。この場合、マップがディスクにフラッシュされている間、着信データはキューに留まります。これにより、マップへのアクセスが簡単になります。1つのスレッドだけが直接マップに触れるため、ロックなしでマップを使用できます。

Another would be to have two maps. At any given time, the thread that flushes to disk gets one map, and the thread the retrieves from the queue and inserts into the map gets the other. When the flushing thread needs to do its thing, it just swaps the roles of the two. Personally, I think I prefer the first though -- eliminating all the locking around the map has a great deal of appeal, at least to me.

Yet another variant that would maintain that simplicity would be for the queue->map thread to create map, fill it, and when it's full enough (i.e., after the appropriate length of time) stuff it into another queue, then repeat from the start (i.e., create new map, etc.) The flushing thread retrieves a map from its incoming queue, flushes it to disk, and destroys it. Though this adds a bit of overhead creating and destroying maps, you're not doing it often enough to care a lot. You still keep single-threaded access to any map at any time, and still keep all the database access segregated from everything else.

于 2012-04-26T15:34:21.567 に答える
0

これを正しく機能させるには、同期メソッド (つまり、ミューテックス) を使用する必要があります。ただし、並列プログラミングの目標は、クリティカル セクション (つまり、ロックを保持している間に実行されるコードの量) を最小限に抑えることです。

つまり、MySQL クエリを同期せずに並行して実行できる (つまり、複数の呼び出しが互いに競合しない) 場合は、それらをクリティカル セクションから外してください。これにより、オーバーヘッドが大幅に削減されます。たとえば、次のような単純なリファクタリングでうまくいく可能性があります

void commitProcessedResources ()
{
    // MOVING THIS LOCK

    // this can take a while since there
    pthread_mutex_lock(&resultQueueMutex);
    std::map<string, result_item_t>::iterator end = results.end();
    std::map<string, result_item_t>::iterator begin = results.begin();
    pthread_mutex_unlock(&resultQueueMutex);

    for (std::map< string, result_item_t >::iterator it = begin; it != end;)
    {
        // do mysql stuff that takes a while

        pthread_mutex_lock(&resultQueueMutex); // Is this the only place we need it?
        // This is a MUCH smaller critical section
        results.erase(it++);
        pthread_mutex_unlock(&resultQueueMutex); // Unlock or everything will block until end of loop
    }

    // MOVED UNLOCK
}

これにより、複数のスレッドにまたがるデータへの同時「リアルタイム」アクセスが可能になります。つまり、すべての書き込みが終了すると、マップが更新され、現在の情報で別の場所で読み取ることができます。

于 2012-04-26T15:29:59.573 に答える