1

正確な使用法は次のとおりです。

    @Slf4j
public class Client<E, Key> {
    @Getter @NonNull private final UpdateListener<E, Key> updateListener;
    @NonNull private final SubscriptionFactory subscriptionFactory;
    @NonNull private final Map<Key, Instant> updatedRegistry = new ConcurrentHashMap<>();

    public Client(UpdateListener<E, Key> updateListener,
                  SubscriptionFactory subscriptionFactory) {
        this.updateListener = updateListener;
        this.subscriptionFactory = subscriptionFactory;
        this.subscriptionFactory.registerSnapshotClient(updateListener);
        log.info("Created new snapshot client for entity key [{}], update type [{}] and component qualifier [{}]",
            updateListener.getEntityKey(),
            updateListener.getOptionalChangeType(),
            updateListener.getComponentQualifier());
    }

    @RabbitListener(queues = {"#{@queueNameCreator.createUpdateQueueName(snapshotClient.getUpdateListener())}",
                                "#{@queueNameCreator.createSnapshotQueueName(snapshotClient.getUpdateListener())}"})
    public void handleMessage(Message<E> rawUpdate, @Header("last_updated") Instant newUpdatedTime) {
        ...//more code
    }
}

各「クライアント」インスタンスには、互いに衝突しないように独自の Bean ID があります。

SpEl を使用して、このオブジェクトの正確な updateListener を取得するにはどうすればよいですか?

アップデート

プログラムによるアプローチと登録方法を使用した後、次の例外が発生します。

    Apr 28, 2015 3:22:47 PM org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler handleError
WARNING: Execution of Rabbit message listener failed.
org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener method 'public void com.everymatrix.om2020.messaging.model.SnapshotClient.handleMessage(org.springframework.messaging.Message<E>,java.time.Instant)' threw exception
    at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:126)
    at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:93)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:756)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:679)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$001(SimpleMessageListenerContainer.java:82)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$1.invokeListener(SimpleMessageListenerContainer.java:167)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:1241)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:660)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1005)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:989)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$700(SimpleMessageListenerContainer.java:82)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1103)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.IllegalStateException: No suitable resolver for argument [0] [type=org.springframework.messaging.Message]

アップデート

完了したら、目的の動作を実現するために次のことを行う必要があります。

@Configuration
@EnableRabbit
public static class OmbeRabbitListenerConfigurer implements RabbitListenerConfigurer {
    @Autowired ApplicationContext applicationContext;
    @Autowired SnapshotClientQueueNamesCreator snapshotClientQueueNamesCreator;
    @Autowired RabbitListenerContainerFactory rabbitListenerContainerFactory;
    @Autowired MessageConverter messageConverter;

    @Override
    public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {
        final Collection<SnapshotClient> snapshotClients = applicationContext.getBeansOfType(SnapshotClient.class).values();
        System.out.println(snapshotClients);

        snapshotClients.stream().forEach(bean -> {
            final String snapshotQueueName = snapshotClientQueueNamesCreator.createSnapshotQueueName(bean.getUpdateListener());
            final String updateQueueName = snapshotClientQueueNamesCreator.createUpdateQueueName(bean.getUpdateListener());

            Method method = Stream.of(bean.getClass().getMethods()).filter(x -> x.getName().equals("handleMessage")).findAny().get();

            MethodRabbitListenerEndpoint endpoint = new MethodRabbitListenerEndpoint();

            final DefaultMessageHandlerMethodFactory messageHandlerMethodFactory = new DefaultMessageHandlerMethodFactory();
            messageHandlerMethodFactory.afterPropertiesSet();

            endpoint.setMessageHandlerMethodFactory(messageHandlerMethodFactory);

            endpoint.setBean(bean);
            endpoint.setMethod(method);

            endpoint.setId(snapshotQueueName + ":" + updateQueueName + UUID.randomUUID());
            endpoint.setQueueNames(snapshotQueueName, updateQueueName);
            endpoint.setExclusive(false);

            registrar.registerEndpoint(endpoint, rabbitListenerContainerFactory);
        });
    }
}
4

1 に答える 1