3

別のツール(overlapFeatures)をラップするperlスクリプトを作成して、ファイル形式をその場で正しく変換できるようにしました。私が扱っているファイルはすべて、通常200万行程度のタブ区切りのテーブルです。それ自体では、overlapFeaturesはこれらを問題なく処理できます。

しかし、一度にたくさんのラインを配管することで、パイプがロックしてしまうのではないかと思います。子プロセスの読み取りと書き込みを同時に行えるように、これをなんとかしてスレッド化する必要があることはわかっています。しかし、私はperl(またはその他のプログラム)でスレッドを適切に使用する方法を本当に理解していません。私はそれを理解しているので、問題を解決するために使用することthreadsもできます。IPC::run

デッドロックが発生する元のスクリプトは、次のようになります。

use strict;
use warnings;
use IPC::Open2;

my $infile = shift;
my $featurefile = shift;

my $command = 'overlapFeatures';
my @args = (qw (-a stdin -b), $featurefile);

my ($input, $output);
my $pid = open2($output, $input, $command, @args) 
    or die "Failed with error $!\n";

open (my $infh, '<', $infile) or die "Can't open $infile\n";
while (<$infh>){
    # Do some format conversion...
    chomp
    my @cols = split /\t/;
    # print a modified line to the tool
    print $input join ("\t", @cols[0,2,3,1,5,4]),"\n";
}
close ($input);

while (<$output>){
    # format conversion for ouput
    chomp;
    my @cols = split /\t/;
    print join (",",@cols[0,1,2,5,3,8]),"\n";
}
close ($output);

IPC :: Open2で大量のデータをフィルタリングする方法に従って、スレッドを利用するようにスクリプトを書き直そうとしました。そのようです:

use strict;
use warnings;
use IPC::Open2;
use threads;

my $infile = shift;
my $featurefile = shift;

my $command = 'overlapFeatures';
my @args = (qw (-a stdin -b), $featurefile);

my ($input, $output);
my $pid = open2($output, $input, $command, @args) 
    or die "Failed with error $!\n";

my $thread = async {
    print join(",", qw(seqid start end strand read feature name)),"\n";
    for(;;) {
        my $line = <$output>; # should block here and wait for output?
        last if !defined $line; # end of stream reached?
        print STDERR "Got line $line\n";
        # Do some format conversion...
        chomp $line;
        my @cols = split /\t/, $line;
        # print a modified line to the tool
        print join(",",@cols[0,1,2,5,3,8]),"\n";
    }
    close($output)
};

{
    open (my $infh, '<', $infile) or die "Can't open $infile\n";
    while (<$infh>){
        # format conversion for ouput
        chomp;
        my @cols = split /\t/;
        print $input join ("\t", @cols[0,2,3,1,5,4]),"\n";
    }
    close ($input);
}

$thread->join();
waitpid ($pid, 0);

ただし、スクリプトは同じようにスタックし、私もスタックします。この場合の使い方もわかりませんIPC::run

私は何が間違っているのですか?糸脱毛を誤解しましたか?


編集:スクリプトのデバッグ(およびamonの支援)により多くの時間を費やして、から行を取得できることがわかりました$output。ただし、スクリプトは終了せず、すべての出力が受信された後、ハングしているように見えます。これが私の唯一の問題だと思います。

4

1 に答える 1

1

これは長いコメントのようなものです。

簡略化したバージョンでコードを試しました。変換コードを削除し、Unixyesコマンドを無限のデータソースとして使用し、出力をに出力しまし/dev/nullた。現在、出力には関心がなく、プログラムが機能しているためです。あなたの代わりとしてoverlapFeatures、私catはデータを変更せずに通過させていました。

use strict; use warnings; use IPC::Open2; use threads;

my $command = "cat";
my @args = ();

my ($input, $output);
my $pid = open2($output, $input, $command, @args) 
  or die "Failed with error $!\n";

my $thread = async {
  print $_ while defined($_ = <$output>);
  close($output)
};

{
  my $c=0;
  open (my $infh, "-|", "yes") or die;
  open my $null, ">/dev/null" or die;
  while (<$infh>){
    $c++;
    print $null $_;
    if ($c >= 1_000_000) {
      print "\n==another million==\n\n";
      $c=0
    }
  }
  close ($input);
}

$thread->join();
waitpid ($pid, 0);

100万行に1回(文字通り)、ステータスメッセージを出力して、IOがまだ機能していることを表明します。

結果

Perl12.4を搭載したUbuntuLinuxでテストしたところ、特定のスクリプトは問題なく動作します。したがって、問題はIPCコード内ではなく、データ形式の変換、ラップしているプログラム、またはデータ量(yes文字列"1\n"を出力し、それぞれが小さなデータの多くの行を作成する)のいずれかにあると考えるのが妥当です。 1行あたり2バイトでグループあたり最大2MB))

結論

別の構成を実行している可能性があります。* nixを実行している場合は、私が使用したスクリプトも機能することを主張してください。そうでない場合は、この構成を明示的に記述し、同等のスクリプトを実行してみてください。

少なくともテストのために、ラッパーを2つのスクリプトに分割することも可能であるため、次のようなものを実行します。

$ convert-to | overlapFeatures | convert-from

これにより、すべてのIPCがシェルに委任され、変換が機能しており、アーキテクチャが実装可能であることが表明されます。

ブレーンストーミングされた他のありそうもないアイデア:

(1)close操作はいつ実行されますか?奇妙な理由で、ループの一方の端が途中で終了する可能性がありますか?sのprint STDERR "Closing down xx\n"前にclose面白いかもしれません。(2)プロセス/スレッドを正常に生成しopen2、制御フローを返しますか?妄想的に私はそれらの後に別のものを置きます…(3)スクリプトからデータを取得します、それともしばらくするとストリームが枯渇しますか?asyncprint STDERR

編集

EOFすべての書き込み終了がclosedになるまで、パイプは降伏しません。したがって、すべてのスレッドは、使用していないものをすべて閉じる必要があります。

my $thread = async {
  close $input;
  print $_ while defined($_ = <$output>);
  close($output)
};

{
  close $output;
  my $c=0;
  open (my $infh, "-|", "yes") or die;
  open my $null, ">/dev/null" or die;
  while (<$infh>){
    $c++;
    print $null $_;
    ...
于 2012-09-21T15:38:45.253 に答える