3

私はJavaに関連する実用的なシナリオに取り組んでいます;ソケットプログラム。既存のシステムと予想されるシステムは次のとおりです。

既存のシステム-システムは、特定の条件が満たされていることを確認します。その場合、送信するメッセージを作成してキューに入れます。

キュープロセッサは別のスレッドです。キュー内にアイテムが存在するかどうかを定期的にチェックします。アイテム(メッセージ)が見つかった場合は、メッセージをリモートホスト(ハードコードされたもの)に送信し、アイテムをキューから削除します。

期待されるシステム-これはそのようなものです。メッセージは、特定の条件が満たされたときに作成されますが、いずれの場合も受信者は同じではありません。したがって、多くのアプローチがあります。

  1. メッセージを同じキューに入れますが、受信者IDを使用します。この場合、2番目のスレッドは受信者を識別できるため、メッセージを受信者に送信できます。

  2. 複数のスレッドを持つ。この場合、条件が満たされ、受信者が「新規」の場合、新しいキューが作成され、メッセージがそのキューに入れられます。そして、新しいスレッドが初期化されて、そのキューを処理します。次のメッセージが同じ受信者に送信される場合は同じキューに入れられ、そうでない場合は新しいキューとスレッドが作成されます。

今、私は2番目のものを実装したいと思います。どうすればいいですか?スケルトンで十分であり、キューの作成方法などを心配する必要はありません... :)

更新:私はまた、アプローチ1がそれを行うための最良の方法だと思います。私は糸脱毛に関するいくつかの記事を読み、その決定に至りました。しかし、アプローチ2の実装方法も学ぶことは本当に価値があります。

4

5 に答える 5

4

車輪の再発明ではなく、Javaメッセージサービス(JMS)の使用を検討しますか?

于 2009-04-26T12:02:38.670 に答える
2

BlockingQueueをご覧になることをお勧めしますか?ディスパッチプロセスはこのキュー(put)に書き込むことができ、クライアントはスレッドセーフな方法で取得またはピークすることができます。したがって、キューの実装を作成する必要はまったくありません。

異なるメッセージタイプを含む1つのキューがある場合は、クライアントごとにいくつかのピークタイプのメカニズムを実装する必要があります(つまり、クライアントはキューの先頭をチェックし、キューの先頭のみを取得する必要があります)。効果的に機能するには、消費者はタイムリーかつ堅牢な方法で必要なデータを抽出する必要があります。

メッセージ/コンシューマータイプごとに1つのキュー/スレッドがある場合、それはより簡単で信頼性が高くなります。

クライアントの実装は、単にループする必要があります。

while (!done) {
   Object item = queue.take();
   // process item
}

キューはジェネリックを利用でき、take()ブロックしていることに注意してください。

もちろん、複数のコンシューマーがさまざまなタイプのメッセージを受信する場合は、スペースベースのアーキテクチャを検討することをお勧めします。これにはキュー(FIFO)の特性はありませんが、非常に簡単な方法で複数のコンシューマーを使用できます。

于 2009-04-26T12:02:46.220 に答える
2

エンド マシンが多数あり、それぞれに時折メッセージが送信されるか、少数のエンド マシンがあり、それぞれに頻繁にメッセージが送信されるかは、少し検討する必要があります。

多数のエンド マシンがある場合、エンド マシンごとに文字通り 1 つのスレッドを使用するのは、実際にそれらすべてのマシンにメッセージを常にストリーミングしない限り、少し過剰に聞こえます。特定の境界間でのみ成長するスレッドのプールを持つことをお勧めします。これを行うには、ThreadPoolExecutor を使用できます。メッセージを投稿する必要がある場合は、実際にメッセージを送信するエグゼキューターに実行可能ファイルを送信します。

Executor msgExec = new ThreadPoolExecutor(...);

public void sendMessage(final String machineId, byte[] message) {
  msgExec.execute(new Runnable() {
    public void run() {
      sendMessageNow(machineId, message);
    }
  });
}

private void sendMessageNow(String machineId, byte[] message) {
  // open connection to machine and send message, thinking
  // about the case of two simultaneous messages to a machine,
  // and whether you want to cache connections.
}

エンド マシンが数台しかない場合は、マシンごとに BlockingQueue を配置し、ブロッキング キューごとに次のメッセージを待機するスレッドを配置できます。この場合、パターンは次のようになります (テストされていない頭から離れた日曜日の朝のコードに注意してください)。

