- スプリング ブート、埋め込みカフカ、テンポラルを使用して統合テスト ケースを作成しています。カフカ トピックに関するメッセージを送信しようとしています。
@SpringBootTest(classes = Application.class)
@RunWith(SpringJUnit4ClassRunner.class)
@ActiveProfiles("test")
@DirtiesContext
@EmbeddedKafka(
partitions = 5,
controlledShutdown = true,
brokerProperties = {
"listeners=PLAINTEXT://localhost:9092",
"port=9092"
})
public class OutboundFlowIT {
private final Logger logger = LoggerFactory.getLogger(OutboundFlowIT.class);
private TestWorkflowEnvironment testEnv;
private Worker worker;
private WorkflowClient workflowClient;
@Autowired
private ActivityService activityService;
@Autowired
private EventSender sender;
@Before
public void setUp(){
// some setup code.
}
@Test
public void processOutboundFinancialMessage_shouldTriggerAllSteps_WhenOK() throws IOException,InterruptedException {
// logic for sending message to intended topic.
}
- しかし、私はエラーを下回っています。
org.apache.kafka.common.network.InvalidReceiveException: Invalid receive (size = 369296129 larger than 104857600)
at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:105) ~[kafka-clients-2.5.1.jar:na]
at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:447) ~[kafka-clients-2.5.1.jar:na]
at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:397) ~[kafka-clients-2.5.1.jar:na]
at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:678) ~[kafka-clients-2.5.1.jar:na]
at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:580) ~[kafka-clients-2.5.1.jar:na]
at org.apache.kafka.common.network.Selector.poll(Selector.java:485) ~[kafka-clients-2.5.1.jar:na]
at kafka.network.Processor.poll(SocketServer.scala:861) ~[kafka_2.12-2.5.1.jar:na]
at kafka.network.Processor.run(SocketServer.scala:760) ~[kafka_2.12-2.5.1.jar:na]
- kafka.properties に以下の構成も追加しましたが、上記と同じ問題が発生しています。
spring.kafka.producer.properties.max.request.size=569296129
spring.kafka.consumer.properties.max.partition.fetch.bytes=369296129
私は kafka が初めてです。助けてください。