0

最初に私のコードを見てください。

これは、 2000 のスレッドを作成し、それらのスレッドがメッセージを送信している私のテスト クラスです。

public class MessageSenderMultipleThreadMock {
    @Autowired
    MessageList message;
    @Autowired
    MessageSender sender;

    public boolean process() throws InterruptedException {

        for (int i = 0; i < 2000; i++) {
            new Thread(new Runnable() {

                @Override
                public void run() {

                    String routingkey = "operation"
                            + UUID.randomUUID().toString();
                    String queueName = UUID.randomUUID().toString();

                    message.setSender(Thread.currentThread().getName());
                    try {
                        sender.sendMessage(routingkey, queueName,
                                "this is message");
                    } catch (InvalidMessagingParameters e) {
                        e.printStackTrace();
                    }

                }
            }).start();
            Thread.sleep(1000);

        }
        Thread.currentThread();
        Thread.sleep(10000);
        return true;
    }
}

メッセージ送信者

これは私のメインのメッセージ送信者クラスです

    @Service
public class MessageSender {

    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Autowired
    private MessageList message;
    String queueName = "";
    String routingKey = "";
    @Autowired
    private QueueCreationService service;
    private boolean messageSentFlag;
    String returnedMessage = "";
    private Logger log = LoggerFactory.getLogger(MessageSender.class.getName());

    public boolean sendMessage(String routingKey, String queueName,
            String messageToBeSent) throws InvalidMessagingParameters {
        if ((routingKey == null && queueName == null)
                || (routingKey.equalsIgnoreCase("") || queueName
                        .equalsIgnoreCase("")))
            throw new InvalidMessagingParameters(routingKey, queueName);

        else {
            this.routingKey = routingKey;
            this.queueName = queueName;
        }
        service.processBinding(queueName, routingKey);
        message.addMessages(messageToBeSent);
        return execute();
    }

    /*
     * overloaded sendMessage method will use requestMap . RequestMap includes
     * queueName and routingKey that controller provides.
     */
    public boolean sendMessage(Map<String, String> requestMap)
            throws MessagingConnectionFailsException,
            InvalidMessagingParameters {
        this.queueName = requestMap.get("queue");
        this.routingKey = requestMap.get("routingkey");
        if ((routingKey == null && queueName == null)
                || (routingKey.equalsIgnoreCase("") || queueName
                        .equalsIgnoreCase("")))
            throw new InvalidMessagingParameters(routingKey, queueName);
        service.processBinding(queueName, routingKey);
        preparingMessagingTemplate();
        return execute();
    }

    private boolean execute() {
        for (int i = 0; i < 5 && !messageSentFlag; i++) {
            executeMessageSending();
        }
        return messageSentFlag;
    }

    private String convertMessageToJson(MessageList message) {
        ObjectWriter ow = new ObjectMapper().writer()
                .withDefaultPrettyPrinter();
        String json = "";
        try {
            json = ow.writeValueAsString(message);
        } catch (JsonProcessingException e) {
            e.printStackTrace();
        }
        return json;
    }

    private void executeMessageSending() {
        rabbitTemplate.convertAndSend(R.EXCHANGE_NAME, routingKey,
                convertMessageToJson(message), new CorrelationData(UUID
                        .randomUUID().toString()));

    }

    private void preparingMessagingTemplate() {
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setReturnCallback(new ReturnCallback() {
            @Override
            public void returnedMessage(Message message, int replyCode,
                    String replyText, String exchange, String routingKey) {
                returnedMessage = replyText;
            }
        });
        rabbitTemplate.setConfirmCallback(new ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean ack,
                    String cause) {
                System.out.println("*" + ack);

                if (ack && !returnedMessage.equalsIgnoreCase("NO_ROUTE")) {
                    messageSentFlag = ack;
                    log.info("message " + message.toString()
                            + " from Operation +" + this.getClass().getName()
                            + "+  has been successfully delivered");
                } else {
                    log.info("message " + message.toString()
                            + " from Operation +" + this.getClass().getName()
                            + "+ has not been delivered");

                }
            }
        });
    }
}

メッセージングで使用される構成クラス

    @Configuration
    @ComponentScan("com.alpharaid.orange.*")
    @PropertySource("classpath:application.properties")

public class MessageConfiguration {

    String content = "";
    @Value("${rabbitmq_host}")
    String host = "";
    String port = "";
    @Value("${rabbitmq_username}")
    String userName = "";
    @Value("${rabbitmq_password}")
    String password = "";
    String queueName = "";
    InputStream input = null;

    @Autowired
    public MessageConfiguration() {
    }

    @Bean
    @Scope("prototype")
    public RabbitTemplate rabbitTemplate() {
        RabbitTemplate template = new RabbitTemplate(connectionFactory());
        return template;
    }

    @Bean
    @Scope("prototype")
    public QueueCreationService service() {
        return new QueueCreationService();
    }

    @Bean
    @Scope("prototype")
    public RabbitAdmin admin() {
        return new RabbitAdmin(connectionFactory());
    }

    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory(
                this.host);
        connectionFactory.setUsername(userName);
        connectionFactory.setPassword(password);
        connectionFactory.setPublisherConfirms(true);
        connectionFactory.setPublisherReturns(true);
        return connectionFactory;
    }

}

私の問題:

  1. サーバーでわかるように、一部のスレッドはメッセージを正常に配信しており、他のスレッドは配信していません。

  2. rabbitTemplate リスナーの確実性はまったくありません(

    rabbitTemplate.setReturnCallback(新しい ReturnCallback() {

毎回動作するリスナーが必要です。これに基づいてメッセージを再度送信しようとするからです

    private boolean execute() {
    for (int i = 0; i < 5 && !messageSentFlag; i++) {
        executeMessageSending();
    }
    return messageSentFlag;
}

messageSentFlagが false で、Confirm リスナーでのみ true になるため、メッセージが 5 回配信されることがあります。

  1. キューを削除する方法を教えてください。8000 個あるので、rabbitAdmin でキューを削除する方法を 1 つ見ましたが、キューの名前が必要で、私のキューは任意のランダム キュー (UUID) です。

どうすれば改善できますか、または回避策はありますか? 私のアプリケーションでは、マルチスレッド環境が必須です。

前もって感謝します。

4

1 に答える 1

2

RabbitMQ は、メッセージが特定のキューにある場合にのみ、メッセージの順序を保証します。

保証を設定しない限り、RabbitMQ にメッセージを送信するためのメッセージ順序の保証はありません。これは、多くの状況、特にあなたのようなマルチスレッド環境では、不可能ではないにしても難しいことです。

メッセージが特定の順序で処理されることを保証する必要がある場合は、リシーケンサーの構築または使用を検討する必要があります

一般的な考え方は、ソースでメッセージに 1、2、3、4、5 などの番号を付ける必要があるということです。コンシューマーがメッセージをキューから取り出すときは、メッセージ番号を見て、これがあなたが今必要としているもの。そうでない場合は、メッセージを保留して後で処理します。現在探しているメッセージ # を取得したら、現在保持しているすべてのメッセージを順番に処理します。

春にはリシーケンサーのようなものが利用できるはずですが、私はそのエコシステムに十分に精通しておらず、正しい方向に導くことができません。

于 2015-08-14T12:36:58.227 に答える