0

Perl スクリプトの一部に問題があり、何日も悩まされています。目的を要約すると、大きなファイルをチャンクで読み込み、入力ストリームに対して何らかの操作を行うことです (私の質問には関係ありません)。最初に実装したとき、ファイルをループしてから、次のようにいくつかのことを行いました。

while (read FILE, $buffer, $chunksize){ 
  callSomeOperation($buffer);
  # Do some other stuff
}

残念ながら、ファイルは非常に大きく、操作は多くの関数呼び出しで複雑であるため、これによりメモリが着実に増加し、perl がメモリを割り当てることができなくなり、スクリプトが失敗しました。そのため、いくつかの調査を行い、メモリ オーバーヘッドを最小限に抑えるためにいくつかのことを試みました (ループ外で変数を定義する、undef に設定するなど)。これにより、割り当てられたメモリ サイズの増加が遅くなりましたが、最終的には失敗しました。(そして、私が正しく理解していれば、perl が OS にメモリを返すのは sth です。実際には起こりません。)

そこで、関数呼び出しとそのすべての定義をサブスレッドにネストし、その終了を待って結合し、次のチャンクでスレッドを再度呼び出すことにしました。

while (read FILE, $buffer, $chunksize){
my $thr = threads->create(\&thrWorker,$buffer);
$thr->join();
}

sub thrWorker{
# Do the stuff here!
}  

スレッドが参加するなら、これは解決策だったかもしれません! しかし、実際にはそうではありません。$thr->detach(); で実行すると 同時に何百ものスレッドを取得することを除いて、すべてが正常に機能しますが、これは良い考えではありません。この場合、それらを連続して実行する必要があります。

そのため、この結合の問題について調査を行ったところ、perl 5.16.1 に問題がある可能性があるという声があったため、5.16.2 に更新しましたが、それでも結合しません。メーリングリストのどこかで、スレッドを CPAN モジュール Thread::Queue に参加させることに成功した誰かから読んだことを覚えていませんが、これも私にはうまくいきませんでした。

だから私はスレッドをあきらめて、このことをフォークしようとしました。しかし、フォークでは、「フォーク」の総数が制限されているように見えますか? とにかく、13 回目から 20 回目の繰り返しまではうまくいき、それ以上分岐できないというメッセージであきらめました。

my $pid = fork();
if( $pid == 0 ){
       thrWorker($buffer);
    exit 0;
}

CPAN モジュールの Parallel::ForkManager と Proc::Fork でも試してみましたが、役に立ちませんでした。

だから今、私はどういうわけか立ち往生していて、自分自身を助けることができません. たぶん他の誰かができます!どんな提案でも大歓迎です!

  1. これをスレッドまたは子プロセスで動作させるにはどうすればよいですか?
  2. または、少なくともどうすれば perl に強制的にメモリを解放させて、同じプロセスでこれを行うことができますか?

私のシステムに関するいくつかの追加情報: OS: Windows 7 64bit / Ubuntu Server 12.10 Perl on Windows: Strawberry Perl 5.16.2 64bit

Stackoverflow に関する私の最初の投稿の 1 つ。私はそれを正しくやったと思います:-)

4

2 に答える 2

1

読むことをお勧めします:これ

私は通常、スレッドの入力を管理するために Thread::Queue を使用します。サンプルコード:

my @threads = {};
my $Q = new Thread::Queue;

# Start the threads
for (my $i=0; $i<NUM_THREADS; $i++) {
    $threads[$i] = 
        threads->new(\&insert_1_thread, $Q);
}

# Get the list of sites and put in the work queue
foreach $row ( @{$ref} ) {
    $Q->enqueue( $row->[0] );
    #sleep 1 while $Q->pending > 100;
} # foreach $row

# Signal we are done
for (my $i=0; $i<NUM_THREADS; $i++) {
    $Q->enqueue( undef ); }

$count = 0;
# Now wait for the threads to complete before going on to the next step
for (my $i=0; $i<NUM_THREADS; $i++) {
    $count += $threads[$i]->join(); }

ワーカー スレッドの場合:

sub insert_1_thread {
my ( $Q ) = @_;
my $tid = threads->tid;
my $count = 0;
Log("Started thread #$tid");

while( my $row = $Q->dequeue ) {
    PROCESS ME...
    $count++;
} # while

Log("Thread#$tid, done");
return $count;

} # sub insert_1_thread
于 2012-11-19T14:55:41.853 に答える
0

それがあなたにとっての解決策であるかどうかはわかりませんが、チャンクオブジェクトの配列を作成し、次のように並列に処理することができます。

#!/usr/bin/perl

package Object; {
    use threads;
    use threads::shared;        

    sub new(){
        my $class=shift;
        share(my %this);
        return(bless(\%this,$class));
    }

    sub set {
       my ($this,$value)=@_;    
        lock($this);
#       $this->{"data"}=shared_clone($value);
        $this->{"data"}=$value;
    }

    sub get {
        my $this=shift; 
        return $this->{"data"};
    }
}


package main; {

use strict;
use warnings;

use threads;
use threads::shared;

    my @objs;
    foreach (0..2){
        my $o = Object->new();
        $o->set($_);
        push @objs, $o; 
    }

    threads->create(\&run,(\@objs))->join();

    sub run {
        my ($obj) = @_;     
        $$obj[$_]->get() foreach(0..2);        
    }
}
于 2012-11-19T15:32:43.487 に答える