6

シェルから次のコマンドを実行したとしましょう

{ 
samtools view -HS header.sam;           # command1
samtools view input.bam 1:1-50000000;   # command2
} | samtools view -bS - > output.bam    # command3

samtools ビューに慣れていない方向け (これは stackoverflow であるため)。これが本質的に行っていることは、新しいヘッダーを持つ新しいbamファイルを作成することです. 通常、bam ファイルは大きな圧縮ファイルであるため、場合によってはファイルを通過するだけでも時間がかかることがあります。別の方法の 1 つは、command2 を実行してから、samtools reheader を使用してヘッダーを切り替えることです。これは、大きなファイルを 2 回通過します。上記のコマンドは、bam を 1 回通過します。これは、大きな bam ファイルに適しています (圧縮されている場合でも 20GB を超えます - WGS)。

私の質問は、サブプロセスを使用して Python でこのタイプのコマンドを実装する方法です。

私は次のものを持っています:

fh_bam = open('output.bam', 'w')
params_0 = [ "samtools", "view", "-HS", "header.sam" ]
params_1 = [ "samtools", "view", "input.bam", "1:1-50000000"]
params_2 = [ "samtools", "view", "-bS", "-" ]
sub_0 = subprocess.Popen(params_0, stderr=subprocess.PIPE, stdout=subprocess.PIPE)
sub_1 = subprocess.Popen(params_1, stderr=subprocess.PIPE, stdout=subprocess.PIPE)
### SOMEHOW APPEND sub_1.stdout to sub_0.stdout
sub_2 = subprocess.Popen(params_2, stdin=appended.stdout, stdout=fh_bam)

どんな助けでも大歓迎です。ありがとう。

4

4 に答える 4

4

文字列に既にシェル コマンドがある場合は、そのまま実行できます。

#!/usr/bin/env python
from subprocess import check_call

check_call(r"""
{ 
samtools view -HS header.sam;           # command1
samtools view input.bam 1:1-50000000;   # command2
} | samtools view -bS - > output.bam    # command3
""", shell=True)

Python でパイプラインをエミュレートするには:

#!/usr/bin/env python
from subprocess import Popen, PIPE

# start command3 to get stdin pipe, redirect output to the file
with open('output.bam', 'wb', 0) as output_file:
    command3 = Popen("samtools view -bS -".split(), 
                     stdin=PIPE, stdout=output_file)
# start command1 with its stdout redirected to command3 stdin
command1 = Popen('samtools view -HS header.sam'.split(),
                 stdout=command3.stdin)
rc_command1 = command1.wait() #NOTE: command3.stdin is not closed, no SIGPIPE or a write error if command3 dies
# start command2 after command1 finishes
command2 = Popen('samtools view input.bam 1:1-50000000'.split(),
                 stdout=command3.stdin)
command3.stdin.close() # inform command2 if command3 dies (SIGPIPE or a write error)
rc_command2 = command2.wait()
rc_command3 = command3.wait()
于 2015-02-28T09:05:18.503 に答える
1

(悲しいことにコメントすることはできませんが、この「回答」は cmidi の回答に対するコメントです。

Marco は、コマンドが約 20GB の大量の出力を生成すると明言しました。communicate() を使用すると、プロセスが終了するまで待機します。つまり、「fd」記述子は大量のデータを保持する必要があります。実際には、コンピューターに 20GB を超える空き RAM がない限り、OS はその間にデータをディスクにフラッシュします。したがって、元の作成者が避けたかった中間データをディスクに書き込むことになります。サーラークの答えに+1!

于 2015-02-27T22:51:59.860 に答える
0

関連するファイルのサイズが原因で、メモリ内の最初の 2 つのサブプロセスからの出力を連結することは現実的ではないと思います。最初の 2 つのサブプロセスの出力を次のようなファイルにラップすることをお勧めします。popen は、シークや書き込みではなく、標準入力ファイルのようなものからのみ読み取るため、 read メソッドのみが必要なようです。以下のコードは、読み取りから空の文字列を返すだけで、ストリームが EOF にあることを示すのに十分であると想定しています。

class concat(object):
    def __init__(self, f1, f2):
        self.f1 = f1
        self.f2 = f2

    def read(self, *args):
        ret = self.f1.read(*args)
        if ret == '':
            ret = self.f2.read(*args)
        return ret

fh_bam = open('output.bam', 'w')
params_0 = [ "samtools", "view", "-HS", "header.sam" ]
params_1 = [ "samtools", "view", "input.bam", "1:1-50000000"]
params_2 = [ "samtools", "view", "-bS", "-" ]
sub_0 = subprocess.Popen(params_0, stderr=subprocess.PIPE, stdout=subprocess.PIPE)
sub_1 = subprocess.Popen(params_1, stderr=subprocess.PIPE, stdout=subprocess.PIPE)
### Somehow Append sub_1.stdout to sub_0.stdout
sub_2 = subprocess.Popen(params_2, stdin=concat(sub_0.stdout, sub_1.stdout), stdout=fh_bam)

明確にするために、f1.readブロック''し、パイプが閉じられた/EOFされたときにのみ戻ります。はこれが発生した後concat.readにのみ読み取りを試行するため、とからの出力は織り交ぜられません。もちろん、末尾を繰り返し読み取るためのわずかなオーバーヘッドがあります。これは、どのファイルから読み取るかを示すフラグ変数を設定することで回避できます。ただし、パフォーマンスが大幅に向上するとは思えません。f2f1f2f1

于 2015-02-27T22:21:54.173 に答える
-1

While Popen accepts file-like objects, it actually uses the underlying file handles/descriptors, not the read and write methods of the file objects to communicate, as @J.F. Sebastian rightly points out. A better way to do this is to use a pipe (os.pipe()) which doesn't use the disk. This allows you to connect the output stream directly to the input stream of another process, which is exactly what you want. The problem is then just a matter of serialisation, to make sure the two source streams don't interleave.

import os
import subprocess

r, w = os.pipe()

fh_bam = open('output.bam', 'w')
params_0 = [ "samtools", "view", "-HS", "header.sam" ]
params_1 = [ "samtools", "view", "input.bam", "1:1-50000000"]
params_2 = [ "samtools", "view", "-bS", "-" ]
sub_sink = subprocess.Popen(params_2, stdin=r, stdout=fh_bam, bufsize=4096)
sub_src1 = subprocess.Popen(params_0, stderr=subprocess.PIPE, stdout=w, bufsize=4096)
sub_src1.communicate()
sub_src2 = subprocess.Popen(params_1, stderr=subprocess.PIPE, stdout=w, bufsize=4096)
sub_src2.communicate()

We open the sink (the reader of the pipe) first and then communicate with the source processes only to avoid potential blocking as mentioned by @Ariel. This also forces the first source process to complete and flush its output over the pipe, before the second source process gets a chance to write to the pipe, preventing interleaved/clobbered output. You can play with the bufsize value to tweak performance.

This is pretty much exactly what the shell command is doing.

于 2015-02-28T12:05:26.147 に答える