4

私は、scala で文字列を使用して大規模なデータを処理するための、メモリ効率が高く機能的な方法を見つけようとしています。私は遅延コレクションについて多くのことを読み、かなりの数のコード例を見てきました。ただし、「GC オーバーヘッドの超過」または「Java ヒープ領域」の問題が何度も発生します。

多くの場合、問題は遅延コレクションを構築しようとすることですが、成長するコレクションに追加するときに新しい要素をそれぞれ評価します (これを段階的に行う方法は他にありません)。もちろん、最初に初期遅延コレクションを初期化するようなことを試して、マップなどを使用してリソースクリティカルな計算を適用することで、目的の値を保持するコレクションを生成することもできますが、最終的なコレクションの正確なサイズがわからないことがよくあります。その怠惰なコレクションを初期化するアプリオリ。

奇数シーケンスのペアが 1 つのファイルに属し、偶数シーケンスのペアが 1 つのファイルに属するという規則に従って、FASTA (以下の定義) 形式のファイルを 2 つの個別のファイルに分割する例として、次のコードを改善する方法についてのヒントや説明を教えてください。別のものに(「ストランドの分離」)。これを行うための「最も」簡単な方法は、行をループし、開いているファイルストリームを介して対応するファイルに出力するという命令的な方法です (もちろん、これはうまく機能します)。ただし、ヘッダーとシーケンスを保持する変数に再割り当てするスタイルが好きではないため、次のコード例では (末尾) 再帰を使用しています。リソースの問題に遭遇することなく同様の設計を維持する方法を見つけていただければ幸いです。 !

この例は小さなファイルに対しては完全に機能しますが、約 500 MB のファイルでは、標準の JVM セットアップではコードが失敗します。「任意の」サイズ、たとえば 10 ~ 20 GB 程度のファイルを処理したい。

val fileName = args(0)
val in = io.Source.fromFile(fileName) getLines

type itType = Iterator[String]
type sType = Stream[(String, String)]

def getFullSeqs(ite: itType) = {
    //val metaChar = ">"
    val HeadPatt = "(^>)(.+)" r
    val SeqPatt  = "([\\w\\W]+)" r
    @annotation.tailrec
    def rec(it: itType, out: sType = Stream[(String, String)]()): sType = 
        if (it hasNext) it next match  {
            case HeadPatt(_,header) =>
                // introduce new header-sequence pair
                rec(it, (header, "") #:: out)
            case SeqPatt(seq) =>
                val oldVal = out head
                // concat subsequences
                val newStream = (oldVal._1, oldVal._2 + seq) #:: out.tail    
                rec(it, newStream)
            case _ =>
                println("something went wrong my friend, oh oh oh!"); Stream[(String, String)]()                
        } else out
    rec(ite)    
}

def printStrands(seqs: sType) {
   import java.io.PrintWriter
   import java.io.File
   def printStrand(seqse: sType, strand: Int) {
        // only use sequences of one strand 
        val indices =  List.tabulate(seqs.size/2)(_*2 + strand - 1).view
        val p = new PrintWriter(new File(fileName + "." + strand))
        indices foreach { i =>
              p.print(">" + seqse(i)._1 + "\n" + seqse(i)._2 + "\n")
        }; p.close
       println("Done bro!")
   }
   List(1,2).par foreach (s => printStrand(seqs, s))
}

printStrands(getFullSeqs(in))

3 つの疑問が生じます。

getLinesA)私の方法のように取得した初期イテレータを処理することによって得られる大きなデータ構造を維持する必要があると仮定しましょう( のサイズと の出力がgetFullSeqs異なることに注意してください)。これは、(!) データ全体の変換が繰り返し必要になるためです。どの段階でもデータのどの部分が必要になるか分からないからです。私の例は最善ではないかもしれませんが、どうすればよいでしょうか? そもそも可能ですか??ingetFullSeqs

(header -> sequence)B) 必要なデータ構造が本質的に遅延していない場合、ペアをに格納したいとしMap()ます。怠惰なコレクションでラップしますか?

C) ストリームを構築する私の実装は、入力された行の順序を逆にする可能性があります。reverse を呼び出すと、すべての要素が評価されます (私のコードでは既に評価されているため、これが実際の問題です)。怠惰な方法で「後ろから」後処理する方法はありますか? 私は知っていますがreverseIterator、これはすでに解決策ですか、それとも実際には最初にすべての要素を評価するわけではありません(リストで呼び出す必要があるため)? でストリームを構築することもできますがnewVal #:: rec(...)、そうすると末尾再帰が失われてしまいますね。

したがって、基本的に必要なのは、追加のプロセスによって評価されない要素をコレクションに追加することです。だからlazy val elem = "test"; elem :: lazyCollection私が探しているものではありません。

