わかりました、実際には実装できませんでしたが、アクターは GC で停止しています。Scala 2.9.2 (REPL) + Akka 2.0.3 を使用。
EventBus
withWeakReference[ActorRef]
は役に立ちませんでした。Akka にはwith dungeon
( ChildrenContainer
)もあり、ライフサイクル イベントへのサブスクリプションself.children
も存在する可能性があるためです。Monitor
私が試していないこと - 私たちの新しい光沢だけを知っているディスパッチャーでアクターを作成することWeakEventBus
- 多分私は要点を逃したでしょうか?
REPL のコードは次のとおりです (適切なインポートで開始し:paste
、2 つの手順で行います)。
// Start REPL with something like:
// scala -Yrepl-sync -classpath "/opt/akka-2.0.3/lib/akka/akka-actor-2.0.3.jar:
// /opt/akka-2.0.3/lib/akka/akka-remote-2.0.3.jar:
// /opt/akka-2.0.3/lib/akka/config-0.3.1.jar:
// /opt/akka-2.0.3/lib/akka/protobuf-java-2.4.1.jar:
// /opt/akka-2.0.3/lib/akka/netty-3.5.3.Final.jar"
// :paste 1/2
import akka.actor._
import akka.pattern._
import akka.event._
import akka.util._
import com.typesafe.config.ConfigFactory
import akka.util.Timeout
import akka.dispatch.Await
import scala.ref.WeakReference
import java.util.Comparator
import java.util.concurrent.atomic._
import java.util.UUID
case class Message(val id:String,val timestamp: Long)
case class PostMessage(
override val id:String=UUID.randomUUID().toString(),
override val timestamp: Long=new java.util.Date().getTime(),
text:String) extends Message(id, timestamp)
case class MessageEvent(val channel:String, val message:Message)
case class StartServer(nodeName: String)
case class ServerStarted(nodeName: String, actor: ActorRef)
case class IsAlive(nodeName: String)
case class IsAliveWeak(nodeName: String)
case class AmAlive(nodeName: String, actor: ActorRef)
case class GcCheck()
case class GcCheckScheduled(isScheduled: Boolean,
gcFlag: WeakReference[AnyRef])
trait WeakLookupClassification { this: WeakEventBus ⇒
protected final val subscribers = new Index[Classifier,
WeakReference[Subscriber]](mapSize(),
new Comparator[WeakReference[Subscriber]] {
def compare(a: WeakReference[Subscriber],
b: WeakReference[Subscriber]): Int = {
if (a.get == None || b.get == None) -1
else compareSubscribers(a.get.get, b.get.get)
}
})
protected def mapSize(): Int
protected def compareSubscribers(a: Subscriber, b: Subscriber): Int
protected def classify(event: Event): Classifier
protected def publish(event: Event, subscriber: Subscriber): Unit
def subscribe(subscriber: Subscriber, to: Classifier): Boolean =
subscribers.put(to, new WeakReference(subscriber))
def unsubscribe(subscriber: Subscriber, from: Classifier): Boolean =
subscribers.remove(from, new WeakReference(subscriber))
def unsubscribe(subscriber: Subscriber): Unit =
subscribers.removeValue(new WeakReference(subscriber))
def publish(event: Event): Unit = {
val i = subscribers.valueIterator(classify(event))
while (i.hasNext) publish(event, i.next().get.get)
}
}
class WeakEventBus extends EventBus with WeakLookupClassification {
type Event = MessageEvent
type Classifier=String
type Subscriber = ActorRef
protected def compareSubscribers(a: ActorRef, b: ActorRef) = a compareTo b
protected def mapSize(): Int = 10
protected def classify(event: Event): Classifier = event.channel
protected def publish(event: Event, subscriber: Subscriber): Unit =
subscriber ! event
}
lazy val weakEventBus = new WeakEventBus
implicit val timeout = akka.util.Timeout(1000)
lazy val actorSystem = ActorSystem("serversys", ConfigFactory.parseString("""
akka {
loglevel = "DEBUG"
actor {
provider = "akka.remote.RemoteActorRefProvider"
debug {
receive = on
autoreceive = on
lifecycle = on
event-stream = on
}
}
remote {
transport = "akka.remote.netty.NettyRemoteTransport"
log-sent-messages = on
log-received-messages = on
}
}
serverconf {
include "common"
akka {
actor {
deployment {
/root {
remote = "akka://serversys@127.0.0.1:2552"
}
}
}
remote {
netty {
hostname = "127.0.0.1"
port = 2552
}
}
}
}
""").getConfig("serverconf"))
class Server extends Actor {
private[this] val scheduled = new AtomicBoolean(false)
private[this] val gcFlagRef = new AtomicReference[WeakReference[AnyRef]]()
val gcCheckPeriod = Duration(5000, "millis")
override def preRestart(reason: Throwable, message: Option[Any]) {
self ! GcCheckScheduled(scheduled.get, gcFlagRef.get)
super.preRestart(reason, message)
}
def schedule(period: Duration, who: ActorRef) =
actorSystem.scheduler.scheduleOnce(period)(who ! GcCheck)
def receive = {
case StartServer(nodeName) =>
sender ! ServerStarted(nodeName, self)
if (scheduled.compareAndSet(false, true))
schedule(gcCheckPeriod, self)
val gcFlagObj = new AnyRef()
gcFlagRef.set(new WeakReference(gcFlagObj))
weakEventBus.subscribe(self, nodeName)
actorSystem.eventStream.unsubscribe(self)
case GcCheck =>
val gcFlag = gcFlagRef.get
if (gcFlag == null) {
sys.error("gcFlag")
}
gcFlag.get match {
case Some(gcFlagObj) =>
scheduled.set(true)
schedule(gcCheckPeriod, self)
case None =>
println("Actor stopped because of GC: " + self)
context.stop(self)
}
case GcCheckScheduled(isScheduled, gcFlag) =>
if (isScheduled && scheduled.compareAndSet(false, isScheduled)) {
gcFlagRef.compareAndSet(null, gcFlag)
schedule(gcCheckPeriod, self)
}
case IsAlive(nodeName) =>
println("Im alive (default EventBus): " + nodeName)
sender ! AmAlive(nodeName, self)
case e: MessageEvent =>
println("Im alive (weak EventBus): " + e)
}
}
// :paste 2/2
class Root extends Actor {
def receive = {
case start @ StartServer(nodeName) =>
val server = context.actorOf(Props[Server], nodeName)
context.watch(server)
Await.result(server ? start, timeout.duration)
.asInstanceOf[ServerStarted] match {
case started @ ServerStarted(nodeName, _) =>
sender ! started
case _ =>
throw new RuntimeException(
"[S][FAIL] Could not start server: " + start)
}
case isAlive @ IsAlive(nodeName) =>
Await.result(context.actorFor(nodeName) ? isAlive,
timeout.duration).asInstanceOf[AmAlive] match {
case AmAlive(nodeName, _) =>
println("[S][SUCC] Server is alive : " + nodeName)
case _ =>
throw new RuntimeException("[S][FAIL] Wrong answer: " + nodeName)
}
case isAliveWeak @ IsAliveWeak(nodeName) =>
actorSystem.eventStream.publish(MessageEvent(nodeName,
PostMessage(text="isAlive-default")))
weakEventBus.publish(MessageEvent(nodeName,
PostMessage(text="isAlive-weak")))
}
}
lazy val rootActor = actorSystem.actorOf(Props[Root], "root")
object Root {
def start(nodeName: String) = {
val msg = StartServer(nodeName)
var startedActor: Option[ActorRef] = None
Await.result(rootActor ? msg, timeout.duration)
.asInstanceOf[ServerStarted] match {
case succ @ ServerStarted(nodeName, actor) =>
println("[S][SUCC] Server started: " + succ)
startedActor = Some(actor)
case _ =>
throw new RuntimeException("[S][FAIL] Could not start server: " + msg)
}
startedActor
}
def isAlive(nodeName: String) = rootActor ! IsAlive(nodeName)
def isAliveWeak(nodeName: String) = rootActor ! IsAliveWeak(nodeName)
}
////////////////
// actual test
Root.start("weak")
Thread.sleep(7000L)
System.gc()
Root.isAlive("weak")