9

私が抱えている問題の説明は少し複雑であり、より完全な情報を提供する側で誤りを犯します。せっかちな人のために、これが私がそれを要約することができる最も簡単な方法です:

改行文字をスローしながら、テキストファイルをサイズN(バインドされたN、たとえば36)のすべての(重複する)サブ文字列に分割する最も速い(実行時間が最も短い)方法は何ですか。

FASTAASCIIベースのゲノム形式でファイルを解析するモジュールを書いています。これらのファイルは、「hg18」ヒトリファレンスゲノムと呼ばれるもので構成されており、必要に応じて、UCSCゲノムブラウザーからダウンロードできます(スラッグになります!)。

お気づきのように、ゲノムファイルはchr[1..22].faとchr[XY].fa、およびこのモジュールで使用されていない他の小さなファイルのセットで構成されています。

BioPythonのSeqIOなど、FASTAファイルを解析するためのモジュールがすでにいくつか存在します。(申し訳ありませんが、リンクを投稿しますが、まだポイントがありません。)残念ながら、私が見つけたすべてのモジュールは、私が行おうとしている特定の操作を実行しません。

私のモジュールは、ゲノムデータ(たとえば、「CAGTACGTCAGACTATACGGAGCTA」は1行である可能性があります)を、重複するすべてのN長のサブストリングに分割する必要があります。非常に小さなファイル(実際の染色体ファイルの長さは3億5500万から2000万文字)とN=8を使用した例を挙げましょう。