編集: のストリーム引数に by-name パラメータを使用してみましたrec

ご清聴ありがとうございました。

/////////////////////////////////////////////// /////////////////////////////////////////////// /////////////////////////////////////////////// ///////////////

FASTA は、1 つのヘッダー行で区切られたシーケンスの連続セットとして定義されます。ヘッダーは、">" で始まる行として定義されます。ヘッダーの下のすべての行は、ヘッダーに関連付けられたシーケンスの一部と呼ばれます。新しいヘッダーが存在する場合、シーケンスは終了します。すべてのヘッダーは一意です。例:

>HEADER1
abcdefg
>HEADER2
hijklmn
opqrstu
>HEADER3
vwxyz
>HEADER4
zyxwv

したがって、シーケンス 2 はシーケンス 1 の 2 倍の大きさです。私のプログラムは、そのファイルをファイル A に分割します。

>HEADER1
abcdefg
>HEADER3
vwxyz

を含む 2 番目のファイル B

>HEADER2
hijklmn
opqrstu
>HEADER4
zyxwv

入力ファイルは、偶数のヘッダーとシーケンスのペアで構成されていると想定されます。

4

2 に答える 2

4

非常に大きなデータ構造を処理するための鍵は、必要な操作を実行するために重要なものだけをメモリに保持することです。だから、あなたの場合、それは

  • 入力ファイル
  • 2つの出力ファイル
  • 現在のテキスト行

それだけです。場合によっては、シーケンスの長さなどの情報を保存する必要があります。このようなイベントでは、最初のパスでデータ構造を構築し、2番目のパスでそれらを使用します。たとえば、3つのファイルを書き込むことにしたとします。1つは偶数レコード用、1つは奇数レコード用、もう1つは全長が300ヌクレオチド未満のエントリ用です。あなたはこのようなことをするでしょう(警告-それはコンパイルされますが、私はそれを実行したことがないので、実際には機能しないかもしれません):

final def findSizes(
  data: Iterator[String], sz: Map[String,Long] = Map(),
  currentName: String = "", currentSize: Long = 0
): Map[String,Long] = {
  def currentMap = if (currentName != "") sz + (currentName->currentSize) else sz
  if (!data.hasNext) currentMap
  else {
    val s = data.next
    if (s(0) == '>') findSizes(data, currentMap, s, 0)
    else findSizes(data, sz, currentName, currentSize + s.length)
  }
}

次に、処理のために、そのマップを使用して再度パススルーします。

import java.io._
final def writeFiles(
  source: Iterator[String], targets: Array[PrintWriter],
  sizes: Map[String,Long], count: Int = -1, which: Int = 0
) {
  if (!source.hasNext) targets.foreach(_.close)
  else {
    val s = source.next
    if (s(0) == '>') {
      val w = if (sizes.get(s).exists(_ < 300)) 2 else (count+1)%2
      targets(w).println(s)
      writeFiles(source, targets, sizes, count+1, w)
    }
    else {
      targets(which).println(s)
      writeFiles(source, targets, sizes, count, which)
    }
  }
}

次に、Source.fromFile(f).getLines()2回使用してイテレータを作成すると、準備が整います。 編集:これはあなたの「怠惰な」コレクションであるため、ある意味でこれが重要なステップです。ただし、すべてのメモリをすぐに読み取らない(「レイジー」)という理由だけでなく、以前の文字列も格納しないという理由で重要ではありません。

より一般的には、Scalaは、メモリに必要な情報と、必要に応じてディスクから取得できる情報について慎重に考えることから、それほど多くのことを支援することはできません。遅延評価が役立つ場合もありますが、すべてのデータを遅延的にメモリに保持するという要件を簡単に表現できるため、魔法の公式はありません。Scalaは、メモリにアクセスするためのコマンドを、秘密裏に、代わりにディスクから何かをフェッチするための命令として解釈することはできません。(まあ、それを正確に行うディスクから結果をキャッシュするライブラリを作成しない限り、そうではありません。)

于 2012-06-05T12:00:09.620 に答える
4

newVal #:: rec(...) でストリームを構築することもできますが、そうすると末尾再帰が失われてしまいますよね?

実は違う。

それで、これが問題です...現在の末尾再帰では、すべてStream値で埋めます。はい、Stream怠惰ですが、すべての要素を計算して、怠惰を取り除いています。

今あなたが言うnewVal #:: rec(...)。末尾再帰を失いますか? いいえ、なぜですか?あなたは再帰的ではないからです。どうして?まあ、Stream怠け者なので、評価しませんrec(...)

そして、それはそれの美しさです。そのようにgetFullSeqsすると、最初の対話で戻り、printStrands要求されたときにのみ「再帰」を計算します。残念ながら、そのままではうまくいきません...

