akka アクターを使用してロード ジェネレーター アプリに取り組んでいます。アプリは数百万のリクエストに対しては正常に動作しましたが、負荷を 1,000 万以上のリクエストに増やすか、無限ループを使用して (リクエスト数ではなく) 一定期間負荷を実行すると、アプリケーションがハングします。以下は単純化された実装であり、テスト中のコマンドを出力するだけです。また、時間が経過しても統計が記録されないか、アプリがシャットダウンしないことにも気付きました。スケジューラを使用して 30 秒ごとに統計をダンプし、2 時間後にアプリをシャットダウンします。短い間隔でテストしましたが、「stats」および「Shutdown」メッセージの処理が表示されません。
アプリケーションがハングする原因は何ですか?
import akka.actor._
import akka.util.duration._
import akka.routing.RoundRobinRouter
import com.test.redload.util.CommandGenerator
import org.apache.log4j.Logger
import akka.util.Duration
class LoadWorker extends Actor {
val log = Logger.getLogger(this.getClass().getName())
def receive = {
case "PUT" => sender ! PUT
case "GET" => sender ! GET
case "DELETE" => sender ! DELETE
case "POST" => sender ! POST
case "HEAD" => sender ! HEAD
}
def PUT():Boolean = {println("PUT");return true}
def GET():Boolean = {println("GET");return true}
def DELETE():Boolean = {println("DELETE");return true}
def POST():Boolean = {println("POST");return true}
def HEAD():Boolean = {println("HEAD");return true}
}
class LoadGenerator(nrOfWorkers:Int, noOfMessages:Int) extends Actor {
val log = Logger.getLogger(this.getClass().getName())
val start:Long = System.currentTimeMillis
var noOfMessageRcvd:Int = 0
val r = new CommandGenerator// <- is basically are list implementation that iterates and returns the next command
r.addCommand("PUT",5) r.addCommand("GET",2) r.addCommand("DELETE",2)
r.addCommand("POST",2) r.addCommand("HEAD",1) r.addCommand("LBRPOP",1)
val loadRouter = context.actorOf(Props[LoadWorker].withRouter(RoundRobinRouter(nrOfWorkers)),name ="loadRouter")
def receive = {
case "start" => {
if(noOfMessages > 1) {
for( i <- 0 until noOfMessages) loadRouter ! r.getRandomCommand()
} else {
log.info("Time bound Load run..")
//for( i <- 0 until 10000000) { //<- For any number greater than few millions that app hangs after few messages
while(true){loadRouter ! r.getRandomCommand() //<- with while loop the app hangs as soon as it begins
}
}
}
case true => {
noOfMessageRcvd +=1
if(noOfMessages == noOfMessageRcvd){
self ! "shutdown"
}
}
case "stats" => {
logStats()
}
case "shutdown" => {
logStats()
log.info("Shutting Down!")
context.system.shutdown()
}
}
def logStats(){
var duration = (System.currentTimeMillis - start)/1000
if( duration > 0) {
log.info(noOfMessageRcvd+" messages processed in "+duration +" seconds "
+ "at "+ noOfMessageRcvd/duration +" TPS" )
} else {
log.info(noOfMessageRcvd+" messages processed in less than a second ")
}
}
}
object RedLoad extends App{
val log = Logger.getLogger(this.getClass().getName())
val system = ActorSystem("LoadGeneratorApp");
// -1 is if we want to run for a period of time and > 1 the run will end after the messages are procesed
val lg = system.actorOf(Props(new LoadGenerator(100,-1)),"LG")
//Log the stats every 30 seconds
system.scheduler.schedule(0 seconds,30 seconds,lg,"stats")
//Shutdown the load run after 2 hours, if no of message is > -1 then it will shutdown after
//all messages are processed
system.scheduler.scheduleOnce(2 hours,lg,"shutdown")
lg ! "start"
log.info("Started..")
}