** 元の実装にはいくつかの誤った仮定が含まれているため、最初に更新を参照してください
裏話
プロセスをフォークしなければならないという問題があります。その理由は、jni とシングル スレッドの R プロセスを使用しているためです。また、メモリと CPU を監視する方法が必要です。フォークが唯一の本当の解決策のようです。プロセスごとに複数の R 呼び出しを実装することはできません。私は間違いなくこの制限を回避しようとしましたが、rinside セットアップ メソッドのために不可能であると確信しています。
現在の実装
現在、プロセスを fork して rmi 接続を接続し、これらをスタックプールに保存しようとしています。問題は、registry.bind() メソッドがブロックされていないことです。メイン プロセスでレジストリにバインドすると、プロセスはブロックされ、リモート メソッド呼び出しを待機しますが、RunTime.getRuntime().exec() から開始すると、プロセスはブロックされずに終了します。これにより、エンドポイント デーモンが閉じられ、デーモンと通信しようとするとソケット エラーが発生します。フォークされたプロセスの起動時に例外などを受け取ることができるようにするためだけに、gfork ライブラリを使用してプロセスをフォークしています。
public class JRIDaemon implements IROperationRemoteProvider, Serializable, Runnable {
/**
* Serialization Id
*/
private static final long serialVersionUID = 2279972098306474322L;
/**
* Daemon logger
*/
private static final Logger logger = Logger.getLogger(JRIDaemon.class.getName());
/**
* This is the exeuctor service used to execute our job, the option for
* newSingleThreadExecutor is important because R is single threaded and JRI
* puts check in and will kill us if the thread is manipulated.
*/
private static ExecutorService executorService = Executors.newSingleThreadExecutor();
/**
* This implemenation uses the exeuctor service to run the analytics
* operation. The executor service is used because R is single threaded and
* cannot be called from outside.
*/
private JRIExecutionTask callableOperation;
/**
* This is the unique id that can to fetch this daemon.
*/
private final String daemonId;
private JRIDaemon() {
this(UUID.randomUUID().toString());
}
private JRIDaemon(String daemonId) {
this.daemonId = daemonId;
}
private String getDaemonId() {
return daemonId;
}
@Override
public void run() {
logger.info("Starting the jri daemon");
System.out.println("Starting the jri daemon");
try {
IROperationRemoteProvider stub = (IROperationRemoteProvider) UnicastRemoteObject.exportObject(this, 0);
Registry registry = LocateRegistry.getRegistry();
registry.rebind(daemonId, stub);
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException("Exception occurred when initializing the rmi agent ", e);
}
System.out.println("Daemon is done");
logger.fine("Exiting JRIDaemon#run");
}
/**
* Close the connection to R services.
* @throws NotBoundException
* @throws RemoteException
* @throws AccessException
*/
public void close() throws Exception {
logger.info("Calling close !!!!!!!!!");
//if (registry != null) {
// registry.unbind(daemonId);
//}
//System.exit(0);
}
/**
* @see IROperationProvider#execute(IAnalyticsOperation, List, List)
*/
@Override
public Map<String, IMetric> execute(IAnalyticsOperation operation, List<IAnalyticsOperationInput> inputs, List<? extends IDataProvider> dataProvider) throws Exception {
callableOperation = new JRIExecutionTask(inputs, operation, dataProvider);
Future<Map<String, IMetric>> execution = executorService.submit((Callable<Map<String, IMetric>>) callableOperation);
return execution.get();
}
/**
* @see IROperationProvider#interrupt()
*
* TODO come to a solution on stopping and restarting the thread in the
* Rengine implementation.
*/
@Override
public void interrupt() {
System.out.println("Calling interrupt on executor service");
executorService.shutdown();
// Can't do this yet because it causes a segfault in the task engine
// process.
// callableOperation.interrupt();
}
@Override
public Boolean isAllGood() {
return true;
}
@Override
public void activate() {
}
@Override
public void passivate() {
}
/**
* This is here only for testing purposes.
* @param args
* @throws Exception
*/
public static void main(String args[] ) throws Exception {
IROperationRemoteProvider provider = create();
Thread.sleep(10000);
System.out.println(" ALL GOOD " + provider.isAllGood());
}
/**
* This creates a daemon and initializes returns the client that can be used
* to talk to the server. The daemon is useless for the calling process as
* it is a separate process and we use the client to communicate with the
* jri daemon process.
*
* @return
*/
public static IROperationRemoteProvider create() throws Exception {
LocateRegistry.createRegistry(1099);
String daemonId = UUID.randomUUID().toString();
JRIDaemon daemon = new JRIDaemon(daemonId);
Fork<JRIDaemon, org.gfork.types.Void> forkedDaemon = new Fork<JRIDaemon, org.gfork.types.Void>(daemon);
//forkedDaemon.setJvmOptions("-Djava.security.manager -Djava.security.policy=\"taskenginesecurity.policy\"");
logger.info("Calling run task");
forkedDaemon.addListener(new Listener<JRIDaemon, org.gfork.types.Void>() {
@Override
public void onFinish(Fork<JRIDaemon, Void> fork, boolean wasKilled) throws IllegalAccessException, InterruptedException {
logger.info("Task is finished exit value -> " + fork.getExitValue() + " killed ->" + wasKilled);
}
@Override
public void onError(Fork<JRIDaemon, Void> fork) throws IllegalAccessException, InterruptedException {
logger.info("Error was " + fork.getStdErr());
}
@Override
public void onException(Fork<JRIDaemon, Void> fork) throws IllegalAccessException, InterruptedException, IOException, ClassNotFoundException {
logger.log(Level.SEVERE, " Erorro occurred in daemon ", fork.getException());
}
});
Fork.setLoggingEnabled(true);
forkedDaemon.execute();
forkedDaemon.waitFor();
logger.info("Standard out was " + forkedDaemon.getStdOut());
if (forkedDaemon.isException()) {
throw new RuntimeException("Unble to create Remote Provider ", forkedDaemon.getException());
}
//Thread.sleep(2000);
Registry registry = LocateRegistry.getRegistry();
IROperationRemoteProvider process = (IROperationRemoteProvider) registry.lookup(daemonId);
return process;
}
}
create メソッドを使用して、アナリティクス プロバイダーの新しい実装を作成します。実行時に Fork クラス呼び出しが実行され、新しいデーモンが生成されます。これとまったく同じコードを public static void main(String[] args) に入れると、プロセスはデーモン化して rmi 呼び出しを待ちますが、 for 操作を実行するとそうはなりません。
これは Gfrork の execute メソッドで、Runtime.exec を使用していることがわかります。
/**
* Starts a new java process which runs the task.
* The subprocess inherits the environment including class path an
* system properties of the current process. The JVM is launched using
* executable derived from standard system property 'java.home'.
* <p>
* Standard output (System.out) of the task can be red by {@link #getStdOut()} or
* forwarded to a file, see {@link #setStdOutWriter(Writer)}.
* The same is possible for Standard error (System.err),
* see {@link #getStdErr()} and {@link #setStdErrWriter(Writer)}.
*
* @throws Exception
*/
public synchronized void execute() throws Exception {
if (isExecuting()) {
throw new IllegalStateException(FORK_IS_ALREADY_EXECUTING);
}
exec = Runtime.getRuntime().exec(createCmdArray(), null, workingDir);
taskStdOutReader = new BufferedReader(new InputStreamReader(exec.getInputStream()));
taskErrorReader = new BufferedReader(new InputStreamReader(exec.getErrorStream()));
readError();
readStdOut();
waitForFinishedThread = new Thread("jforkWaitForFinishedThread") {
// needed to notify listeners after execution
@Override
public void run() {
try {
waitFor();
} catch (final Exception e) {
e.printStackTrace();
stdErrText.append(String.format("ERROR jforkListenerNotifier: %s%n", e.toString()));
}
}
};
waitForFinishedThread.start();
}
プロセスを監視するためにスリープタイマーを追加しました。プロセスが開始され、その後すぐにエラーなしでステータス0で終了します。run メソッドで rmi を構成する際に問題がある場合、例外が返されることを確認しました。RMI は正しく初期化されているように見えますが、フォークされたプロセスが終了しないように単にブロックしません。Runtime.exec に RTFM がありますが、これが終了する原因がわかりません。どんな助けでも大歓迎です。
アップデート
あなたの発言は見下すようなものでしたが、EJPに感謝します. プロセスが終了していないという事実のためにバインドがブロックされているという誤った仮定をしましたが、これはむしろ rmi 通信を処理するために別のスレッドを作成するためです。これがプロセスを存続させるものです。
import java.rmi.Remote;
import java.rmi.RemoteException;
import java.rmi.registry.LocateRegistry;
import java.rmi.registry.Registry;
import java.rmi.server.UnicastRemoteObject;
public class RunnableRMIDaemon implements Remote {
public static void main(String args[]) throws InterruptedException {
String daemonID = "123";
System.out.println("STARTING");
Registry registry;
try {
RunnableRMIDaemon daemon = new RunnableRMIDaemon();
registry = LocateRegistry.getRegistry();
final Remote stub = (Remote) UnicastRemoteObject.exportObject(daemon, 0);
registry.rebind(daemonID, stub);
Thread.sleep(1000);
} catch (RemoteException e) {
throw new RuntimeException("Remote Exception occurred while running " + e);
}
System.out.println("ENDING");
}
}
import java.io.IOException;
public class ForkRMIDaemon {
public static void main(String args[]) throws IOException, InterruptedException {
System.out.println("Starting fork");
Runtime.getRuntime().exec("java -cp . RunnableRMIDaemon");
Thread.sleep(10000);
System.out.println("Completed fork");
}
}
最初のプロセスが終了しても、Runtime.getRuntime().exec() プロセスはまだ生きています。
thanatos:testingrmifork chris$ java ForkRMIDaemon
Starting fork
Completed fork
tv-mini:testingrmifork chris$ ps -ef | grep java
501 25499 1 0 0:00.10 ttys007 0:00.72 /usr/bin/java -cp . RunnableRMIDaemon
501 25501 25413 0 0:00.00 ttys007 0:00.00 grep java
thanatos:testingrmifork chris$
私の調査はまだ完了していませんが、単純な gfork ライブラリが実際に戻ったときにプロセスを閉じるために何かをしているようです。gfork コードを確認しましたが、これがどこで発生しているのかわかりませんでした。
EJP に感謝します。誤った情報を認めます。メインではないメソッドを呼び出すことができるため、gfork は何らかのトリックを行っていると推測しています。
私は Java がスレッドを c pthreads のように扱うと想定しており、main() で while ループを常に作成する必要がありました。そうしないと、メインの終了時にスレッドが強制終了されてしまいます。私の間違い