3

Parallel::ForkManagerを使用していくつかの子プロセスを制御しようとしています。同時に実行するプロセスの数を10に制限したいと思います。合計で20を実行する必要があります。

オブジェクト宣言の最初の行でプロセス制限を10に設定できることはわかっていますが、$ pmオブジェクトを使用して、別のことを行う子プロセスを実行しています(現在の関数はメモリを大量に消費するため、制限する必要があります) )。

私が現在持っているコードは機能せず、finish呼び出しで実行されることはないため、残りの10人の子はフォークされません。なぜそうなるのかわかりません。子供が終了時に終了コードを呼び出してカウントを減らすと思っていたのですが、「if」ステートメントはこれを止めているようです。誰かがこれが事実である理由を説明できますか?

助けてくれてありがとう!

# Parallel declarations
my $pm = Parallel::ForkManager->new(30);

$pm->run_on_finish(sub {
    my ($pid, $exit_code, $ident, $exit_signal, $core_dump, $data_str_ref) = @_; 
    --$active_jobs;
    })

my $total_jobs = 0;
my $active_jobs = 0;
while( $total_jobs < 20) {
    sleep 300 and next if $active_jobs > 10; 

    my $pid = $pm->start and ++$active_p1_jobs and ++$total_p1_jobs and next;

    my $return = module::function(%args);

    $pm->finish(0, { index => $total_jobs, return => $return }); 
    }

print STDERR "Submitted all jobs, now waiting for children to exit.\n";
$pm->wait_all_children();
4

1 に答える 1

3

10に制限されているジョブを「タイプ2」と呼びます。

これは私がP::FMでそれを行う方法です:

use strict;
use warnings;

use List::Util            qw( shuffle );
use Parallel::ForkManager qw( );
use POSIX                 qw( WNOHANG );
use Time::HiRes           qw( sleep );

use constant MAX_WORKERS       => 30;
use constant MAX_TYPE2_WORKERS => 10;

sub is_type2_job { $_[0]{type} == 2 }

my @jobs = shuffle(
   ( map { { type => 1, data => $_ } } 0..19 ),
   ( map { { type => 2, data => $_ } } 0..19 ),
);

my $pm = Parallel::ForkManager->new(MAX_WORKERS);

my $type2_count = 0;
$pm->run_on_finish(sub {
   my ($pid, $exit_code, $ident, $exit_signal, $core_dump, $job) = @_;
   --$type2_count if is_type2_job($job);
   print "Finished: $pid, $job->{type}, $job->{data}, $job->{result}\n";
});

my @postponed_jobs;
while (@postponed_jobs || @jobs) {
   my $job;
   if (@postponed_jobs && $type2_count < MAX_TYPE2_WORKERS) {
      $job = shift(@postponed_jobs);
   }
   elsif (@jobs) {
      $job = shift(@jobs);
      if ($type2_count >= MAX_TYPE2_WORKERS && is_type2_job($job)) {
         push @postponed_jobs, $job;
         redo;
      }
   }
   # elsif (@postponed_jobs) {
   #     # Already max type 2 jobs being processed,
   #     # but there are idle workers.
   #     $job = shift(@postponed_jobs);
   # }
   else {
      local $SIG{CHLD} = sub { };
      select(undef, undef, undef, 0.300);
      $pm->wait_one_child(WNOHANG);
      redo;
   }

   ++$type2_count if is_type2_job($job);

   my $pid = $pm->start and next;
   $job->{result} = $job->{data} + 100;  # Or whatever.
   $pm->finish(0, $job);
}

$pm->wait_all_children();

しかし、これは壊れています。次のジョブを選択するコードはstart、前ではなく、途中(つまり、子が終了するのを待った後、フォークする前)に実行する必要がありstartます。これにより、ジョブが順不同で実行される可能性があります。P::FMにプリフォークコールバックがあることを望んでいたのはこれが初めてではありません。たぶん、あなたはメンテナにそれを頼むことができます。

于 2012-07-16T16:00:15.047 に答える