0

Web サイトのロード ジェネレーターであるシステムをモデル化しようとしています。ウェブサイトには、今のところ文字列のページがあります。

type Page = String

val pages: Vector[Page] = Stream.eval(Task.delay {
  new Random().alphanumeric.take(10).mkString
}).repeat.take(100).runLog.unsafeRun

Web サイトには、ランダムな時間に Web サイトの閲覧を開始するようにモデル化されたユーザーがいます。

case class User(userName: String)

def newUser: Task[User] = Task.delay {
  val random = new Random
  val userName = random.alphanumeric.take(5).mkString
  val sleepTime = random.nextInt(10000)
  println(s"${new Date}: ${Thread.currentThread.getName} Generating user ${userName} after ${sleepTime}ms")
  Thread.sleep(sleepTime)
  User(userName)
}

各ユーザーはページからページへ遷移し、次のようにモデル化されます。

case class Transition(from: Page, to: Page, thinkTime: Int)

def newTransitions(user: User): Stream[Task, Transition] = {
  val init = Transition(pages(0), pages(1), 100)
  Stream.iterateEval(init) { t => nextTransition(user)(t) }
}

def nextTransition(user: User)(curr: Transition): Task[Transition] = Task.delay {
  val random = new Random
  val from: Page = curr.to
  val to: Page = pages(random.nextInt(pages.size))
  val thinkTime: Int = random.nextInt(10000)
  Thread.sleep(thinkTime)
  println(s"${new Date}: ${Thread.currentThread.getName} ${user} transitioning from ${from} to ${to} after thinking for ${thinkTime}ms")
  Transition(from, to, thinkTime)
}

このシミュレーションを実行しようとしています。4 人のユーザーがランダムな間隔でこの Web サイトの閲覧を開始し、各ユーザーがランダムに 3 つのページに移動します。

implicit val S = fs2.Strategy.fromFixedDaemonPool(8, threadName = "worker")
val users: Stream[Task, User] = Stream.eval(newUser).repeat
val transitions: Stream[Task, Stream[Task, Transition]] = users.map(newTransitions(_).take(3))

transitions.take(4).runLog.async.unsafeRun.map { _.runLog.async.unsafeRun }

出力は次のようになります。

Sun Oct 16 12:58:04 PDT 2016: worker-1 Generating user 97qkE after 3700ms
Sun Oct 16 12:58:08 PDT 2016: worker-1 Generating user sMIJs after 2074ms
Sun Oct 16 12:58:10 PDT 2016: worker-1 Generating user CpWWf after 1334ms
Sun Oct 16 12:58:11 PDT 2016: worker-1 Generating user xlQmj after 8825ms
Sun Oct 16 12:58:28 PDT 2016: worker-7 User(97qkE) transitioning from sLBbTbXKhO to 7ROMneDkzl after thinking for 8040ms
Sun Oct 16 12:58:34 PDT 2016: worker-7 User(97qkE) transitioning from 7ROMneDkzl to AK66SuDENi after thinking for 6227ms
Sun Oct 16 12:58:41 PDT 2016: worker-6 User(sMIJs) transitioning from sLBbTbXKhO to U2Qu8Th1oH after thinking for 6599ms
Sun Oct 16 12:58:46 PDT 2016: worker-6 User(sMIJs) transitioning from U2Qu8Th1oH to 3bRBWYUeS0 after thinking for 4920ms
Sun Oct 16 12:58:52 PDT 2016: worker-4 User(CpWWf) transitioning from sLBbTbXKhO to ZVwRydhPe9 after thinking for 6395ms
Sun Oct 16 12:58:54 PDT 2016: worker-4 User(CpWWf) transitioning from ZVwRydhPe9 to yHtly5IrMC after thinking for 1963ms
Sun Oct 16 12:58:57 PDT 2016: worker-2 User(xlQmj) transitioning from sLBbTbXKhO to 6jn7TmxW0O after thinking for 2565ms
Sun Oct 16 12:59:05 PDT 2016: worker-2 User(xlQmj) transitioning from 6jn7TmxW0O to FFROVBvpHJ after thinking for 8535ms

つまり、通貨はなく、すべてのランダムスリープは厳密な順序で発生します。

これらの異なるストリーム (新しいユーザーが Web サイトを閲覧し始め、各ユーザーが行った遷移) が同時に発生するように、このプログラムを変更するにはどうすればよいですか?


ここに私の完全なプログラムがあります:

import fs2._
import scala.util.Random
import java.util.Date

object Question {
  type Page = String

  val pages: Vector[Page] = Stream.eval(Task.delay {
    new Random().alphanumeric.take(10).mkString
  }).repeat.take(100).runLog.unsafeRun

  case class User(userName: String)

  case class Transition(from: Page, to: Page, thinkTime: Int)

  def newUser: Task[User] = Task.delay {
    val random = new Random
    val userName = random.alphanumeric.take(5).mkString
    val sleepTime = random.nextInt(10000)
    println(s"${new Date}: ${Thread.currentThread.getName} Generating user ${userName} after ${sleepTime}ms")
    Thread.sleep(sleepTime)
    User(userName)
  }

  def nextTransition(user: User)(curr: Transition): Task[Transition] = Task.delay {
    val random = new Random
    val from: Page = curr.to
    val to: Page = pages(random.nextInt(pages.size))
    val thinkTime: Int = random.nextInt(10000)
    Thread.sleep(thinkTime)
    println(s"${new Date}: ${Thread.currentThread.getName} ${user} transitioning from ${from} to ${to} after thinking for ${thinkTime}ms")
    Transition(from, to, thinkTime)
  }

  def newTransitions(user: User): Stream[Task, Transition] = {
    val init = Transition(pages(0), pages(1), 100)
    Stream.iterateEval(init) { t => nextTransition(user)(t) }
  }

  def main(args: Array[String]) {
    implicit val S = fs2.Strategy.fromFixedDaemonPool(8, threadName = "worker")
    val users: Stream[Task, User] = Stream.eval(newUser).repeat
    val transitions: Stream[Task, Stream[Task, Transition]] = users.map(newTransitions(_).take(3))

    transitions.take(4).runLog.async.unsafeRun.map { _.runLog.async.unsafeRun }
  }
}
4

1 に答える 1

3

まず、あなたobject Questionの にタイプミスがあるようです。切り替えることを忘れないでください:

Thread.sleep(sleepTime)
println(s"${new Date}: ${Thread.currentThread.getName} Generating user ${userName} after ${sleepTime}ms")

次に、ワーカー スレッドで実行したいので、Task { ... }代わりに使用します。Task.delay

最後に、メイン関数の最後の行は次のようになります。

fs2.concurrent.join(maxOpen = 3)(transitions).run.unsafeRun

また、これらは単なるプレースホルダーであると推測していThread.sleepます。実際のコードではそれらを使用したくないでしょう。また、printlnはデバッグには問題ありませんが、ファイナライズされたら、ロギングを別のストリーム構造として移動することをお勧めします。

于 2016-10-17T08:59:26.763 に答える