1

多数のステート マシンがあります。場合によっては、ステート マシンをある状態から別の状態に移動する必要があります。これには、安価な場合もあれば高価な場合もあり、DB の読み取りと書き込みなどが必要になる場合があります。

これらの状態の変化は、クライアントからの受信コマンドが原因で発生し、いつでも発生する可能性があります。

ワークロードを並列化したい。「このマシンをこの状態からこの状態に移動する」というキューが必要です。明らかに、任意の 1 台のマシンに対するコマンドを順番に実行する必要がありますが、多くのスレッドがある場合は、多くのマシンを並行して進めることができます。

ステート マシンごとに 1 つのスレッドを持つこともできますが、ステート マシンの数はデータに依存し、数百または数千になる可能性があります。ステート マシンごとに専用のスレッドは必要ありません。何らかのプールが必要です。

ワーカーのプールを持ちながら、各ステート マシンのコマンドが厳密に順番に処理されるようにするにはどうすればよいですか?

UPDATEMachine :インスタンスに未処理のコマンドのリストがあると想像してくださいスレッド プール内のエグゼキュータがコマンドの消費を終了すると、Machineさらに未処理のコマンドがある場合は、コマンドをスレッド プールのタスク キューに戻します。問題は、Machine最初のコマンドを追加するときに、アトミックにスレッド プールに入れる方法です。そして、これがすべてスレッドセーフであることを確認しますか?

4

3 に答える 3

2

このシナリオをお勧めします:

  1. おそらく修正サイズのスレッドプールを作成しますExecutors.newFixedThreadPool
  2. ステート マシンごとにHashMap1 つ保持する構造体 (おそらく ) を作成します。Semaphoreそのセマフォの値は 1 であり、順序を維持するのに公正なセマフォになります。
  3. semaphore.aquire()Runnable では、状態マシンのセマフォとsemaphore.release()実行メソッドの最後に追加するだけで、物乞いで仕事をします。

スレッド プールのサイズを使用して、並列処理のレベルを制御します。

于 2013-01-21T08:50:36.807 に答える
1

別のアプローチを提案します。スレッドプールを使用してステート マシンで状態を移動する代わりに、作業の実行を含むすべてにスレッドプールを使用します。何らかの作業を行って状態変更を行った後、状態変更イベントをキューに追加する必要があります。状態変更が処理された後、別の do-work イベントをキューに追加する必要があります。

状態遷移が作業主導であり、その逆であると仮定すると、逐次処理は不可能です。

セマフォを特別なマップに格納するという考えは非常に危険です。マップは同期する必要があり (オブジェクトの追加/削除はスレッドセーフではありません)、検索を実行して (マップ上で同期する可能性があります)、セマフォを使用するためのオーバーヘッドが比較的大きくなります。

その上、アプリケーションでマルチスレッド アーキテクチャを使用したい場合は、すべての方法を使用する必要があると思います。異なるアーキテクチャを混在させると、後で面倒になる可能性があります。

于 2013-01-21T09:16:48.740 に答える
1

マシンごとにスレッド ID を持っています。必要な数のスレッドを生成します。すべてのスレッドで、グローバル キューからのメッセージを貪欲に処理します。各スレッドは、現在のメッセージのサーバーを (現在のメッセージとそのキューのすべてのメッセージの処理が完了するまで) 排他的に使用できるようにロックし、他のスレッドはそのサーバーのメッセージを内部キューに入れます。

編集: メッセージの擬似コードの処理:

void handle(message)
  targetMachine = message.targetMachine
  if (targetMachine.thread != null)
    targetMachine.thread.addToQueue(message);
  else
    targetMachine.thread = this;
    process(message);
    processAllQueueMessages();
    targetMachine.thread = null;

メッセージの Java コードの処理: (少し複雑すぎるかもしれませんが、これはスレッドセーフである必要があります)

/* class ThreadClass */
void handle(Message message)
{
  // get targetMachine from message
  targetMachine.mutexInc.aquire(); // blocking
  targetMachine.messages++;
  boolean acquired = targetMachine.mutex.aquire(); // non-blocking
  if (acquired)
    targetMachine.threadID = this.ID;
  targetMachine.mutexInc.release();
  if (!acquired)
    // can put this before release, it may speed things up
    threads[targetMachine.threadID].addToQueue(message);
  else
  {
    process(message);
    targetMachine.messages--;
    while (true)
    {
      while (!queue.empty())
      {
        process(queue.pop());
        targetMachine.messages--;
      }
      targetMachine.mutexInc.acquire(); // blocking
      if (targetMachine.messages > 0)
      {
        targetMachine.mutexInc.release();
        Thread.sleep(1);
      }
      else
        break;
    }
    targetMachine.mutex.release();
    targetMachine.mutexInc.release();
  }
}
于 2013-01-21T09:38:07.277 に答える