Spark ストリーミングで WSMQ データ ソースのカスタマー レシーバーを実装しようとしています。ここで提供されている例に従いました。
後で、この Github リポジトリの例に従いました。
私は3つの問題を抱えています:
1: エラー (このエラーは、プログラムをしばらく実行した後に発生します)
java.net.ConnectException: Connection refused
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739)
at org.apache.kafka.common.network.Selector.poll(Selector.java:238)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:192)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:191)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:122)
at java.lang.Thread.run(Thread.java:745)
セッションの作成中にこのコードを使用したにもかかわらず、プログラムは WSMQ からメッセージを削除しません
MQQueueSession qSession = (MQQueueSession) qCon.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
Receiver
Custom Receiver Spark API で説明されている信頼できるものを実装する必要があります。それは言います:信頼できるレシーバーを実装するには、ストア (複数レコード) を使用してデータを保存する必要があります。このフレーバーのストアは、指定されたすべてのレコードが Spark 内に格納された後にのみ返されるブロッキング呼び出しです。レシーバーの構成済みストレージ レベルでレプリケーションが使用されている場合 (デフォルトで有効)、この呼び出しはレプリケーションの完了後に返されます。したがって、データが確実に保存され、受信者はソースを適切に確認できるようになります。これにより、受信側がデータの複製中に障害が発生した場合にデータが発生しないことが保証されます。バッファリングされたデータは確認応答されないため、後でソースによって再送信されます。
ストア(複数レコード)についてどうすればよいかわかりませんか?
これらのエラーが発生する理由と、信頼できる .xml を実装する方法がわかりませんReceiver
。
コードは次のとおりです。
public class JavaConnector extends Receiver<String> {
String host = null;
int port = -1;
String qm=null;
String qn=null;
String channel=null;
transient Gson gson=new Gson();
transient MQQueueConnection qCon= null;
String topic=null;
Enumeration enumeration =null;
private static MQQueueReceiver receiver = null;
public JavaConnector(String host , int port, String qm, String channel, String qn) {
super(StorageLevel.MEMORY_ONLY_2());
this.host = host;
this.port = port;
this.qm=qm;
this.qn=qn;
this.channel=channel;
}
public void onStart() {
// Start the thread that receives data over a connection
new Thread() {
@Override public void run() {
try {
initConnection();
receive();
}
catch (JMSException ex)
{
ex.printStackTrace();
}
catch (Exception ex)
{
ex.printStackTrace();
}
}
}.start();
}
public void onStop() {
// There is nothing much to do as the thread calling receive()
// is designed to stop by itself isStopped() returns false
}
/** Create a MQ connection and receive data until receiver is stopped */
private void receive() throws InterruptedException {
System.out.print("Started receiving messages from MQ");
try {
JMSTextMessage receivedMessage= null;
int cnt =0;
//JMSTextMessage receivedMessage = (JMSTextMessage) receiver.receive(10000);
boolean flag=false;
while (!isStopped() && enumeration.hasMoreElements()&&cnt<50 )
{
receivedMessage= (JMSTextMessage) enumeration.nextElement();
receivedMessage.acknowledge();
String userInput = receivedMessage.getText();
ArrayList<String> list = new ArrayList<String>();
list.add(userInput);
Iterator<String> itr = list.iterator();
store(itr);
cnt++;
}
/*while (!isStopped() && receivedMessage !=null)
{
// receivedMessage= (JMSTextMessage) enumeration.nextElement();
String userInput = receivedMessage.getText();
store(userInput);
receivedMessage.acknowledge();
}*/
// Restart in an attempt to connect again when server is active again
//restart("Trying to connect again");
stop("No More Messages To read !");
qCon.close();
System.out.println("Queue Connection is Closed");
}
catch(Exception e)
{ Thread.sleep(100);
System.out.println("WRONG"+e.toString());
e.printStackTrace();
restart("Trying to connect again");
}
catch(Throwable t) {
Thread.sleep(100);
System.out.println("WRONG-1"+t.toString());
// restart if there is any other error
restart("Error receiving data", t);
}
}
public void initConnection() throws JMSException,InterruptedException {
try {
MQQueueConnectionFactory conFactory = new MQQueueConnectionFactory();
conFactory.setHostName(host);
conFactory.setPort(port);
conFactory.setTransportType(JMSC.MQJMS_TP_CLIENT_MQ_TCPIP);
conFactory.setQueueManager(qm);
conFactory.setChannel(channel);
conFactory.setMsgBatchSize(100);
qCon = (MQQueueConnection) conFactory.createQueueConnection();
MQQueueSession qSession = (MQQueueSession) qCon.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
MQQueue queue = (MQQueue) qSession.createQueue(qn);
MQQueueBrowser browser = (MQQueueBrowser) qSession.createBrowser(queue);
qCon.start();
//receiver = (MQQueueReceiver) qSession.createReceiver(queue);
enumeration= browser.getEnumeration();
} catch (Exception e) {
Thread.sleep(1000);
}
}
@Override
public StorageLevel storageLevel() {
return StorageLevel.MEMORY_ONLY_2();
}