一般的な URI 検索システムを構築しています。基本的にジェネリック クラスがRetriever<T>
あり、取得する URI のキューを維持します。そのキューをできるだけ速く処理する別のスレッドがあります。質問のタイトルに示されている URI のタイプの例は、HTTP タイプの URI です。
問題は、抽象メソッドを介してリソースを取得するように要求すると、T RetrieveResource(Uri location)
非同期性がないために速度が低下することです。
戻り値の型をRetrieveResource
toTask<T>
に変更することは、私の最初の考えでした。しかし、未処理のタスクが何千もあると、タスクが山積みになり、多くの問題が発生するようです。スレッド プールを利用する代わりに、多くの実際のスレッドを作成しているように見えます。一度に多くのことが進行しているため、すべてが遅くなるだけだと思います。そのため、個別に大きな進歩はありません。
取得するキューに入れられたアイテムが多数あり、キューに入れられるほど速く処理できないことが予想されます。時間の経過とともに、システムが追いつく機会があります。しかし、それは間違いなく速くはありません。
また、キューとそれを処理するスレッドを維持する代わりに、ThreadPool
.. ただし、すべての作業項目を処理する前にシステムをシャットダウンする必要がある場合や、後で優先順位付けなどを許可したい場合、これが理想的かどうかはわかりません。
また、リソースの取得は時間のかかるプロセス (0.250 ~ 5 秒) ですが、必ずしもリソースを大量に消費するプロセスではないこともわかっています。これを何百ものリクエストに並列化しています。
私たちの要件は次のとおりです。
- システムがキューで動作している場合でも、URI は任意のスレッドからエンキューできます。
- 検索は、後で優先順位を付けることができるようにする必要がある場合があります
- 取得を一時停止できる必要があります
- 何も取得されていないときは、最小限のスピンが発生するはずです (
BlockingCollection
ここで役立ちます)。
不必要な複雑さを導入することなくこれを並列化する良い方法はありますか?
以下は、例として、既存のコードの一部です。
public abstract class Retriever<T> : IRetriever<T>, IDisposable
{
private readonly Thread worker;
private readonly BlockingCollection<Uri> pending;
private volatile int isStarted;
private volatile int isDisposing;
public event EventHandler<RetrievalEventArgs<T>> Retrieved;
protected Retriever()
{
this.worker = new Thread(this.RetrieveResources);
this.pending = new BlockingCollection<Uri>(new ConcurrentQueue<Uri>());
this.isStarted = 0;
this.isDisposing = 0;
}
~Retriever()
{
this.Dispose(false);
}
private void RetrieveResources()
{
while (this.isDisposing == 0)
{
while (this.isStarted == 0)
{
Monitor.Wait(this.pending);
}
Uri location = this.pending.Take();
// This is what needs to be concurrently done.
// In this example, it's synchronous, but just on a separate thread.
T result = this.RetrieveResource(location);
// At this point, we would fire our event with the retrieved data
}
}
protected abstract T RetrieveResource(Uri location);
protected void Dispose(bool disposing)
{
if (Interlocked.CompareExchange(ref this.isDisposing, 1, 0) == 1)
{
return;
}
if (disposing)
{
this.pending.CompleteAdding();
this.worker.Join();
}
}
public void Add(Uri uri)
{
try
{
this.pending.Add(uri);
}
catch (InvalidOperationException)
{
return;
}
}
public void AddRange(IEnumerable<Uri> uris)
{
foreach (Uri uri in uris)
{
try
{
this.pending.Add(uri);
}
catch (InvalidOperationException)
{
return;
}
}
}
public void Start()
{
if (Interlocked.CompareExchange(ref this.isStarted, 1, 0) == 1)
{
throw new InvalidOperationException("The retriever is already started.");
}
if (this.worker.ThreadState == ThreadState.Unstarted)
{
this.worker.Start();
}
Monitor.Pulse(this.pending);
}
public void Stop()
{
if (Interlocked.CompareExchange(ref this.isStarted, 0, 1) == 0)
{
throw new InvalidOperationException("The retriever is already stopped.");
}
}
public void Dispose()
{
this.Dispose(true);
GC.SuppressFinalize(this);
}
}
上記の例に基づいて構築するには...これに対する解決策は、複雑すぎるか、奇妙なコードを追加すると思います...これです。
private void RetrieveResources()
{
while (this.isDisposing == 0)
{
while (this.isStarted == 0)
{
Monitor.Wait(this.pending);
}
Uri location = this.pending.Take();
Task<T> task = new Task<T>((state) =>
{
return this.RetrieveResource(state as Uri);
}, location);
task.ContinueWith((t) =>
{
T result = t.Result;
RetrievalEventArgs<T> args = new RetrievalEventArgs<T>(location, result);
EventHandler<RetrievalEventArgs<T>> callback = this.Retrieved;
if (!Object.ReferenceEquals(callback, null))
{
callback(this, args);
}
});
task.Start();
}
}