私はJavaを使用したPlayFramework2システムに取り組んでいます。PlayFrameworkは、PlayFrameworkと統合されたakkaシステムを使用してリモートのakkaシステムに接続します。リモートakkaシステムは、マスターノードとワーカーノードで構成されています。両方のシステムは、Eclipse juno IDEを備えた同じコンピューター上にあります。マスターノード用に2つのポート2552を構成し、ワーカーノード用にポート2553を構成しました。play2フレームワークのakkaノードはシステム自体によって選択されます。Play Frameworkのakkaシステムは、akka構成を使用したリモートルックアップによって、メッセージをリモートマスターノードに渡すことが期待されています。マスターノードのインターンは、リモートルックアップによる処理のためにメッセージをリモートワーカーにも渡します。マスターノードとワーカーノードには、次の形式のapplication.confファイルがあります。
src/main/resources/application.conf
ただし、起動時に、マスターノードとワーカーノードの両方が通信にポート番号2552を使用することを決定します。以下にコードスニペットを示します。
これは、playframewrokapplication.configファイルのコードです。
localNode {
akka {
actor {
provider = "akka.remote.RemoteActorRefProvider"
}
remote {
transport = "akka.remote.netty.NettyRemoteTransport"
netty {
hostname = "127.0.0.1"
port = 0
}
}
}
}
これは、playlocalNodeの構成です
package controllers;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Map;
import org.codehaus.jackson.JsonNode;
import org.codehaus.jackson.JsonParseException;
import org.codehaus.jackson.map.JsonMappingException;
import org.codehaus.jackson.map.ObjectMapper;
import play.libs.F.Callback;
import play.mvc.WebSocket;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.serialization.Serialization;
import akka.serialization.SerializationExtension;
import com.typesafe.config.ConfigFactory;
import Com.RubineEngine.GesturePoints.*;
public class LocalNode {
ActorSystem csystem;
ActorRef localActor ;
public LocalNode() {
//We create the actor container and a child upon initialization
csystem = ActorSystem.create("LocalNode", ConfigFactory.load().getConfig("localNode"));
localActor = csystem.actorOf(new Props(LocalActor.class),"localActor");
}
public void connectMaster (final String classname)
{
localActor.tell(classname);
}
public void connectMaster ()
{
}
public void connectMaster (final WebSocket.In<JsonNode> in, final WebSocket.Out<JsonNode> out )
{
in.onMessage(new Callback<JsonNode>() {
public void invoke(JsonNode event) throws JsonParseException, JsonMappingException, IOException {
ObjectMapper mapper = new ObjectMapper();
@SuppressWarnings("unchecked")
Map<String,ArrayList<Object>> jsonMap = mapper.readValue(event, Map.class);
GesturePoints gp = new GesturePoints();
gp.setPoints(jsonMap);
localActor.tell(gp);
}
}); }
}
これは、PlayFrameworkのakkaアクターのコードです。
package controllers;
import Com.RubineEngine.GesturePoints.*;
import akka.actor.ActorRef;
import akka.actor.UntypedActor;
import akka.event.Logging;
import akka.event.LoggingAdapter;
public class LocalActor extends UntypedActor {
/**
*
*/
ActorRef masterActor; // = getContext().actorFor("akka://MasterNode@127.0.0.1:2552/user/masterActor");
LoggingAdapter log = Logging.getLogger(getContext().system(), this);
@Override
public void onReceive(Object arg) throws Exception {
System.out.println(" Local Actor 1");
if(arg instanceof GesturePoints)
{ System.out.println(" local Actor 2");
masterActor.tell(" Welcome home " , getSelf());
System.out.println(" Local Actor 3");}
else
{unhandled(arg);}
}
public void preStart()
{
masterActor = getContext().actorFor("akka://MasterNode@127.0.0.1:2553/user/masterActor");
}
}
これはマスターノードapplication.confのコードです
masterNode {
akka {
actor {
provider = "akka.remote.RemoteActorRefProvider"
}
remote {
transport = "akka.remote.netty.NettyRemoteTransport"
netty {
hostname = "127.0.0.1"
remote.netty.port = 2553
}
}
}
}
これはマスターノードのコードです
package Rubine_Cluster;
import java.util.Arrays;
import com.typesafe.config.ConfigFactory;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.kernel.Bootable;
/**
* Hello world!
*
*/
public class MasterNode implements Bootable
{
final ActorSystem system;
ActorRef masterActor;
public MasterNode() {
//Create a child actor of this actor upon initialization
system = ActorSystem.create("MasterNode", ConfigFactory.load()
.getConfig("masterNode"));
masterActor = system.actorOf(new Props(MasterActor.class),"masterActor");
}
public void startup() {
}
public void shutdown() {
system.shutdown();
}
}
これはマスターアッカ俳優のコードです
package Rubine_Cluster;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Map;
import Com.RubineEngine.GesturePoints.*;
import akka.actor.*;
import akka.serialization.Serialization;
import akka.serialization.SerializationExtension;
import akka.serialization.Serializer;
public class MasterActor extends UntypedActor {
/**
*
*/
ActorRef worker1;
@Override
public void onReceive(Object message) throws Exception {
System.out.println(" Master Actor 5");
System.out.println(message);
if(message instanceof GesturePoints)
{ //GesturePoints gp = (GesturePoints) message;
System.out.println(" Master Actor 1");
try { worker1.tell(message, getSelf());
System.out.println(" Master Actor 2");
} catch (Exception e) {
getSender().tell(new akka.actor.Status.Failure(e), getSelf());
throw e;
}
}
else{ unhandled(message);}
}
public void preStart()
{
worker1 = getContext().actorFor("akka://WorkerNode@127.0.0.1:2552/user/workerActor");
}
}
これは、ワーカーノードapplication.confのコードです。
workerNode {
akka {
actor {
provider = "akka.remote.RemoteActorRefProvider"
}
remote {
transport = "akka.remote.netty.NettyRemoteTransport"
netty {
hostname = "127.0.0.1"
remote.netty.port = 2552
}
}
}
}
これはworkernideのコードです
package com.theta.gesture;
import com.typesafe.config.ConfigFactory;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.kernel.Bootable;
public class WorkerNode implements Bootable{
ActorSystem system;
ActorRef worker;
WorkerNode(){
system = ActorSystem.create("WorkerNode", ConfigFactory.load()
.getConfig("workerNode"));
ActorRef workerActor = system.actorOf(new Props(WorkerActor.class),"workerActor");
}
public void shutdown() {
system.shutdown();
}
public void startup() {
}
}
これは、ワーカープロジェクトのアクターのコードです
package com.theta.gesture;
import java.util.ArrayList;
import java.util.Map;
import Com.RubineEngine.GesturePoints.GesturePoints;
import akka.actor.*;
public class WorkerActor extends UntypedActor {
private static double DIST_SQ_THRESHOLD = 3 * 3; /* threshold to eliminate mouse jitter */
@Override
public void onReceive(Object msg) throws Exception {
if(msg instanceof GesturePoints)
{ GesturePoints message = (GesturePoints) msg;
initial_Theta(message);}
else {unhandled(msg);}
}
public void initial_Theta(GesturePoints p)
{ System.out.println(" Worker Actor 1");
if(p.getPoints().get("X").size() < 3) //The number of x coordinates as size
{ return;}
System.out.println(" Worker Actor 2");
double magsq,dx,dy, recip;
dx = (double) ((Integer)p.getPoints().get("x").get(2) - (Integer)p.getPoints().get("x").get(0)) ;
dy = ((Double)p.getPoints().get("y").get(2)) - ((Double)p.getPoints().get("y").get(0));
magsq = dx * dx + dy * dy;
if(magsq > DIST_SQ_THRESHOLD)
{
recip = 1/Math.sqrt(magsq);
double initial_cos = dx * recip;
System.out.println(" Worker Actor 3");
double initial_sin = dy * recip;
System.out.println("Feature cos " + initial_cos);
System.out.println("Gesture sin " + initial_sin);
}
} }
これは、ワーカーノードのコンソール情報です
[INFO] [10/08/2012 12:12:44.486] [main] [ActorSystem(WorkerNode)] REMOTE:
RemoteServerStarted@akka://WorkerNode@127.0.0.1:2552
これはマスターノードのコンソール情報です
[INFO] [10/08/2012 12:13:34.633] [main] [ActorSystem(MasterNode)] REMOTE:
RemoteServerStarted@akka://MasterNode@127.0.0.1:2552
この状況の可能なコースについてのアイデアと提案された解決策が切に求められています