0

いくつかの処理関数によって個別に処理され、別のシーケンス オブジェクトとして収集される必要がある非常に長い文字列シーケンスがあります。この問題は、フォーク/ジョイン タイプの攻撃に最適なようです。

関数は、インスタンス化に非常にコストがかかるクラスのメンバーです。しかし、future 間で 1 つのクラス オブジェクトをインスタンス化して共有すると問題が発生するようだったので、使用可能なプロセッサ数の 4 倍をインスタンス化してから、future 間で分割しました。

// instantiate processing class objects
val processors = 1 to SomeNumber map (x=> new MyProcessor)
val processorstream = Stream.continually(processors).flatten
// the input strings
val input: Seq[String] = some sequence of strings
val splitinput = input.grouped(some large number).toStream
// create futures
val mytask = splitinput.zip(processorstream).collect {
    case (subseq of strings, processor) => future {
        map elements of subsequence of strings with processor}}

次に、出力を次のように収集します

val result = mytask.map(x => x.apply()).reduce(_++_) // or some appropriate concatenation operator

私の問題は、コアが 8 つあるにもかかわらず、CPU の使用率が完全に得られないことです。1 つのコアのみを使用します。

調査するために、私が試した代替手段は

val input: Seq[String] = some sequence of strings
// no stage where I split the input into subsequences
val mytask = input.zip(processorstream).collect {
    case (string, processor) => future {
        process string with processor}}
val result = mytask.map(x => x.apply())

この代替手段は機能し、機能しませんでした。完全な CPU 使用率を達成しましたが、(仮説として) プロセッサが各文字列を処理する速度が速すぎて、同じプロセッサ オブジェクトが異なる文字列に同時に適用されることがあったため、いくつかの例外がスローされました。

より長い入力 (たとえば、10 単語の見出しの代わりにテキスト ドキュメント全体) を提供すると、例外がスローされることなく CPU がフルに使用されるため、プロセッサの動作が速すぎるという私の仮説はさらに確信しています。

また、akka の先物と scalaz の約束を試してみましたが、入力シーケンスをサブシーケンスに分割すると、すべて 1 つの CPU しか使用しないようです。

では、文字列のサブシーケンスを入力として使用しながら、このインスタンスで先物を使用して完全な CPU 使用率を得るにはどうすればよいでしょうか?

4

2 に答える 2

2

@om-nom-nom あたり:

input.par.map { s => task(s) }
于 2012-08-18T20:08:39.827 に答える
0

ThreadLocal可変プロセッサに使用してみてください。かなり役に立たない例:

val words = io.Source.fromFile("/usr/share/dict/words").getLines.toIndexedSeq

class Processor {
  val sb = new StringBuffer() // mutable!
  println("---created processor---")
  def map(s: String): Int = {
    sb.setLength(0)
    for (i <- 1 to s.length()) {
      sb.append(s.substring(0, i))
    }
    sb.toString().sum.toInt  // don't make any sense out of this
  }
}

val tl = new ThreadLocal[Processor] {
  override protected def initialValue() = new Processor
}

val parRes = words.par.map(w => tl.get.map(w)).sum
val serRes = words.map(    w => tl.get.map(w)).sum
assert(parRes == serRes)

---created processor---メッセージが証明するように、これはデフォルトで CPU コアと同じ数のスレッドを作成します。

于 2012-08-18T20:41:23.160 に答える