0

コンピューター ネットワーク ソフトウェア開発クラスの宿題で、教授はローカル ホストに対して実行するポート 1 ~ 1024 用のポート スキャナーを作成するように私たちに指示しました。演習のポイントは、アクターを使用してタスク レベルの並列処理を実証することです。教授は、各ポートを順番にスキャンするコードを提供しました。これを並行して実行するバージョンを作成し、システムで使用できる各プロセッサまたはハイパー スレッドのアクターを作成します。目標は、すべてのポート 1 ~ 1024 のフル スキャンを完了する時間を取得し、パラレル スキャンの結果をシリアル スキャンの結果と比較することです。並列スキャンのコードは次のとおりです。

import java.net.Socket
import scala.actors._
import Actor._
import scala.collection.mutable.ArrayBuffer

object LowPortScanner {

  var lastPort = 0
  var openPorts = ArrayBuffer[Int]()
  var longestRunTime = 00.00
  var results = List[Tuple3[Int, Range, Double]]()

  val host = "localhost"
  val numProcs = 1 to Runtime.getRuntime().availableProcessors()
  val portsPerProc = 1024 / numProcs.size
  val caller = self

  def main(args: Array[String]): Unit = {

    //spawn an actor for each processor that scans a given port range
    numProcs.foreach { proc =>
      actor {
        val portRange: Range = (lastPort + 1) to (lastPort + portsPerProc)
        lastPort = lastPort + portsPerProc
        caller ! scan(proc, portRange)
      }
    }

    //catch results from the processor actors above
    def act {
      loop {
        reactWithin(100) {
          //update the list of results returned from scan
          case scanResult: Tuple3[Int, Range, Double] =>
            results = results ::: List(scanResult)

          //check if all results have been returned for each actor
          case TIMEOUT =>
            if (results.size == numProcs.size) wrapUp

          case _ =>
            println("got back something weird from one of the port scan actors!")
            wrapUp
        }
      }
    }

    //Attempt to open a socket on each port in the given range
    //returns a Tuple3[procID: Int, ports: Range, time: Double
    def scan(proc: Int, ports: Range) {
      val startTime = System.nanoTime()
      ports.foreach { n =>
        try {
          println("Processor " + proc + "is checking port " + n)

          val socket = new Socket(host, n)

          //println("Found open port: " + n)
          openPorts += n

          socket.close
        } catch {
          case e: Exception =>
          //println("While scanning port " + n + " caught Exception: " + e)
        }
      }
      (proc, ports, startTime - System.nanoTime())
    }

    //output results and kill the main actor
    def wrapUp {
      println("These are the open ports in the range 1-1024:")
      openPorts.foreach { port => println(port) }
      results.foreach { result => if (result._3 > longestRunTime) { longestRunTime = result._3} }
      println("Time to scan ports 1 through 1024 is: %3.3f".format(longestRunTime / 1000))
      caller ! exit
    }
  }
}

クアッド コア i7 を使用しているため、numProcs = 8 です。このハードウェア プラットフォームでは、各 proc アクターは 128 ポート (1024/8 = 128) をスキャンする必要があります。私の意図は、proc1 アクターが 0 ~ 128 をスキャンし、proc2 が 129 ~ 256 をスキャンする必要があるなどです。しかし、これは起こっていることではありません。一部のアクターは、他のアクターと同じ範囲で作業することになります。以下の出力サンプルは、この問題を示しています。

プロセッサ 2 はポート 1 をチェックしています
プロセッサ 7 はポート 385 をチェックしています
プロセッサ 1 はポート 1 をチェックしています
プロセッサ 5 はポート 1 をチェックしています
プロセッサ 4 はポート 1 をチェックしています
プロセッサ 8 はポート 129 をチェックしています
プロセッサ 3 はポート 1 をチェックしています
プロセッサ 6 はポート 257 をチェックしています
プロセッサ 1
プロセッサ 5 はポート 2 をチェックしていますプロセッサ
1 はポート 3 をチェックしています
プロセッサ 3 はポート 2 をチェックしています
プロセッサ 5 はポート 3 をチェックしています
プロセッサ 1 はポート 4 をチェックしています

編集

最終的な「作業」コード:

import java.net.Socket
import scala.actors._
import Actor._
import scala.collection.mutable.ArrayBuffer

object LowPortScanner {

  var lastPort = 0
  var openPorts = ArrayBuffer[Int]()
  var longestRunTime = 00.00
  var results = List[Tuple3[Int, Range, Double]]()

  val host = "localhost"
  val numProcs = 1 to Runtime.getRuntime().availableProcessors()
  val portsPerProc = 1024 / numProcs.size
  val caller = self
  val procPortRanges = numProcs.foldLeft(List[Tuple2[Int, Range]]()) { (portRanges, proc) =>
    val tuple2 = (proc.toInt, (lastPort + 1) to (lastPort + portsPerProc))
    lastPort += portsPerProc
    tuple2 :: portRanges
  }

  def main(args: Array[String]): Unit = {

    //spawn an actor for each processor that scans a given port range
    procPortRanges.foreach { proc =>
      actor {
        caller ! scan(proc._1, proc._2)
      }
    }

//catch results from the processor actors above
def act {
  loop {
    reactWithin(100) {
      //update the list of results returned from scan
      case scanResult: Tuple3[Int, Range, Double] =>
        results = results ::: List(scanResult)

      //check if results have been returned for each actor
      case TIMEOUT =>
        if (results.size == numProcs.size) wrapUp

      case _ =>
        println("got back something weird from one of the port scan actors!")
        wrapUp
    }
  }
}

    //Attempt to open a socket on each port in the given range
    //returns a Tuple3[procID: Int, ports: Range, time: Double
    def scan(proc: Int, ports: Range) {
      val startTime = System.nanoTime()
      ports.foreach { n =>
        try {
          println("Processor " + proc + "is checking port " + n)

          val socket = new Socket(host, n)

          //println("Found open port: " + n)
          openPorts += n

          socket.close
        } catch {
          case e: Exception =>
          //println("While scanning port " + n + " caught Exception: " + e)
        }
      }
      (proc, ports, startTime - System.nanoTime())
    }

    //output results and kill the main actor
    def wrapUp {
      println("These are the open ports in the range 1-1024:")
      openPorts.foreach { port => println(port) }
      results.foreach { result => if (result._3 > longestRunTime) { longestRunTime = result._3} }
      println("Time to scan ports 1 through 1024 is: %3.3f".format(longestRunTime / 1000))
      caller ! exit
    }
  }
}
4

1 に答える 1

3

このハードウェアプラットフォームでは、各procアクターは128ポート(1024/8 = 128)をスキャンする必要があります。

あなたが持っていることを除いて

val portsPerProc = numProcs.size / 1024

また、8/1024は0です。オフバイワンエラーが発生し、すべてのアクターがより1つ多くのポートをスキャンすることに注意しportsPerProcてください。またはのいずれlastPort to (lastPort + portsPerProc) - 1かをスキャンする必要があり(lastPort + 1) to (lastPort + portsPerProc)ます。

将来的には、別の質問がある場合は、個別に質問する必要があります:)しかし、ここでは、非常に明白な競合状態があります。すべてのアクターが実行しようとしています。

    val portRange: Range = (lastPort + 1) to (lastPort + portsPerProc)
    lastPort = lastPort + portsPerProc

同時に。たとえば、アクター1と2が、アクターが2番目の行に到達する前に、最初の行を実行するとどうなるかを考えてください。

于 2012-11-09T19:13:26.833 に答える