Apache Flink Streaming API を使用して Twitter のツイートを読み取る小さな Scala プログラムを作成しました。
object TwitterWordCount {
private val properties = "/home/twitter-login.properties"
def main(args: Array[String]) {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val twitterStream = env.addSource(new TwitterSource(properties))
val tweets = twitterStream
.flatMap(new JSONParseFlatMap[String, String] {
override def flatMap(in: String, out: Collector[String]): Unit = {
if (getString(in, "user.lang") == "en") {
out.collect(getString(in, "text"))
}
}
})
tweets.print
env.execute("tweets")
}
}
実行すると、次の問題が発生します。
14:35:48,353 INFO com.twitter.hbc.httpclient.ClientBase - twitterSourceClient Establishing a connection
14:35:48,354 DEBUG org.apache.http.impl.conn.PoolingClientConnectionManager - Connection request: [route: {}->http://stream.twitter.com][total kept alive: 0; route allocated: 0 of 2; total allocated: 0 of 20]
14:35:48,354 DEBUG org.apache.http.impl.conn.PoolingClientConnectionManager - Connection leased: [id: 4][route: {}->http://stream.twitter.com][total kept alive: 0; route allocated: 1 of 2; total allocated: 1 of 20]
14:35:48,354 DEBUG org.apache.http.impl.conn.DefaultClientConnectionOperator - Connecting to stream.twitter.com:80
14:35:49,486 DEBUG org.apache.flink.runtime.taskmanager.TaskManager - Received message SendHeartbeat at akka://flink/user/taskmanager_1 from Actor[akka://flink/deadLetters].
14:35:49,486 DEBUG org.apache.flink.runtime.taskmanager.TaskManager - Sending heartbeat to JobManager
14:35:49,487 DEBUG org.apache.flink.runtime.taskmanager.TaskManager - Handled message SendHeartbeat in 1 ms from Actor[akka://flink/deadLetters].
14:35:49,487 DEBUG org.apache.flink.runtime.jobmanager.JobManager - Received message Heartbeat(cb51cdb1bd08879df10bd2198b8e043a,[B@4daaaf5f) at akka://flink/user/jobmanager from Actor[akka://flink/user/taskmanager_1#-64418449].
14:35:49,488 DEBUG org.apache.flink.runtime.jobmanager.JobManager - Received hearbeat message from cb51cdb1bd08879df10bd2198b8e043a.
14:35:49,488 DEBUG org.apache.flink.runtime.instance.InstanceManager - Received heartbeat from TaskManager cb51cdb1bd08879df10bd2198b8e043a @ localhost - 8 slots - URL: akka://flink/user/taskmanager_1
14:35:49,488 DEBUG org.apache.flink.runtime.jobmanager.JobManager - Handled message Heartbeat(cb51cdb1bd08879df10bd2198b8e043a,[B@4daaaf5f) in 0 ms from Actor[akka://flink/user/taskmanager_1#-64418449].
14:35:52,358 DEBUG org.apache.http.impl.conn.DefaultClientConnection - Connection org.apache.http.impl.conn.DefaultClientConnection@64c88f2d closed
14:35:52,358 DEBUG org.apache.http.impl.conn.DefaultClientConnection - Connection org.apache.http.impl.conn.DefaultClientConnection@64c88f2d shut down
14:35:52,358 DEBUG org.apache.http.impl.conn.PoolingClientConnectionManager - Connection [id: 4][route: {}->http://stream.twitter.com] can be kept alive for 9223372036854775807 MILLISECONDS
14:35:52,358 DEBUG org.apache.http.impl.conn.DefaultClientConnection - Connection org.apache.http.impl.conn.DefaultClientConnection@64c88f2d closed
14:35:52,358 DEBUG org.apache.http.impl.conn.PoolingClientConnectionManager - Connection released: [id: 4][route: {}->http://stream.twitter.com][total kept alive: 0; route allocated: 0 of 2; total allocated: 0 of 20]
14:35:52,359 WARN com.twitter.hbc.httpclient.ClientBase - twitterSourceClient IOException caught when establishing connection to https://stream.twitter.com/1.1/statuses/filter.json?delimited=length
14:35:53,613 WARN com.twitter.hbc.httpclient.ClientBase - twitterSourceClient failed to establish connection properly
14:35:53,613 INFO com.twitter.hbc.httpclient.ClientBase - twitterSourceClient Done processing, preparing to close connection
14:35:53,613 DEBUG org.apache.http.impl.conn.PoolingClientConnectionManager - Connection manager is shutting down
14:35:53,613 DEBUG org.apache.http.impl.conn.PoolingClientConnectionManager - Connection manager shut down
プログラムは接続を再確立しようとします。したがって、この 4 行のログ メッセージは引き続き出力されます。
これに関する奇妙な点は、Apache Flink プロジェクトで提供されている例を実行すると、すべてが正常に機能することです (マスターの最新バージョンを GitHub から取得しました)。私も同じプロパティファイルを使用しています。そのサンプル クラスを自分のプロジェクトにコピーすると、上記の問題状態も発生します。
Flink アーキタイプを使用して、独自のプロジェクトを作成しました。バージョン 0.9.1 と 0.10-SNAPSHOT で試しました。依存関係flink-scala
、flink-streaming-scala
、flink-clients
およびflink-connector-twitter
は、対応するバージョンで使用されます。
誰かが同様の問題を経験したことがあり、私を正しい軌道に乗せることができますか?