1

以下は、System.Diagnostics.Process を IConnectableObservable に変換する試みです。このソリューションには問題があります。標準出力とエラーを継続的にリッスンし、イベント Process.Exited を OnCompleted のトリガーとして使用したいと考えています。残念ながら、出力バッファーが空になる前にProcess.Exited が発生することがわかりました。これは、スレッド スリープを使用した醜い回避策がなくても、OnNext ステートメントを介して出力が提供されない状況を再現できることを意味します。

Q1: この問題の回避策はありますか?

Q2: System.Reactive に関して: 自分のソリューションで何が改善できたでしょうか?

よろしく、

マーカス

public static class RxProcessUtilities
{
    /// <summary>
    /// Creates a connectable observable for a process.
    /// </summary>
    /// <remarks>Must be a connectable observable in order to hinder multiple 
    /// subscriptions to call the process multiple times.</remarks>
    /// <param name="process">The process.</param>
    /// <returns></returns>
    public static IConnectableObservable<string> CreateConnectableObservableProcess
        (string filename, string arguments, IObservable<string> input = null)
    {
        var observable = Observable.Using(() =>
            {
                Process process = new Process();

                // process configuration
                process.StartInfo.FileName = filename;
                process.StartInfo.Arguments = arguments;
                process.StartInfo.CreateNoWindow = true;
                process.StartInfo.UseShellExecute = false;

                process.EnableRaisingEvents = true;
                process.StartInfo.RedirectStandardError = true;
                process.StartInfo.RedirectStandardOutput = true;

                if (null != input)
                {
                    process.StartInfo.RedirectStandardInput = true;

                    input.Subscribe(s =>
                        {
                            if (!process.HasExited)
                            {
                                process.StandardInput.Write(s);
                            }
                        });
                }

                return process;
            },
            process =>
            {
                return Observable.Create<string>(
                (IObserver<string> observer) =>
                {
                    // listen to stdout and stderr
                    var stdOut = RxProcessUtilities.CreateStandardOutputObservable(process);
                    var stdErr = RxProcessUtilities.CreateStandardErrorObservable(process);

                    var stdOutSubscription = stdOut.Subscribe(observer);
                    var stdErrSubscription = stdErr.Subscribe(observer);

                    var processExited = Observable.FromEventPattern
                    (h => process.Exited += h, h => process.Exited -= h);

                    var processError = processExited.Subscribe(args =>
                    {
                        // Here is my problem: process sends exited event *before* all 
                        // *DataReceived events have been raised

                        // My ugly workaround for process exit before stdout and stderr buffers are empty.
                        Thread.Sleep(2000);

                        // Also: AFAICS we cannot read synchronously what is left in the buffer, 
                        // since we started asynchronously. This will throw:
                        // string restOfStdOut = process.StandardOutput.ReadToEnd();
                        // string restOfStdErr = process.StandardError.ReadToEnd();

                        if (process.ExitCode != 0)
                        {
                            observer.OnError(new Exception
                                (String.Format("Process '{0}' terminated with error code {1}",
                                 process.StartInfo.FileName, process.ExitCode)));
                        }
                        else
                        {
                            observer.OnCompleted();
                        }
                    });

                    process.Start();

                    process.BeginOutputReadLine();
                    process.BeginErrorReadLine();

                    return new CompositeDisposable
                        (stdOutSubscription,
                         stdErrSubscription,
                         processError);
                });
            });

        return observable.Publish();
    }

    /// <summary>
    /// Creates an IObservable&lt;string&gt; for the standard error of a process.
    /// </summary>
    /// <param name="process">The process.</param>
    /// <returns></returns>
    public static IObservable<string> CreateStandardErrorObservable(Process process)
    {
        // var processExited = Observable.FromEventPattern
        //    (h => process.Exited += h, h => process.Exited -= h);

        var receivedStdErr =
            Observable.FromEventPattern<DataReceivedEventHandler, DataReceivedEventArgs>
                (h => process.ErrorDataReceived += h,
                 h => process.ErrorDataReceived -= h)
            //.TakeUntil(processExited) 
            // cannot be used here, since process exited event might be raised 
            // before all stderr and stdout events occurred.
            .Select(e => e.EventArgs.Data);

        return Observable.Create<string>(observer =>
        {
            var cancel = Disposable.Create(process.CancelErrorRead);

            return new CompositeDisposable(cancel, receivedStdErr.Subscribe(observer));
        });
    }

