1

少し変更されたボス/ワーカー モデルに基づいて、マルチスレッド アプリケーションを実装しようとしています。基本的に、メイン スレッドはいくつかのボス スレッドを作成し、それらはそれぞれ 2 つのワーカー スレッドを生成します (場合によってはそれ以上)。これは、ボス スレッドがそれぞれ 1 つのホストまたはネットワーク デバイスを処理し、ワーカー スレッドが作業を完了するのに時間がかかるためです。

私はThread::Poolこの概念を実現するために使用していますが、これまでのところ非常にうまく機能しています。また、私の問題が関連しているとは思いませんThread::Pool(以下を参照)。非常に単純化された擬似コード:

use strict;
use warnings;

my $bosspool = create_bosspool();   # spawns all boss threads
my $taskpool = undef;               # created in each boss thread at
                                    # creation of each boss thread 

# give device jobs to boss threads
while (1) {
  foreach my $device ( @devices ) {
    $bosspool->job($device);
  }

  sleep(1);
}

# This sub is called for jobs passed to the $bosspool
sub process_boss
{
  my $device = shift;

  foreach my $task ( $device->{tasks} ) {
    # process results as they become available
    process_result() while ( $taskpool->results );
    # give task jobs to task threads
    scalar $taskpool->job($device, $task);
    sleep(1); ### HACK ###
  }

  # process remaining results / wait for all tasks to finish
  process_result() while ( $taskpool->results || $taskpool->todo );

  # happy result processing
}

sub process_result
{
  my $result = $taskpool->result_any();

  # mangle $result
}

# This sub is called for jobs passed to the $taskpool of each boss thread
sub process_task
{
  # not so important stuff

  return $result;
}

ところで、monitor()-routine を使用しない理由は、すべてのジョブが終了するのを待たなければならないから$taskpoolです。さて、この### HACK ###行を削除しない限り、このコードは素晴らしく機能します。スリープし$taskpool->todo()ないと、ジョブを追加したり結果を受け取ったりするのが「速すぎる」場合、適切な数のまだ開いているジョブが提供されません。同様に、合計で 4 つのジョブを追加しますが、$taskpool->todo()後で返されるのは 2 つだけです (保留中の結果はありません)。これにより、あらゆる種類の興味深い効果が得られます。

OK、それThread::Pool->todo()はがらくたです。回避策を試してみましょう:

sub process_boss
{
  my $device = shift;

  my $todo = 0;

  foreach my $task ( $device->{tasks} ) {
    # process results as they become available
    while ( $taskpool->results ) {
      process_result();
      $todo--;
    }
    # give task jobs to task threads
    scalar $taskpool->job($device, $task);
    $todo++;
  }

  # process remaining results / wait for all tasks to finish
  while ( $todo ) {
    process_result();
    sleep(1); ### HACK ###
    $todo--;
  }
}

ラインを維持している限り、これも問題なく機能し### HACK ###ます。Thread::Pool->todo()この行がないと、このコードは の問題を再現します$todo。 は 1 だけでなく 2 またはそれ以上減少します。

私はこのコードを 1 つのボス スレッドだけでテストしたので、(このサブルーチンに関しては) 基本的にマルチスレッドは関係ありませんでした。$bosspool$taskpoolそして特にそうで$todoはありません:shared、副作用はありえませんよね?このサブルーチンは、共有変数やセマフォなどを使用せずに 1 つのボス スレッドによってのみ実行されるので、何が起こっているのでしょうか?

4

1 に答える 1

0

「ワーカー」スレッド モデルを実装する最良の方法は、Thread::Queue. このようなことを行う際の問題は、キューがいつ完了するか、またはアイテムがデキューされて処理が保留されているかどうかを把握することです。

while ループが undef を返し、スレッドが終了するように、Thread::Queuewhile ループを使用してキューとキューから要素を取得できます。end

したがって、常に複数の「ボス」スレッドが必要なわけではなく、複数の異なるフレーバーのworker入力キューを使用できます。その場合、なぜ「ボス」スレッドモデルが必要なのか疑問に思います。不要なようです。

参照先: 子デーモンを使用した Perl のデーモン化

#!/usr/bin/perl

use strict;
use warnings;
use threads;
use Thread::Queue;

my $nthreads = 4;

my @targets = qw ( device1 device2 device3 device4 );

my $task_one_q = Thread::Queue->new();
my $task_two_q = Thread::Queue->new();

my $results_q = Thread::Queue->new();

sub task_one_worker {
    while ( my $item = task_one_q->dequeue ) {

        #do something with $item

        $results_q->enqueue("$item task_one complete");
    }
}

sub task_two_worker {
    while ( my $item = task_two_q->dequeue ) {

        #do something with $item

        $results_q->enqueue("$item task_two complete");
    }
}

#start threads;

for ( 1 .. $nthreads ) {
    threads->create( \&task_one_worker );
    threads->create( \&task_two_worker );
}

foreach my $target (@targets) {
    $task_one_q->enqueue($target);
    $task_two_q->enqueue($target);
}

$task_one_q->end;
$task_two_q->end;

#Wait for threads to exit.

foreach my $thr ( threads->list() ) {
    threads->join();
}

$results_q->end();

while ( my $item = $results_q->dequeue() ) {
    print $item, "\n";
}

必要に応じて、スレッドで同様のことを行うことがbossできます。キューごとに作成し、boss参照によってワーカーに渡すことができます。それが必要かどうかはわかりませんが。

于 2015-02-10T22:09:55.193 に答える