0

github のメッセージ ハブ Java サンプルに従っています。独自のバージョンの Kafka プロデューサーを作成しましたが、Java メイン アプリケーションを使用して J2Se 環境で正常に動作します。以下のコード スニペット -

String jasslocation = "C:\\HOME\\Technical\\bluemix\\git\\message-hub-samples\\java\\message-hub-kafka-ssl\\resources" + File.separator + "jaas.conf";
		System.setProperty(JAAS_CONFIG_PROPERTY, jasslocation);
		
		Properties props = new Properties();
        props.put("key.serializer","org.apache.kafka.common.serialization.ByteArraySerializer");
        props.put("value.serializer","org.apache.kafka.common.serialization.ByteArraySerializer");
        props.put("bootstrap.servers","kafka02-prod02.messagehub.services.eu-gb.bluemix.net:9093");
        props.put("acks","all");
     //   props.put("block.on.bu","true");
        props.put("batch.size","1");
        props.put("security.protocol","SASL_SSL");
        props.put("ssl.protocol","TLSv1.2");
        props.put("ssl.enabled.protocols","TLSv1.2");
        props.put("ssl.truststore.location","C:\\IBM\\BPM857\\java_1.7_64\\jre\\lib\\security\\cacerts");
        //props.put("ssl.truststore.location","C:\\IBM\\IID\\v8.5\\jdk\\jre\\lib\\security\\cacerts");
        
        props.put("ssl.truststore.password","changeit");
        props.put("ssl.truststore.type","JKS");
        props.put("ssl.endpoint.identification.algorithm","HTTPS");
        
        String topic="mytopic";
        String message="publish message to a topic";
        try {
        	
          KafkaProducer<byte[], byte[]> kafkaProducer;
          kafkaProducer = new KafkaProducer<byte[], byte[]>(props);
          ProducerRecord<byte[], byte[]> producerRecord = new ProducerRecord<byte[], byte[]>(topic,message.getBytes("UTF-8"));
         
          RecordMetadata recordMetadata = kafkaProducer.send(producerRecord).get();
          //getting RecordMetadata is possible to validate topic, partition and offset
          System.out.println("topic where message is published : " + recordMetadata.topic());
          System.out.println("partition where message is published : " + recordMetadata.partition());
          System.out.println("message offset # : " + recordMetadata.offset());
          kafkaProducer.close();

カフカ クライアント バージョン - 0.90

このコードを J2EE MDB でラップし、BPM サーバーにデプロイすると。MDB が起動すると、スレッドは send() を待機し続けます:-

RecordMetadata recordMetadata = kafkaProducer.send(producerRecord).get();

しばらくするとタイムアウトします。その他の例外やエラーはスローされません。

私の質問は、BPM 内から kafka クライアント API を使用するには、ssl 構成などの追加の手順を実行する必要があるかということです。

お知らせ下さい。

ありがとう。

4

0 に答える 0