0

RabbitMQ を使用して一部のプロデューサーからメッセージを受信し、それらを解析して別の場所に送信するアプリがあります。問題は、アプリが CPU パワーを十分に使用していないことです。CPU の 25% 以下を使用します。プロファイラーからこのスクリーンショットを見てください: ここに画像の説明を入力

処理の大部分が実行されるコードは次のとおりです。

public class Consumer {

    private static final String QUEUE_NAME = "MdnaMessagesQueue";
    private static int i = 1;
    private final ConnectionFactory factory;
    private final boolean autoAck = false;
    private final Connection connection;
    private final Channel channel;
    private String response;
    private Sender sender;
    private Logger LOG = Logger.getLogger(Consumer.class);
    private ExecutorService es;


    public Consumer() throws IOException{
        es = Executors.newFixedThreadPool(8);
        factory = new ConnectionFactory();
        factory.setHost("localhost");
        connection = factory.newConnection(es);
        channel = connection.createChannel();
        sender = new Sender();
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);
        channel.basicQos(25);
        Properties props = new Properties();
        try {
            props.load(new FileInputStream("/home/mikhail/bzrrep/DLP/DLPServer/src/main/java/log4j.properties"));
        } catch (Exception e){
            LOG.error(e);
        }
        PropertyConfigurator.configure(props);
    }

    /**
     * is used to receive messages from the queue
     * @param customerId the id of current customer
     * @throws IOException
     * @throws InterruptedException
     */
    public void receive(final String customerId) throws IOException,InterruptedException{




        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");



        final QueueingConsumer consumer = new QueueingConsumer(channel);
        channel.basicConsume(QUEUE_NAME, autoAck, "myConsumerTag",
                new DefaultConsumer(channel){
                    @Override
                    public void handleDelivery(String consumerTag,
                                               Envelope envelope,
                                               AMQP.BasicProperties properties,
                                               byte[] body)
                            throws IOException{                           
                        BasicProperties replyProperties = new BasicProperties.Builder().correlationId(properties.getCorrelationId()).build();
                        long deliveryTag = envelope.getDeliveryTag();
                        try{
                            LOG.info(" [" +(i++) +"] Received from first queue ");
                            byte[] dataArr = body;
                            MimeParser mimeParser = new MimeParser(true);
                            Filter filter = new Filter();
                            ByteArrayInputStream bais1 = new ByteArrayInputStream(dataArr);
                            MessageInfo mi = mimeParser.parseMessages(bais1);
                            //checking for compliance with rules
                            boolean messageAccepted = filter.getMessageAcceptability(mi.getPlainText(), customerId);
                            response = filter.getResult();
                            if(messageAccepted){
                                //sending to the other queue
                                sender.send(dataArr);
                                channel.basicPublish("", properties.getReplyTo(), replyProperties, response.getBytes());//need to add responce
                            } else {
                                channel.basicPublish("", properties.getReplyTo(), replyProperties, response.getBytes());
                            }
                        } catch (Exception e){
                            LOG.error(e);
                        }finally {
                            channel.basicAck(deliveryTag, false);
                        }

                    }
                });
    }
}

プロファイラーからのスナップショットは次のとおりです。 ここに画像の説明を入力

この問題を解決するには?

4

0 に答える 0