バッチ処理したいので、キューは使用しません。これは、(ログ) ファイルを開いたり閉じたり、データベースを開いたり閉じたりする必要がある場合に便利です。これは私がそのようなものを作成する方法の例です:
// J. van Langen
public abstract class QueueHandler<T> : IDisposable
{
// some events to trigger.
ManualResetEvent _terminating = new ManualResetEvent(false);
ManualResetEvent _terminated = new ManualResetEvent(false);
AutoResetEvent _needProcessing = new AutoResetEvent(false);
// my 'queue'
private List<T> _queue = new List<T>();
public QueueHandler()
{
new Thread(new ThreadStart(() =>
{
// what handles it should wait on.
WaitHandle[] handles = new WaitHandle[] { _terminating, _needProcessing };
// while not terminating, loop (0 timeout)
while (!_terminating.WaitOne(0))
{
// wait on the _terminating and the _needprocessing handle.
WaitHandle.WaitAny(handles);
// my temporay array to store the current items.
T[] itemsCopy;
// lock the queue
lock (_queue)
{
// create a 'copy'
itemsCopy = _queue.ToArray();
// clear the queue.
_queue.Clear();
}
if (itemsCopy.Length > 0)
HandleItems(itemsCopy);
}
// the thread is done.
_terminated.Set();
})).Start();
}
public abstract void HandleItems(T[] items);
public void Enqueue(T item)
{
// lock the queue to add the item.
lock (_queue)
_queue.Add(item);
_needProcessing.Set();
}
// batch
public void Enqueue(IEnumerable<T> items)
{
// lock the queue to add multiple items.
lock (_queue)
_queue.AddRange(items);
_needProcessing.Set();
}
public void Dispose()
{
// let the thread know it should stop.
_terminating.Set();
// wait until the thread is stopped.
_terminated.WaitOne();
}
}
_terminating
/_terminated
には a を使用しますManualResetEvent
。それらは設定されたものだけだからです。
_needProcessing
私が使用するのはAutoResetEvent
、ManualResetEvent では実行できません。これは、トリガーされたときに別のスレッドがSet
再び実行できるためです。そのためReset
、WaitHandle.WaitAny の後に追加すると、新しく追加された項目を元に戻すことができます。(うーん、誰かがこれをもっと簡単に説明できるなら、大歓迎です。:)
例:
public class QueueItem
{
}
public class MyQueue : QueueHandler<QueueItem>
{
public override void HandleItems(QueueItem[] items)
{
// do your thing.
}
}
public void Test()
{
MyQueue queue = new MyQueue();
QueueItem item = new QueueItem();
queue.Enqueue(item);
QueueItem[] batch = new QueueItem[]
{
new QueueItem(),
new QueueItem()
};
queue.Enqueue(batch);
// even on dispose, all queued items will be processed in order to stop the QueueHandler.
queue.Dispose();
}