thread A
からすべてのスレッドthreads B-1
に少量のデータ (~1k バイト) を送信する必要がありますthreads B-n
。
私の現在の実装はかなり複雑です:
GHashTable
キューを にマップするために使用しますthread id
。と ですべてthreads B-x
待ち状態にGCond
するg_cond_wait(_until)
。すべてのスレッドが受信するデータへのポインタを から各キューにプッシュしthread A
、更新をブロードキャストしますg_cond_broadcast
(それらはすべて同じGCond
インスタンスを使用します)。スレッドが終了することを決定した場合 (つまり、リモート DC)、最初に からキューを削除し、キューをGHashTable
クリアして内容を破棄します。省略した詳細がいくつかあります (競合状態、待機ブロックの中間の ref/unref など)。
これは健全なアプローチですか?どうすればこれを改善できますか。それはまったく効率的ではありません。
参考までに、いくつかのドラフト コードを添付します。
typedef struct {
//TODO verify this is not too stupid
// if we use that mutex too often, all parallel foo is pointless
GMutex mutex;
GHashTable *hashmap; //full of queues
gint refs;
GDestroyNotify fx_ref;
GDestroyNotify fx_unref;
} Foo;
Foo *
foo_new (GDestroyNotify fx_ref, GDestroyNotify fx_unref)
{
Foo *foo;
foo = g_new0 (Foo, 1);
g_assert (foo);
g_mutex_init (&(foo->mutex));
foo->hashmap = g_hash_table_new_full ();
foo->refs = 1;
foo->fx_ref = fx_ref; //just asume this increases the refcount atomically
foo->fx_unref = fx_unref; //"" decreases ""
return foo;
}
void
foo_register_thread (Foo *obj, gint threadid)
{
AQueue *aq;
foo_lock (obj);
aq = a_queue_new ((GDestroyNotify)i_do_unref);
g_hash_table_insert (obj->hashmap, id, aq);
foo_unlock (obj);
}
void
foo_unregister_thread (Foo *obj, gint threadid)
{
AQueue *aq;
foo_lock (obj);
g_hash_table_remove (obj->hashmap, id);
// broadcast _after_ removing the queue from the hashtable,
// so the thread wakes up and quits its foo_thread_wait_until_ready call
g_cond_broadcast (obj->cond);
foo_unlock (obj);
// allow somebody to sneak in
foo_lock (obj);
a_queue_unref (aq)
foo_unlock (obj);
}
void
foo_enqueue (Foo *obj, gpointer data)
{
GHashTableIter iter;
gint key;
GAsyncQueue *queue;
//wave after wave, not wave intermixing
g_mutex_lock (&obj->mutex);
g_hash_table_iter_init (iter, obj->ht);
while (g_hash_table_iter_next (&iter, &id, &queue)) {
if (foo->fx_ref)
foo->fx_ref (data);
g_queue_push_tail (queue, data);
}
g_cond_broadcast (cond);
g_mutex_unlock (&obj->mutex);
}
gpointer
foo_thread_pop (Foo *obj, gint id)
{
AQueue *aq;
gpointer data = NULL;
g_return_val_if_fail (obj, NULL);
g_return_val_if_fail (id>0, NULL);
foo_lock (obj);
aq = g_hash_table_lookup (obj->hashmap, id);
if (aq) {
data = g_queue_pop_head ((GQueue*)aq);
}
foo_unlock (obj);
return data;
}
/**
* wait until the queue gets removed or until data is ready to be read
*/
gpointer
foo_thread_wait_until_ready (Foo *obj, gint id)
{
gpointer data = NULL;
AQueue *aq;
foo_lock (obj);
aq = (AQueue*)g_hash_table_lookup (obj->hashmap, id);
if (!aq)
return NULL;
// just in case stuff gets cleaned up in the meantime
a_queue_ref (aq);
while (g_queue_peek_head ((GQueue*)aq)==NULL) {
g_cond_wait_until (&(obj->cond), &(obj->mutex))
// make sure queue still exists, if not this means this thread is dying
if (g_hash_table_lookup (obj->hashmap, id) != (gpointer)aq)
break;
}
data = g_queue_pop_head ((GQueue*)aq);
a_queue_unref (aq);
foo_unlock (obj);
return data;
}
void
foo_destroy (Foo *obj)
{
g_return_if_fail (obj);
g_mutex_clear (&obj->mutex);
g_cond_clear (&obj->cond);
}
void
foo_unref (Foo *obj)
{
g_return_if_fail (obj);
if (g_atomic_int_dec_and_test (&obj->refs))
foo_destroy (obj);
}
void
foo_ref (Foo *obj)
{
g_return_if_fail (obj);
g_atomic_int_inc (&obj->refs);
}
void
foo_lock (Foo *obj)
{
g_return_if_fail (obj);
g_atomic_int_inc (&obj->refs);
g_mutex_lock (&obj->mutex);
}
void
foo_unlock (Foo *obj)
{
g_return_if_fail (obj);
g_mutex_unlock (&obj->mutex);
foo_unref (obj);
}