-1

マルチメディア メッセージを定期的に受信し、それらに返信するアプリケーションがあります。

現在、これを 1 つのスレッドで行っており、最初にメッセージを受信し、次にそれらを 1 つずつ処理しています。これは仕事をしますが、遅いです。

そのため、同じプロセスを複数のスレッドで同時に実行することを考えています。

着信レコードの並列処理を可能にする簡単な方法はありますか?ただし、2 つのスレッドで同じレコードを誤って処理することは避けてください。

4

1 に答える 1

1

着信レコードの並列処理を可能にする簡単な方法はありますか?ただし、2 つのスレッドで同じレコードを誤って処理することは避けてください。

はい、実際にはそれほど難しくありません。あなたがやりたいことは、「生産者 - 消費者モデル」と呼ばれます。

メッセージ受信者が一度に 1 つのスレッドしか処理できないが、メッセージの「プロセッサ」が一度に複数のメッセージを処理できる場合は、BlockingCollectionを使用して、処理する必要がある作業を保存するだけです。

public sealed class MessageProcessor : IDisposable
{
    public MessageProcessor() 
        : this(-1)
    {   
    }

    public MessageProcessor(int maxThreadsForProcessing)
    {
        _maxThreadsForProcessing = maxThreadsForProcessing;
        _messages = new BlockingCollection<Message>();
        _cts = new CancellationTokenSource();

        _messageProcessorThread = new Thread(ProcessMessages);
        _messageProcessorThread.IsBackground = true;
        _messageProcessorThread.Name = "Message Processor Thread";
        _messageProcessorThread.Start();
    }

    public int MaxThreadsForProcessing
    {
        get { return _maxThreadsForProcessing; }
    }

    private readonly BlockingCollection<Message> _messages;
    private readonly CancellationTokenSource _cts;
    private readonly Thread _messageProcessorThread;
    private bool _disposed = false;
    private readonly int _maxThreadsForProcessing;


    /// <summary>
    /// Add a new message to be queued up and processed in the background.
    /// </summary>
    public void ReceiveMessage(Message message)
    {
       _messages.Add(message);
    }

    /// <summary>
    /// Signals the system to stop processing messages.
    /// </summary>
    /// <param name="finishQueue">Should the queue of messages waiting to be processed be allowed to finish</param>
    public void Stop(bool finishQueue)
    {
        _messages.CompleteAdding();
        if(!finishQueue)
            _cts.Cancel();

        //Wait for the message processor thread to finish it's work.
        _messageProcessorThread.Join();
    }

    /// <summary>
    /// The background thread that processes messages in the system
    /// </summary>
    private void ProcessMessages()
    {
        try
        {
            Parallel.ForEach(_messages.GetConsumingEnumerable(),
                         new ParallelOptions()
                         {
                             CancellationToken = _cts.Token,
                             MaxDegreeOfParallelism = MaxThreadsForProcessing
                         },
                         ProcessMessage);
        }
        catch (OperationCanceledException)
        {
            //Don't care that it happened, just don't want it to bubble up as a unhandeled exception.
        }
    }

    private void ProcessMessage(Message message, ParallelLoopState loopState)
    {
        //Here be dragons! (or your code to process a message, your choice :-))

        //Use if(_cts.Token.IsCancellationRequested || loopState.ShouldExitCurrentIteration) to test if 
        // we should quit out of the function early for a graceful shutdown.
    }

    public void Dispose()
    {
        if(!_disposed)
        {
            if(_cts != null && _messages != null && _messageProcessorThread != null)
                Stop(true); //This line will block till all queued messages have been processed, if you want it to be quicker you need to call `Stop(false)` before you dispose the object.

            if(_cts != null)
                _cts.Dispose();

            if(_messages != null)
                _messages.Dispose();

            GC.SuppressFinalize(this);
           _disposed = true;
        }
    }

    ~MessageProcessor()
    {
        //Nothing to do, just making FXCop happy.
    }

}

無料の本Patterns for Parallel Programmingを読むことを強くお勧めします。これについて詳しく説明しています。Producer-Consumer モデルを詳細に説明するセクション全体があります。


更新:と にはいくつかのパフォーマンスの問題がありますGetConsumingEnumerable()Parallel.ForEach(代わりにライブラリParallelExtensionsExtrasを使用してください。これは新しい拡張メソッドです。GetConsumingPartitioner()

public static Partitioner<T> GetConsumingPartitioner<T>(
    this BlockingCollection<T> collection)
{
    return new BlockingCollectionPartitioner<T>(collection);
}

private class BlockingCollectionPartitioner<T> : Partitioner<T>
{
    private BlockingCollection<T> _collection;

    internal BlockingCollectionPartitioner(
        BlockingCollection<T> collection)
    {
        if (collection == null)
            throw new ArgumentNullException("collection");
        _collection = collection;
    }

    public override bool SupportsDynamicPartitions {
        get { return true; }
    }

    public override IList<IEnumerator<T>> GetPartitions(
        int partitionCount)
    {
        if (partitionCount < 1)
            throw new ArgumentOutOfRangeException("partitionCount");
        var dynamicPartitioner = GetDynamicPartitions();
        return Enumerable.Range(0, partitionCount).Select(_ =>
            dynamicPartitioner.GetEnumerator()).ToArray();
    }

    public override IEnumerable<T> GetDynamicPartitions()
    {
        return _collection.GetConsumingEnumerable();
    }
}
于 2013-10-26T17:49:06.980 に答える