0

まず、「評判」が 1500 以上ある人は、「ContinueWith」のタグを作成してください (この質問にタグを付けてください)。ありがとう!

この投稿が長くなって申し訳ありませんが、関連する詳細を省略したために、誰かが私を助けようとしている時間を無駄にしたくありません. とは言っても、それはまだ起こるかもしれません。:)

それでは詳細です。いくつかの ActiveMQ キュー トピックにサブスクライブするサービスに取り組んでいます。このうち 2 つのトピックは多少関連しています。1 つは「会社の更新」で、もう 1 つは「製品の更新」です。両方の「ID」は CompanyID です。会社のトピックには、製品のトピックのデータが含まれています。他のサブスクライバーは商品データを必要としているが、商品トピックをサブスクライブしたくない/必要としないため、必須です。私のサービスはマルチスレッドであるため (私たちの裁量を超えた要件)、メッセージが到着すると、更新パラメーターが単にContinueWithである AddOrUpdate を使用してConcurrentDictionaryでそれぞれを処理するTaskを追加します。(下記参照)。これらのトピックとサブスクライバーが「永続的」であるために発生する可能性のある同時更新を防ぐために行われたため、リスナー サービスがオフラインになった場合 (何らかの理由で)、同じ CompanyID に対して複数のメッセージ (会社および/または製品) で終了する可能性があります。

さて、私の実際の質問(ついに!)タスク(1つのタスクだけか、ContinueWithタスクのチェーンの最後のタスクか)が終了した後、それをConcurrentDictionaryから削除したいと思います(明らかに)。どのように?同僚のアイデアをいくつか考えてもらいましたが、どれもあまり好きではありません。あなたの答えは私が持っているが好きではないアイデアの1つである可能性があるため、アイデアをリストするつもりはありませんが、最終的には最高のものになる可能性があります.

私の説明とは異なり、スクロールしすぎないようにコード スニペットを圧縮しようとしました。:)

nrtq = 質問とは関係ありません

public interface IMessage
{
  long CompantId { get; set; }
  void Process();
}
public class CompanyMessage : IMessage
{ //implementation, nrtq }
public class ProductMessage : IMessage
{ //implementation, nrtq }

public class Controller
{
  private static ConcurrentDictionary<long, Task> _workers = new ConcurrentDictionary<long, Task>();
  //other needed declarations, nrtq

  public Controller(){//constructor stuff, nrtq }

  public StartSubscribers()
  {
    //other code, nrtq
    _companySubscriber.OnMessageReceived += HandleCompanyMsg;
    _productSubscriber.OnMessageReceived += HandleProductMsg;
  }

  private void HandleCompanyMsg(string msg)
  {
    try {
      //other code, nrtq
      QueueItUp(new CompanyMessage(message));
    } catch (Exception ex) { //other code, nrtq }
  }

  private void HandleProductMsg(string msg)
  {
    try {
      //other code, nrtq
      QueueItUp(new ProductMessage(message));
    } catch (Exception ex) { //other code, nrtq }
  }

  private static void QueueItUp(IMessage message)
  {
    _workers.AddOrUpdate(message.CompanyId,
      x => {
        var task = new Task(message.Process);
        task.Start();
        return task;
      },
      (x, y) => y.ContinueWith((z) => message.Process())
    );
  }

ありがとう!

4

1 に答える 1

0

他の誰かがより良い解決策を思い付くことができるかどうかを見たいので、私はしばらくこの答えを「受け入れる」ことはしません。

同僚が私が少し調整した解決策を思いついた。はい、私はlockステートメントを。で使用することの皮肉(?)を知っていConcurrentDictionaryます。使用するより良いコレクションタイプがあるかどうかを確認する時間は今のところありません。基本的に、ContinueWith()既存のタスクに対して単に実行するのではなく、タスクをそれ自体に置き換え、最後にを使用して別のタスクを追加しますContinueWith()

それはどのような違いをもたらしますか?よろしくお願いします!:)を実行したばかりの場合ContinueWith()、チェーンの最初のタスクが完了するとすぐに!worker.Value.IsCompleted戻ります。trueただし、タスクを2つ(またはそれ以上)のチェーンタスクに置き換えると、コレクションに関する限り、タスクは1つだけになり、チェーン内のすべてのタスクが完了するまで!worker.Value.IsCompleted戻りません。true

タスクをそれ自体+(新しいタスク)に置き換えることについて少し心配していたことを認めます。なぜなら、タスクが置き換えられている間にたまたま実行されていた場合はどうなるのでしょうか。さて、私はこれから生きている昼光をテストしました、そして何の問題にもぶつかりませんでした。何が起こっているのかというと、タスクは独自のスレッドで実行されており、コレクションはそのスレッドへのポインターを保持しているだけなので、実行中のタスクは影響を受けないということです。それをそれ自体+(新しいタスク)に置き換えることにより、実行中のスレッドへのポインターを維持し、それが完了すると「通知」を取得して、次のタスクが「続行」またはIsCompleted戻ることができるようにしますtrue

また、「クリーンアップ」ループの動作方法とその場所は、コレクション内で「完了した」タスクがぶら下がっていることを意味しますが、次回の「クリーンアップ」が実行されるまでは、メッセージを受信します。繰り返しになりますが、これが原因でメモリの問題が発生する可能性があるかどうかを確認するために多くのテストを行いましたが、1秒あたり数百のメッセージを処理している場合でも、サービスが20MBを超えるRAMを使用することはありませんでした。これが問題を引き起こすには、かなり大きなメッセージを受信し、長時間実行されるタスクがたくさんある必要がありますが、状況が異なる可能性があるため、注意が必要です。

上記のように、以下のコードでは、nrtq=質問には関係ありません。

public interface IMessage
{
  long CompantId { get; set; }
  void Process();
}
public class CompanyMessage : IMessage
{ //implementation, nrtq }
public class ProductMessage : IMessage
{ //implementation, nrtq }

public class Controller
{
  private static ConcurrentDictionary<long, Task> _workers = new ConcurrentDictionary<long, Task>();
  //other needed declarations, nrtq

  public Controller(){//constructor stuff, nrtq }

  public StartSubscribers()
  {
    //other code, nrtq
    _companySubscriber.OnMessageReceived += HandleCompanyMsg;
    _productSubscriber.OnMessageReceived += HandleProductMsg;
  }

  private void HandleCompanyMsg(string msg)
  {
    //other code, nrtq
    QueueItUp(new CompanyMessage(message));
  }

  private void HandleProductMsg(string msg)
  {
    //other code, nrtq
    QueueItUp(new ProductMessage(message));
  }

  private static void QueueItUp(IMessage message)
  {
    lock(_workers)
    {
      foreach (var worker in Workers)
      {
        if (!worker.Value.IsCompleted) continue;
        Task task;
        Workers.TryRemove(worker.Key, out task);
      }
      var id = message.CompanyId;
      if (_workers.ContainsKey(id))
        _workers[id] = _workers[id].ContinueWith(x => message.Process());
      else
      {
        var task = new Task(y => message.Process(), id);
        _workers.TryAdd(id, task);
        task.Start();
      }
    }
  }
于 2012-06-19T19:34:32.123 に答える