8

rx と特定のクエリに行き詰まっています。問題:

多くの単一の更新操作は、連続ストリームによって生成されます。操作は、挿入または削除できます。これらのストリームをバッファリングして、一度にいくつかの操作を実行したいのですが、順序を維持することが非常に重要です。さらに、操作はバッファリングされ、X 秒ごとにシーケンスで実行される必要があります。

例:

の:

insert-insert-insert-delete-delete-insert-delete-delete-delete-delete

外:

insert(3)-delete(2)-insert(1)-delete(4)

私はそれをテストするための簡単なアプリケーションを書きました。それは多かれ少なかれ私と同じように動作しますが、受信挿入/削除の順序を尊重しません

namespace RxTests
{
using System;
using System.Collections.Generic;
using System.Globalization;
using System.Linq;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Text;
using System.Threading;

internal class Program
{
    private static readonly Random Random = new Random();

    private static readonly CancellationTokenSource ProducerStopped = new CancellationTokenSource();

    private static readonly ISubject<UpdateOperation> operations = new Subject<UpdateOperation>();

    private static void Main(string[] args)
    {
        Console.WriteLine("Starting production");
        var producerScheduler = new EventLoopScheduler();
        var consumerScheduler = new EventLoopScheduler();
        var producer =
            Observable.Interval(TimeSpan.FromSeconds(2))
                      .SubscribeOn(producerScheduler)
                      .Subscribe(Produce, WriteProductionCompleted);
        var consumer =
            operations.ObserveOn(producerScheduler)
                      .GroupBy(operation => operation.Delete)
                      .SelectMany(observable => observable.Buffer(TimeSpan.FromSeconds(8), 50))
                      .SubscribeOn(consumerScheduler)
                      .Subscribe(WriteUpdateOperations);
        Console.WriteLine("Type any key to stop");
        Console.ReadKey();
        consumer.Dispose();
        producer.Dispose();
    }

    private static void Produce(long time)
    {
        var delete = Random.NextDouble() < 0.5;
        Console.WriteLine("Produce {0}, {1} at {2}", time + 1, delete, time);
        var idString = (time + 1).ToString(CultureInfo.InvariantCulture);
        var id = time + 1;
        operations.OnNext(
            new UpdateOperation(id, delete, idString, time.ToString(CultureInfo.InvariantCulture)));
    }

    private static void WriteProductionCompleted()
    {
        Console.WriteLine("Production completed");
        ProducerStopped.Cancel();
    }

    private static void WriteUpdateOperation(UpdateOperation updateOperation)
    {
        Console.WriteLine("Consuming {0}", updateOperation);
    }

    private static void WriteUpdateOperations(IList<UpdateOperation> updateOperation)
    {
        foreach (var operation in updateOperation)
        {
            WriteUpdateOperation(operation);
        }
    }

    private class UpdateOperation
    {
        public UpdateOperation(long id, bool delete, params string[] changes)
        {
            this.Id = id;
            this.Delete = delete;
            this.Changes = new List<string>(changes ?? Enumerable.Empty<string>());
        }

        public bool Delete { get; set; }

        public long Id { get; private set; }

        public IList<string> Changes { get; private set; }

        public override string ToString()
        {
            var stringBuilder = new StringBuilder("{UpdateOperation ");
            stringBuilder.AppendFormat("Id: {0}, Delete: {1}, Changes: [", this.Id, this.Delete);
            if (this.Changes.Count > 0)
            {
                stringBuilder.Append(this.Changes.First());
                foreach (var change in this.Changes.Skip(1))
                {
                    stringBuilder.AppendFormat(", {0}", change);
                }
            }

            stringBuilder.Append("]}");
            return stringBuilder.ToString();
        }
    }
}

}

誰かが正しいクエリで私を助けることができますか?

ありがとう

UPDATE 08.03.13 (JerKimball による提案)

次の行は、結果を出力するための JerKimball のコードへの小さな変更/追加です。

using(query.Subscribe(Print))
{
    Console.ReadLine();
    producer.Dispose();        
}

次の印刷方法を使用します。

private static void Print(IObservable<IList<Operation>> operations)
{
    operations.Subscribe(Print);
}

private static void Print(IList<Operation> operations)
{
    var stringBuilder = new StringBuilder("[");
    if (operations.Count > 0)
    {
        stringBuilder.Append(operations.First());
        foreach (var item in operations.Skip(1))
        {
            stringBuilder.AppendFormat(", {0}", item);
        }
    }

    stringBuilder.Append("]");
    Console.WriteLine(stringBuilder);
 }

