0

メール送信窓口サービスに問題があります。サービスは 3 分ごとに開始され、データベースから送信されるメッセージを取得し、送信を開始します。コードは次のようになります。

        MessageFilesHandler MFHObj = new MessageFilesHandler();
        List<Broadcostmsg> imidiateMsgs = Manager.GetImidiateBroadCastMsgs(conString);
        if (imidiateMsgs.Count > 0)
        {

           // WriteToFileImi(strLog);

            Thread imMsgThread = new Thread(new             ParameterizedThreadStart(MFHObj.SendImidiatBroadcast));              
            imMsgThread.IsBackground = true;
            imMsgThread.Start(imidiateMsgs);
        }

これにより、メッセージが大きなリストに送信され、大きなリストへの送信が完了するまでに時間がかかります。メッセージがまだ送信されていて、サービスが送信する新しいメッセージを取得し、以前の送信が中止され、新しいメッセージの送信が開始されたときに問題が発生します。スレッドを使用していますが、サービスがメッセージを送信して新しいスレッドを開始するたびに。コードで間違いを犯している場所を教えてください。

4

2 に答える 2

0

要件は、コンシューマープロデューサーキューを構築することのようです。どのプロデューサーがリストにメッセージを追加し続け、コンシューマーがそのリストからアイテムを選択してそれを使って作業を行うか 私が心配しているのは、スレッドプールからスレッドを選択するのではなく、毎回新しいスレッドを作成してメールを送信することです。ますます多くのスレッドを作成し続けると、コンテキストの切り替えによって作成されるオーバーヘッドが原因で、アプリケーションのパフォーマンスが低下します。

.Net framwe work 4.0を使用している場合、魂は非常に簡単になります。System.Collections.Concurrent.ConcurrentQueueを使用して、アイテムのエンキューとデキューを行うことができます。そのスレッドセーフなので、ロックオブジェクトは必要ありません。タスクを使用してメッセージを処理します。

BlockingCollectionは、コンストラクターでIProducerConsumerCollectionを取得します。または、空のコンストラクターを呼び出すと、デフォルトでConcurrentQueueを使用します。

したがって、メッセージをキューに入れます。

//define a blocking collectiom
var blockingCollection = new BlockingCollection<string>();

//Producer
Task.Factory.StartNew(() =>
{
    while (true)
    {
        blockingCollection.Add("value" + count);
        count++;                    
    }
});

//consumer
//GetConsumingEnumerable would wait until it find some item for work
// its similar to while(true) loop that we put inside consumer queue
Task.Factory.StartNew(() =>
{
    foreach (string value in blockingCollection.GetConsumingEnumerable())
    {
        Console.WriteLine("Worker 1: " + value);
    }                
});

アップデート

FrameWork3.5を使用しているため。JosephAlbahariによるConsumer/ProducerQueueの実装をご覧になることをお勧めします。それはあなたがこれまでに見つけたであろう最高の1つです。

上記のリンクから直接コードを取得

public class PCQueue
{
  readonly object _locker = new object();
  Thread[] _workers;
  Queue<Action> _itemQ = new Queue<Action>();

  public PCQueue (int workerCount)
  {
    _workers = new Thread [workerCount];

    // Create and start a separate thread for each worker
    for (int i = 0; i < workerCount; i++)
      (_workers [i] = new Thread (Consume)).Start();
  }

  public void Shutdown (bool waitForWorkers)
  {
    // Enqueue one null item per worker to make each exit.
    foreach (Thread worker in _workers)
      EnqueueItem (null);

    // Wait for workers to finish
    if (waitForWorkers)
      foreach (Thread worker in _workers)
        worker.Join();
  }

  public void EnqueueItem (Action item)
  {
    lock (_locker)
    {
      _itemQ.Enqueue (item);           // We must pulse because we're
      Monitor.Pulse (_locker);         // changing a blocking condition.
    }
  }

  void Consume()
  {
    while (true)                        // Keep consuming until
    {                                   // told otherwise.
      Action item;
      lock (_locker)
      {
        while (_itemQ.Count == 0) Monitor.Wait (_locker);
        item = _itemQ.Dequeue();
      }
      if (item == null) return;         // This signals our exit.
      item();                           // Execute item.
    }
  }
}

このアプローチの利点は、パフォーマンスを最適化するために作成する必要のあるスレッドの数を制御できることです。スレッドプールアプローチでは、安全ではありますが、同時に作成できるスレッドの数を制御することはできません。

于 2012-07-26T05:39:17.603 に答える
0

新しいメッセージを待機するループ内でコードを使用していると思いますが、それらの待機を管理しましたか?? どれどれ:

while(imidiateMsgs.Count == 0)
{
    //Wait for new Message
}

//Now you have a new message Here

//Make a new thread to process message

その待機にはさまざまな方法があります。BlockingQueues を使用することをお勧めします。

公共エリア:

BlockingCollection<Broadcostmsg> imidiateMsgs = new BlockingCollection<Broadcostmsg>();

あなたの消費者(メッセージを生成するスレッド)で:

SendImidiatBroadcast = imidiateMsgs.Take();//this will wait for new message
//Now you have a new message Here

//Make a new thread to process message

プロデューサー(メッセージに応答するスレッド):

imidiateMsgs.Add(SendImidiatBroadcast);

また、メッセージに応答するたびに新しいスレッドを作成するためにスレッドプールを使用する必要があり、毎回新しいスレッドを初期化しないでください。

于 2012-07-26T05:43:41.670 に答える