5

Javaからstdinを介してCLIPHPプロセスにメッセージを渡す必要があります。プールで約20のPHPプロセスを実行し続けたいので、メッセージをプールに渡すと、各メッセージが別々のスレッドに送信され、配信されるメッセージのキューが保持されます。これらのPHPプロセスをできるだけ長く存続させ、1つが死んだ場合に新しいプロセスを起動するようにしたいと思います。静的スレッドプールを使用してこれを行うことを検討しましたが、実行されて単純に終了するタスク用に設計されているようです。プールにメッセージを渡すためのシンプルなインターフェイスを使用して、これを行うにはどうすればよいですか?独自のカスタム「スレッドプール」を実装する必要がありますか?

4

2 に答える 2

4

物事がより明確になると思うので、私はこれでいくつかのコードを提供しています。基本的に、プロセスオブジェクトのプールを維持する必要があります。これらの各プロセスには、何らかの方法で管理する必要のある入力、出力、およびエラーストリームがあることに注意してください。私の例では、エラーと出力をメインプロセスコンソールにリダイレクトするだけです。必要に応じて、コールバックとハンドラーを設定して、PHPプログラムの出力を取得できます。タスクを処理しているだけで、PHPの内容を気にしない場合は、そのままにするか、ファイルにリダイレクトします。

ObjectPoolにApacheCommonsPoolライブラリを使用しています。1つを再発明する必要はありません。

PHPプログラムを実行する20のプロセスのプールがあります。これだけでは、必要なものは得られません。これらの20のプロセスすべてに対して「同時に」タスクを処理したい場合があります。したがって、ObjectPoolからプロセスをプルするThreadPoolも必要になります。

また、Javaプロセスを強制終了するかCTRL-Cを実行すると、initプロセスがphpプロセスを引き継ぎ、そこにとどまるということも理解する必要があります。おそらく、生成したPHPプロセスのすべてのpidのログを保持し、Javaプログラムを再実行する場合はそれらをクリーンアップすることをお勧めします。

public class StackOverflow_10037379 {

    private static Logger sLogger = Logger.getLogger(StackOverflow_10037379.class.getName());

    public static class CLIPoolableObjectFactory extends BasePoolableObjectFactory<Process> {

        private String mProcessToRun;

        public CLIPoolableObjectFactory(String processToRun) {
            mProcessToRun = processToRun;
        }

        @Override
        public Process makeObject() throws Exception {
            ProcessBuilder builder = new ProcessBuilder();
            builder.redirectError(Redirect.INHERIT);
            // I am being lazy, but really the InputStream is where
            // you can get any output of the PHP Process. This setting
            // will make it output to the current processes console.
            builder.redirectOutput(Redirect.INHERIT);
            builder.redirectInput(Redirect.PIPE);
            builder.command(mProcessToRun);
            return builder.start();
        }

        @Override
        public boolean validateObject(Process process) {
            try {
                process.exitValue();
                return false;
            } catch (IllegalThreadStateException ex) {
                return true;
            }
        }

        @Override
        public void destroyObject(Process process) throws Exception {
            // If PHP has a way to stop it, do that instead of destroy
            process.destroy();
        }

        @Override
        public void passivateObject(Process process) throws Exception {
            // Should really try to read from the InputStream of the Process
            // to prevent lock-ups if Rediret.INHERIT is not used.
        }
    }

    public static class CLIWorkItem implements Runnable {

        private ObjectPool<Process> mPool;
        private String mWork;

        public CLIWorkItem(ObjectPool<Process> pool, String work) {
            mPool = pool;
            mWork = work;
        }

        @Override
        public void run() {
            Process workProcess = null;
            try {
                workProcess = mPool.borrowObject();
                OutputStream os = workProcess.getOutputStream();
                os.write(mWork.getBytes(Charset.forName("UTF-8")));
                os.flush();
                // Because of the INHERIT rule with the output stream
                // the console stream overwrites itself. REMOVE THIS in production.
                Thread.sleep(100);
            } catch (Exception ex) {
                sLogger.log(Level.SEVERE, null, ex);
            } finally {
                if (workProcess != null) {
                    try {
                        // Seriously.. so many exceptions.
                        mPool.returnObject(workProcess);
                    } catch (Exception ex) {
                        sLogger.log(Level.SEVERE, null, ex);
                    }
                }
            }
        }
    }

