2

一般的な URI 検索システムを構築しています。基本的にジェネリック クラスがRetriever<T>あり、取得する URI のキューを維持します。そのキューをできるだけ速く処理する別のスレッドがあります。質問のタイトルに示されている URI のタイプの例は、HTTP タイプの URI です。

問題は、抽象メソッドを介してリソースを取得するように要求すると、T RetrieveResource(Uri location)非同期性がないために速度が低下することです。

戻り値の型をRetrieveResourcetoTask<T>に変更することは、私の最初の考えでした。しかし、未処理のタスクが何千もあると、タスクが山積みになり、多くの問題が発生するようです。スレッド プールを利用する代わりに、多くの実際のスレッドを作成しているように見えます。一度に多くのことが進行しているため、すべてが遅くなるだけだと思います。そのため、個別に大きな進歩はありません。

取得するキューに入れられたアイテムが多数あり、キューに入れられるほど速く処理できないことが予想されます。時間の経過とともに、システムが追いつく機会があります。しかし、それは間違いなく速くはありません。

また、キューとそれを処理するスレッドを維持する代わりに、ThreadPool.. ただし、すべての作業項目を処理する前にシステムをシャットダウンする必要がある場合や、後で優先順位付けなどを許可したい場合、これが理想的かどうかはわかりません。

また、リソースの取得は時間のかかるプロセス (0.250 ~ 5 秒) ですが、必ずしもリソースを大量に消費するプロセスではないこともわかっています。これを何百ものリクエストに並列化しています。

私たちの要件は次のとおりです。

  • システムがキューで動作している場合でも、URI は任意のスレッドからエンキューできます。
  • 検索は、後で優先順位を付けることができるようにする必要がある場合があります
  • 取得を一時停止できる必要があります
  • 何も取得されていないときは、最小限のスピンが発生するはずです (BlockingCollectionここで役立ちます)。

不必要な複雑さを導入することなくこれを並列化する良い方法はありますか?

以下は、例として、既存のコードの一部です。

public abstract class Retriever<T> : IRetriever<T>, IDisposable
{
    private readonly Thread worker;
    private readonly BlockingCollection<Uri> pending;
    private volatile int isStarted;
    private volatile int isDisposing;

    public event EventHandler<RetrievalEventArgs<T>> Retrieved;

    protected Retriever()
    {
        this.worker = new Thread(this.RetrieveResources);
        this.pending = new BlockingCollection<Uri>(new ConcurrentQueue<Uri>());
        this.isStarted = 0;
        this.isDisposing = 0;
    }

    ~Retriever()
    {
        this.Dispose(false);
    }

    private void RetrieveResources()
    {
        while (this.isDisposing == 0)
        {
            while (this.isStarted == 0)
            {
                Monitor.Wait(this.pending);
            }

            Uri location = this.pending.Take();

            // This is what needs to be concurrently done.
            // In this example, it's synchronous, but just on a separate thread.
            T result = this.RetrieveResource(location);

            // At this point, we would fire our event with the retrieved data
        }
    }

    protected abstract T RetrieveResource(Uri location);

    protected void Dispose(bool disposing)
    {
        if (Interlocked.CompareExchange(ref this.isDisposing, 1, 0) == 1)
        {
            return;
        }

        if (disposing)
        {
            this.pending.CompleteAdding();
            this.worker.Join();
        }
    }

    public void Add(Uri uri)
    {
        try
        {
            this.pending.Add(uri);
        }
        catch (InvalidOperationException)
        {
            return;
        }
    }

    public void AddRange(IEnumerable<Uri> uris)
    {
        foreach (Uri uri in uris)
        {
            try
            {
                this.pending.Add(uri);
            }
            catch (InvalidOperationException)
            {
                return;
            }
        }
    }

    public void Start()
    {
        if (Interlocked.CompareExchange(ref this.isStarted, 1, 0) == 1)
        {
            throw new InvalidOperationException("The retriever is already started.");
        }

        if (this.worker.ThreadState == ThreadState.Unstarted)
        {
            this.worker.Start();
        }

        Monitor.Pulse(this.pending);
    }

    public void Stop()
    {
        if (Interlocked.CompareExchange(ref this.isStarted, 0, 1) == 0)
        {
            throw new InvalidOperationException("The retriever is already stopped.");
        }
    }

    public void Dispose()
    {
        this.Dispose(true);
        GC.SuppressFinalize(this);
    }
}

上記の例に基づいて構築するには...これに対する解決策は、複雑すぎるか、奇妙なコードを追加すると思います...これです。

    private void RetrieveResources()
    {
        while (this.isDisposing == 0)
        {
            while (this.isStarted == 0)
            {
                Monitor.Wait(this.pending);
            }

            Uri location = this.pending.Take();

            Task<T> task = new Task<T>((state) =>
                {
                    return this.RetrieveResource(state as Uri);
                }, location);

            task.ContinueWith((t) =>
                {
                    T result = t.Result;
                    RetrievalEventArgs<T> args = new RetrievalEventArgs<T>(location, result);

                    EventHandler<RetrievalEventArgs<T>> callback = this.Retrieved;
                    if (!Object.ReferenceEquals(callback, null))
                    {
                        callback(this, args);
                    }
                });

            task.Start();
        }
    }
