2

以下のようなキュー構成があります

 @Bean
   public ConnectionFactory connectionFactory() {
   CachingConnectionFactory connectionFactory = new            CachingConnectionFactory(hostName);
    connectionFactory.setUsername(mqUsername);
    connectionFactory.setPassword(mqPassword);
    connectionFactory.setVirtualHost(virtualHost);
    return connectionFactory;
   }

   @Bean
   RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
    RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
    rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
    return rabbitTemplate;
   }

  @Bean
  public AmqpAdmin amqpAdmin() {
    RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory());
    return rabbitAdmin;
  }

そして、私は次のような非同期設定を持っています

@EnableAsync
@Configuration
public class AsyncConfiguration implements AsyncConfigurer {

    @Override
    public Executor getAsyncExecutor() {
        return taskExector();
    }

    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        return new SimpleAsyncUncaughtExceptionHandler();
    }

    @Bean
    public ThreadPoolTaskExecutor taskExector() {
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        taskExecutor.setCorePoolSize(10);
        taskExecutor.setMaxPoolSize(10);
        taskExecutor.initialize();
        return taskExecutor;
    }

}       

私の非同期メソッドでは、amqp admin と rabbit テンプレート Bean を使用しています。したがって、構成ごとに、最大実行タスクで 10 個のスレッドがあり、しばらくしてアプリケーションがハングし、以下の情報で見つけたアクチュエーターを使用してダンプを取得していることがわかりました。このアプローチや、複数のスレッドがこれらの rabbit mq Bean にアクセスできることを確認する方法に何か問題がありますか。

バージョン: スプリング ブート 1.4.0.RELEASE、Java 8。

私のサービスはこのようなものです

@Service
public class QDispatcherService implements DispatcherService {
    private final Logger logger = LoggerFactory.getLogger(this.getClass());

    @Autowired
    private AmqpAdmin amqpAdmin;


    @Autowired
    RabbitTemplate rabbitTemplate;

    @Override
    public void sendData(Data dataObject) throws Exception {

        try {
            //something on this properties , I have to check if queue exist or there are messages in it to decide to add message in other queue
            Properties properties = amqpAdmin.getQueueProperties(queueName);
            amqpAdmin.declareQueue(new Queue(queueName));
             logger.info("***********************DEBUG 4***********************");
            rabbitTemplate.convertAndSend(queueName, dataObject);

        } catch (Exception ex) {
            ex.printStackTrace();
        }
    }

}

