3

リソースをプールする方法を理解するのに苦労しており、スレッドに問題があるのではないかと疑い始めています (100% ではありませんが、実験しています)。私がやろうとしていることの要点は、サーバーへのチャネルのプールを作成し、スレッドがそれらを使用しているかどうかを確認することです。アップロードしているアイテムと同じ数のチャネルを作成することに成功し(つまり、プールせず、各スレッドで新しいチャネルを作成するだけです)、チャネルを1つだけ作成することに成功しました(つまり、プールしたり、新しいチャネルを作成したりしません)必要に応じてチャネル)。

スレッドがプールと対話する方法が問題なのではnewCachedThreadPoolないかと考えていたので、作業がある限りスレッドが死なないように作成しようとしましたが、そうすると、使用されているチャネルが閉じられているというエラーが表示されます。私のプールにはdestroyObjectメソッドがありますが、私はそれを呼び出さないので、なぜそれがトリガーされるのか理解できません (コメントアウトすると機能しますが、1 つのチャネルしか作成せず、アップロードはスレッド化されていない場合と比較して約 300 操作/秒と非常に遅くなります)プール私は30k /秒を取得します)。終了していると思われますが、これを確認する方法はありますか?終了している場合に使用できる代替手段はありますか?

これがコードです(rabbitmqのものはすべて無視してください。結果を監視できるようにするためです):

import org.apache.commons.pool.BasePoolableObjectFactory;
import org.apache.commons.pool.ObjectPool;
import org.apache.commons.pool.PoolableObjectFactory;
import org.apache.commons.pool.impl.GenericObjectPool;

import java.io.IOException;
import java.util.NoSuchElementException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;


import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;

public class PoolExample {

    private static ExecutorService executor_worker;

    static {
        final int numberOfThreads_ThreadPoolExecutor = 20;
        executor_worker = Executors.newCachedThreadPool();
        executor_worker = new ThreadPoolExecutor(numberOfThreads_ThreadPoolExecutor, numberOfThreads_ThreadPoolExecutor, 1000, TimeUnit.SECONDS,
                                           new LinkedBlockingDeque<Runnable>());
    }

    private static ObjectPool<Channel> pool;

    public static void main(String[] args) throws Exception {
        System.out.println("starting..");           
        ObjectPool<Channel> pool =
                new GenericObjectPool<Channel>(
                new ConnectionPoolableObjectFactory(), 50);
        for (int x = 0; x<500000000; x++) {
            executor_worker.submit(new MyRunnable(x, pool));
        }
        //executor_worker.shutdown();
        //pool.close();
    }
}

 class ConnectionPoolableObjectFactory extends BasePoolableObjectFactory<Channel> {
     Channel channel;
     Connection connection;

    public ConnectionPoolableObjectFactory() throws IOException {
        System.out.println("hello world");
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        channel = connection.createChannel(); 
    }

    @Override
    public Channel makeObject() throws Exception {  
        //channel = connection.createChannel(); 
        return channel; 
    }

    @Override
    public boolean validateObject(Channel channel) {
        return channel.isOpen();
    }

    @Override
    public void destroyObject(Channel channel) throws Exception {
        channel.close();
    }

    @Override
    public void passivateObject(Channel channel) throws Exception {
        //System.out.println("sent back to queue");
    }
}

class MyRunnable implements Runnable{  
    protected int x = 0;
    protected ObjectPool<Channel> pool;

    public MyRunnable(int x, ObjectPool<Channel> pool) {
        // TODO Auto-generated constructor stub
        this.x = x;
        this.pool = pool;
    }

    public void run(){
        try {
                Channel channel = pool.borrowObject();
                String message = Integer.toString(x);
                channel.basicPublish( "", "task_queue", 
                        MessageProperties.PERSISTENT_TEXT_PLAIN,
                        message.getBytes());
                pool.returnObject(channel);
        } catch (NoSuchElementException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (IllegalStateException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } 
    }
}

ps 私は基本的にいくつかの質問をし、ドキュメントを読み、これを理解しようとしましたが、途中で完全に間違った方向に進んでいる可能性があるため、問題やヒントがあれば送ってください。

プロットが厚くなります:

メインメソッドの for ループ (作業をスレッドに送信する場所) に次を追加しました。

    Set<Thread> threadSet = Thread.getAllStackTraces().keySet();
    System.out.println(threadSet.size()); //number of threads
    System.out.println(pool.getNumActive());

プールには 25 のスレッド (20 と言いましたが) と 20 のアイテムが表示されます。しかし、rabbitmq の UI を見ると、チャネルが 1 つしかない 1 つの接続が表示されます。チャネルを作成してランナブルに送信すると、多くのチャネルが作成されます (ただし、チャネルは閉じられません)。何が起こっているのか、なぜ結果が期待どおりにならないのか理解できません。

4

1 に答える 1

1

問題は、 ConnectionPoolableObjectFactory が単一の Channel オブジェクトしか作成していないことだと思います。makeObject呼び出されるたびに新しいチャンネルを作成する必要があるようです。

したがって、次のように実装する必要があります。

public class ConnectionPoolableObjectFactory
        extends BasePoolableObjectFactory<Channel> {

    private final Connection connection;

    private ConnectionPoolableObjectFactory() {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        connection = factory.newConnection();
    }

    @Override
    public Channel makeObject() throws Exception {
        return connection.createChannel();
    }

    @Override
    public boolean validateObject(Channel channel) {
        return channel.isOpen();
    }

    @Override
    public void destroyObject(Channel channel) throws Exception {
        channel.close();
    }

    @Override
    public void passivateObject(Channel channel) throws Exception {
        //System.out.println("sent back to queue");
    }
}

これは、各ファクトリが 1 つの接続から複数のチャネルを作成することを前提としています。

于 2012-05-02T18:09:54.230 に答える