3

私はperlの初心者なので、私の無知を許してください。(私はwindows7を使用しています)

私は echiken のスレッドのサンプル スクリプトを借りて、それをスクリプトの基礎として使用して多数のシステム コールを作成したいと考えていましたが、理解できない問題に遭遇しました。私が見ている問題を説明するために、以下のサンプル コードで単純な ping コマンドを実行しています。

  • $nb_process許容される同時実行スレッドの数です。
  • $nb_computeサブルーチンを実行したい回数として (つまり、ping コマンドを発行する合計回数)。

$nb_compute$nb_processを同じ値に設定すると、完全に機能します。

ただし、$nb_process(一度に実行するスレッドの数を制限するために)減らすと、で定義されたスレッドの数$nb_processが開始されるとロックされるようです。

システム コール (ping コマンド) を削除すると、正常に動作します。

他のシステム コールでも同じ動作が見られます (ping だけではありません)。

誰か助けてください。以下にスクリプトを用意しました。

#!/opt/local/bin/perl -w  
 use threads;  
 use strict;  
 use warnings;  

 my @a = ();  
 my @b = ();  


 sub sleeping_sub ( $ $ $ ); 

 print "Starting main program\n";  

 my $nb_process = 3;  
 my $nb_compute = 6;  
 my $i=0;  
 my @running = ();  
 my @Threads;  
 while (scalar @Threads < $nb_compute) {  

     @running = threads->list(threads::running);  
     print "LOOP $i\n";  
     print "  - BEGIN LOOP >> NB running threads = ".(scalar @running)."\n";  

     if (scalar @running < $nb_process) {  
         my $thread = threads->new( sub { sleeping_sub($i, \@a, \@b) });  
         push (@Threads, $thread);  
         my $tid = $thread->tid;  
         print "  - starting thread $tid\n";  
     }  
     @running = threads->list(threads::running);  
     print "  - AFTER STARTING >> NB running Threads = ".(scalar @running)."\n";  
     foreach my $thr (@Threads) {  
         if ($thr->is_running()) {  
             my $tid = $thr->tid;  
             print "  - Thread $tid running\n";  
         }  
         elsif ($thr->is_joinable()) {  
             my $tid = $thr->tid;  
             $thr->join;  
             print "  - Results for thread $tid:\n";  
             print "  - Thread $tid has been joined\n";  
         }  
     }  

     @running = threads->list(threads::running);  
     print "  - END LOOP >> NB Threads = ".(scalar @running)."\n";  
     $i++;  
 }  

 print "\nJOINING pending threads\n";  
 while (scalar @running != 0) {  
    foreach my $thr (@Threads) {  
         $thr->join if ($thr->is_joinable());  
     }  
     @running = threads->list(threads::running);  
}  
 print "NB started threads = ".(scalar @Threads)."\n";  
 print "End of main program\n";  


 sub sleeping_sub ( $ $ $ ) { 
    my @res2 = `ping 136.13.221.34`; 
    print "\n@res2";
    sleep(3);  
 } 
4

1 に答える 1

3

プログラムの主な問題は、スレッドを結合できるかどうかをテストするビジー ループがあることです。これは無駄です。さらに、コードをよりよく理解するために、グローバル変数の量を減らすことができます。

その他の眉上げ:

  • プロトタイプの意味を正確に理解していない限り、プロトタイプを使用しないでください。
  • はそのsleeping_sub引数を使用しません。
  • threads::runningこれが実際に正しいかどうかを考えずに、リストをよく使用します。

一度にN 個のワーカーのみを実行したいが、合計でM個のワーカーを起動したいようです。これを実装するかなりエレガントな方法を次に示します。主なアイデアは、終了したばかりのスレッドがスレッド ID をキューに入れることができるスレッド間にキューがあるということです。その後、このスレッドは結合されます。スレッドの数を制限するには、セマフォを使用します。

use threads; use strict; use warnings;
use feature 'say';  # "say" works like "print", but appends newline.
use Thread::Queue;
use Thread::Semaphore;

my @pieces_of_work = 1..6;
my $num_threads = 3;
my $finished_threads = Thread::Queue->new;
my $semaphore = Thread::Semaphore->new($num_threads);

