おなじみの前付から始めます。
#! /usr/bin/env perl
use strict;
use warnings;
use 5.10.0; # for // (defined-or)
use IO::Handle;
use IO::Select;
use LWP::Simple;
use POSIX qw/ :sys_wait_h /;
use Socket;
グローバル定数は、プログラムの実行を制御します。
my $DEBUG = 0;
my $EXIT_COMMAND = "<EXIT>";
my $NJOBS = 10;
チェックする URL は、ソケットのワーカー側に 1 行に 1 つずつ到着します。URL ごとにLWP::Simple::head
、リソースが取得可能かどうかを判断するためにワーカーが呼び出します。次に、ワーカーはurlの形式の行をソケットに書き戻し ます。*status* ここで、*status* は"YES"
または"NO"
およびスペース文字を表します。
URL が$EXIT_COMMAND
の場合、ワーカーはすぐに終了します。
sub check_sites {
my($s) = @_;
warn "$0: [$$]: waiting for URL" if $DEBUG;
while (<$s>) {
chomp;
warn "$0: [$$]: got '$_'" if $DEBUG;
exit 0 if $_ eq $EXIT_COMMAND;
print $s "$_: ", (head($_) ? "YES" : "NO"), "\n";
}
die "NOTREACHED";
}
ワーカーを作成するには、まずsocketpair
. 親プロセスは一方の端を使用し、各ワーカー (子) はもう一方の端を使用します。両端でバッファリングを無効にし、親エンドを IO::Select インスタンスに追加します。また、すべてのワーカーが終了するのを待つことができるように、各子のプロセス ID も記録します。
sub create_worker {
my($sel,$kidpid) = @_;
socketpair my $parent, my $kid, AF_UNIX, SOCK_STREAM, PF_UNSPEC
or die "$0: socketpair: $!";
$_->autoflush(1) for $parent, $kid;
my $pid = fork // die "$0: fork: $!";
if ($pid) {
++$kidpid->{$pid};
close $kid or die "$0: close: $!";
$sel->add($parent);
}
else {
close $parent or die "$0: close: $!";
check_sites $kid;
die "NOTREACHED";
}
}
URL をディスパッチするために、親は利用可能な数のリーダーを取得し、ジョブ キューから同じ数の URL を渡します。ジョブ キューが空になった後に残っているすべてのワーカーは、終了コマンドを受け取ります。
print
基になるワーカーが既に終了している場合は失敗することに注意してください。SIGPIPE
親は即時終了を防ぐために無視する必要があります。
sub dispatch_jobs {
my($sel,$jobs) = @_;
foreach my $s ($sel->can_write) {
my $url = @$jobs ? shift @$jobs : $EXIT_COMMAND;
warn "$0 [$$]: sending '$url' to fd ", fileno $s if $DEBUG;
print $s $url, "\n" or $sel->remove($s);
}
}
制御が に到達するまでread_results
に、ワーカーが作成され、作業を受け取ります。現在、親はcan_read
1 つ以上のワーカーから結果が到着するのを待っています。定義済みの結果は現在のワーカーからの応答であり、未定義の結果は、子が終了してソケットの反対側を閉じたことを意味します。
sub read_results {
my($sel,$results) = @_;
warn "$0 [$$]: waiting for readers" if $DEBUG;
foreach my $s ($sel->can_read) {
warn "$0: [$$]: reading from fd ", fileno $s if $DEBUG;
if (defined(my $result = <$s>)) {
chomp $result;
push @$results, $result;
warn "$0 [$$]: got '$result' from fd ", fileno $s if $DEBUG;
}
else {
warn "$0 [$$]: eof from fd ", fileno $s if $DEBUG;
$sel->remove($s);
}
}
}
親は、すべての結果を収集するために、ライブ ワーカーを追跡する必要があります。
sub reap_workers {
my($kidpid) = @_;
while ((my $pid = waitpid -1, WNOHANG) > 0) {
warn "$0: [$$]: reaped $pid" if $DEBUG;
delete $kidpid->{$pid};
}
}
プールを実行すると、上記のサブルーチンが実行され、すべての URL がディスパッチされ、すべての結果が返されます。
sub run_pool {
my($n,@jobs) = @_;
my $sel = IO::Select->new;
my %kidpid;
my @results;
create_worker $sel, \%kidpid for 1 .. $n;
local $SIG{PIPE} = "IGNORE"; # writes to dead workers will fail
while (@jobs || keys %kidpid || $sel->handles) {
dispatch_jobs $sel, \@jobs;
read_results $sel, \@results;
reap_workers \%kidpid;
}
warn "$0 [$$]: returning @results" if $DEBUG;
@results;
}
サンプルメインプログラムの使用
my @jobs = qw(
bogus
http://stackoverflow.com/
http://www.google.com/
http://www.yahoo.com/
);
my @results = run_pool $NJOBS, @jobs;
print $_, "\n" for @results;
出力は
偽物:いいえ
http://www.google.com/: はい
http://stackoverflow.com/: はい
http://www.yahoo.com/: はい