    public static void main(String[] args) throws Exception {

        // Change the 5 to 20 in your case. 
        // Also change mock_php.exe to /usr/bin/php or wherever.
        ObjectPool<Process> pool =
                new GenericObjectPool<>(
                new CLIPoolableObjectFactory("mock_php.exe"), 5);         

        // This will only allow you to queue 100 work items at a time. I would suspect
        // that if you only want 20 PHP processes running at a time and this queue
        // filled up you'll need to implement some other strategy as you are doing
        // more work than PHP can keep up with. You'll need to block at some point
        // or throw work away.
        BlockingQueue<Runnable> queue = 
            new ArrayBlockingQueue<>(100, true);

        ThreadPoolExecutor executor = 
            new ThreadPoolExecutor(20, 20, 1, TimeUnit.HOURS, queue);

        // print some stuff out.
        executor.execute(new CLIWorkItem(pool, "Message 1\r\n"));
        executor.execute(new CLIWorkItem(pool, "Message 2\r\n"));
        executor.execute(new CLIWorkItem(pool, "Message 3\r\n"));
        executor.execute(new CLIWorkItem(pool, "Message 4\r\n"));
        executor.execute(new CLIWorkItem(pool, "Message 5\r\n"));
        executor.execute(new CLIWorkItem(pool, "Message 6\r\n"));
        executor.execute(new CLIWorkItem(pool, "Message 7\r\n"));
        executor.execute(new CLIWorkItem(pool, "Message 8\r\n"));
        executor.execute(new CLIWorkItem(pool, "Message 9\r\n"));
        executor.execute(new CLIWorkItem(pool, "Message 10\r\n"));
        executor.execute(new CLIWorkItem(pool, "Message 11\r\n"));

        executor.shutdown();
        executor.awaitTermination(4000, TimeUnit.HOURS);

        pool.close();        
    }
}

プログラム実行の出力:

12172 - Message 2
10568 - Message 1
4804 - Message 3
11916 - Message 4
11116 - Message 5
12172 - Message 6
4804 - Message 7
10568 - Message 8
11916 - Message 9
11116 - Message 10
12172 - Message 11

入力されたものだけを出力するC++プログラムのコード:

#include <windows.h>
#include <iostream>
#include <string>

int main(int argc, char* argv[])
{
    DWORD pid = GetCurrentProcessId();
    std::string line;
    while (true) {      
        std::getline (std::cin, line);
        std::cout << pid << " - " << line << std::endl;
    }

    return 0;
}

アップデート

遅れて申し訳ありません。これは、興味のある人のためのJDK6バージョンです。プロセスのInputStreamからすべての入力を読み取るには、別のスレッドを実行する必要があります。このコードは、新しいプロセスごとに新しいスレッドを生成するように設定しました。そのスレッドは、それが生きている限り、常にプロセスから読み取られます。ファイルに直接出力する代わりに、Loggingフレームワークを使用するように設定しました。このようにして、ファイルに移動するようにハードコーディングすることなく、ファイルに移動、ロールオーバー、コンソールに移動するなどのログ構成をセットアップできます。

プロセスにstdoutとstderrがある場合でも、プロセスごとに1つのGobblerのみを開始することに気付くでしょう。物事を簡単にするために、stderrをstdoutにリダイレクトします。どうやらjdk6はこのタイプのリダイレクトのみをサポートしています。

public class StackOverflow_10037379_jdk6 {

    private static Logger sLogger = Logger.getLogger(StackOverflow_10037379_jdk6.class.getName());

    // Shamelessy taken from Google and modified. 
    // I don't know who the original Author is.
    public static class StreamGobbler extends Thread {

        InputStream is;
        Logger logger;
        Level level;

        StreamGobbler(String logName, Level level, InputStream is) {
            this.is = is;
            this.logger = Logger.getLogger(logName);
            this.level = level;
        }

        public void run() {
            try {
                InputStreamReader isr = new InputStreamReader(is);
                BufferedReader br = new BufferedReader(isr);
                String line = null;
                while ((line = br.readLine()) != null) {
                    logger.log(level, line);
                }
            } catch (IOException ex) {
                logger.log(Level.SEVERE, "Failed to read from Process.", ex);
            }
            logger.log(
                    Level.INFO, 
                    String.format("Exiting Gobbler for %s.", logger.getName()));
        }
    }

    public static class CLIPoolableObjectFactory extends BasePoolableObjectFactory<Process> {

        private String mProcessToRun;

        public CLIPoolableObjectFactory(String processToRun) {
            mProcessToRun = processToRun;
        }

