最初に私のコードを見てください。
これは、 2000 のスレッドを作成し、それらのスレッドがメッセージを送信している私のテスト クラスです。
public class MessageSenderMultipleThreadMock {
@Autowired
MessageList message;
@Autowired
MessageSender sender;
public boolean process() throws InterruptedException {
for (int i = 0; i < 2000; i++) {
new Thread(new Runnable() {
@Override
public void run() {
String routingkey = "operation"
+ UUID.randomUUID().toString();
String queueName = UUID.randomUUID().toString();
message.setSender(Thread.currentThread().getName());
try {
sender.sendMessage(routingkey, queueName,
"this is message");
} catch (InvalidMessagingParameters e) {
e.printStackTrace();
}
}
}).start();
Thread.sleep(1000);
}
Thread.currentThread();
Thread.sleep(10000);
return true;
}
}
メッセージ送信者
これは私のメインのメッセージ送信者クラスです
@Service
public class MessageSender {
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private MessageList message;
String queueName = "";
String routingKey = "";
@Autowired
private QueueCreationService service;
private boolean messageSentFlag;
String returnedMessage = "";
private Logger log = LoggerFactory.getLogger(MessageSender.class.getName());
public boolean sendMessage(String routingKey, String queueName,
String messageToBeSent) throws InvalidMessagingParameters {
if ((routingKey == null && queueName == null)
|| (routingKey.equalsIgnoreCase("") || queueName
.equalsIgnoreCase("")))
throw new InvalidMessagingParameters(routingKey, queueName);
else {
this.routingKey = routingKey;
this.queueName = queueName;
}
service.processBinding(queueName, routingKey);
message.addMessages(messageToBeSent);
return execute();
}
/*
* overloaded sendMessage method will use requestMap . RequestMap includes
* queueName and routingKey that controller provides.
*/
public boolean sendMessage(Map<String, String> requestMap)
throws MessagingConnectionFailsException,
InvalidMessagingParameters {
this.queueName = requestMap.get("queue");
this.routingKey = requestMap.get("routingkey");
if ((routingKey == null && queueName == null)
|| (routingKey.equalsIgnoreCase("") || queueName
.equalsIgnoreCase("")))
throw new InvalidMessagingParameters(routingKey, queueName);
service.processBinding(queueName, routingKey);
preparingMessagingTemplate();
return execute();
}
private boolean execute() {
for (int i = 0; i < 5 && !messageSentFlag; i++) {
executeMessageSending();
}
return messageSentFlag;
}
private String convertMessageToJson(MessageList message) {
ObjectWriter ow = new ObjectMapper().writer()
.withDefaultPrettyPrinter();
String json = "";
try {
json = ow.writeValueAsString(message);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
return json;
}
private void executeMessageSending() {
rabbitTemplate.convertAndSend(R.EXCHANGE_NAME, routingKey,
convertMessageToJson(message), new CorrelationData(UUID
.randomUUID().toString()));
}
private void preparingMessagingTemplate() {
rabbitTemplate.setMandatory(true);
rabbitTemplate.setReturnCallback(new ReturnCallback() {
@Override
public void returnedMessage(Message message, int replyCode,
String replyText, String exchange, String routingKey) {
returnedMessage = replyText;
}
});
rabbitTemplate.setConfirmCallback(new ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack,
String cause) {
System.out.println("*" + ack);
if (ack && !returnedMessage.equalsIgnoreCase("NO_ROUTE")) {
messageSentFlag = ack;
log.info("message " + message.toString()
+ " from Operation +" + this.getClass().getName()
+ "+ has been successfully delivered");
} else {
log.info("message " + message.toString()
+ " from Operation +" + this.getClass().getName()
+ "+ has not been delivered");
}
}
});
}
}
メッセージングで使用される構成クラス
@Configuration
@ComponentScan("com.alpharaid.orange.*")
@PropertySource("classpath:application.properties")
public class MessageConfiguration {
String content = "";
@Value("${rabbitmq_host}")
String host = "";
String port = "";
@Value("${rabbitmq_username}")
String userName = "";
@Value("${rabbitmq_password}")
String password = "";
String queueName = "";
InputStream input = null;
@Autowired
public MessageConfiguration() {
}
@Bean
@Scope("prototype")
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory());
return template;
}
@Bean
@Scope("prototype")
public QueueCreationService service() {
return new QueueCreationService();
}
@Bean
@Scope("prototype")
public RabbitAdmin admin() {
return new RabbitAdmin(connectionFactory());
}
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory(
this.host);
connectionFactory.setUsername(userName);
connectionFactory.setPassword(password);
connectionFactory.setPublisherConfirms(true);
connectionFactory.setPublisherReturns(true);
return connectionFactory;
}
}
私の問題:
サーバーでわかるように、一部のスレッドはメッセージを正常に配信しており、他のスレッドは配信していません。
rabbitTemplate リスナーの確実性はまったくありません(
rabbitTemplate.setReturnCallback(新しい ReturnCallback() {
毎回動作するリスナーが必要です。これに基づいてメッセージを再度送信しようとするからです
private boolean execute() {
for (int i = 0; i < 5 && !messageSentFlag; i++) {
executeMessageSending();
}
return messageSentFlag;
}
messageSentFlagが false で、Confirm リスナーでのみ true になるため、メッセージが 5 回配信されることがあります。
- キューを削除する方法を教えてください。8000 個あるので、rabbitAdmin でキューを削除する方法を 1 つ見ましたが、キューの名前が必要で、私のキューは任意のランダム キュー (UUID) です。
どうすれば改善できますか、または回避策はありますか? 私のアプリケーションでは、マルチスレッド環境が必須です。
前もって感謝します。