同じサーバーで 2 つの別々のアプリケーションを実行しています。
- ユーザー管理
- ウォレット管理
ユーザー管理からウォレット管理へのイベントソーシングを実装しました。そしてそれはうまくいっています。
しかし、その時点でウォレット管理アプリケーションに存在するイベント ハンドラーから新しいイベントを発行すると、次のエラー メッセージ ログが表示されます。
これが私のログの詳細です
org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener method 'public void com.peaas.ngapblueprintdemo.wallet.config.AxonConfiguration$1.onMessage(org.springframework.amqp.core.Message,com.rabbitmq.client.Channel) throws java.lang.Exception' threw exception
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:190)
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:120)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1414)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1337)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1324)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1303)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:817)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:801)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$700(SimpleMessageListenerContainer.java:77)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1042)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.axonframework.eventsourcing.eventstore.EventStoreException: An event for aggregate [772c7f69-534e-4ee2-b198-5b4d2edd3497] at sequence [0] could not be persisted
at org.axonframework.eventsourcing.eventstore.AbstractEventStorageEngine.handlePersistenceException(AbstractEventStorageEngine.java:112)
at org.axonframework.eventsourcing.eventstore.jpa.JpaEventStorageEngine.appendEvents(JpaEventStorageEngine.java:223)
at org.axonframework.eventsourcing.eventstore.AbstractEventStorageEngine.appendEvents(AbstractEventStorageEngine.java:85)
at org.axonframework.eventsourcing.eventstore.AbstractEventStore.prepareCommit(AbstractEventStore.java:64)
at org.axonframework.eventhandling.AbstractEventBus.doWithEvents(AbstractEventBus.java:210)
at org.axonframework.eventhandling.AbstractEventBus.lambda$null$4(AbstractEventBus.java:145)
at org.axonframework.messaging.unitofwork.MessageProcessingContext.notifyHandlers(MessageProcessingContext.java:68)
at org.axonframework.messaging.unitofwork.BatchingUnitOfWork.lambda$notifyHandlers$2(BatchingUnitOfWork.java:131)
at java.util.ArrayList$Itr.forEachRemaining(ArrayList.java:891)
at org.axonframework.messaging.unitofwork.BatchingUnitOfWork.notifyHandlers(BatchingUnitOfWork.java:131)
at org.axonframework.messaging.unitofwork.AbstractUnitOfWork.changePhase(AbstractUnitOfWork.java:214)
at org.axonframework.messaging.unitofwork.AbstractUnitOfWork.commitAsRoot(AbstractUnitOfWork.java:83)
at org.axonframework.messaging.unitofwork.AbstractUnitOfWork.commit(AbstractUnitOfWork.java:71)
at org.axonframework.messaging.unitofwork.BatchingUnitOfWork.executeWithResult(BatchingUnitOfWork.java:92)
at org.axonframework.eventhandling.AbstractEventProcessor.process(AbstractEventProcessor.java:116)
at org.axonframework.eventhandling.SubscribingEventProcessor.process(SubscribingEventProcessor.java:142)
at org.axonframework.eventhandling.DirectEventProcessingStrategy.handle(DirectEventProcessingStrategy.java:32)
at org.axonframework.eventhandling.SubscribingEventProcessor.lambda$start$0(SubscribingEventProcessor.java:135)
at org.axonframework.amqp.eventhandling.spring.SpringAMQPMessageSource.lambda$onMessage$1(SpringAMQPMessageSource.java:90)
at java.util.concurrent.CopyOnWriteArrayList.forEach(CopyOnWriteArrayList.java:890)
at org.axonframework.amqp.eventhandling.spring.SpringAMQPMessageSource.onMessage(SpringAMQPMessageSource.java:90)
at com.peaas.ngapblueprintdemo.wallet.config.AxonConfiguration$1.onMessage(AxonConfiguration.java:67)
at sun.reflect.GeneratedMethodAccessor273.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:181)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:114)
at org.springframework.amqp.rabbit.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:51)
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:182)
... 10 common frames omitted
Caused by: javax.persistence.TransactionRequiredException: No EntityManager with actual transaction available for current thread - cannot reliably process 'persist' call
at org.springframework.orm.jpa.SharedEntityManagerCreator$SharedEntityManagerInvocationHandler.invoke(SharedEntityManagerCreator.java:289)
at com.sun.proxy.$Proxy224.persist(Unknown Source)
at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1374)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:151)
at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:174)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418)
at org.axonframework.eventsourcing.eventstore.jpa.JpaEventStorageEngine.appendEvents(JpaEventStorageEngine.java:218)
... 37 common frames omitted
イベント ハンドラー クラス
package com.peaas.ngapblueprintdemo.wallet.eventHandlers;
import java.util.UUID;
import org.axonframework.commandhandling.gateway.CommandGateway;
import org.axonframework.config.ProcessingGroup;
import org.axonframework.eventhandling.EventHandler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.peaas.ngapblueprintdemo.events.CreateUserProfileEvent;
import com.peaas.ngapblueprintdemo.events.WalletCreatedEvent;
import com.peaas.ngapblueprintdemo.wallet.commands.WalletCreatedCommand;
import com.peaas.ngapblueprintdemo.wallet.domain.Wallet;
import com.peaas.ngapblueprintdemo.wallet.repository.WalletRepository;
@ProcessingGroup("amqpEvents")
@Component
public class UserEventHandler {
@Autowired
private WalletRepository walletRepository;
@Autowired
private transient CommandGateway commandGatway;
@EventHandler
public void onCreateUserProfile(CreateUserProfileEvent event) {
System.out.println("--- Wallet Event Handler ---");
Wallet wallet = new Wallet();
wallet.setAmount(0d);
wallet.setUserid(event.getUserId());
wallet = walletRepository.save(wallet);
String walletID = UUID.randomUUID().toString();
WalletCreatedCommand command = new WalletCreatedCommand(wallet.getId(),walletID,wallet.getAmount(),wallet.getUserid());
command.setId(wallet.getId());
commandGatway.send(command);
}
@EventHandler
public void onCreateWalletEvent(WalletCreatedEvent event) {
System.out.println("--- Wallet Created Successfully ---");
System.out.println(event);
}
}
集約クラス
package com.peaas.ngapblueprintdemo.wallet.aggregate;
import javax.persistence.GeneratedValue;
import javax.persistence.GenerationType;
import org.axonframework.commandhandling.CommandHandler;
import org.axonframework.commandhandling.model.AggregateIdentifier;
import org.axonframework.commandhandling.model.AggregateLifecycle;
import org.axonframework.eventsourcing.EventSourcingHandler;
import org.axonframework.spring.stereotype.Aggregate;
import com.peaas.ngapblueprintdemo.events.WalletCreatedEvent;
import com.peaas.ngapblueprintdemo.wallet.commands.WalletCreatedCommand;
@Aggregate
public class WalletAggregate {
private Long id;
@AggregateIdentifier
@GeneratedValue(strategy = GenerationType.IDENTITY)
private String walletId;
private Double amount;
private Long userId;
@CommandHandler
public WalletAggregate(WalletCreatedCommand command){
System.out.println("--- Command Handler Start : WalletCreatedCommand ---");
AggregateLifecycle.apply(new WalletCreatedEvent(command.getUserId(),command.getWalletId(),command.getAmount(),command.getUserId()));
System.out.println("--- Command Handler End : WalletCreatedCommand ---");
}
@EventSourcingHandler
public void handle(WalletCreatedEvent event) {
System.out.println("--- Event Sourcing Handler Start : WalletCreatedEvent ---");
this.id = event.getId();
this.walletId = event.getWalletId();
this.amount = event.getAmount();
this.userId = event.getUserId();
System.out.println("--- Event Sourcing Handler End : WalletCreatedEvent ---");
}
public Long getId() {
return id;
}
public void setId(Long id) {
this.id = id;
}
public Double getAmount() {
return amount;
}
public void setAmount(Double amount) {
this.amount = amount;
}
public Long getUserId() {
return userId;
}
public void setUserId(Long userId) {
this.userId = userId;
}
@Override
public String toString() {
return "WalletAggregate [id=" + id + ", amount=" + amount + ", userId=" + userId + "]";
}
}
コマンドクラス
package com.peaas.ngapblueprintdemo.wallet.commands;
import org.axonframework.commandhandling.TargetAggregateIdentifier;
public class WalletCreatedCommand {
private Long id;
@TargetAggregateIdentifier
private String walletId;
private Double amount;
private Long userId;
public WalletCreatedCommand() {
}
public WalletCreatedCommand(Long id,String walletId, Double amount, Long userId) {
super();
this.id = id;
this.walletId = walletId;
this.amount = amount;
this.userId = userId;
}
public Long getId() {
return id;
}
public void setId(Long id) {
this.id = id;
}
public Double getAmount() {
return amount;
}
public void setAmount(Double amount) {
this.amount = amount;
}
public Long getUserId() {
return userId;
}
public void setUserId(Long userId) {
this.userId = userId;
}
public String getWalletId() {
return walletId;
}
public void setWalletId(String walletId) {
this.walletId = walletId;
}
@Override
public String toString() {
return "WalletCreatedCommand [id=" + id + ", walletId=" + walletId + ", amount=" + amount + ", userId=" + userId
+ "]";
}
}
イベントクラス
package com.peaas.ngapblueprintdemo.events;
public class WalletCreatedEvent {
private Long id;
private String walletId;
private Double amount;
private Long userId;
public WalletCreatedEvent() {
}
public WalletCreatedEvent(Long id,String walletId, Double amount, Long userId) {
super();
this.id = id;
this.walletId = walletId;
this.amount = amount;
this.userId = userId;
}
public Long getId() {
return id;
}
public void setId(Long id) {
this.id = id;
}
public Double getAmount() {
return amount;
}
public void setAmount(Double amount) {
this.amount = amount;
}
public Long getUserId() {
return userId;
}
public void setUserId(Long userId) {
this.userId = userId;
}
public String getWalletId() {
return walletId;
}
public void setWalletId(String walletId) {
this.walletId = walletId;
}
@Override
public String toString() {
return "WalletCreatedEvent [id=" + id + ", walletId=" + walletId + ", amount=" + amount + ", userId=" + userId
+ "]";
}
}
ありがとう