私の同僚と私は、DistributedPubSubMediator が直接またはプロキシ アクターを介してサブスクライブ/サブスクライブ解除するためのさまざまな動作に困惑しています。以下に異なる結果を示すテストをまとめました。
私たちの理解では、ActorRef.forward は元の送信者を渡す必要があるため、メッセージがメディエーターに直接送信されるか、プロキシ アクター経由で送信されるかは問題ではありません。すなわち。http://www.scala-lang.org/api/current/index.html#scala.actors.ActorRef。
回避するには、DIStributedPubSubMediator クラスを拡張し、DistributedPubSubMediator オブジェクトが既に提供しているロジックを含める必要があります。理想的には、オブジェクトを直接使用してコードを元に戻すことをお勧めします。
これはバグのようです。この異常な動作の根本的な理由を知っている人はいますか? 助けてください...
[2013 年 10 月 22 日] Roland の回答 (ありがとう) に基づいてテストを更新し、SubscriberAck と UnsubscribeAck に expectMsgType を追加しました。これで SubscribeAck を受け取りましたが、不思議なことに UnSubscribeAck を受け取っていません。これは大きな問題ではありませんが、その理由を知りたいです。
もう 1 つの質問として、同じ ActorSystem で実行されているプロキシ アクターを介してリモート アクターを DistributedPubSubMediator にサブスクライブすることは良い方法でしょうか?
現時点では、次のものがあります。
- サブスクライブするアプリは、公開するアプリを (非 Akka 方式で) 検出し、クラスター アドレスを取得します。
- リモート サブスクライバーは、このアドレスと既知のプロキシ アクターのパスを使用して、Identity 要求を送信します。
- リモート サブスクライバーは ActorIdentity 応答を取得し、この (リモート) プロキシを介してサブスクライブ/サブスクライブ解除します。
- パブリッシャー アプリでは、サブスクライブ/サブスクライブ解除メッセージが DistributedPubSubMediator に転送され、後続のビジネス メッセージのパブリッシュに使用されます。
パブリッシャー側でフェールオーバーを処理する必要があるため、Akka Reactor pubsub チャット クライアントの例 (つまり、DistributedPubSubMediator のみを使用してパブリッシュする) に従ってクラスターに参加していません。
[2013 年 11 月 5 日] 送信メッセージのテストを追加しました。うまくいかないようで、まだわかりません。
package star.common.pubsub
import org.scalatest.{BeforeAndAfterAll, FunSuite}
import org.junit.runner.RunWith
import akka.contrib.pattern.DistributedPubSubExtension
import akka.contrib.pattern.DistributedPubSubMediator._
import akka.testkit.TestKit
import akka.actor.{Actor, ActorSystem, ActorRef, Props}
import scala.concurrent.duration._
import com.typesafe.config.ConfigFactory
object MediatorTest {
val config = ConfigFactory.parseString(s"""
akka.actor.provider="akka.cluster.ClusterActorRefProvider"
akka.remote.netty.tcp.port=0
akka.extensions = ["akka.contrib.pattern.DistributedPubSubExtension"]
""")
}
@RunWith(classOf[org.scalatest.junit.JUnitRunner])
class MediatorTest extends TestKit(ActorSystem("test", MediatorTest.config)) with FunSuite {
val mediator = DistributedPubSubExtension(system).mediator
val topic = "example"
val message = "Published Message"
// val joinAddress = Cluster(system).selfAddress
// Cluster(system).join(joinAddress)
test("Direct subscribe to mediator") {
mediator.!(Subscribe(topic, testActor))(testActor)
expectMsgType[SubscribeAck](5 seconds)
mediator.!(Publish(topic, message))(testActor)
expectMsg(2 seconds, message)
mediator.!(Unsubscribe(topic, testActor))(testActor)
expectMsgType[UnsubscribeAck](5 seconds)
mediator ! Publish(topic, message)
expectNoMsg(2 seconds)
}
test("Subscribe to mediator via proxy") {
class Proxy extends Actor {
override def receive = {
case subscribe: Subscribe =>
mediator forward subscribe
case unsubscribe: Unsubscribe =>
mediator forward unsubscribe
case publish: Publish =>
mediator.!(publish)
}
}
val proxy = system.actorOf(Props(new Proxy), "proxy")
proxy.!(Subscribe(topic,testActor))(testActor)
expectMsgType[SubscribeAck](2 seconds)
proxy ! Publish(topic, message)
expectMsg(5 seconds, message)
proxy.!(Unsubscribe(topic,testActor))(testActor)
expectMsgType[UnsubscribeAck](5 seconds)
proxy ! Publish(topic, message)
expectNoMsg(5 seconds)
}
test("Send message to address") {
val testActorAddress = testActor.path.toString
// val system2 = ActorSystem("test", MediatorTest.config)
// Cluster(system2).join(joinAddress)
mediator.!(Subscribe(topic, testActor))(testActor)
expectMsgType[SubscribeAck](5 seconds)
println(testActorAddress) // akka://test/system/testActor1
mediator.!(Publish(topic, message))(testActor)
expectMsg(2 seconds, message)
mediator ! Send(testActorAddress, message, false)
expectMsg(5 seconds, message)
}
}