問題は、常に を変更していることです。Streamこれは、 を使用する方法ではありませんStream。ではStream、常にそれに追加します。を「書き換え」続けないでくださいStream

さて、私がすぐに特定できる問題が他に 3 つありますprintStrands。まず、sizeonseqsを呼び出します。これにより、全体Streamが処理され、遅延が失われます。を呼び出さないsizeでくださいStream。次に、applyonを呼び出してseqse、インデックスでアクセスします。(または)applyを呼び出さないでください。これは非常に非効率的です。それはあなたの内側のループを作ります-はい、入力ファイルのヘッダーの数で2次です! 最後に、の実行中ずっとへの参照を保持し、処理要素がガベージ コレクションされるのを防ぎます。StreamListO(n)O(n^2)printStrandsseqsprintStrand

したがって、最初の概算は次のとおりです。

def inputStreams(fileName: String): (Stream[String], Stream[String]) = {
  val in = (io.Source fromFile fileName).getLines.toStream
  val SeqPatt = "^[^>]".r
  def demultiplex(s: Stream[String], skip: Boolean): Stream[String] = {
    if (s.isEmpty) Stream.empty
    else if (skip) demultiplex(s.tail dropWhile (SeqPatt findFirstIn _ nonEmpty), skip = false)
         else s.head #:: (s.tail takeWhile (SeqPatt findFirstIn _ nonEmpty)) #::: demultiplex(s.tail dropWhile (SeqPatt findFirstIn _ nonEmpty), skip = true)
  }
  (demultiplex(in, skip = false), demultiplex(in, skip = true))
}

上記の問題、および怠惰の問題をさらにガイドするためにそのコードを示しているのは、これを実行した瞬間です。

val (a, b) = inputStreams(fileName)

両方のストリームのヘッドへの参照を保持し、ガベージ コレクションを防ぎます。それらへの参照を保持することはできないため、「val」または「lazy val」に保存せずに、取得したらすぐにそれらを消費する必要があります。「var」で十分かもしれませんが、扱いにくいでしょう。それでは、代わりにこれを試してみましょう:

def inputStreams(fileName: String): Vector[Stream[String]] = {
  val in = (io.Source fromFile fileName).getLines.toStream
  val SeqPatt = "^[^>]".r
  def demultiplex(s: Stream[String], skip: Boolean): Stream[String] = {
    if (s.isEmpty) Stream.empty
    else if (skip) demultiplex(s.tail dropWhile (SeqPatt findFirstIn _ nonEmpty), skip = false)
         else s.head #:: (s.tail takeWhile (SeqPatt findFirstIn _ nonEmpty)) #::: demultiplex(s.tail dropWhile (SeqPatt findFirstIn _ nonEmpty), skip = true)
  }
  Vector(demultiplex(in, skip = false), demultiplex(in, skip = true))
}

inputStreams(fileName).zipWithIndex.par.foreach { 
  case (stream, strand) => 
    val p = new PrintWriter(new File("FASTA" + "." + strand))
    stream foreach p.println
    p.close
}

stream内部inputStreamsは参照として機能し、印刷中もストリーム全体をメモリに保持するため、それでも機能しません。

それで、再び失敗したので、私は何をお勧めしますか? 複雑にしないでおく。

def in = (scala.io.Source fromFile fileName).getLines.toStream
def inputStream(in: Stream[String], strand: Int = 1): Stream[(String, Int)] = {
  if (in.isEmpty) Stream.empty
  else if (in.head startsWith ">") (in.head, 1 - strand) #:: inputStream(in.tail, 1 - strand)
       else                        (in.head, strand) #:: inputStream(in.tail, strand)
}
val printers = Array.tabulate(2)(i => new PrintWriter(new File("FASTA" + "." + i)))
inputStream(in) foreach {
  case (line, strand) => printers(strand) println line
}
printers foreach (_.close)

これで、必要以上にメモリに保持されなくなりました。しかし、それでも複雑すぎると思います。これは、次のように簡単に実行できます。

def in = (scala.io.Source fromFile fileName).getLines
val printers = Array.tabulate(2)(i => new PrintWriter(new File("FASTA" + "." + i)))
def printStrands(in: Iterator[String], strand: Int = 1) {
  if (in.hasNext) {
    val next = in.next
    if (next startsWith ">") { 
      printers(1 - strand).println(next)
      printStrands(in, 1 - strand)
    } else {
      printers(strand).println(next)
      printStrands(in, strand)
    }
  }
}
printStrands(in)
printers foreach (_.close)

またはwhile、再帰の代わりにループを使用します。

さて、他の質問に:

B) データの2 つのコピーを保持する必要がないように、読み取り中にそうするのが理にかなっている場合がありMapますSeq

C) a を逆にしないでくださいStream-- その怠惰さがすべて失われます。

于 2012-06-05T17:12:18.750 に答える