1

thread Aからすべてのスレッドthreads B-1に少量のデータ (~1k バイト) を送信する必要がありますthreads B-n

私の現在の実装はかなり複雑です:

GHashTableキューを にマップするために使用しますthread id。と ですべてthreads B-x待ち状態にGCondするg_cond_wait(_until)。すべてのスレッドが受信するデータへのポインタを から各キューにプッシュしthread A、更新をブロードキャストしますg_cond_broadcast(それらはすべて同じGCondインスタンスを使用します)。スレッドが終了することを決定した場合 (つまり、リモート DC)、最初に からキューを削除し、キューをGHashTableクリアして内容を破棄します。省略した詳細がいくつかあります (競合状態、待機ブロックの中間の ref/unref など)。

これは健全なアプローチですか?どうすればこれを改善できますか。それはまったく効率的ではありません。

参考までに、いくつかのドラフト コードを添付します。

typedef struct {
    //TODO verify this is not too stupid
    // if we use that mutex too often, all parallel foo is pointless
    GMutex mutex;
    GHashTable *hashmap; //full of queues
    gint refs;
    GDestroyNotify fx_ref;
    GDestroyNotify fx_unref;
} Foo;


Foo *
foo_new (GDestroyNotify fx_ref, GDestroyNotify fx_unref)
{
    Foo *foo;

    foo = g_new0 (Foo, 1);
    g_assert (foo);
    g_mutex_init (&(foo->mutex));
    foo->hashmap = g_hash_table_new_full ();
    foo->refs = 1;
    foo->fx_ref = fx_ref; //just asume this increases the refcount atomically
    foo->fx_unref = fx_unref; //"" decreases ""
    return foo;
}

void
foo_register_thread (Foo *obj, gint threadid)
{
    AQueue *aq;

    foo_lock (obj);
    aq = a_queue_new ((GDestroyNotify)i_do_unref);

    g_hash_table_insert (obj->hashmap, id, aq);
    foo_unlock (obj);
}

void
foo_unregister_thread (Foo *obj, gint threadid)
{
    AQueue *aq;

    foo_lock (obj);
    g_hash_table_remove (obj->hashmap, id);
    // broadcast _after_ removing the queue from the hashtable,
    // so the thread wakes up and quits its foo_thread_wait_until_ready call
    g_cond_broadcast (obj->cond);
    foo_unlock (obj);
    // allow somebody to sneak in
    foo_lock (obj);
    a_queue_unref (aq)
    foo_unlock (obj);
}

void
foo_enqueue (Foo *obj, gpointer data)
{
    GHashTableIter iter;
    gint key;
    GAsyncQueue *queue;

    //wave after wave, not wave intermixing 
    g_mutex_lock (&obj->mutex);

    g_hash_table_iter_init (iter, obj->ht);
    while (g_hash_table_iter_next (&iter, &id, &queue)) {
        if (foo->fx_ref)
            foo->fx_ref (data);
        g_queue_push_tail (queue, data);
    }
    g_cond_broadcast (cond);

    g_mutex_unlock (&obj->mutex);
}


gpointer
foo_thread_pop (Foo *obj, gint id)
{
    AQueue *aq;
    gpointer data = NULL;

    g_return_val_if_fail (obj, NULL);
    g_return_val_if_fail (id>0, NULL);

    foo_lock (obj);
    aq = g_hash_table_lookup (obj->hashmap, id);
    if (aq) {
        data = g_queue_pop_head ((GQueue*)aq);
    }
    foo_unlock (obj);
    return data;
}


/**
 * wait until the queue gets removed or until data is ready to be read
 */
gpointer
foo_thread_wait_until_ready (Foo *obj, gint id)
{
    gpointer data = NULL;
    AQueue *aq;

    foo_lock (obj);
    aq = (AQueue*)g_hash_table_lookup (obj->hashmap, id);
    if (!aq)
        return NULL;

    // just in case stuff gets cleaned up in the meantime
    a_queue_ref (aq);


    while (g_queue_peek_head ((GQueue*)aq)==NULL) {
        g_cond_wait_until (&(obj->cond), &(obj->mutex))
        // make sure queue still exists, if not this means this thread is dying
        if (g_hash_table_lookup (obj->hashmap, id) != (gpointer)aq)
            break;
    }

    data = g_queue_pop_head ((GQueue*)aq);

    a_queue_unref (aq);

    foo_unlock (obj);

    return data;
}


void
foo_destroy (Foo *obj)
{
    g_return_if_fail (obj);
    g_mutex_clear (&obj->mutex);
    g_cond_clear (&obj->cond);
}

void
foo_unref (Foo *obj)
{
    g_return_if_fail (obj);
    if (g_atomic_int_dec_and_test (&obj->refs))
        foo_destroy (obj);
}

void
foo_ref (Foo *obj)
{
    g_return_if_fail (obj);
    g_atomic_int_inc (&obj->refs);
}

void
foo_lock (Foo *obj)
{
    g_return_if_fail (obj);
    g_atomic_int_inc (&obj->refs);
    g_mutex_lock (&obj->mutex);
}

void
foo_unlock (Foo *obj)
{
    g_return_if_fail (obj);
    g_mutex_unlock (&obj->mutex);
    foo_unref (obj);
}
4

1 に答える 1

0

フォークを使って似たようなことをしました。アルゴリズムの概要を以下に示します。

仮説: 各スレッドはメモリを親にブロードキャストする必要はありません。各リクエストは任意の順序で処理できます。

  • マルチスレッド (フォーク) の前に、構造体へのポインターの配列を作成します (構造体は、キューに入れられた要求を処理するために必要なすべての情報を提供します)。初期値として 0 を使用して、mutex によって制御される共有整数を割り当てます。
  • 好きなだけフォークします。(親がほとんどアイドル状態であるため、論理 CPU の数 +1 を使用しました)。

次に、すべてのフォークは次のようになります。

  • 共有整数がロックされていない場合は、ロックしてインクリメントします。ロックを解除します。インクリメント前の値に対応する配列内の要素を扱います。
  • ロックされている場合は、後で再試行してください。
  • 値がリクエストのグローバル数より大きい場合は、終了します。

親は、すべてのスレッドが完了するのを待つだけです。

于 2013-10-09T17:57:28.580 に答える