ファイルのリストを含む配列があります@arr=(a.txt,b.txt,c.txt);
配列を繰り返し処理し、foreach ループでファイルを処理しています。ファイルの各行は sql を生成し、DB サーバーで実行されます。
ファイルの各行で 1 つのスレッドを作成し、DB にクエリを実行したいと考えています。また、同時に実行するスレッドの最大数を制御したいと考えています。
ファイルのリストを含む配列があります@arr=(a.txt,b.txt,c.txt);
配列を繰り返し処理し、foreach ループでファイルを処理しています。ファイルの各行は sql を生成し、DB サーバーで実行されます。
ファイルの各行で 1 つのスレッドを作成し、DB にクエリを実行したいと考えています。また、同時に実行するスレッドの最大数を制御したいと考えています。
これは単純なワーカー モデルであり、理想的なシナリオです。問題ない。
use threads;
use Thread::Queue qw( );
use constant NUM_WORKERS => 5;
sub work {
my ($dbh, $job) = @_;
...
}
{
my $q = Thread::Queue->new();
my @threads;
for (1..NUM_WORKERS) {
push @threads, async {
my $dbh = ...;
while (my $job = $q->dequeue())
work($dbh, $job);
}
};
}
while (<>) {
chomp;
$q->enqueue($_);
}
$q->enqueue(undef) for 1..@threads;
$_->join() for @threads;
}
ファイル名を引数としてスクリプトに渡すか、スクリプト内で割り当てます@ARGV
。
local @ARGV = qw( a.txt b.txt c.txt );
Thread::Poolベースのシステムを使用できます。または、Boss/Workerモデル ベースのシステム。
興味深いことに、実行するスレッドの数を手動で制御しています。スレッド ID [コード スニップ] my %thr; のハッシュを使用します。#スレッドのハッシュ
$count=1;
$maxthreads=5;
while (shift (@data) {
$syncthr = threads->create(sub {callfunction here}, {pass variables});
$tid = $syncthr->tid; #get the thread ID
$thr{$tid} = $syncthr;
if ($count >= $maxthreads) {
threads->yield();
while (1) { # loop until threads are completed
$num_run_threads = keys (%thr);
foreach $one_thread ( keys %thr ) {
if ($thr{$one_thread}->is_running() ) { # if thread running check for error state
if ($err = $thr{$one_thread}->error() } {
[ do stuff here]
}
# otherwise move on to next thread check
} else { # thread is either completed or has error
if ($err = $thr{$one_thread}->error()) {
[ check for error again cann't hurt to double check ]
}
if ($err = $thr{$one_thread}->join()) {
print "Thread completed id: [$one_thread]\n";
}
delete $thr{$one_thread}; # delete the hash since the thread is no more
$num_run_threads = $num_run_threads - 1; # reduce the number of running threads
}
} # close foreach loop
@threads = threads->list(threads::running); # get threads
if ($num_run_threads < $maxthreads ) {
$count = $num_run_threads; # reset the counter to number of threads running
if ( $#data != -1 ) { # check to make sure we still have data
last; # exit the infinite while loop
} else {
if (@threads) {
next; # we still have threads continue with processing
} else {
{ no more threads to process exit program or do something else }
}
} # end else
} # end threads running
} # end the while statement
#Check the threads to see if they are joinable
undef @threads;
@threads = threads->joinable()
if (@threads) {
foreach $mthread(@threads) {
if ($mthreads != 0) {
$thr->join();
}
} #end foreach
} #end @threads
} #end the if statement
$count++; Increment the counter to get to number of max threads to spawn
}
これは決して完全なプログラムではありません。さらに、非常に当たり障りのないものに変更しました。しかし、私はこれをしばらく使用して成功しています。特に OO Perl では。これは私にとってはうまくいき、かなり多くの用途があります。特にタイムアウトのエラーチェックをいくつか見逃しているかもしれませんが、スレッド自体でそれを行います。ちなみに、スレッドは実際には私が呼び出しているサブルーチンです。