{
    "threadName": "taskExector-10",
    "threadId": 77,
    "blockedTime": -1,
    "blockedCount": 317,
    "waitedTime": -1,
    "waitedCount": 379,
    "lockName": "com.rabbitmq.utility.BlockingValueOrException@105f30b9",
    "lockOwnerId": -1,
    "lockOwnerName": null,
    "inNative": false,
    "suspended": false,
    "threadState": "WAITING",
    "stackTrace": [
      {
        "methodName": "wait",
        "fileName": "Object.java",
        "lineNumber": -2,
        "className": "java.lang.Object",
        "nativeMethod": true
      },
      {
        "methodName": "wait",
        "fileName": "Object.java",
        "lineNumber": 502,
        "className": "java.lang.Object",
        "nativeMethod": false
      },
      {
        "methodName": "get",
        "fileName": "BlockingCell.java",
        "lineNumber": 50,
        "className": "com.rabbitmq.utility.BlockingCell",
        "nativeMethod": false
      },
      {
        "methodName": "uninterruptibleGet",
        "fileName": "BlockingCell.java",
        "lineNumber": 89,
        "className": "com.rabbitmq.utility.BlockingCell",
        "nativeMethod": false
      },
      {
        "methodName": "uninterruptibleGetValue",
        "fileName": "BlockingValueOrException.java",
        "lineNumber": 33,
        "className": "com.rabbitmq.utility.BlockingValueOrException",
        "nativeMethod": false
      },
      {
        "methodName": "getReply",
        "fileName": "AMQChannel.java",
        "lineNumber": 361,
        "className": "com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation",
        "nativeMethod": false
      },
      {
        "methodName": "privateRpc",
        "fileName": "AMQChannel.java",
        "lineNumber": 226,
        "className": "com.rabbitmq.client.impl.AMQChannel",
        "nativeMethod": false
      },
      {
        "methodName": "exnWrappingRpc",
        "fileName": "AMQChannel.java",
        "lineNumber": 118,
        "className": "com.rabbitmq.client.impl.AMQChannel",
        "nativeMethod": false
      },
      {
        "methodName": "queueDeclare",
        "fileName": "ChannelN.java",
        "lineNumber": 844,
        "className": "com.rabbitmq.client.impl.ChannelN",
        "nativeMethod": false
      },
      {
        "methodName": "queueDeclare",
        "fileName": "ChannelN.java",
        "lineNumber": 61,
        "className": "com.rabbitmq.client.impl.ChannelN",
        "nativeMethod": false
      },
      {
        "methodName": "invoke",
        "fileName": null,
        "lineNumber": -1,
        "className": "sun.reflect.GeneratedMethodAccessor176",
        "nativeMethod": false
      },
      {
        "methodName": "invoke",
        "fileName": "DelegatingMethodAccessorImpl.java",
        "lineNumber": 43,
        "className": "sun.reflect.DelegatingMethodAccessorImpl",
        "nativeMethod": false
      },
      {
        "methodName": "invoke",
        "fileName": "Method.java",
        "lineNumber": 498,
        "className": "java.lang.reflect.Method",
        "nativeMethod": false
      },
      {
        "methodName": "invoke",
        "fileName": "CachingConnectionFactory.java",
        "lineNumber": 916,
        "className": "org.springframework.amqp.rabbit.connection.CachingConnectionFactory$CachedChannelInvocationHandler",
        "nativeMethod": false
      },
      {
        "methodName": "queueDeclare",
        "fileName": null,
        "lineNumber": -1,
        "className": "com.sun.proxy.$Proxy166",
        "nativeMethod": false
      },
      {
        "methodName": "declareQueues",
        "fileName": "RabbitAdmin.java",
        "lineNumber": 577,
        "className": "org.springframework.amqp.rabbit.core.RabbitAdmin",
        "nativeMethod": false
      },
      {
        "methodName": "access$200",
        "fileName": "RabbitAdmin.java",
        "lineNumber": 67,
        "className": "org.springframework.amqp.rabbit.core.RabbitAdmin",
        "nativeMethod": false
      },
      {
        "methodName": "doInRabbit",
        "fileName": "RabbitAdmin.java",
        "lineNumber": 209,
        "className": "org.springframework.amqp.rabbit.core.RabbitAdmin$3",
        "nativeMethod": false
      },
      {
        "methodName": "doInRabbit",
        "fileName": "RabbitAdmin.java",
        "lineNumber": 206,
        "className": "org.springframework.amqp.rabbit.core.RabbitAdmin$3",
        "nativeMethod": false
      },
      {
        "methodName": "doExecute",
        "fileName": "RabbitTemplate.java",
        "lineNumber": 1394,
        "className": "org.springframework.amqp.rabbit.core.RabbitTemplate",
        "nativeMethod": false
      },
      {
        "methodName": "execute",
        "fileName": "RabbitTemplate.java",
        "lineNumber": 1367,
        "className": "org.springframework.amqp.rabbit.core.RabbitTemplate",
        "nativeMethod": false
      },
      {
        "methodName": "execute",
        "fileName": "RabbitTemplate.java",
        "lineNumber": 1343,
        "className": "org.springframework.amqp.rabbit.core.RabbitTemplate",
        "nativeMethod": false
      },
      {
        "methodName": "declareQueue",
        "fileName": "RabbitAdmin.java",
        "lineNumber": 206,
        "className": "org.springframework.amqp.rabbit.core.RabbitAdmin",
        "nativeMethod": false
      },
      {
        "methodName": "sendData",
        "fileName": "QDispatcherService.java",
        "lineNumber": 59,
        "className": "com.mycompany.QDispatcherService",
        "nativeMethod": false
      },

      ....

       "lockedMonitors": [
      {
        "className": "java.lang.Object",
        "identityHashCode": 285810320,
        "lockedStackFrame": {
          "methodName": "invoke",
          "fileName": "CachingConnectionFactory.java",
          "lineNumber": 916,
          "className": "org.springframework.amqp.rabbit.connection.CachingConnectionFactory$CachedChannelInvocationHandler",
          "nativeMethod": false
        },
        "lockedStackDepth": 13
      }
    ],
    "lockedSynchronizers": [
      {
        "className": "java.util.concurrent.ThreadPoolExecutor$Worker",
        "identityHashCode": 372417558
      }
    ],
    "lockInfo": {
      "className": "com.rabbitmq.utility.BlockingValueOrException",
      "identityHashCode": 274673849
    }
  },

________________________________________________________________________________________

新しいトレース

{
"threadName": "taskExector-10",
"threadId": 77,
"blockedTime": -1,
"blockedCount": 37,
"waitedTime": -1,
"waitedCount": 1113,
"lockName": "java.io.DataOutputStream@33111fc",
"lockOwnerId": 65,
"lockOwnerName": "taskExector-8",
"inNative": false,
"suspended": false,
"threadState": "BLOCKED",
"stackTrace": [
  {
    "methodName": "writeFrame",
    "fileName": "SocketFrameHandler.java",
    "lineNumber": 170,
    "className": "com.rabbitmq.client.impl.SocketFrameHandler",
    "nativeMethod": false
  },
  {
    "methodName": "writeFrame",
    "fileName": "AMQConnection.java",
    "lineNumber": 542,
    "className": "com.rabbitmq.client.impl.AMQConnection",
    "nativeMethod": false
  },
  {
    "methodName": "transmit",
    "fileName": "AMQCommand.java",
    "lineNumber": 104,
    "className": "com.rabbitmq.client.impl.AMQCommand",
    "nativeMethod": false
  },
  {
    "methodName": "quiescingTransmit",
    "fileName": "AMQChannel.java",
    "lineNumber": 337,
    "className": "com.rabbitmq.client.impl.AMQChannel",
    "nativeMethod": false
  },
  {
    "methodName": "transmit",
    "fileName": "AMQChannel.java",
    "lineNumber": 313,
    "className": "com.rabbitmq.client.impl.AMQChannel",
    "nativeMethod": false
  },
  {
    "methodName": "basicPublish",
    "fileName": "ChannelN.java",
    "lineNumber": 686,
    "className": "com.rabbitmq.client.impl.ChannelN",
    "nativeMethod": false
  },
  {
    "methodName": "basicPublish",
    "fileName": "ChannelN.java",
    "lineNumber": 668,
    "className": "com.rabbitmq.client.impl.ChannelN",
    "nativeMethod": false
  },
  {
    "methodName": "invoke",
    "fileName": null,
    "lineNumber": -1,
    "className": "sun.reflect.GeneratedMethodAccessor176",
    "nativeMethod": false
  },
  {
    "methodName": "invoke",
    "fileName": "DelegatingMethodAccessorImpl.java",
    "lineNumber": 43,
    "className": "sun.reflect.DelegatingMethodAccessorImpl",
    "nativeMethod": false
  },
  {
    "methodName": "invoke",
    "fileName": "Method.java",
    "lineNumber": 498,
    "className": "java.lang.reflect.Method",
    "nativeMethod": false
  },
  {
    "methodName": "invoke",
    "fileName": "CachingConnectionFactory.java",
    "lineNumber": 916,
    "className": "org.springframework.amqp.rabbit.connection.CachingConnectionFactory$CachedChannelInvocationHandler",
    "nativeMethod": false
  },
  {
    "methodName": "basicPublish",
    "fileName": null,
    "lineNumber": -1,
    "className": "com.sun.proxy.$Proxy166",
    "nativeMethod": false
  },
  {
    "methodName": "doSend",
    "fileName": "RabbitTemplate.java",
    "lineNumber": 1451,
    "className": "org.springframework.amqp.rabbit.core.RabbitTemplate",
    "nativeMethod": false
  },
  {
    "methodName": "doInRabbit",
    "fileName": "RabbitTemplate.java",
    "lineNumber": 703,
    "className": "org.springframework.amqp.rabbit.core.RabbitTemplate$3",
    "nativeMethod": false
  },
  {
    "methodName": "doExecute",
    "fileName": "RabbitTemplate.java",
    "lineNumber": 1394,
    "className": "org.springframework.amqp.rabbit.core.RabbitTemplate",
    "nativeMethod": false
  },
  {
    "methodName": "execute",
    "fileName": "RabbitTemplate.java",
    "lineNumber": 1367,
    "className": "org.springframework.amqp.rabbit.core.RabbitTemplate",
    "nativeMethod": false
  },
  {
    "methodName": "send",
    "fileName": "RabbitTemplate.java",
    "lineNumber": 699,
    "className": "org.springframework.amqp.rabbit.core.RabbitTemplate",
    "nativeMethod": false
  },
  {
    "methodName": "convertAndSend",
    "fileName": "RabbitTemplate.java",
    "lineNumber": 767,
    "className": "org.springframework.amqp.rabbit.core.RabbitTemplate",
    "nativeMethod": false
  },
  {
    "methodName": "convertAndSend",
    "fileName": "RabbitTemplate.java",
    "lineNumber": 754,
    "className": "org.springframework.amqp.rabbit.core.RabbitTemplate",
    "nativeMethod": false
  }
4

1 に答える 1

0

送信のたびにキューを宣言するのは少し変わっています。とはいえ、非効率性は別として、それは機能するはずです。スタック トレースは、キュー宣言の応答を待機しているウサギ クライアントでスタックしていることを示しています。複数のスレッドが同じチャネルを使用している場合にこれを見てきましたが、現在の操作 (管理者、テンプレート) が完了するとチャネルが常にキャッシュに返され、複数のスレッドで使用できないため、ここでは発生しないはずです。

新しい 4.0.0.RC1 amqp-client jar を試すことをお勧めします。ログが追加されているためです (3.xx クライアントでは使用できません)。物事を追跡するのに役立つかもしれません。

于 2016-11-11T13:41:40.603 に答える