1

サーバーへのチャネルを開閉する必要がないため、スループットを向上させるために接続をプールする方法を学習しようとしていますが、サーバーを機能させることができないようです。スレッドをフォークし、各スレッドにループを実行させてデータをダンプしたときに機能するコードのわずかに変更されたバージョンがありましたが、現在はThreadPoolExecutor、単一のスレッドを介してジョブを送信し、処理を処理するために2つのスレッドを生成するために使用しようとしています作品。私の実験では、いつでも(または私が持っているのと同じ数のスレッドで)開いている約2つのチャネルが表示されるはずですが、代わりにコードを変更すると、illegalstateexception: pool not open

プールの設計が間違っているか、ThreadPoolExecutorの理解に欠陥があると、本当に混乱します。ThreadPoolExecutorについての私の理解は、実行する作業がある場合にスレッドを存続させ、反復ごとにスレッドを強制終了/再生成し続けなかったということでした。

これがコードです(rabbitmqのものはすべて無視できます。その要点は、サーバーへの接続を開いてからチャネルを開く必要があるということです。サーバーへの接続を1つ開いてから、チャネルのプールを開こうとしています。共有)。私のアイデアは、objectpoolクラスのインスタンスを作成し、それをborrows必要に応じてチャネルするランナブルに渡すことでした。

コード:

import org.apache.commons.pool.BasePoolableObjectFactory;
import org.apache.commons.pool.ObjectPool;
import org.apache.commons.pool.PoolableObjectFactory;
import org.apache.commons.pool.impl.GenericObjectPool;

import java.io.IOException;
import java.util.NoSuchElementException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;


import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;

public class PoolExample {

    private static ExecutorService executor_worker;
    static {
        final int numberOfThreads_ThreadPoolExecutor = 2;
        executor_worker =
            new ThreadPoolExecutor(numberOfThreads_ThreadPoolExecutor, numberOfThreads_ThreadPoolExecutor, 1000, TimeUnit.SECONDS,
                                   new LinkedBlockingDeque<Runnable>());   
    }

    public static void main(String[] args) throws Exception {
        System.out.println("starting..");       
        PoolableObjectFactory<MyPooledObject> factory = new MyPoolableObjectFactory();
        ObjectPool<MyPooledObject> pool = new GenericObjectPool<MyPooledObject>(factory);
        for (int x = 0; x<500000000; x++) {
            executor_worker.submit(new Thread(new MyRunnable(x, pool)));
        }
    }
}

class MyPooledObject {
    //Connection connection;
    Channel channel;
    public MyPooledObject() throws IOException {
        System.out.println("hello world");
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        channel = connection.createChannel(); 
    }

    public Channel sing() throws IOException {
        //System.out.println("mary had a little lamb");
        return channel;
    }

    public void destroy() {
        System.out.println("goodbye cruel world");      
    }
}

class MyPoolableObjectFactory extends BasePoolableObjectFactory<MyPooledObject> {
    @Override
    public MyPooledObject makeObject() throws Exception {
        return new MyPooledObject();
    }
    @Override
    public void destroyObject(MyPooledObject obj) throws Exception {
        obj.destroy();
    }
}

class MyRunnable implements Runnable{  
    protected int x = 0;
    protected ObjectPool<MyPooledObject> pool = null;

    public MyRunnable(int x, ObjectPool<MyPooledObject> pool) {
        // TODO Auto-generated constructor stub
        this.x = x;
        this.pool = pool;
    }

