Kafka にメッセージを送信し、Spring クラウド ストリームを使用して Kafka から同じメッセージを消費しようとしています。Postman を使用して JSON String { "acctNo" : "32432", "tn" : "3234" } をプロデューサーの残りのコントローラーに送信しています。JSON Parserexception を取得しています:
com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'ÿ': was expecting ('true', 'false' or 'null')
at [Source: [B@776e03af; line: 1, column: 4]
at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1581) ~[jackson-core-2.6.5.jar:2.6.5]
at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:533) ~[jackson-core-2.6.5.jar:2.6.5]
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3451) ~[jackson-core-2.6.5.jar:2.6.5]
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2610) ~[jackson-core-2.6.5.jar:2.6.5]
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser
私のコードは次のようなものです:
プロデューサー アプリケーション yaml
spring:
cloud:
stream:
bindings:
activationMsgQueue:
destination: test
contentType: application/json
プロデューサー レスト コントローラー
@RestController
@RequestMapping("/ActivationQueueService")
public class ActivationQueueController {
private static final Logger LOGGER = LoggerFactory
.getLogger(ActivationQueueController.class);
@Autowired
SpringCloudStreamClient producer;
@InitBinder
protected void initBinder(WebDataBinder binder) {
binder.setValidator(new ActivationDataInfoValidator());
}
@RequestMapping(method = RequestMethod.POST, value = "/sendMessage", headers = "Accept=application/json", produces = "application/json")
public void sendMessage(@RequestBody @Valid ActivationDataInfo message)
throws JsonProcessingException {
LOGGER.debug("Activation Data Request Recieved : " + message.toString());
if (message != null) {
ObjectMapper mapper = new ObjectMapper();
producer.sendMessagetoKafka(message);
LOGGER.info("Activation Data Request sent to Kafka : " + message);
}
}
}
プロデューサーコード
@Service
@EnableBinding(MessageChannels.class)
public class SpringCloudStreamClient {
private static final Logger LOGGER = LoggerFactory
.getLogger(SpringCloudStreamClient.class);
@Autowired MessageChannels msgChannel;
public Object sendMessagetoKafka(ActivationDataInfo msg){
LOGGER.info("Sending Message : " + msg);
msgChannel.save().send(MessageBuilder.withPayload(msg).build());
return new String("Success");
}
}
コンシューマ アプリケーション Yaml
spring:
cloud:
stream:
bindings:
input:
content-type: application/x-java-object;type=com.comcast.activation.message.vo.ActivationDataInfo
destination: test
group: prac
consumer:
headerMode: raw
enableDlq: true
resetOffsets: true
startOffset: latest
消費者コード
@EnableBinding(Sink.class)
public class LogSink {
private static Logger logger = LoggerFactory.getLogger(LogSink.class);
@ServiceActivator(inputChannel = Sink.INPUT)
public void loggerSink( ActivationDataInfo payload) throws Exception {
logger.info("Received: " + payload.getAcctNo());
}
}
ドメインクラス
public class ActivationDataInfo {
private String acctNo;
private String tn;
public String getAcctNo() {
return acctNo;
}
public void setAcctNo(String acctNo) {
this.acctNo = acctNo;
}
public String getTn() {
return tn;
}
public void setTn(String tn) {
this.tn = tn;
}
これは何が原因ですか?助けてください。