Chunkify メソッドを使用して、すべての「保留中」のアイテムを「キャッチ」しようとしていました。
実際、私の目標は、イベントの「スパム フィルター」を作成し、最後の 5 つの値のみを選択し、2 回以上の連続した繰り返しを無視することでした。
問題の発生例:
注意!以下のコードは愚かで無意味です。これは単に問題を示し、イベントが複数のスレッドを呼び出すことができることを示すためのものです (上記のコードを実行し、出力ウィンドウを確認してください。これが問題です)。
[TestMethod]
public void ThreadSpinning()
{
var subs = Observable.FromEventPattern(add => this.Raise += add, rem => this.Raise -= rem)
.Select((item, countRaise) => countRaise)
.Chunkify()
.ToObservable(Scheduler.Default)
.Select((countRaise, countChunkify) => new { raiseItems = countRaise, countChunkify })
.Do(obj => Trace.Write("Chunkify = " + obj.countChunkify + " | "))
.Select(a => a.raiseItems)
.Where(a => a.Any())
.Do(obj =>
{
Trace.WriteLine("[ Start do something.. Raise = " + Dump(obj) + " ] " +
Environment.NewLine + Environment.NewLine);
Thread.Sleep(700);
}).Subscribe();
Thread.Sleep(2000);
var handle = new ManualResetEventSlim(false);
ThreadPool.QueueUserWorkItem(r =>
{
Thread.Sleep(500);
Task.Factory.StartNew(() =>
{
OnRaise();
OnRaise();
}).Wait();
OnRaise();
Thread.Sleep(500);
OnRaise();
Task.Factory.StartNew(OnRaise).Wait();
Thread.Sleep(1500);
OnRaise();
OnRaise();
Thread.Sleep(500);
OnRaise();
Thread.Sleep(250);
OnRaise();
Task.Factory.StartNew(OnRaise).Wait();
Thread.Sleep(500);
Task.Factory.StartNew(OnRaise).Wait();
Task.Factory.StartNew(OnRaise).Wait();
Thread.Sleep(2000);
handle.Set();
});
handle.Wait();
Thread.Sleep(3000);
subs.Dispose();
Thread.Sleep(1000);
}
private event EventHandler Raise;
protected virtual void OnRaise()
{
EventHandler handler = Raise;
if (handler != null)
handler(this, EventArgs.Empty);
}
public static string Dump<T>(IEnumerable<T> source)
{
return source.Select(a => a.ToString()).Aggregate((a, b) => a + ", " + b);
}