4

全て

最新の Redis 2.4.16 をインストールし、その Pub/Sub システムを Java で使用しようとしています。私は毎秒チャネルにメッセージを入れています。パブリッシャーには問題はありませんが、サブスクライバーはメッセージでクラッシュします

例外:

redis.clients.jedis.exceptions.JedisDataException: ERR only (P)SUBSCRIBE / (P)UNSUBSCRIBE / QUIT allowed in this context
at redis.clients.jedis.Protocol.processError(Protocol.java:59)
at redis.clients.jedis.Protocol.process(Protocol.java:66)
at redis.clients.jedis.Protocol.read(Protocol.java:131)
at redis.clients.jedis.Connection.getObjectMultiBulkReply(Connection.java:206)
at redis.clients.jedis.JedisPubSub.process(JedisPubSub.java:88)
at redis.clients.jedis.JedisPubSub.proceed(JedisPubSub.java:83)
at redis.clients.jedis.Jedis.subscribe(Jedis.java:1971)
at com.jedis.test.JedisSub$1.run(JedisSub.java:22)
at java.lang.Thread.run(Thread.java:680)

ここに私のコードがあります:

出版社:

        final Jedis jedis = new Jedis("localhost");

        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(10);
        newFixedThreadPool.submit(new Runnable() {

            @Override
            public void run() {
                while(true){
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    jedis.publish("CC", new Date().toString());
                }

            }
        });

加入者:

        JedisPool jedisPool = new JedisPool(poolConfig,"localhost", 6379, 100);
        final Jedis subscriberJedis = jedisPool.getResource();


        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    subscriberJedis.subscribe(new JedisPubSub() …..,"CC");
                } catch (Exception e) {
                   e.printStackTrace();
                }
            }
        }).start();

        jedisPool.returnResource(subscriberJedis);

プール構成:

    JedisPoolConfig poolConfig = new JedisPoolConfig();
    poolConfig.maxActive = 10;
    poolConfig.maxIdle = 5;
    poolConfig.minIdle = 1;
    poolConfig.testOnBorrow = true;
    poolConfig.numTestsPerEvictionRun = 10;
    poolConfig.timeBetweenEvictionRunsMillis = 60000;
    poolConfig.maxWait = 3000;
    poolConfig.whenExhaustedAction = org.apache.commons.pool.impl.GenericObjectPool.WHEN_EXHAUSTED_FAIL;

Redis のインストールには、単純にコマンドを使用しました

make PREFIX=/Users/ggg/dev/dist/redis/ install

この後、私は使用しませんでした./install_server.sh

Jedis のバージョンは 2.1.0、プラットフォームは Mac OS X です。

注: 私が気付いたのは、開始から約 30 秒後にサブスクライバーがクラッシュすることです。

4

1 に答える 1

9

パブリッシャーとサブスクライバーの両方のコードは、独自の方法で間違っています。

このエラーは、パブリッシャーとサブスクライバーの間で Redis 接続を共有できないという事実から発生します。実際には、パブリッシャー用の接続 (または接続のプール) と、サブスクライバー スレッド用の専用接続が 1 つだけ必要です。通常は、プロセスごとに 1 つのサブスクライバー スレッドを実行するだけで十分です。

ここでは、サブスクライバー スレッドが完了する前に、subscriberJedis 接続をプールに返すのが早すぎるため、接続が共有されます。

パブリッシャーで:

10 個のスレッドのプールがあるため、これらのスレッド間で一意の接続を共有しないでください。これは接続プールを使用するのに最適な場所であり、接続は各スレッドで取得および解放する必要があります。

    // This should be a global singleton
    JedisPool jedisPool = new JedisPool(poolConfig,"localhost", 6379, 100);

    ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(10);
    newFixedThreadPool.submit(new Runnable() {

        @Override
        public void run() {
            while(true){
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                Jedis jedis = jedisPool.getResource();
                try {
                   jedis.publish("CC", new Date().toString());
                } catch (Exception e) {
                   e.printStackTrace();
                } finally {
                   jedisPool.returnResource(jedis);
                }
            }

        }
    });

サブスクライバーで:

サブスクライバーでは、専用の接続が必要です。

    new Thread(new Runnable() {
        @Override
        public void run() {
            Jedis subscriberJedis = new Jedis("localhost");
            try {
                subscriberJedis.subscribe(new JedisPubSub() …..,"CC");
            } catch (Exception e) {
               e.printStackTrace();
            }
        }
    }).start();

異なるチャネルまたはパターンにサブスクライブする必要がある場合は、同じスレッドで同じ接続に対して他のサブスクリプションを設定することをお勧めします。

于 2012-08-25T08:02:44.220 に答える