0

Java Kafka アプリケーションを使用して Twitter ストリームを使用しようとしています。

Twitter 開発者アカウントと Twitter アプリケーションを作成し、必要な 4 つのキーをすべて生成しました。

以下のコードを見つけてください。

package com.github.simpleanand.kafkabeginner.tutorial2;

import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

import org.apache.http.HttpHost;
import org.apache.http.conn.params.ConnRoutePNames;
import org.apache.http.impl.client.DefaultHttpClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.collect.Lists;
import com.twitter.hbc.ClientBuilder;
import com.twitter.hbc.core.Client;
import com.twitter.hbc.core.Constants;
import com.twitter.hbc.core.Hosts;
import com.twitter.hbc.core.HttpHosts;
import com.twitter.hbc.core.endpoint.StatusesFilterEndpoint;
import com.twitter.hbc.core.processor.StringDelimitedProcessor;
import com.twitter.hbc.httpclient.auth.Authentication;
import com.twitter.hbc.httpclient.auth.OAuth1;

public class TwitterProducer {

    private Logger logger = LoggerFactory.getLogger(TwitterProducer.class);

    private String consumerKey = "<consumer-key>";
    private String consumerSecret = "<consumerSecret>";
    private String token = "<token value>";
    private String secret = "<secret>";

    public TwitterProducer() {
    }

    public static void main(String[] args) {
        new TwitterProducer().run();
    }

    public void run() {     
        logger.info("inside run........");
        // create a twitter client
        /**
         * Set up your blocking queues: Be sure to size these properly based on
         * expected TPS of your stream
         */
        BlockingQueue<String> msgQueue = new LinkedBlockingQueue<String>(100000);
        Client client = createTwitterClient(msgQueue);
        client.connect();

        // create a kafka producer

        // loop to send tweets to kafka

        // on a different thread, or multiple different threads....
        while (!client.isDone()) {
            String msg = null;
            try {
                msg = msgQueue.poll(5, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
                client.stop();
            }
            if (null != msg) {
                logger.info("Msg -> " + msg);
            }
        }

        logger.info("end of application........");
    }

    public Client createTwitterClient(BlockingQueue<String> msgQueue) {
        HttpHost proxy = new HttpHost("<compayny proxy value>", 8080);
        DefaultHttpClient httpClient = new DefaultHttpClient();
        httpClient.getParams().setParameter(ConnRoutePNames.DEFAULT_PROXY, proxy);
        /**
         * Declare the host you want to connect to, the endpoint, and
         * authentication (basic auth or oauth)
         */
        Hosts hosebirdHosts = new HttpHosts(Constants.STREAM_HOST);
        StatusesFilterEndpoint hosebirdEndpoint = new StatusesFilterEndpoint();

        // Optional: set up some followings and track terms
        // List<Long> followings = Lists.newArrayList(1234L, 566788L);
        List<String> terms = Lists.newArrayList("bitcoin");
        // hosebirdEndpoint.followings(followings);
        hosebirdEndpoint.trackTerms(terms);

        // These secrets should be read from a config file
        Authentication hosebirdAuth = new OAuth1(consumerKey, consumerSecret, token, secret);
        hosebirdAuth.setupConnection(httpClient);

        // Creating a client:

        ClientBuilder builder = new ClientBuilder().name("Hosebird-Client-01") // optional:
                                                                                // mainly
                                                                                // for
                                                                                // the
                                                                                // logs
                .hosts(hosebirdHosts).authentication(hosebirdAuth).endpoint(hosebirdEndpoint)
                .processor(new StringDelimitedProcessor(msgQueue));

        Client hosebirdClient = builder.build();
        // Attempts to establish a connection.
        return hosebirdClient;
    }

}

次のエラーが表示されます。

[hosebird-client-io-thread-0] INFO com.twitter.hbc.httpclient.ClientBase - Hosebird-Client-01 Establishing a connection
[hosebird-client-io-thread-0] WARN com.twitter.hbc.httpclient.ClientBase - Hosebird-Client-01 Unknown host - stream.twitter.com
[hosebird-client-io-thread-0] WARN com.twitter.hbc.httpclient.ClientBase - Hosebird-Client-01 failed to establish connection properly
[hosebird-client-io-thread-0] INFO com.twitter.hbc.httpclient.ClientBase - Hosebird-Client-01 Done processing, preparing to close connection

このエラーの解決方法を教えてください。

プロキシ オブジェクトの送信に問題があるようです。それでよろしいですか?

4

0 に答える 0