Operation の文字列には次のように指定します。

public override string ToString()
{
    return string.Format("{0}:{1}", this.Type, this.Seq);
}

順序は保持されますが、次のようになります。

  • 別のサブスクリプション内でサブスクライブすることについて確信が持てません: それは正しいですか?
  • 各リストには常に 2 つ以下の要素しかありません (ストリームが同じタイプで 2 つ以上の連続した値を生成する場合でも)
4

2 に答える 2

4

GroupByUntilDistinctUntilChanged、およびBuffer:

これには、サンプル コードに合わせて少し調整する必要がありますが、クエリ (および概念)次のようになります。

(編集: ああ - そこに少し逃した...)

void Main()
{
    var rnd = new Random();
    var fakeSource = new Subject<Operation>();
    var producer = Observable
        .Interval(TimeSpan.FromMilliseconds(1000))
        .Subscribe(i => 
            {
                var op = new Operation();
                op.Type = rnd.NextDouble() < 0.5 ? "insert" : "delete";
                fakeSource.OnNext(op);
            });    
    var singleSource = fakeSource.Publish().RefCount();

    var query = singleSource
        // We want to groupby until we see a change in the source
        .GroupByUntil(
               i => i.Type, 
               grp => singleSource.DistinctUntilChanged(op => op.Type))
        // then buffer up those observed events in the groupby window
        .Select(grp => grp.Buffer(TimeSpan.FromSeconds(8), 50));

    using(query.Subscribe(Console.WriteLine))
    {
        Console.ReadLine();
        producer.Dispose();        
    }
}

public class Operation { 
    private static int _cnt = 0;
    public Operation() { Seq = _cnt++; }
    public int Seq {get; set;}
    public string Type {get; set;}    
}
于 2013-03-07T16:53:46.937 に答える
2

新しいアプローチを試してみましょう(したがって、新しい答え):

まず、順序を保持しながらキーに基づいてアイテムのリストを「折りたたむ」拡張メソッドを定義しましょう。

public static class Ext
{
    public static IEnumerable<List<T>> ToRuns<T, TKey>(
            this IEnumerable<T> source, 
            Func<T, TKey> keySelector) 
    {
        using (var enumerator = source.GetEnumerator()) 
        {
            if (!enumerator.MoveNext())
                yield break;

            var currentSet = new List<T>();

            // inspect the first item
            var lastKey = keySelector(enumerator.Current);
            currentSet.Add(enumerator.Current);

            while (enumerator.MoveNext()) 
            {
                var newKey = keySelector(enumerator.Current);
                if (!Equals(newKey, lastKey)) 
                {
                    // A difference == new run; return what we've got thus far
                    yield return currentSet;
                    lastKey = newKey;
                    currentSet = new List<T>();
                }
                currentSet.Add(enumerator.Current);
            }

            // Return the last run.
            yield return currentSet;

            // and clean up
            currentSet = new List<T>();
            lastKey = default(TKey);
        }
    }
}

かなり簡単です - を指定すると、各サブリストが同じキーを持つ がIEnumerable<T>返されます。List<List<T>>

今、それを供給して使用するには:

var rnd = new Random();
var fakeSource = new Subject<Operation>();
var producer = Observable
    .Interval(TimeSpan.FromMilliseconds(1000))
    .Subscribe(i => 
        {
            var op = new Operation();
            op.Type = rnd.NextDouble() < 0.5 ? "insert" : "delete";
            fakeSource.OnNext(op);
        });    

var singleSource = fakeSource
    .Publish().RefCount();

var query = singleSource
    // change this value to alter your "look at" time window
    .Buffer(TimeSpan.FromSeconds(5))    
    .Select(buff => buff.ToRuns(op => op.Type).Where(run => run.Count > 0));

using(query.Subscribe(batch => 
{
    foreach(var item in batch)
    {
        Console.WriteLine("{0}({1})", item.First().Type, item.Count);
    }
}))
{
    Console.ReadLine();
    producer.Dispose();     
}

試してみてください - これが私が典型的な実行で見たものです:

insert(4)
delete(2)
insert(1)
delete(1)
insert(1)
insert(1)
delete(1)
insert(1)
delete(2)
delete(2)
insert(2)
delete(1)
insert(1)
delete(2)
insert(2)
于 2013-03-12T19:40:57.837 に答える