3

Spark ストリーミングで WSMQ データ ソースのカスタマー レシーバーを実装しようとしています。ここで提供されている例に従いました。

後で、この Github リポジトリの例に従いました。

私は3つの問題を抱えています:

1: エラー (このエラーは、プログラムをしばらく実行した後に発生します)

java.net.ConnectException: Connection refused
    at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
    at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739)
    at org.apache.kafka.common.network.Selector.poll(Selector.java:238)
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:192)
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:191)
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:122)
    at java.lang.Thread.run(Thread.java:745)
  1. セッションの作成中にこのコードを使用したにもかかわらず、プログラムは WSMQ からメッセージを削除しません

    MQQueueSession qSession = (MQQueueSession) qCon.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
    
  2. ReceiverCustom Receiver Spark API で説明されている信頼できるものを実装する必要があります。それは言います:

    信頼できるレシーバーを実装するには、ストア (複数レコード) を使用してデータを保存する必要があります。このフレーバーのストアは、指定されたすべてのレコードが Spark 内に格納された後にのみ返されるブロッキング呼び出しです。レシーバーの構成済みストレージ レベルでレプリケーションが使用されている場合 (デフォルトで有効)、この呼び出しはレプリケーションの完了後に返されます。したがって、データが確実に保存され、受信者はソースを適切に確認できるようになります。これにより、受信側がデータの複製中に障害が発生した場合にデータが発生しないことが保証されます。バッファリングされたデータは確認応答されないため、後でソースによって再送信されます。

ストア(複数レコード)についてどうすればよいかわかりませんか?

これらのエラーが発生する理由と、信頼できる .xml を実装する方法がわかりませんReceiver

コードは次のとおりです。

public class JavaConnector extends Receiver<String> {

    String host = null;
    int port = -1;
    String qm=null;
    String qn=null;
    String channel=null;
    transient Gson gson=new Gson();
    transient MQQueueConnection qCon= null;
    String topic=null;

    Enumeration enumeration =null;
    private static MQQueueReceiver receiver = null;


    public JavaConnector(String host , int port, String qm, String channel, String qn) {
        super(StorageLevel.MEMORY_ONLY_2());
        this.host = host;
        this.port = port;
        this.qm=qm;
        this.qn=qn;
        this.channel=channel;


    }

    public void onStart()  {
        // Start the thread that receives data over a connection
        new Thread()  {
            @Override public void run() {
                try {
                    initConnection();
                    receive();
                }
                catch (JMSException ex)
                {
                    ex.printStackTrace();
                }
                catch (Exception ex)
                {
                    ex.printStackTrace();
                }
            }
        }.start();
    }

    public void onStop() {

        // There is nothing much to do as the thread calling receive()
        // is designed to stop by itself isStopped() returns false

    }

    /** Create a MQ connection and receive data until receiver is stopped */
    private void receive() throws InterruptedException {
        System.out.print("Started receiving messages from MQ");


        try {

            JMSTextMessage receivedMessage= null;
            int cnt =0;

            //JMSTextMessage receivedMessage = (JMSTextMessage) receiver.receive(10000);

            boolean flag=false;
            while (!isStopped() && enumeration.hasMoreElements()&&cnt<50 )
            {

                receivedMessage= (JMSTextMessage) enumeration.nextElement();
                receivedMessage.acknowledge();
                String userInput = receivedMessage.getText();

                    ArrayList<String> list = new ArrayList<String>();
                    list.add(userInput);
                    Iterator<String> itr = list.iterator();
                    store(itr);
                cnt++;

            }
            /*while (!isStopped() && receivedMessage !=null)
            {

               // receivedMessage= (JMSTextMessage) enumeration.nextElement();
                String userInput = receivedMessage.getText();

                store(userInput);
        receivedMessage.acknowledge();

            }*/

            // Restart in an attempt to connect again when server is active again
            //restart("Trying to connect again");

            stop("No More Messages To read !");
            qCon.close();
            System.out.println("Queue Connection is Closed");

        }
        catch(Exception e)
        {      Thread.sleep(100);
            System.out.println("WRONG"+e.toString());
            e.printStackTrace();
            restart("Trying to connect again");
        }
        catch(Throwable t) {
            Thread.sleep(100);
            System.out.println("WRONG-1"+t.toString());
            // restart if there is any other error
            restart("Error receiving data", t);
        }



    }

    public void initConnection() throws JMSException,InterruptedException {
        try {
            MQQueueConnectionFactory conFactory = new MQQueueConnectionFactory();
            conFactory.setHostName(host);
            conFactory.setPort(port);
            conFactory.setTransportType(JMSC.MQJMS_TP_CLIENT_MQ_TCPIP);
            conFactory.setQueueManager(qm);
            conFactory.setChannel(channel);
            conFactory.setMsgBatchSize(100);


            qCon = (MQQueueConnection) conFactory.createQueueConnection();
            MQQueueSession qSession = (MQQueueSession) qCon.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
            MQQueue queue = (MQQueue) qSession.createQueue(qn);
            MQQueueBrowser browser = (MQQueueBrowser) qSession.createBrowser(queue);
            qCon.start();
            //receiver = (MQQueueReceiver) qSession.createReceiver(queue);
            enumeration= browser.getEnumeration();


        } catch (Exception e) {
            Thread.sleep(1000);
        }
    }

    @Override
    public StorageLevel storageLevel() {
        return StorageLevel.MEMORY_ONLY_2();
    }
4

1 に答える 1

1

最後に、これを解決することができました。解決策 1: kafka がダウンしていて IO エラーが発生していたため、ストリーミング コンテキストが Kafka に書き込みを試みます。それは私が愚かでした。:)

解決策 2: MessageListener を使用することになっていましたが、実際にはメッセージを消費しないメ​​ッセージを読み取るために QueueBrowser が使用されます。

于 2016-02-05T19:06:07.867 に答える