0

1 秒あたり 8000 を超えるイベントが配信される状況で、StreamInsight エンジンが 140 を超えるイベント/秒を処​​理できない理由がわかりません。クエリのイベント数/秒がパフォーマンス モニターに表示されます。アプリケーションのコンソールでは、エンジンが多くのイベントを回避するようになっています。ID: 200 のイベントがあり、次に ID: 3330 のイベントがあります。何が問題なのか知っている人はいますか?

私のテストでは、一連のクエリがあります。最初のクエリの出力は 2 番目のクエリの入力であり、このシナリオでは、結果は 140 イベント/秒を超えません。

ここで、入力ストリームから受信したすべてのイベントを出力する単純なクエリを使用してアプリケーションをテストしました。この状況では、サーバーは約 2000 イベント/秒を処​​理します。結果の画像がいくつかありますが、残念ながらまだ掲載できません。私が気になるのは、イベント/秒の量が突然 0 に減少した理由と、エンジンがまだすべての入力イベントを考慮していない理由です。

これが私のサーバー構成と私が使用しているクエリです。

using (var server = Server.Create("SIInstance23"))
{
    log.Info("StreamInsight Server started");
    Application application = server.CreateApplication("StreamInsight Application Test");

    ServiceHost host = new ServiceHost(server.CreateManagementService());
    WSHttpBinding binding = new WSHttpBinding(SecurityMode.Message);
    binding.HostNameComparisonMode = HostNameComparisonMode.Exact;
    host.AddServiceEndpoint(typeof(IManagementService),
         binding,
         "http://localhost:8081/StreamInsight/SIInstance23");

    ScenarioWorkflow.NormalScenarioWorkflow(application, server, host);
}



    public static void NormalScenarioWorkflow(this Application application, Server server, ServiceHost host)
    {
        host.Open();

        IQStreamable<SensorDataEvent> inputStream = application.DefineSensorDataEventStream();
        var simpleQuery = from e in inputStream
                          select e;

        var simpleQueryConsumer = application.DefineConsumer(simpleQuery);
        var simpleQueryBinding = simpleQuery.Bind(simpleQueryConsumer);

        using (simpleQueryBinding.Run("process"))
        {
            Thread.Sleep(1000);
            Console.WriteLine(string.Empty);

            DiagnosticSettings settings = server.GetDiagnosticSettings(new Uri("cep:/Server/Application/StreamInsight Application Test/Entity/process/Query/StreamableBinding_1"));
            settings.Aspects |= DiagnosticAspect.PerformanceCounters;
            server.SetDiagnosticSettings(new Uri("cep:/Server/Application/StreamInsight Application Test/Entity/process/Query/StreamableBinding_1"), settings);

            Console.WriteLine("***Hit Return to exit after viewing query output***");
            Console.WriteLine();
            Console.ReadLine();
        }
        host.Close();
    }

次に、(チェーンから) 最後のクエリを実行しようとしたとき、約 1500 イベント/秒を取得しました。それでも、イベント/秒の突然の減少に問題があります。イベントに対して行われる各変換は、エンジンによって処理される必要がある新しいものを生成すると考えました。したがって、一連のクエリからのイベント/秒の数は 1500 を超えるはずです。間違っている場合は修正してください。私はこのドメインに不慣れで、アドバイスは大歓迎です。

問題は以下のクラスにあると思います。PointEvent<SendorDataEvent>の代わりにもSensorDataEvent試し、CTI も挿入しようとしましたが、結果はありませんでした。

public class SocketEventInputAdapter : IObservable<SensorDataEvent>, IDisposable
{
    public List<IObserver<SensorDataEvent>> observers { get; set; }
    public object sync { get; set; }
    public bool done { get; set; }

    public SocketEventInputAdapter()
    {
        this.done = false;
        this.observers = new List<IObserver<SensorDataEvent>>();
        this.sync = new object();
        SocketServer serverSocket = new SocketServer(4444, this);
    }

    internal void NotifyObservers(SensorDataEvent value)
    {
        lock (sync)
        {
            if (!done)
            {
                foreach (var observer in observers)
                {
                    observer.OnNext(value);
                }
            }
        }
    }

    public IDisposable Subscribe(IObserver<SensorDataEvent> observer)
    {
        lock (sync)
        {
            observers.Add(observer);
        }

        return new Subscription(this, observer);
    }

    void IDisposable.Dispose()
    {
    }

    private sealed class Subscription : IDisposable
    {
        private readonly SocketEventInputAdapter _subject;
        private IObserver<SensorDataEvent> _observer;

        public Subscription(SocketEventInputAdapter subject, IObserver<SensorDataEvent> observer)
        {
            _subject = subject;
            _observer = observer;
        }

        public void Dispose()
        {
            IObserver<SensorDataEvent> observer = _observer;
            if (null != observer)
            {
                lock (_subject.sync)
                {
                    _subject.observers.Remove(observer);
                }
                _observer = null;
            }
        }
    }
}

ありがとうございました。

4

1 に答える 1

0

ポールが言ったように、もっと多くの情報を提供する必要があります。多くのことが起こっている可能性があります... CTI の調整ミス/設定ミス、ソースでの不必要なロック、シンクでのロックまたは長時間実行タスク...またはその他の多くのこと。i7 ラップトップで平均 12 万イベント/秒を超える StreamInsight プロセスを実行しました... 1 週間にわたるストレス テストで。エンジンがそれを処理します。

于 2015-01-27T06:11:57.717 に答える