    /// <summary>
    /// Creates an IObservable&lt;string&gt; for the standard output of a process.
    /// </summary>
    /// <param name="process">The process.</param>
    /// <returns></returns>
    public static IObservable<string> CreateStandardOutputObservable(Process process)
    {
        var receivedStdOut =
            Observable.FromEventPattern<DataReceivedEventHandler, DataReceivedEventArgs>
            (h => process.OutputDataReceived += h,
             h => process.OutputDataReceived -= h)
            .Select(e => e.EventArgs.Data);

        return Observable.Create<string>(observer =>
        {
            var cancel = Disposable.Create(process.CancelOutputRead);

            return new CompositeDisposable(cancel, receivedStdOut.Subscribe(observer));
        });
    }
}
4

1 に答える 1

1

トリックは

process.WaitForExit();

http://msdn.microsoft.com/en-us/library/fb4aw7b8.aspxを参照してください:「このオーバーロード[WaitForExit()]は、リダイレクトされた標準出力の非同期イベントの処理を含む、すべての処理が完了していることを確認します。標準出力が非同期イベントハンドラーにリダイレクトされたときにWaitForExit(Int32)オーバーロードを呼び出した後、このオーバーロードを使用してください。」

完全なソリューションは次のとおりです。

    /// <summary>
    /// Creates a connectable observable for a process.
    /// </summary>
    /// <remarks>Must be a connectable observable in order to hinder multiple subscriptions to call the process multiple times.</remarks>
    /// <param name="process">The process.</param>
    /// <returns></returns>
    public static IConnectableObservable<string> CreateConnectableObservableProcess(string filename, string arguments, IObservable<string> input = null)
    {
        var observable = Observable.Using(() =>
            {
                Process process = new Process();

                // process configuration
                process.StartInfo.FileName = filename;
                process.StartInfo.Arguments = arguments;
                process.StartInfo.CreateNoWindow = true;
                process.StartInfo.UseShellExecute = false;

                process.EnableRaisingEvents = true;
                process.StartInfo.RedirectStandardError = true;
                process.StartInfo.RedirectStandardOutput = true;

                if (null != input)
                {
                    process.StartInfo.RedirectStandardInput = true;

                    input.Subscribe(s =>
                        {
                            if (!process.HasExited)
                            {
                                process.StandardInput.Write(s);
                            }
                        });
                }

                return process;
            },
            process =>
            {
                return Observable.Create<string>(
                    (IObserver<string> observer) =>
                    {
                        // listen to stdout and stderr
                        var stdOut = RxProcessUtilities.CreateStandardOutputObservable(process);
                        var stdErr = RxProcessUtilities.CreateStandardErrorObservable(process);

                        var stdOutSubscription = stdOut.Subscribe(observer);
                        var stdErrSubscription = stdErr.Subscribe(observer);

                        var processExited = Observable.FromEventPattern(h => process.Exited += h, h => process.Exited -= h);

                        var processError = processExited.Subscribe(args =>
                        {
                            process.WaitForExit();

                            try
                            {
                                if (process.ExitCode != 0)
                                {
                                    observer.OnError(new Exception(String.Format("Process '{0}' terminated with error code {1}",
                                        process.StartInfo.FileName, process.ExitCode)));
                                }
                                else
                                {
                                    observer.OnCompleted();
                                }
                            }
                            finally
                            {
                                process.Close();
                            }
                        });

                        process.Start();

                        process.BeginOutputReadLine();
                        process.BeginErrorReadLine();

                        return new CompositeDisposable(stdOutSubscription,
                                                       stdErrSubscription,
                                                       processError);
                    });
            });

        return observable.Publish();
    }
于 2012-12-14T12:03:24.607 に答える