3

私はReactiveExtensionsで多くの実験を行っており、現在、サブスクライバーに通知を送信しながら、プロシージャをキューに入れて任意の方法で実行できるシステムを作成しようとしています。

現在、データベースアクセスをUserAccessクラス内にカプセル化して、ユーザーを追加するメソッドを公開しています。その方法では、データベースにユーザーを追加するアクションをキューに入れたいと思います。そこで、メソッドQueueJob(Action)を公開するTクラスのJobProcessorを作成し、ユーザーにこのクラスを実装させました。私の問題は、アクションがUserパラメーターを受け取るため、ObservableのOnNextメソッド内からアクションを呼び出す方法がわからないことです。

迎え角が間違っていて、デザインの把握に問題があるはずです。たとえば、ユーザーをQueueJobプロシージャに渡す必要があることはわかっていますが、クリーンな方法でそれを行う方法がわかりません。

    public class UserAccess : JobProcessor<User>
    {
        public void AddUser(User user)
        {
            QueueJob(usr =>
                     {
                         using (var db = new CenterPlaceModelContainer())
                         {
                             db.Users.Add(usr);
                         }

                     });
         [...]

    public abstract class JobProcessor<T>
    {
        // Either Subject<T> or Subject<Action<T>>
        private Subject<Action<T>> JobSubject = new Subject<Action<T>>();

        public JobProcessor()
        {
            JobSubject
            /* Insert Rx Operators Here */
            .Subscribe(OnJobNext, OnJobError, OnJobComplete);
        }

        private void OnJobNext(Action<T> action)
        {
            // ???
        }

        private void OnJobError(Exception exception)
        {

        }

        private void OnJobComplete()
        {

        }

        public void QueueJob(Action<T> action)
        {
            JobSubject.OnNext(action);
        }
    }

編集1:

QueueJobの署名をに変更しようとしました

QueueJob(T entity, Action<T> action)

今、私はできる

QueueJob(user, usr => { ... } );

しかし、それはあまり直感的ではないようです。エンティティとアクションの両方を渡すフレームワークはあまり見たことがありません。そうすれば、JobProcessorは必要ないかもしれません。

編集2: JobProcessorのサブジェクトタイプをSubjectに変更し、Tを完全に削除しました。外部から参照できるので、手順にユーザーを含める必要がなかったので。現在の唯一の問題は、QueueJobのアクションに渡すユーザーが、アクションの実行の実際の時間の間に変更された場合、ユーザーが変更された情報を持っていることです。望ましくありませんが、私は解決策を探し続けると思います。

私のコードは今です(サンプルにバッファを使用):

public abstract class JobProcessor
{
   public Subject<Action> JobSubject = new Subject<Action>();

   public JobProcessor()
   {
       JobSubject
           .Buffer(3)
           .Subscribe(OnJobNext, OnJobError, OnJobComplete);
   }

   private void OnJobNext(IList<Action> actionsList)
   {
       foreach (var element in actionsList)
        {
            element();
        }
   }

   private void OnJobError(Exception exception)
   {

   }

   private void OnJobComplete()
   {

   }

   public void QueueJob(Action action)
   {
       JobSubject.OnNext(action);
   }
}
4

4 に答える 4

1

ここでのあなたの「目標」は率直にわかりませんが、少し逆行したと思います...

通常、対象は
IObservable<Action<T>> NewJob {get{return _subject;}}
...などのプロパティを介して公開されます。(主題は観察可能になります-主題は本質的に二重です-そしてなぜそれが特定であるか-そして少し物議をかもします-しかし遊んでみるのに良いなど)

そして、あなたOnNextはクラスの中から電話をかけるだけです-あなたがしたように。

しかし、通常は自分でオブザーバブルをサブスクライブしません
...外部ユーザーにプロパティに「フック」してサブスクライブを定義することでサブスクライブさせます。これにより、新しいアイテムが到着したときに取得されます。

もちろんこれは単純化されており、多くの場合と多くの用途がありますが、これは私が望むのに役立つかもしれません

于 2013-03-26T01:02:11.400 に答える
1

私の最初の反応は、IObservableは通常、メソッドポインター/デリゲート/アクションではなく、不変のデータ構造のシーケンスを作成するのに最も適しているということです。

次に、アクションをキュー形式で処理するように「スケジュール」しようとしている場合は、RxのIScheduler実装が最適であるように思われることをお勧めします。

あるいは、実際にProduceConsumerキューを作成しようとしている場合、Rxが実際にこれに最適であるとは思いません。つまり、メッセージのヒープをキューに入れてから、一部のコンシューマーにこれらのメッセージを読み取って処理させる場合は、別のフレームワークを検討します。

于 2013-03-26T09:48:21.067 に答える
1

まず、LeeとNSGagaに同意する必要があります。おそらく、この方法は望まないでしょう。プロデューサー/コンシューマーキューには、達成しようとしていることとはるかに一致する他のパターンがあります。ここ。

とは言うものの、私は挑戦に抵抗することはできないので...いくつかの小さな調整を加えることで、「私は何を行動に移すのか」というあなたの差し迫った問題を取り除くことができます。渡されたユーザーパラメータをキャプチャしてまっすぐにするだけActionです-いくつかの変更を加えたコードは次のとおりです。

public class UserAccess : JobProcessor
{
    public void AddUser(User user)
    {
        QueueJob(() =>
                 {
                     using (var db = new CenterPlaceModelContainer())
                     {
                         db.Users.Add(user);
                     }

                 });
     [...]

public abstract class JobProcessor
{
    // Subject<Action>
    private Subject<Action> JobSubject = new Subject<Action>();

    public JobProcessor()
    {
        JobSubject
        /* Insert Rx Operators Here */
        .Subscribe(OnJobNext, OnJobError, OnJobComplete);
    }

    private void OnJobNext(Action action)
    {
        // Log something saying "Yo, I'm executing an action" here?
        action();
    }

    private void OnJobError(Exception exception)
    {
        // Log something saying "Yo, something broke" here?
    }

    private void OnJobComplete()
    {
        // Log something saying "Yo, we shut down" here?
    }

    public void QueueJob(Action action)
    {
        JobSubject.OnNext(action);
    }
}
于 2013-03-26T14:20:41.233 に答える
0

デザインを完成させて、気に入ったものを見つけました。他の誰かがそれを必要とする場合のコードはここにあります。

public class JobProcessor<T> : IDisposable where T : new()
{
    private ISubject<Action<T>> jobsProcessor = new Subject<Action<T>>();

    private IDisposable disposer;

    private T _jobProvider = new T();

    public JobProcessor(Func<ISubject<Action<T>>, IObservable<IEnumerable<Action<T>>>> initializer)
    {
        Console.WriteLine("Entering JobProcessor Constructor");

        disposer = initializer(jobsProcessor)
            .Subscribe(OnJobsNext, OnJobsError, OnJobsComplete);

        Console.WriteLine("Leaving JobProcessor Constructor");
    }

    private void OnJobsNext(IEnumerable<Action<T>> actions)
    {
        Debug.WriteLine("Entering OnJobsNext");

        foreach (var action in actions)
        {
            action(_jobProvider);
        }

        Debug.WriteLine("Leaving OnJobsNext");
    }

    private void OnJobsError(Exception ex)
    {
        Debug.WriteLine("Entering OnJobsError");

        Debug.WriteLine(ex.Message);

        Debug.WriteLine("Leaving OnJobsError");
    }

    private void OnJobsComplete()
    {
        Debug.WriteLine("Entering OnJobsComplete");

        Debug.WriteLine("Leaving OnJobsComplete");
    }

    public void QueueJob(Action<T> action)
    {
        Debug.WriteLine("Entering QueueJobs");

        jobsProcessor.OnNext(action);

        Debug.WriteLine("Leaving QueueJobs");
    }

    public void Dispose()
    {
        disposer.Dispose();
    }
}

実行の速度を選択できる並行性のレイヤーでJobProcessorを使用できるレイヤーのアーキテクチャをサポートするために、汎用のメーカーを選択しました。JobProcessorコンストラクターは、コード内の別の場所でObservableシーケンスを宣言するために使用されるFuncを取得し、シーケンスによって記述された順序でジョブを実行するプロセッサーを生成します。OnNextは、IEnumerable>を取り込んで、アクションのバッチを同時に返す.Buffer(3)のようなシーケンスをサポートできるようにします。その欠点は、一度に1つのアクションを返すシーケンスを作成するときに、これを行う必要があることです。

var x = new JobProcessor<DatabaseAccess<User>>(subject => subject.Select(action => action.Yield()));

TのYield()拡張methofは、単一要素の列挙可能オブジェクトを返します。ここで見つけました。単一のアイテムをIEnumerable<T>として渡します

于 2013-03-27T13:31:22.200 に答える