        @Override
        public Process makeObject() throws Exception {
            ProcessBuilder builder = new ProcessBuilder();
            builder.redirectErrorStream(true);
            builder.command(mProcessToRun);
            Process process = builder.start();
            StreamGobbler loggingGobbler =
                    new StreamGobbler(
                    String.format("process.%s", process.hashCode()),
                    Level.INFO,
                    process.getInputStream());
            loggingGobbler.start();
            return process;
        }

        @Override
        public boolean validateObject(Process process) {
            try {
                process.exitValue();
                return false;
            } catch (IllegalThreadStateException ex) {
                return true;
            }
        }

        @Override
        public void destroyObject(Process process) throws Exception {
            // If PHP has a way to stop it, do that instead of destroy
            process.destroy();
        }

        @Override
        public void passivateObject(Process process) throws Exception {
            // Should really try to read from the InputStream of the Process
            // to prevent lock-ups if Rediret.INHERIT is not used.
        }
    }

    public static class CLIWorkItem implements Runnable {

        private ObjectPool<Process> mPool;
        private String mWork;

        public CLIWorkItem(ObjectPool<Process> pool, String work) {
            mPool = pool;
            mWork = work;
        }

        @Override
        public void run() {
            Process workProcess = null;
            try {
                workProcess = mPool.borrowObject();
                OutputStream os = workProcess.getOutputStream();
                os.write(mWork.getBytes(Charset.forName("UTF-8")));
                os.flush();
                // Because of the INHERIT rule with the output stream
                // the console stream overwrites itself. REMOVE THIS in production.
                Thread.sleep(100);
            } catch (Exception ex) {
                sLogger.log(Level.SEVERE, null, ex);
            } finally {
                if (workProcess != null) {
                    try {
                        // Seriously.. so many exceptions.
                        mPool.returnObject(workProcess);
                    } catch (Exception ex) {
                        sLogger.log(Level.SEVERE, null, ex);
                    }
                }
            }
        }
    }

    public static void main(String[] args) throws Exception {

        // Change the 5 to 20 in your case. 
        ObjectPool<Process> pool =
                new GenericObjectPool<Process>(
                new CLIPoolableObjectFactory("mock_php.exe"), 5);

        BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(100, true);

        ThreadPoolExecutor executor = new ThreadPoolExecutor(20, 20, 1, TimeUnit.HOURS, queue);

        // print some stuff out.
        executor.execute(new CLIWorkItem(pool, "Message 1\r\n"));
        executor.execute(new CLIWorkItem(pool, "Message 2\r\n"));
        executor.execute(new CLIWorkItem(pool, "Message 3\r\n"));
        executor.execute(new CLIWorkItem(pool, "Message 4\r\n"));
        executor.execute(new CLIWorkItem(pool, "Message 5\r\n"));
        executor.execute(new CLIWorkItem(pool, "Message 6\r\n"));
        executor.execute(new CLIWorkItem(pool, "Message 7\r\n"));
        executor.execute(new CLIWorkItem(pool, "Message 8\r\n"));
        executor.execute(new CLIWorkItem(pool, "Message 9\r\n"));
        executor.execute(new CLIWorkItem(pool, "Message 10\r\n"));
        executor.execute(new CLIWorkItem(pool, "Message 11\r\n"));

        executor.shutdown();
        executor.awaitTermination(4000, TimeUnit.HOURS);

        pool.close();
    }
}

出力

Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run
INFO: 9440 - Message 3
Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run
INFO: 8776 - Message 2
Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run
INFO: 6100 - Message 1
Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run
INFO: 10096 - Message 4
Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run
INFO: 8868 - Message 5
Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run
INFO: 8868 - Message 8
Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run
INFO: 6100 - Message 10
Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run
INFO: 8776 - Message 9
Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run
INFO: 10096 - Message 6
Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run
INFO: 9440 - Message 7
Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run
INFO: 6100 - Message 11
Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run
INFO: Exiting Gobbler for process.295131993.
Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run
INFO: Exiting Gobbler for process.756434719.
Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run
INFO: Exiting Gobbler for process.332711452.
Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run
INFO: Exiting Gobbler for process.1981440623.
Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run
INFO: Exiting Gobbler for process.1043636732.
于 2012-04-10T01:49:26.923 に答える
1

ここでの最善の策は、pcntl関数を使用してプロセスをフォークすることですが、プロセス間の通信は困難です。コマンドラインにメッセージを渡そうとするのではなく、プロセスが読み取ることができるキューを作成することをお勧めします。

Beanstalkには、プロセス間のメッセージングを処理するために使用できるいくつかのPHPクライアントがあります。

于 2012-04-09T20:02:16.657 に答える