4

1 に答える 1

2

私はかなり良い解決策を思いついたと思います。リソースを取得する方法と結果の表現の両方を抽象化しました。これにより、任意の結果で任意のURIを取得できるようになります。ある種のURI駆動の「ORM」のようなものです。

可変の同時実行レベルをサポートします。先日、質問を投稿したとき、非同期性と同時実行性がまったく異なることを忘れていました。タスクで達成したのは、非同期性とタスクスケジューラの妨害だけでした。なぜなら、私が本当に望んでいたのは同時実行性だったからです。

スタート/ストップ機能があるのは良い考えのようだったので、キャンセルで追加しました。

public abstract class Retriever<T> : IRetriever<T>
{
    private readonly object locker;
    private readonly BlockingCollection<Uri> pending;
    private readonly Thread[] threads;
    private CancellationTokenSource cancellation;

    private volatile int isStarted;
    private volatile int isDisposing;

    public event EventHandler<RetrieverEventArgs<T>> Retrieved;

    protected Retriever(int concurrency)
    {
        if (concurrency <= 0)
        {
            throw new ArgumentOutOfRangeException("concurrency", "The specified concurrency level must be greater than zero.");
        }

        this.locker = new object();
        this.pending = new BlockingCollection<Uri>(new ConcurrentQueue<Uri>());
        this.threads = new Thread[concurrency];
        this.cancellation = new CancellationTokenSource();

        this.isStarted = 0;
        this.isDisposing = 0;

        this.InitializeThreads();
    }

    ~Retriever()
    {
        this.Dispose(false);
    }

    private void InitializeThreads()
    {
        for (int i = 0; i < this.threads.Length; i++)
        {
            Thread thread = new Thread(this.ProcessQueue)
            {
                IsBackground = true
            };

            this.threads[i] = thread;
        }
    }

    private void StartThreads()
    {
        foreach (Thread thread in this.threads)
        {
            if (thread.ThreadState == ThreadState.Unstarted)
            {
                thread.Start();
            }
        }
    }

    private void CancelOperations(bool reset)
    {
        this.cancellation.Cancel();
        this.cancellation.Dispose();

        if (reset)
        {
            this.cancellation = new CancellationTokenSource();
        }
    }

    private void WaitForThreadsToExit()
    {
        foreach (Thread thread in this.threads)
        {
            thread.Join();
        }
    }

    private void ProcessQueue()
    {
        while (this.isDisposing == 0)
        {
            while (this.isStarted == 0)
            {
                Monitor.Wait(this.locker);
            }

            Uri location;

            try
            {
                location = this.pending.Take(this.cancellation.Token);
            }
            catch (OperationCanceledException)
            {
                continue;
            }

            T data;

            try
            {
                data = this.Retrieve(location, this.cancellation.Token);
            }
            catch (OperationCanceledException)
            {
                continue;
            }

            RetrieverEventArgs<T> args = new RetrieverEventArgs<T>(location, data);

            EventHandler<RetrieverEventArgs<T>> callback = this.Retrieved;
            if (!Object.ReferenceEquals(callback, null))
            {
                callback(this, args);
            }
        }
    }

    private void ThowIfDisposed()
    {
        if (this.isDisposing == 1)
        {
            throw new ObjectDisposedException("Retriever");
        }
    }

    protected abstract T Retrieve(Uri location, CancellationToken token);

    protected virtual void Dispose(bool disposing)
    {
        if (Interlocked.CompareExchange(ref this.isDisposing, 1, 0) == 1)
        {
            return;
        }

        if (disposing)
        {
            this.CancelOperations(false);
            this.WaitForThreadsToExit();
            this.pending.Dispose();
        }
    }

    public void Start()
    {
        this.ThowIfDisposed();

        if (Interlocked.CompareExchange(ref this.isStarted, 1, 0) == 1)
        {
            throw new InvalidOperationException("The retriever is already started.");
        }

        Monitor.PulseAll(this.locker);
        this.StartThreads();
    }

    public void Add(Uri location)
    {
        this.pending.Add(location);
    }

    public void Stop()
    {
        this.ThowIfDisposed();

        if (Interlocked.CompareExchange(ref this.isStarted, 0, 1) == 0)
        {
            throw new InvalidOperationException("The retriever is already stopped.");
        }

        this.CancelOperations(true);
    }

    public void Dispose()
    {
        this.Dispose(true);
        GC.SuppressFinalize(this);
    }
}
于 2012-10-25T07:10:59.160 に答える