for my $task (@pieces_of_work) {
  $semaphore->down;  # wait for permission to launch a thread

  say "Starting a new thread...";

  # create a new thread in scalar context
  threads->new({ scalar => 1 }, sub {
    my $result = worker($task);                # run actual task
    $finished_threads->enqueue(threads->tid);  # report as joinable "in a second"
    $semaphore->up;                            # allow another thread to be launched
    return $result;
  });

  # maybe join some threads
  while (defined( my $thr_id = $finished_threads->dequeue_nb )) {
    join_thread($thr_id);
  }
}

# wait for all threads to be finished, by "down"ing the semaphore:
$semaphore->down for 1..$num_threads;
# end the finished thread ID queue:
$finished_threads->enqueue(undef);

# join any threads that are left:
while (defined( my $thr_id = $finished_threads->dequeue )) {
  join_thread($thr_id);
}

join_threadworker定義_

sub worker {
  my ($task) = @_;
  sleep rand 2; # sleep random amount of time
  return $task + rand; # return some number
}

sub join_thread {
  my ($tid) = @_;
  my $thr = threads->object($tid);
  my $result = $thr->join;
  say "Thread #$tid returned $result";
}

出力を得ることができます:

Starting a new thread...
Starting a new thread...
Starting a new thread...
Starting a new thread...
Thread #3 returned 3.05652608754778
Starting a new thread...
Thread #1 returned 1.64777186731541
Thread #2 returned 2.18426146087901
Starting a new thread...
Thread #4 returned 4.59414651998983
Thread #6 returned 6.99852684265667
Thread #5 returned 5.2316971836585

(順序と戻り値は決定論的ではありません)。

キューを使用すると、どのスレッドが終了したかを簡単に知ることができます。セマフォを使用すると、リソースを保護したり、並列処理の量を制限したりすることが容易になります。

このパターンの主な利点は、ビジー ループとは対照的に、使用される CPU がはるかに少ないことです。これにより、一般的な実行時間も短縮されます。

これは非常に大きな改善ですが、改善の余地があります。スポーン スレッドは高価です。これは基本的に、fork()Unix システムでのコピー オン ライトの最適化がすべて行われていないためです。すでに作成したすべての変数、すべての状態などを含むインタープリター全体がコピーされます。

したがって、スレッドは控えめに使用し、できるだけ早く生成する必要があります。スレッド間で値を渡すことができるキューについては既に紹介しました。これを拡張して、いくつかのワーカー スレッドが常に入力キューから作業を取得し、出力キューを介して戻るようにすることができます。ここでの問題は、終了する最後のスレッドが出力キューを終了することです。

use threads; use strict; use warnings;
use feature 'say';
use Thread::Queue;
use Thread::Semaphore;

# define I/O queues
my $input_q  = Thread::Queue->new;
my $output_q = Thread::Queue->new;

# spawn the workers
my $num_threads = 3;
my $all_finished_s = Thread::Semaphore->new(1 - $num_threads); # a negative start value!
my @workers;
for (1 .. $num_threads) {
  push @workers, threads->new( { scalar => 1 }, sub {
    while (defined( my $task = $input_q->dequeue )) {
      my $result = worker($task);
      $output_q->enqueue([$task, $result]);
    }
    # we get here when the input queue is exhausted.
    $all_finished_s->up;
    # end the output queue if we are the last thread (the semaphore is > 0).
    if ($all_finished_s->down_nb) {
      $output_q->enqueue(undef);
    }
  });
}

# fill the input queue with tasks
my @pieces_of_work = 1 .. 6;
$input_q->enqueue($_) for @pieces_of_work;

# finish the input queue
$input_q->enqueue(undef) for 1 .. $num_threads;

# do something with the data
while (defined( my $result = $output_q->dequeue )) {
  my ($task, $answer) = @$result;
  say "Task $task produced $answer";
}

# join the workers:
$_->join for @workers;

前と同じようにworker定義すると、次のようになります。

Task 1 produced 1.15207098293783
Task 4 produced 4.31247785766295
Task 5 produced 5.96967474718984
Task 6 produced 6.2695013168678
Task 2 produced 2.02545636412421
Task 3 produced 3.22281619053999

(すべての出力が印刷された後に 3 つのスレッドが結合されるため、出力が退屈になります)。

この 2 番目の解決策は、スレッドの場合は少し単純になりdetachます。メイン スレッドは、最後のスレッドによって終了された入力キューをリッスンしているため、すべてのスレッドが終了する前に終了しません。

于 2013-08-16T13:04:40.443 に答える