ConcurrentHashMap<String,BockingQueue> queuePerMachine;

public void sendMessage(String machineId, byte[] message) {
  BockingQueue<Message> q = queuePerMachine.get(machineId);
  if (q == null) {
    q = new BockingQueue<Message>();
    BockingQueue<Message> prev = queuePerMachine.putIfAbsent(machineId, q);
    if (prev != null) {
      q = prev;
    } else {
      (new QueueProessor(q)).start();
    }
  }
  q.put(new Message(message));
}

private class QueueProessor extends Thread {
  private final BockingQueue<Message> q;
  QueueProessor(BockingQueue<Message> q) {
    this.q = q;
  }
  public void run() {
    Socket s = null;
    for (;;) {
      boolean needTimeOut = (s != null);
      Message m = needTimeOut ?
         q.poll(60000, TimeUnit.MILLISECOND) :
         q.take();
      if (m == null) {
        if (s != null)
          // close s and null
      } else {
        if (s == null) {
          // open s
        }
        // send message down s
      }
    }
    // add appropriate error handling and finally
  }
}

この場合、そのマシンへのメッセージが 60 秒以内に到着しない場合、接続を閉じます。

代わりに JMS を使用する必要がありますか? さて、これがあなたにとって複雑に聞こえるかどうかを検討する必要があります. 私の個人的な感覚では、特別なフレームワークを保証するほど複雑な作業ではありません。しかし、私は意見が異なると確信しています。

PS実際には、これを見て、おそらくキューをスレッドオブジェクト内に配置し、マシンID->スレッドオブジェクトをマップするだけです。とにかく、あなたはアイデアを得る。

于 2009-04-26T14:13:03.753 に答える
2

実際の「エンジン」として java.util.concurrent を使用する VM 内 JMS 実装であるSomnifugiJMSを使用してみてください。

それはおそらくあなたの目的にとってはやややり過ぎでしょうが、追加のプログラミングをほとんどまたはまったく必要とせずにアプリケーションを配布できる可能性があります (該当する場合) 。ActiveMQなどの別の JMS 実装をプラグインするだけで完了です。

于 2009-04-26T14:27:39.960 に答える
1

まず第一に、多くのレシーバーを持つことを計画している場合、私は ONE-THREAD-AND-QUEUE-PER-RECEIVER アプローチを使用しません。ほとんどの場合何もしない多くのスレッドが発生する可能性があり、パフォーマンスが大幅に低下する可能性があります。別の方法としては、ワーカー スレッドのスレッド プールを使用して、共有キューからタスクを選択するだけです。各タスクには独自のレシーバー ID があり、おそらく、作業スレッドが使用する各レシーバーへのソケット接続を備えた共有ディクショナリです。

そうは言っても、あなたがまだあなたのアプローチを追求したいのであれば、あなたができることは次のとおりです:

1) 新しいスレッドの実行を処理する新しいクラスを作成します。

public class Worker implements Runnable {
   private Queue<String> myQueue = new Queue<String>();
   public void run()
   {
       while (true) {
          string messageToProcess = null;
          synchronized (myQueue) {
             if (!myQueue.empty()) {
                 // get your data from queue
                 messageToProcess = myQueue.pop();
             }
          }
          if (messageToProcess != null) {
             // do your stuff
          }
          Thread.sleep(500); // to avoid spinning
       }
   }
   public void queueMessage(String message)
   {
      synchronized(myQueue) {
         myQueue.add(message);
      }
   }
}

2) メイン スレッドでメッセージを作成し、ディクショナリ (ハッシュ テーブル) を使用して、受信者のスレッドが既に作成されているかどうかを確認します。そうであれば、新しいメッセージをキューに入れるだけです。そうでない場合は、新しいスレッドを作成し、ハッシュテーブルに入れ、新しいメッセージをキューに入れます。

while (true) {
   String msg = getNewCreatedMessage(); // you get your messages from here
   int id = getNewCreatedMessageId();   // you get your rec's id from here
   Worker w = myHash(id);
   if (w == null) {   // create new Worker thread
      w = new Worker();
      new Thread(w).start();
   }
   w.queueMessage(msg);
}

幸運を。

編集: このアプローチで言及されているBlockingQueue Brianを使用して、このソリューションを改善できます。

于 2009-04-26T12:14:54.430 に答える