それぞれが M スレッドの N 個の fork() プロセスを持っています (もちろん、他にどのような方法がありますか?)。スレッドは、ジョブが何らかのキューに送信されるのを待機し、最終的に同じキュー内にいくつかの結果を投稿することになっています。すべてのスレッドは同じ優先度を持ち、同じデータ フローを尊重します (deque() タスク、enque() 新しいタスクのリストとしての結果)。キューの実装をどのように進める必要がありますか?
@queueとしてIPC::Shareableを使用し、2 レベル (スレッド/プロセス) ロックを使用してenqueue()/dequeue()を使用してThread::Queueを簡単に書き直そうとしましたが、間違っていることが判明しました (本当に:share IPC::Shareableの結ばれた配列を共有することはできません)。各スレッドに独自のIPC::Shareableインスタンスを持たせたくないと思います (あまり考えずに) ?
IPC::Msgを介してタスクをフリーズ/解凍し、セマフォを使用してロックと CV を実装し、スレッドごとのIPC::Msgインスタンスを保持する必要がありますか? スレッド内で IPC プリミティブを直接インスタンス化するのはかなり奇妙に思えます (したがって、スレッド レベルで :share IPC::Shareables を試みています)。
あなたの修道士の何人かは、それを正しく、同時に鳴らしたことがありますか?ミドルウェア キュー マネージャーなどのないフラットな実装ですか? 考えを共有していただけますか、モジュールについて教えてください。
ありがとう!
PS私は、mpi、rabbitmqなどの重い既存の実装を選択したくありません.
#!/usr/bin/perl
use strict;
use warnings;
use Carp qw/croak/;
use threads;
use threads::shared;
use insert::your::queue::impl::here;
que_make_init();
my $numthreads = 4;
my $numprocs = 4;
sub runthreads
{
foreach (1..$numthreads) {
@retcodes = map { $_->join } threads->create( sub {
my $q = que_get_process_level_shared_instance();
while ( (my $task = @q->get())) {
@q->put(process($task));
}
});
}
#do_something(@retcodes) ...
exit(0);
}
my %proctable = map {
croak unless defined (my $pid = fork());
runthreads if $pid == 0;
$pid => -1;
} (0..$numprocs-1);
#waitpid(), etc
__END__