Java Kafka アプリケーションを使用して Twitter ストリームを使用しようとしています。
Twitter 開発者アカウントと Twitter アプリケーションを作成し、必要な 4 つのキーをすべて生成しました。
以下のコードを見つけてください。
package com.github.simpleanand.kafkabeginner.tutorial2;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.http.HttpHost;
import org.apache.http.conn.params.ConnRoutePNames;
import org.apache.http.impl.client.DefaultHttpClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.Lists;
import com.twitter.hbc.ClientBuilder;
import com.twitter.hbc.core.Client;
import com.twitter.hbc.core.Constants;
import com.twitter.hbc.core.Hosts;
import com.twitter.hbc.core.HttpHosts;
import com.twitter.hbc.core.endpoint.StatusesFilterEndpoint;
import com.twitter.hbc.core.processor.StringDelimitedProcessor;
import com.twitter.hbc.httpclient.auth.Authentication;
import com.twitter.hbc.httpclient.auth.OAuth1;
public class TwitterProducer {
private Logger logger = LoggerFactory.getLogger(TwitterProducer.class);
private String consumerKey = "<consumer-key>";
private String consumerSecret = "<consumerSecret>";
private String token = "<token value>";
private String secret = "<secret>";
public TwitterProducer() {
}
public static void main(String[] args) {
new TwitterProducer().run();
}
public void run() {
logger.info("inside run........");
// create a twitter client
/**
* Set up your blocking queues: Be sure to size these properly based on
* expected TPS of your stream
*/
BlockingQueue<String> msgQueue = new LinkedBlockingQueue<String>(100000);
Client client = createTwitterClient(msgQueue);
client.connect();
// create a kafka producer
// loop to send tweets to kafka
// on a different thread, or multiple different threads....
while (!client.isDone()) {
String msg = null;
try {
msg = msgQueue.poll(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
client.stop();
}
if (null != msg) {
logger.info("Msg -> " + msg);
}
}
logger.info("end of application........");
}
public Client createTwitterClient(BlockingQueue<String> msgQueue) {
HttpHost proxy = new HttpHost("<compayny proxy value>", 8080);
DefaultHttpClient httpClient = new DefaultHttpClient();
httpClient.getParams().setParameter(ConnRoutePNames.DEFAULT_PROXY, proxy);
/**
* Declare the host you want to connect to, the endpoint, and
* authentication (basic auth or oauth)
*/
Hosts hosebirdHosts = new HttpHosts(Constants.STREAM_HOST);
StatusesFilterEndpoint hosebirdEndpoint = new StatusesFilterEndpoint();
// Optional: set up some followings and track terms
// List<Long> followings = Lists.newArrayList(1234L, 566788L);
List<String> terms = Lists.newArrayList("bitcoin");
// hosebirdEndpoint.followings(followings);
hosebirdEndpoint.trackTerms(terms);
// These secrets should be read from a config file
Authentication hosebirdAuth = new OAuth1(consumerKey, consumerSecret, token, secret);
hosebirdAuth.setupConnection(httpClient);
// Creating a client:
ClientBuilder builder = new ClientBuilder().name("Hosebird-Client-01") // optional:
// mainly
// for
// the
// logs
.hosts(hosebirdHosts).authentication(hosebirdAuth).endpoint(hosebirdEndpoint)
.processor(new StringDelimitedProcessor(msgQueue));
Client hosebirdClient = builder.build();
// Attempts to establish a connection.
return hosebirdClient;
}
}
次のエラーが表示されます。
[hosebird-client-io-thread-0] INFO com.twitter.hbc.httpclient.ClientBase - Hosebird-Client-01 Establishing a connection
[hosebird-client-io-thread-0] WARN com.twitter.hbc.httpclient.ClientBase - Hosebird-Client-01 Unknown host - stream.twitter.com
[hosebird-client-io-thread-0] WARN com.twitter.hbc.httpclient.ClientBase - Hosebird-Client-01 failed to establish connection properly
[hosebird-client-io-thread-0] INFO com.twitter.hbc.httpclient.ClientBase - Hosebird-Client-01 Done processing, preparing to close connection
このエラーの解決方法を教えてください。
プロキシ オブジェクトの送信に問題があるようです。それでよろしいですか?