    public void run(){
        try {
                MyPooledObject obj;
                obj = pool.borrowObject();
                Channel channel = obj.sing();
                String message = Integer.toString(x);
                channel.basicPublish( "", "task_queue", 
                        MessageProperties.PERSISTENT_TEXT_PLAIN,
                        message.getBytes());
                pool.returnObject(obj);
        } catch (NoSuchElementException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (IllegalStateException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } finally {
            try {
                pool.close();
            } catch (Exception e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    }
}

私のデザインの何が問題になっているのでしょうか?または、オブジェクトをプールするための私のアプローチ全体に欠陥がありますか?

UPDATE1:リクエストに応じて、ここにスタックトレースがあります(これらの多くを継続的に取得します):

スタックトレース:

java.lang.IllegalStateException: Pool not open
    at org.apache.commons.pool.BaseObjectPool.assertOpen(BaseObjectPool.java:140)
    at org.apache.commons.pool.impl.GenericObjectPool.borrowObject(GenericObjectPool.java:1079)
    at MyRunnable.run(PoolExample.java:85)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
    at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
    at java.util.concurrent.FutureTask.run(FutureTask.java:138)
    at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
    at java.lang.Thread.run(Thread.java:680)

それが役立つ場合は、私のコードの85行目(このエラーがトリガーされる場所は次のとおりです):obj = pool.borrowObject();

UPDATE2:非常に奇妙です。エラーが発生しましたが、キューに2つのアイテムが書き込まれます。野生のガチョウの追跡に誰かを送りたくないのですが、オブジェクトを作成するときは正常に借用できますが、プールに戻されるときは借用できないということだと思いますか?

UPDATE3:上記の中間ステップを通過しないようにコードを設計しました。エラーは発生しなくなりましたが、基本的には何も起こりません。10スレッドを起動し、10チャネルを期待していますが、数秒間は1チャネルしか取得せず、その後もオフになります。コード:

import org.apache.commons.pool.BasePoolableObjectFactory;
import org.apache.commons.pool.ObjectPool;
import org.apache.commons.pool.PoolableObjectFactory;
import org.apache.commons.pool.impl.GenericObjectPool;

import java.io.IOException;
import java.util.NoSuchElementException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;


import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;

public class PoolExample {

    private static ExecutorService executor_worker;
    static {
        final int numberOfThreads_ThreadPoolExecutor = 20;
        executor_worker =
            new ThreadPoolExecutor(numberOfThreads_ThreadPoolExecutor, numberOfThreads_ThreadPoolExecutor, 1000, TimeUnit.SECONDS,
                                   new LinkedBlockingDeque<Runnable>());   
    }

    public static void main(String[] args) throws Exception {
        System.out.println("starting..");           
        ObjectPool<Channel> pool =
                new GenericObjectPool<Channel>(
                new ConnectionPoolableObjectFactory(), 5);

        for (int x = 0; x<500000000; x++) {
            executor_worker.submit(new MyRunnable(x, pool));
        }
        executor_worker.shutdown();
        pool.close();
    }
}

 class ConnectionPoolableObjectFactory extends BasePoolableObjectFactory<Channel> {
     Channel channel;

    public ConnectionPoolableObjectFactory() throws IOException {
        System.out.println("hello world");
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        channel = connection.createChannel(); 
    }

    @Override
    public Channel makeObject() throws Exception {                
        return channel;
    }

    @Override
    public boolean validateObject(Channel channel) {
        return channel.isOpen();
    }

    @Override
    public void destroyObject(Channel channel) throws Exception {
        channel.close();
    }

    @Override
    public void passivateObject(Channel channel) throws Exception {
        //System.out.println("sent back to queue");
    }
}

class MyRunnable implements Runnable{  
    protected int x = 0;
    protected ObjectPool<Channel> pool;

    public MyRunnable(int x, ObjectPool<Channel> pool) {
        // TODO Auto-generated constructor stub
        this.x = x;
        this.pool = pool;
    }

    public void run(){
        try {
                Channel channel = pool.borrowObject();
                String message = Integer.toString(x);
                channel.basicPublish( "", "task_queue", 
                        MessageProperties.PERSISTENT_TEXT_PLAIN,
                        message.getBytes());
                pool.returnObject(channel);
        } catch (NoSuchElementException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (IllegalStateException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } 
    }
}
4

0 に答える 0