1

プログラムをマルチスレッドにうまく設計する方法がわかりません。基本的に、接続して作業を送信するキューサーバーがありますが、より多くのスレッドを起動してより多くの作業をより速く送信すると、スレッドがブロックされていることに気付きます。

これが私がしていることの例です(私のプログラムでは、共有接続から派生したいくつかのデータとチャネルを送信しています)。

class Send_to_Queue implements Runnable{

    protected String queue_name = null;
    protected Channel channel = null;
    protected byte[] message = 0

   public Send_to_Queue(String queue_name, byte[] message, Channel channel) {
        // TODO Auto-generated constructor stub
        this.queue_name = queue_name;
        this.message   = message;
        this.channel = channel;         
    } 

このチャネルは、起動されているすべてのスレッドによって共有されているスレッドに固有のものではありません。これは、ブロッキングが発生している場所だと思います。私はこれを行うための最良の方法について少し混乱しています。なぜなら ThreadPoolExecutor、その存続期間中、新しいチャネルを作成する方法がわからず、タスクごとに新しいチャネル(一種の高価な)を作成しないからです。作業がない場合はシャットダウンしてもかまいませんが、4つのスレッドと100の作業ユニットがある場合は、100ではなく4回だけ新しいチャネルを確立する必要があります。

サーバーへの新しいチャネル/接続を作成するシナクスは、すべてのインスタンスで確立されていない方法でそれを行う方法を理解していないようです。スレッドに接続を渡し、新しいチャネルを開始させる(下this.channelは毎回新しいチャネルを作成しています)

4

1 に答える 1

1

このチャネルを使用して並行して作業できるようChannelに、が構築されたときに新しいものを作成したいと思います。Threadこれを行うには、いくつかの異なる方法があります。通常、サーバーシナリオでは、サーバーは接続を受け入れ、Channelその接続に対して新しい接続を生成します。Channel次に、それをハンドラーに渡しThreadます。これはソケットでも同じですが、あなたは考えを理解します:

ServerSocket socket = ...
...
while (true) {
    Socket clientSocket = socket.accept();
    new Thread(new MyRunnable(clientSocket)).start();
}
...
public class MyRunnable implements Runnable {
    private Socket clientSocket;
    public MyRunnable(Socket clientSocket) {
        this.clientSocket = clientSocket;
    }
    public void run() {
        while (!done) {
            // use the socket associated with this thread
        }
    }
}

リモートサーバーに接続して作業を行うために一連のスレッドを作成するだけの場合は、ループで、またはExecutorServiceプールの1つを使用してそれを行うことができます。

ExecutorService threadPool = Executors.newFixedThreadPool(100); 
for (int i = 0; i < 100; i++) {
    threadPool.submit(new Runnable() {
        public void run() {
            Channel threadChannel = // create channel here;
            while (!done) {
                // use the per-thread channel
            }
        }
    });
}

もう1つの一般的なパターンは、スレッドにSocketまたはChannelを使用させて一連のユニットで作業させることです。BlockingQueueその目的でを使用できます。

BlockingQueue<WorkUnit> workQueue = new LinkedBlockingQueue<WorkUnit>();
...
// add work units to the work queue
for (int i = 0; i < 1000; i++) {
    // add work to the queue
    workQueue.put(new WorkUnit(i));
}

// the MyRunnable above can then be modified like this:
    ...
    public void run() {
        while (!done) {
            WorkUnit workUnit = workQueue.take();
            // use MyRunnable socket or channel and do the WorkUnit
        }
    }
于 2012-04-28T16:04:06.160 に答える