-1

アクションA、B、Cと言う1000メッセージのバッチでアクションを実行する必要があります。これらのアクションを並行して実行できます。私は彼らのためにグループを作りました。並列性を高めるために、各グループにサブグループを作成しました。サブグループ内のタスクは、連続して実行する必要があります。ただし、2 つのサブグループを並行して実行できます。1000 のバッチが終了した後、データベースに保存するなどの処理を行う必要があります。しかし、すべてのタスクが完了するのを待つ方法を理解できません (1000 タスクの終わりに途中で待機することに興味はありません)。どんな提案でも大歓迎です。

public class OrderlyThreadPool<t> : IDisposable
{
    BlockingCollection<t> _workingqueue = null;
    Action<t> _handler = null;
    public OrderlyThreadPool(int wrkerCount, Action<t> handler)
    {
        _workingqueue = new BlockingCollection<t>();
        _handler = handler;
        Worker worker = new Worker(wrkerCount, Process); //WorkerCount is always 1
        worker.Start();
    }

    public void AddItem(t item)
    {
        _workingqueue.Add(item);
    }
    private void Process()
    {
        foreach (t item in _workingqueue.GetConsumingEnumerable())
        {
            _handler(item);
        }
    }
    public void Dispose()
    {
        _workingqueue.CompleteAdding();
        _workingqueue = null;
    }

}

public class Worker
{
    int _wrkerCount = 0;
    Action _action = null;
    public Worker(int workerCount, Action action)
    {
        _wrkerCount = workerCount;
        _action = action;

    }
    public void Start()
    {
        // Create and start a separate Task for each consumer:
        for (int i = 0; i < _wrkerCount; i++)
        {
            Task.Factory.StartNew(_action);
        }
    }
}

したがって、基本的にはサブグループごとに OrderlyThreadPool を作成します。I am recv messages from say source, 利用可能なメッセージがない場合はブロックします。だから私のコードは次のようになります

while(true)
{

    var message = GetMsg();

    foreach(OrderlyThreadPool<Msg> a in myList)
    {
        a.AddMsg(message);

    }

    if(msgCount > 1000)
    {
    Wait for all threads to finish work;

    }
    else
    {
    msgCount =msgCount+1;

    }

}
4

1 に答える 1

0

タスクを開始しますが、参照を保持しません。これらのタスクを保存し、Worker と OrderlyThreadPool を介して公開し、Task.WhenAllすべてのタスクが完了するまで待機するために使用します。

public class Worker
{
    //...
    List<Task> _tasks = new List<Task>();
    public Task Completion { get { return Task.WhenAll(_tasks); } }
    public void Start()
    {
        // Create and start a separate Task for each consumer:
        for (int i = 0; i < _wrkerCount; i++)
        {
            Tasks.Add(Task.Factory.StartNew(_action));
        }
    }
}

public class OrderlyThreadPool<t> : IDisposable
{
    //...
    public Task Completion { get { return _worker.Completion; }}
}

await Task.WhenAll(myList.Select(orderlyThreadPool => orderlyThreadPool.Completion));

ただし、代わりに TPL Dataflow の使用を検討する必要があります。これは、完了、バッチ処理、同時実行レベルなどをカプセル化するアクターベースのフレームワークです...

于 2014-09-14T19:47:00.903 に答える