アクターを作成してメッセージを送信し、アクターが送信者経由でサービスに応答を返す単純なスカラ クラス CartService があります。結果アクターコード
package actors
import java.util.UUID
import akka.persistence.PersistentActor
import entites._
/**
* Created by spineor on 12/12/16.
*/
//Command to be processed by the Actors
case class AddSkuToCartCmd(cart_id :UUID, sku_id :String, price :Double)
case class RemoveSkufromCmd(cart_id :UUID, sku_id :String)
case class ChangeSkuQuantityCmd(cart_id: UUID, sku_id: String, new_quantity: Int)
case class GetCartUniqueSkuCountCmd(cart_id: UUID)
case class CheckoutCartCmd(cart_id: UUID)
//Events to be persisted in Journal
case class AddedSkutoCartEvent(cart_id :UUID, sku_id:String, price :Double)
case class RemovedSkuFromCartEvent(cart_id :UUID, sku_id :String)
case class ChangeSkuQuantityEvent(cart_id: UUID, sku_id: String, new_quantity: Int)
class CartActor extends PersistentActor{
import kafka.producerConsumer._
var cart :Cart = _
override def preStart() {
val actorName = self.path.name
println("hi2")
var cart_entries = Map[String, CartEntry]()
var cart_new = new Cart(UUID.fromString(actorName), cart_entries)
cart = cart_new
}
override def receiveRecover: Receive = {
case AddedSkutoCartEvent(cart_id , sku_id, price) =>
addSkutoCart(cart_id,sku_id, price)
case RemovedSkuFromCartEvent(cart_id , sku_id) =>
removeSkufromCart(cart_id,sku_id)
case ChangeSkuQuantityEvent(cart_id , sku_id, new_quantity) =>
changeSkuQuantity(cart_id,sku_id, new_quantity)
case CheckoutCartCmd(cart_id) =>
case GetCartUniqueSkuCountCmd(cart_id) =>
println("in recover")
}
override def receiveCommand: Receive = {
case AddSkuToCartCmd(cart_id , sku_id, price) =>
println("hi")
persist(AddedSkutoCartEvent(cart_id , sku_id, price)) (evt =>{
addSkutoCart(cart_id,sku_id, price)
})
case RemoveSkufromCmd(cart_id , sku_id) =>
persist(RemovedSkuFromCartEvent(cart_id , sku_id)) (evt =>{
removeSkufromCart(cart_id,sku_id)
})
case ChangeSkuQuantityCmd(cart_id, sku_id, new_quantity) =>
persist(ChangeSkuQuantityEvent(cart_id, sku_id, new_quantity)) (evt =>{
changeSkuQuantity(cart_id, sku_id, new_quantity)
})
case GetCartUniqueSkuCountCmd(cart_id) =>
sender ! getCartUniqueSkuCount(cart_id)
case CheckoutCartCmd(cart_id) =>
checkoutCart(cart_id)
}
override def persistenceId: String = self.path.name
def addSkutoCart(cart_id: UUID, sku_id: String, price: Double) : UUID= {
var entries = cart.cart_entries
val new_entry = CartEntry(sku_id, 1, price)
val new_entries = entries + (sku_id -> new_entry)
cart.cart_entries = new_entries
cart.cart_id
}
def changeSkuQuantity(cart_id: UUID ,sku_id: String ,new_quant: Int)= {
var entries = cart.cart_entries
if(entries.contains(sku_id)){
//If sku is already present increase quantity by 1 set price as latest received and update the entries map
var cart_entry = entries.get(sku_id).get
var new_quantity = new_quant
var new_price = cart_entry.price
val new_entry = CartEntry(sku_id, new_quantity, new_price)
entries + (sku_id -> new_entry)
}
else{
//TODO : Throw Exception , item to be updated is not present in cart
}
/*for ((k,v) <- entries) printf("key: %s, value: %s\n", k, v)*/
}
def removeSkufromCart(cart_id: UUID, sku_id: String) = {
var entries = cart.cart_entries
var new_entries = entries - sku_id
cart.cart_entries = new_entries
}
def getCartUniqueSkuCount(cart_id: UUID): Int = {
val count = cart.cart_entries.size
count
}
def checkoutCart(cart_id: UUID) = {
val cartProducer = Producer[Cart]("Cart-Update-Topic")
cartProducer.send(cart)
//Consumer Test Code
/*val consumer = SingleTopicConsumer("Cart-Update-Topic")
var entries = cart.cart_entries
var cart3 = new Cart(cart.cart_id,entries)
cartProducer.send(cart3)
entries += "ABC" -> new CartEntry("ABC",12,213)
val kafkaCart = consumer.read()*/
}
}
およびサービスコード
package services
import java.util.UUID
import actors._
import akka.actor.{ActorRef, ActorSystem, Props}
import akka.pattern.ask
import akka.util.Timeout
import scala.concurrent.Await
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
import scala.util.{Failure, Success}
import scala.concurrent.duration.Duration
/**
* Created by spineor on 9/12/16.
*/
class CartService {
var system = ActorSystem("System")
def addSkutoCart(cart_id :UUID, sku_id :String, price :Double) :String = {
//get Actor for this cart_id
//implicit val timeout: Timeout = 20 seconds
val cart_actor = routeMessagesToCart(cart_id)
cart_actor ! AddSkuToCartCmd(cart_id, sku_id ,price)
cart_actor.path.name
}
def removeSkuFromCart(cart_id :UUID, sku_id :String) = {
//get Actor for this cart_id
val cart_actor = routeMessagesToCart(cart_id)
cart_actor ! RemoveSkufromCmd(cart_id, sku_id )
}
def changeSkuQuantity(cart_id: UUID,sku_id: String ,new_quantity: Int) = {
val cart_actor = routeMessagesToCart(cart_id)
cart_actor ! ChangeSkuQuantityCmd(cart_id, sku_id, new_quantity)
}
def checkoutCart(cart_id: UUID) ={
val cart_actor = routeMessagesToCart(cart_id)
cart_actor ! CheckoutCartCmd(cart_id)
}
/*This is cental method to locate cart Actors in the node
in case a actor is not present or has terminated ,it will be created
In case of a terminated cart actor ,its previous events will be replayed
from the journal first and then the new message is applied
The Cart actor name and its persistence id both are same as the Cart's unique UUID*/
def routeMessagesToCart(cart_id: UUID): ActorRef = {
var cartActor :ActorRef = null
if(null == cart_id){
//No cart id in request create a new Cart Actor for the request
val new_cart_id = createNewCartId()
cartActor = system.actorOf(Props[CartActor],new_cart_id.toString)
cartActor
}
//Find if actor for the cart already exists,or is Terminated
else{
implicit val timeout = Timeout(5 seconds)
val name = "/user/" + cart_id.toString
val future = system.actorSelection("/user/" + cart_id.toString).resolveOne()
future onComplete {
case Success(v) => { cartActor = v
}
case Failure(e) => {
//The cart actor no longer exists and must be recreated
cartActor = system.actorOf(Props[CartActor],cart_id.toString)
}
}
//Sleep for Actor to be created
Thread.sleep(500)
cartActor
}
}
/*
This method creates a new random UUID which is used to uniquely
identify the cart and thus cart actors
*/
def createNewCartId() :UUID = {
val cart_id = UUID.randomUUID()
cart_id
}
def getCartUniqueSkuCount(cart_id: UUID) ={
val cart_actor = routeMessagesToCart(cart_id)
implicit val timeout = Timeout(15 seconds)
val count = cart_actor ? GetCartUniqueSkuCountCmd(cart_id)
val result = Await.result(count, 15 second)
result
/*count onComplete {
case Success(x) => println("Unique Items in cart : " + x)
case Failure(t) => println("An error has occured: " + t.getMessage)
}*/
}
}
現在、サービス オブジェクトを作成し、アクターにメッセージを送信するサービス メソッドを呼び出すサービスのテスト ケースを作成しています。
val service = new CartService()
val new_cart_id = UUID.fromString(service.addSkutoCart(null, "12", 12))
val count = service.getCartUniqueSkuCount(new_cart_id)
しかし、今回は待機がタイムアウトし、メッセージが表示されます
[INFO] [12/28/2016 16:07:10.441] [System-akka.actor.default-dispatcher-7] [akka://System/user/fd38f1b1-6f03-4997-8625-0ce7e0ef2626] メッセージ [actors Actor[akka://System/temp/$a] から Actor[akka://System/user/fd38f1b1-6f03-4997-8625-0ce7e0ef2626#279696660] への .GetCartUniqueSkuCountCmd] が配信されませんでした。[2] デッドレターが発生しました。このロギングは、構成設定 'akka.log-dead-letters' および 'akka.log-dead-letters-during-shutdown' でオフにするか、調整することができます。
いくつかのグーグルから、送信者が閉鎖されていることに関係があることは理解していますが、これを修正することはできません。どんな入力でも大歓迎です。
スタックトレースは次のとおりです。
java.util.concurrent.TimeoutException: scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) で scala.concurrent.impl.Promise$DefaultPromise.result(Promise. scala:223) で scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190) で
scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) で scala.concurrent.Await$.result(package.scala:190) で services.CartService.getCartUniqueSkuCount(CartService.scala:103) で CartActorSpec$$ anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(CartActorSpec.scala:48) at CartActorSpec$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(CartActorSpec.scala:44) CartActorSpec$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(CartActorSpec.scala:44) で org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(Transformer.scala:22) でorg.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85) で org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104) で org.scalatest.Transformer.apply(Transformer.scala:22) で org org.scalatest の .scalatest.Transformer.apply(Transformer.scala:20)。WordSpecLike$$anon$1.apply(WordSpecLike.scala:953) at org.scalatest.Suite$class.withFixture(Suite.scala:1122) at CartActorSpec.withFixture(CartActorSpec.scala:19) at org.scalatest.WordSpecLike$class .invokeWithFixture$1(WordSpecLike.scala:950) at org.scalatest.WordSpecLike$$anonfun$runTest$1.apply(WordSpecLike.scala:962) at org.scalatest.WordSpecLike$$anonfun$runTest$1.apply(WordSpecLike.scala: 962) org.scalatest.SuperEngine.runTestImpl(Engine.scala:306) で org.scalatest.WordSpecLike$class.runTest(WordSpecLike.scala:962) で CartActorSpec.runTest(CartActorSpec.scala:19) で org.scalatest. WordSpecLike$$anonfun$runTests$1.apply(WordSpecLike.scala:1021) at org.scalatest.WordSpecLike$$anonfun$runTests$1.apply(WordSpecLike.scala:1021) at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1 .apply(Engine.scala:413) at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:401) scala.collection.immutable.List.foreach(List.scala:381) at org. scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401) at org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:390) at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply (Engine.scala:427) org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:401) で scala.collection.immutable.List.foreach(List.scala:381) で org.scalatest .SuperEngine.traverseSubNodes$1(Engine.scala:401) at org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:396) at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:483) org.scalatest.WordSpecLike$class.runTests(WordSpecLike.scala:1021) at CartActorSpec.runTests(CartActorSpec.scala:19) at org.scalatest.Suite$class.run(Suite.scala:1424) at CartActorSpec.org$scalatest$WordSpecLike$$super$run( CartActorSpec.scala:19) org.scalatest.WordSpecLike$$anonfun$run$1.apply(WordSpecLike.scala:1067) org.scalatest.WordSpecLike$$anonfun$run$1.apply(WordSpecLike.scala:1067) で.scalatest.SuperEngine.runImpl(Engine.scala:545) at org.scalatest.WordSpecLike$class.run(WordSpecLike.scala:1067) at CartActorSpec.org$scalatest$BeforeAndAfterAll$$super$run(CartActorSpec.scala:19) org.scalatest.BeforeAndAfterAll$class.liftedTree1$1(BeforeAndAfterAll.scala:257) で org.scalatest.BeforeAndAfterAll$class.run(BeforeAndAfterAll.scala:256) で CartActorSpec.run(CartActorSpec.scala:19)1021) CartActorSpec.runTests(CartActorSpec.scala:19) で org.scalatest.Suite$class.run(Suite.scala:1424) で CartActorSpec.org$scalatest$WordSpecLike$$super$run(CartActorSpec.scala:19) でorg.scalatest.WordSpecLike$$anonfun$run$1.apply(WordSpecLike.scala:1067) で org.scalatest.WordSpecLike$$anonfun$run$1.apply(WordSpecLike.scala:1067) で org.scalatest.SuperEngine.runImpl (Engine.scala:545) at org.scalatest.WordSpecLike$class.run(WordSpecLike.scala:1067) at CartActorSpec.org$scalatest$BeforeAndAfterAll$$super$run(CartActorSpec.scala:19) at org.scalatest.BeforeAndAfterAll $class.liftedTree1$1(BeforeAndAfterAll.scala:257) at org.scalatest.BeforeAndAfterAll$class.run(BeforeAndAfterAll.scala:256) at CartActorSpec.run(CartActorSpec.scala:19)1021) CartActorSpec.runTests(CartActorSpec.scala:19) で org.scalatest.Suite$class.run(Suite.scala:1424) で CartActorSpec.org$scalatest$WordSpecLike$$super$run(CartActorSpec.scala:19) でorg.scalatest.WordSpecLike$$anonfun$run$1.apply(WordSpecLike.scala:1067) で org.scalatest.WordSpecLike$$anonfun$run$1.apply(WordSpecLike.scala:1067) で org.scalatest.SuperEngine.runImpl (Engine.scala:545) at org.scalatest.WordSpecLike$class.run(WordSpecLike.scala:1067) at CartActorSpec.org$scalatest$BeforeAndAfterAll$$super$run(CartActorSpec.scala:19) at org.scalatest.BeforeAndAfterAll $class.liftedTree1$1(BeforeAndAfterAll.scala:257) at org.scalatest.BeforeAndAfterAll$class.run(BeforeAndAfterAll.scala:256) at CartActorSpec.run(CartActorSpec.scala:19)scalatest.Suite$class.run(Suite.scala:1424) at CartActorSpec.org$scalatest$WordSpecLike$$super$run(CartActorSpec.scala:19) at org.scalatest.WordSpecLike$$anonfun$run$1.apply(WordSpecLike) .scala:1067) org.scalatest.WordSpecLike$$anonfun$run$1.apply(WordSpecLike.scala:1067) at org.scalatest.SuperEngine.runImpl(Engine.scala:545) at org.scalatest.WordSpecLike$class. run(WordSpecLike.scala:1067) at CartActorSpec.org$scalatest$BeforeAndAfterAll$$super$run(CartActorSpec.scala:19) at org.scalatest.BeforeAndAfterAll$class.liftedTree1$1(BeforeAndAfterAll.scala:257) at org.scalatest .BeforeAndAfterAll$class.run(BeforeAndAfterAll.scala:256) at CartActorSpec.run(CartActorSpec.scala:19)scalatest.Suite$class.run(Suite.scala:1424) at CartActorSpec.org$scalatest$WordSpecLike$$super$run(CartActorSpec.scala:19) at org.scalatest.WordSpecLike$$anonfun$run$1.apply(WordSpecLike) .scala:1067) org.scalatest.WordSpecLike$$anonfun$run$1.apply(WordSpecLike.scala:1067) at org.scalatest.SuperEngine.runImpl(Engine.scala:545) at org.scalatest.WordSpecLike$class. run(WordSpecLike.scala:1067) at CartActorSpec.org$scalatest$BeforeAndAfterAll$$super$run(CartActorSpec.scala:19) at org.scalatest.BeforeAndAfterAll$class.liftedTree1$1(BeforeAndAfterAll.scala:257) at org.scalatest .BeforeAndAfterAll$class.run(BeforeAndAfterAll.scala:256) at CartActorSpec.run(CartActorSpec.scala:19)19) org.scalatest.WordSpecLike$$anonfun$run$1.apply(WordSpecLike.scala:1067) で org.scalatest.WordSpecLike$$anonfun$run$1.apply(WordSpecLike.scala:1067) で org.scalatest.SuperEngine .runImpl(Engine.scala:545) at org.scalatest.WordSpecLike$class.run(WordSpecLike.scala:1067) at CartActorSpec.org$scalatest$BeforeAndAfterAll$$super$run(CartActorSpec.scala:19) at org.scalatest .BeforeAndAfterAll$class.liftedTree1$1(BeforeAndAfterAll.scala:257) at org.scalatest.BeforeAndAfterAll$class.run(BeforeAndAfterAll.scala:256) at CartActorSpec.run(CartActorSpec.scala:19)19) org.scalatest.WordSpecLike$$anonfun$run$1.apply(WordSpecLike.scala:1067) で org.scalatest.WordSpecLike$$anonfun$run$1.apply(WordSpecLike.scala:1067) で org.scalatest.SuperEngine .runImpl(Engine.scala:545) at org.scalatest.WordSpecLike$class.run(WordSpecLike.scala:1067) at CartActorSpec.org$scalatest$BeforeAndAfterAll$$super$run(CartActorSpec.scala:19) at org.scalatest .BeforeAndAfterAll$class.liftedTree1$1(BeforeAndAfterAll.scala:257) at org.scalatest.BeforeAndAfterAll$class.run(BeforeAndAfterAll.scala:256) at CartActorSpec.run(CartActorSpec.scala:19)run(WordSpecLike.scala:1067) at CartActorSpec.org$scalatest$BeforeAndAfterAll$$super$run(CartActorSpec.scala:19) at org.scalatest.BeforeAndAfterAll$class.liftedTree1$1(BeforeAndAfterAll.scala:257) at org.scalatest .BeforeAndAfterAll$class.run(BeforeAndAfterAll.scala:256) at CartActorSpec.run(CartActorSpec.scala:19)run(WordSpecLike.scala:1067) at CartActorSpec.org$scalatest$BeforeAndAfterAll$$super$run(CartActorSpec.scala:19) at org.scalatest.BeforeAndAfterAll$class.liftedTree1$1(BeforeAndAfterAll.scala:257) at org.scalatest .BeforeAndAfterAll$class.run(BeforeAndAfterAll.scala:256) at CartActorSpec.run(CartActorSpec.scala:19)
PSテストクラスにImplictSender特性があります
class CartActorSpec extends TestKit(ActorSystem("CartActorSpec"))
with WordSpecLike
with Matchers
with BeforeAndAfterAll
with ImplicitSender {...}