0

スキーマ レジストリからスキーマを取得する単純な Resttemplate を作成しようとしています。応答が 200 の場合、スキーマ ID を取得し、Avro 形式のメッセージと共に kafka に送信します。ここに私のコードがあります: .

 @SpringBootApplication
        public class DemoApplication implements CommandLineRunner{

  String kafkarwsrproxyURL = String.format("%s/topics/%s", "https://kafka-rest-proxy-**********", "test-topic");
    String schemaurl = String.format("%s/subjects/%s/versions/latest", "https://schema-registry-*********", "test-topic");
  @Autowired
  private RestTemplate restTemplate;

@Bean
public RestTemplate restTemplate() {
    return new RestTemplate();
}

public static void main(String[] args) {
    SpringApplication.run(DemoApplication.class, args);
}

@Override
public void run(String... args) throws Exception {
    
    ObjectMapper obj = new ObjectMapper();
     JSONObject event = new JSONObject();
     JSONObject record = new JSONObject();
     JSONObject eventenvolpe = new JSONObject();
     JSONObject jsondata  = new JSONObject();
     JSONArray jsonarray =new JSONArray();
     JSONObject recordvalue =new JSONObject();
  // connecting to schema registary and getting back schema   
    HttpHeaders headers = new HttpHeaders();
    headers.setContentType(MediaType.valueOf("application/vnd.schemaregistry.v1+json"));
    headers.setBasicAuth("username", "password");
    HttpEntity<String> SchemaEntity = new HttpEntity<String>("parameters", headers);
     
    ResponseEntity<String> result = restTemplate.exchange(schemaurl, HttpMethod.GET, SchemaEntity, String.class);
         if(result.getStatusCodeValue()==200) {
             
             JsonNode rootNode = obj.readTree(result.getBody());
             JsonNode schema_id = rootNode.path("id");//fetchinf schema id form schema
             
             event = new JSONObject();
             record = new JSONObject();
             eventenvolpe = new JSONObject();
             jsondata  = new JSONObject();
            
            
            
            jsondata.put("data", obj.writeValueAsString(new Data("test1",1)));
            eventenvolpe.put("event_envelope", jsondata);
            recordvalue.put("value", eventenvolpe);
            jsonarray.put(recordvalue);
            event.put("value_schema_id", schema_id);
            event.put("records", recordvalue);//setting up event object to send to kafka
        
            System.out.println(event);
        
        //Sending message to kafka
        HttpHeaders messageheaders = new HttpHeaders();
        messageheaders.setContentType(MediaType.valueOf("application/vnd.kafka.avro.v2+json"));
        messageheaders.setBasicAuth("username", "password");
        HttpEntity<JSONObject> message = new HttpEntity<JSONObject>(event,messageheaders );
         
        ResponseEntity<String> result1 = restTemplate.exchange(kafkarwsrproxyURL, HttpMethod.POST, message, String.class);
        
        if(result1.getStatusCodeValue()==200) {
            System.out.println("Message is pushed to Kafka");
        }
        
             
        
             
         }
         
    
}

 }

スキーマフォームレジストリを正常に取得できましたが、kafka に送り返すときにエラーが発生しました。

2020-06-30 20:02:56.078  INFO 7972 --- [           main] com.example.demo.DemoApplication         : 
No active profile set, falling back to default profiles: default
2020-06-30 20:02:59.235  INFO 7972 --- [           main] o.s.b.w.embedded.tomcat.TomcatWebServer  : 
Tomcat initialized with port(s): 8089 (http)
2020-06-30 20:02:59.263  INFO 7972 --- [           main] o.apache.catalina.core.StandardService   : 
Starting service [Tomcat]
  2020-06-30 20:02:59.264  INFO 7972 --- [           main] org.apache.catalina.core.StandardEngine  : 
 Starting Servlet engine: [Apache Tomcat/9.0.36]
 2020-06-30 20:02:59.434  INFO 7972 --- [           main] o.a.c.c.C.[Tomcat].[localhost].[/]       : 
 Initializing Spring embedded WebApplicationContext
 2020-06-30 20:02:59.434  INFO 7972 --- [           main] w.s.c.ServletWebServerApplicationContext : 
  Root WebApplicationContext: initialization completed in 3269 ms
2020-06-30 20:02:59.847  INFO 7972 --- [           main] o.s.s.concurrent.ThreadPoolTaskExecutor  : 
Initializing ExecutorService 'applicationTaskExecutor'
2020-06-30 20:03:00.238  INFO 7972 --- [           main] o.s.b.w.embedded.tomcat.TomcatWebServer  : 
Tomcat started on port(s): 8089 (http) with context path ''
2020-06-30 20:03:00.251  INFO 7972 --- [           main] com.example.demo.DemoApplication         : 
 Started DemoApplication in 4.945 seconds (JVM running for 5.758)
 {"records":{"value":{"event_envelope":{"data":" 
 {\"test\":\"test1\",\"testEventId\":1}"}}},"value_schema_id":"5"}
 2020-06-30 20:03:02.276  INFO 7972 --- [           main] ConditionEvaluationReportLoggingListener : 

 Error starting ApplicationContext. To display the conditions report re-run your application with 
 'debug' enabled.
 2020-06-30 20:03:02.282 ERROR 7972 --- [           main] o.s.boot.SpringApplication               : 
 Application run failed

 java.lang.IllegalStateException: Failed to execute CommandLineRunner
 at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:798) [spring-boot- 
 2.3.1.RELEASE.jar:2.3.1.RELEASE]
at org.springframework.boot.SpringApplication.callRunners(SpringApplication.java:779) [spring-boot- 
2.3.1.RELEASE.jar:2.3.1.RELEASE]


at org.springframework.boot.SpringApplication.run(SpringApplication.java:322) [spring-boot- 
 2.3.1.RELEASE.jar:2.3.1.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1237) [spring-boot- 
2.3.1.RELEASE.jar:2.3.1.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1226) [spring-boot- 
2.3.1.RELEASE.jar:2.3.1.RELEASE]
at com.example.demo.DemoApplication.main(DemoApplication.java:44) [classes/:na]
Caused by: org.springframework.web.client.RestClientException: No HttpMessageConverter for 
org.json.JSONObject and content type "application/vnd.kafka.avro.v2+json"
at `org.springframework.web.client.RestTemplate$HttpEntityRequestCallback.doWithRequest(RestTemplate.java:961) ~[spring-web-5.2.7.RELEASE.jar:5.2.7.RELEASE] at org.springframework.web.client.RestTemplate.doExecute(RestTemplate.java:737) ~[spring-web-5.2.7.RELEASE.jar:5.2.7.RELEASE] at org.springframework.web.client.RestTemplate.execute(RestTemplate.java:674) ~[spring-web-5.2.7.RELEASE.jar:5.2.7.RELEASE] at org.springframework.web.client.RestTemplate.exchange(RestTemplate.java:583) ~[spring-web-5.2.7.RELEASE.jar:5.2.7.RELEASE] at com.example.demo.DemoApplication.run(DemoApplication.java:91) [classes/:na] at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:795) [spring-boot-2.3.1.RELEASE.jar:2.3.1.RELEASE] ... 5 common frames omitted

2020-06-30 20:03:02.878 INFO 7972 --- [ main] o.s.s.concurrent.ThreadPoolTaskExecutor : Shutting down 
ExecutorService 'applicationTaskExecutor'`
4

1 に答える 1