>>>cStringIOをインポートします
>>> example_file = cStringIO.StringIO( "" "\
>ヘッダー
CAGTcag
TFgcACF
"" ")
>>> parse(example_file)で読み取る場合:
...印刷読み取り
..。
CAGTCAGTF
AGTCAGTFG
GTCAGTFGC
TCAGTFGCA
CAGTFGCAC
AGTFGCACF

私が見つけた関数は、私が考えることができる方法の中で絶対的に最高のパフォーマンスを持っていました:これは次のとおりです。


def parse(file):
  size = 8 # of course in my code this is a function argument
  file.readline() # skip past the header
  buffer = ''
  for line in file:
    buffer += line.rstrip().upper()
    while len(buffer) >= size:
      yield buffer[:size]
      buffer = buffer[1:]

これは機能しますが、残念ながら、この方法でヒトゲノムを解析するのに約1.5時間かかります(以下の注を参照)。おそらくこれは私がこの方法で見るのに最適です(完全なコードリファクタリングが必要かもしれませんが、このアプローチにはコードの他の領域でいくつかの非常に特定の利点があるため、避けたいと思います)が、私は私はこれをコミュニティに引き渡すと思いました。

ありがとう!

  • この時間には、反対側のストランドの読み取りの計算や、サイズが約5Gのハッシュでのハッシュテーブルルックアップの実行など、多くの追加の計算が含まれることに注意してください。

回答後の結論: fileobj.read()を使用してから、結果の文字列(string.replace()など)を操作すると、プログラムの残りの部分と比較して時間とメモリが比較的少なくて済むことがわかったので、それを使用しましたアプローチ。みんな、ありがとう!

4

4 に答える 4

4

ファイルをmmapして、スライディングウィンドウでファイルをつつき始めてもらえますか?私はかなり小さく実行される愚かな小さなプログラムを書きました:

USER       PID %CPU %MEM    VSZ   RSS TTY      STAT START   TIME COMMAND
sarnold  20919  0.0  0.0  33036  4960 pts/2    R+   22:23   0:00 /usr/bin/python ./sliding_window.py

636229バイトのfastaファイル(http://biostar.stackexchange.com/questions/1759で見つかりました)の処理には、.383秒かかりました。

#!/usr/bin/python

import mmap
import os

  def parse(string, size):
    stride = 8
    start = string.find("\n")
    while start < size - stride:
        print string[start:start+stride]
        start += 1

fasta = open("small.fasta", 'r')
fasta_size = os.stat("small.fasta").st_size
fasta_map = mmap.mmap(fasta.fileno(), 0, mmap.MAP_PRIVATE, mmap.PROT_READ)
parse(fasta_map, fasta_size)
于 2011-01-26T06:28:49.250 に答える
3

問題は、文字列形式で保存されているデータが多すぎて、ユースケースにとって非常に無駄であり、実際のメモリが不足してスワップがスラッシングしていることだと思います。これを回避するには 128GBで十分です...:)

とにかく追加情報を格納する必要があることをコメントで示したので、親文字列を参照する別のクラスを選択します。hg18のchromFa.zipのchr21.faを使用して簡単なテストを実行しました。ファイルは約48MBで、100万行弱です。ここには1GBのメモリしかないので、後でオブジェクトを破棄するだけです。したがって、このテストでは、断片化、キャッシュ、または関連する問題は示されませんが、解析スループットを測定するための良い出発点になると思います。

import mmap
import os
import time
import sys

class Subseq(object):
  __slots__ = ("parent", "offset", "length")

  def __init__(self, parent, offset, length):
    self.parent = parent
    self.offset = offset
    self.length = length

  # these are discussed in comments:
  def __str__(self):
    return self.parent[self.offset:self.offset + self.length]

  def __hash__(self):
    return hash(str(self))

  def __getitem__(self, index):
    # doesn't currently handle slicing
    assert 0 <= index < self.length
    return self.parent[self.offset + index]

  # other methods

def parse(file, size=8):
  file.readline()  # skip header
  whole = "".join(line.rstrip().upper() for line in file)
  for offset in xrange(0, len(whole) - size + 1):
    yield Subseq(whole, offset, size)

class Seq(object):
  __slots__ = ("value", "offset")
  def __init__(self, value, offset):
    self.value = value
    self.offset = offset

def parse_sep_str(file, size=8):
  file.readline()  # skip header
  whole = "".join(line.rstrip().upper() for line in file)
  for offset in xrange(0, len(whole) - size + 1):
    yield Seq(whole[offset:offset + size], offset)

def parse_plain_str(file, size=8):
  file.readline()  # skip header
  whole = "".join(line.rstrip().upper() for line in file)
  for offset in xrange(0, len(whole) - size + 1):
    yield whole[offset:offset+size]

def parse_tuple(file, size=8):
  file.readline()  # skip header
  whole = "".join(line.rstrip().upper() for line in file)
  for offset in xrange(0, len(whole) - size + 1):
    yield (whole, offset, size)

def parse_orig(file, size=8):
  file.readline() # skip header
  buffer = ''
  for line in file:
    buffer += line.rstrip().upper()
    while len(buffer) >= size:
      yield buffer[:size]
      buffer = buffer[1:]

def parse_os_read(file, size=8):
  file.readline()  # skip header
  file_size = os.fstat(file.fileno()).st_size
  whole = os.read(file.fileno(), file_size).replace("\n", "").upper()
  for offset in xrange(0, len(whole) - size + 1):
    yield whole[offset:offset+size]

def parse_mmap(file, size=8):
  file.readline()  # skip past the header
  buffer = ""
  for line in file:
    buffer += line
    if len(buffer) >= size:
      for start in xrange(0, len(buffer) - size + 1):
        yield buffer[start:start + size].upper()
      buffer = buffer[-(len(buffer) - size + 1):]
  for start in xrange(0, len(buffer) - size + 1):
    yield buffer[start:start + size]

def length(x):
  return sum(1 for _ in x)

def duration(secs):
  return "%dm %ds" % divmod(secs, 60)


def main(argv):
  tests = [parse, parse_sep_str, parse_tuple, parse_plain_str, parse_orig, parse_os_read]
  n = 0
  for fn in tests:
    n += 1
    with open(argv[1]) as f:
      start = time.time()
      length(fn(f))
      end = time.time()
      print "%d  %-20s  %s" % (n, fn.__name__, duration(end - start))

  fn = parse_mmap
  n += 1
  with open(argv[1]) as f:
    f = mmap.mmap(f.fileno(), 0, mmap.MAP_PRIVATE, mmap.PROT_READ)
    start = time.time()
    length(fn(f))
    end = time.time()
  print "%d  %-20s  %s" % (n, fn.__name__, duration(end - start))


if __name__ == "__main__":
  sys.exit(main(sys.argv))

1  parse                 1m 42s
2  parse_sep_str         1m 42s
3  parse_tuple           0m 29s
4  parse_plain_str       0m 36s
5  parse_orig            0m 45s
6  parse_os_read         0m 34s
7  parse_mmap            0m 37s

最初の4つは私のコードですが、origはあなたのもので、最後の2つは他の回答からのものです。

ユーザー定義オブジェクトは、タプルやプレーン文字列よりも作成と収集にはるかにコストがかかります。これはそれほど驚くべきことではありませんが、これほど大きな違いが生じるとは思いもしませんでした(ユーザー定義のクラスとタプルでのみ実際に異なる#1と#3を比較してください)。とにかく(parseやparse_sep_strの場合のように)文字列とともにオフセットなどの追加情報を格納したいとおっしゃっていたので、そのタイプをC拡張モジュールに実装することを検討してください。Cを直接書きたくない場合は、Cythonおよび関連するものを参照してください。

ケース#1と#2は同じであると予想されます。親文字列を指すことにより、処理時間ではなくメモリを節約しようとしましたが、このテストではそれを測定しません。

于 2011-01-26T04:06:49.410 に答える
3

いくつかの古典的なIOバウンドの変更。

  • のような低レベルの読み取り操作を使用os.readして、大きな固定バッファーに読み込みます。
  • 1つが読み取り、バッファリングし、他のプロセスが処理されるスレッド化/マルチプロセッシングを使用します。
  • 複数のプロセッサ/マシンがある場合は、multiprocessing / mqを使用して、CPU全体の処理をmap-reduceで分割します。

低レベルの読み取り操作を使用することは、それほど多くの書き直しではありません。他はかなり大規模な書き直しになります。

于 2011-01-26T07:56:41.443 に答える
1

テキストファイルを処理し、プロセスのワークセットの非同期プールを使用した読み取りと書き込みおよび並列コンピューティングでバッファーを使用する機能があります。私は2コアのAMD、8GB RAM、gnu / linuxを持っており、1秒未満で300000行、約4秒で1000000行、約20秒で約4500000行(220MB以上)を処理できます。

# -*- coding: utf-8 -*-
import sys
from multiprocessing import Pool

def process_file(f, fo="result.txt", fi=sys.argv[1]):
    fi = open(fi, "r", 4096)
    fo = open(fo, "w", 4096)
    b = []
    x = 0
    result = None
    pool = None
    for line in fi:
        b.append(line)
        x += 1
        if (x % 200000) == 0:
            if pool == None:
                pool = Pool(processes=20)
            if result == None:
                result = pool.map_async(f, b)
            else:
                presult = result.get()
                result = pool.map_async(f, b)
                for l in presult:
                    fo.write(l)
            b = []
    if not result == None:
        for l in result.get():
            fo.write(l)
    if not b == []:
        for l in b:
            fo.write(f(l))
    fo.close()
    fi.close()

最初の引数は1行を受け取り、処理して結果を返す関数で、次は出力ファイル、最後は入力ファイルです(入力スクリプトファイルの最初のパラメーターとして受け取った場合、最後の引数は使用できません) 。

于 2013-01-24T08:04:13.313 に答える