アクションA、B、Cと言う1000メッセージのバッチでアクションを実行する必要があります。これらのアクションを並行して実行できます。私は彼らのためにグループを作りました。並列性を高めるために、各グループにサブグループを作成しました。サブグループ内のタスクは、連続して実行する必要があります。ただし、2 つのサブグループを並行して実行できます。1000 のバッチが終了した後、データベースに保存するなどの処理を行う必要があります。しかし、すべてのタスクが完了するのを待つ方法を理解できません (1000 タスクの終わりに途中で待機することに興味はありません)。どんな提案でも大歓迎です。
public class OrderlyThreadPool<t> : IDisposable
{
BlockingCollection<t> _workingqueue = null;
Action<t> _handler = null;
public OrderlyThreadPool(int wrkerCount, Action<t> handler)
{
_workingqueue = new BlockingCollection<t>();
_handler = handler;
Worker worker = new Worker(wrkerCount, Process); //WorkerCount is always 1
worker.Start();
}
public void AddItem(t item)
{
_workingqueue.Add(item);
}
private void Process()
{
foreach (t item in _workingqueue.GetConsumingEnumerable())
{
_handler(item);
}
}
public void Dispose()
{
_workingqueue.CompleteAdding();
_workingqueue = null;
}
}
public class Worker
{
int _wrkerCount = 0;
Action _action = null;
public Worker(int workerCount, Action action)
{
_wrkerCount = workerCount;
_action = action;
}
public void Start()
{
// Create and start a separate Task for each consumer:
for (int i = 0; i < _wrkerCount; i++)
{
Task.Factory.StartNew(_action);
}
}
}
したがって、基本的にはサブグループごとに OrderlyThreadPool を作成します。I am recv messages from say source, 利用可能なメッセージがない場合はブロックします。だから私のコードは次のようになります
while(true)
{
var message = GetMsg();
foreach(OrderlyThreadPool<Msg> a in myList)
{
a.AddMsg(message);
}
if(msgCount > 1000)
{
Wait for all threads to finish work;
}
else
{
msgCount =msgCount+1;
}
}