github リンクhttps://github.com/apache/incubator-spark/tree/master/examples/src/main/scala/org/apache/spark/streaming/examplesを開くことができませんでした 。
ただし、私のために働いた以下のコードを使用できます。
import org.apache.spark.streaming.{ Seconds, StreamingContext }
import org.apache.spark.SparkContext._
import org.apache.spark.streaming.twitter._
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.{ SparkContext, SparkConf }
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.flume._
/**
* A Spark Streaming application that receives tweets on certain
* keywords from twitter datasource and find the popular hashtags
*
* Arguments: <comsumerKey> <consumerSecret> <accessToken> <accessTokenSecret> <keyword_1> ... <keyword_n>
* <comsumerKey> - Twitter consumer key
* <consumerSecret> - Twitter consumer secret
* <accessToken> - Twitter access token
* <accessTokenSecret> - Twitter access token secret
* <keyword_1> - The keyword to filter tweets
* <keyword_n> - Any number of keywords to filter tweets
*
* More discussion at stdatalabs.blogspot.com
*
* @author Sachin Thirumala
*/
object SparkPopularHashTags {
val conf = new SparkConf().setMaster("local[4]").setAppName("Spark Streaming - PopularHashTags")
val sc = new SparkContext(conf)
def main(args: Array[String]) {
sc.setLogLevel("WARN")
val Array(consumerKey, consumerSecret, accessToken, accessTokenSecret) = args.take(4)
val filters = args.takeRight(args.length - 4)
// Set the system properties so that Twitter4j library used by twitter stream
// can use them to generat OAuth credentials
System.setProperty("twitter4j.oauth.consumerKey", consumerKey)
System.setProperty("twitter4j.oauth.consumerSecret", consumerSecret)
System.setProperty("twitter4j.oauth.accessToken", accessToken)
System.setProperty("twitter4j.oauth.accessTokenSecret", accessTokenSecret)
// Set the Spark StreamingContext to create a DStream for every 5 seconds
val ssc = new StreamingContext(sc, Seconds(5))
// Pass the filter keywords as arguements
// val stream = FlumeUtils.createStream(ssc, args(0), args(1).toInt)
val stream = TwitterUtils.createStream(ssc, None, filters)
// Split the stream on space and extract hashtags
val hashTags = stream.flatMap(status => status.getText.split(" ").filter(_.startsWith("#")))
// Get the top hashtags over the previous 60 sec window
val topCounts60 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(60))
.map { case (topic, count) => (count, topic) }
.transform(_.sortByKey(false))
// Get the top hashtags over the previous 10 sec window
val topCounts10 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(10))
.map { case (topic, count) => (count, topic) }
.transform(_.sortByKey(false))
// print tweets in the currect DStream
stream.print()
// Print popular hashtags
topCounts60.foreachRDD(rdd => {
val topList = rdd.take(10)
println("\nPopular topics in last 60 seconds (%s total):".format(rdd.count()))
topList.foreach { case (count, tag) => println("%s (%s tweets)".format(tag, count)) }
})
topCounts10.foreachRDD(rdd => {
val topList = rdd.take(10)
println("\nPopular topics in last 10 seconds (%s total):".format(rdd.count()))
topList.foreach { case (count, tag) => println("%s (%s tweets)".format(tag, count)) }
})
ssc.start()
ssc.awaitTermination()
}
}
説明:
setMaster("local[4]")
- 受信ストリームを収集するために 1 つのスレッドが使用され、それを処理するために別のスレッドが使用されるため、少なくとも 2 つのスレッドでマスターをローカル モードに設定してください。
以下のコードを使用して、人気のあるハッシュタグをカウントします。
val topCounts60 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(60))
.map { case (topic, count) => (count, topic) }
.transform(_.sortByKey(false))
上記のスニペットは、で指定されたように、過去 60/10 秒間のハッシュタグのワード カウントを実行し、reduceByKeyAndWindow
それらを降順に並べ替えます。
reduceByKeyAndWindow
以前のストリーム間隔で蓄積されたデータに変換を適用する必要がある場合に使用されます。
4 つの Twitter OAuth トークンを引数として渡して、コードを実行します。
人気のあるハッシュタグが 10/60 秒間隔で表示されるはずです。
以下のリンクで Spark Streaming と Storm を Flume と Kafka と統合することで、同様のプロジェクトを確認できます。
スパーク ストリーミング:
Spark ストリーミング パート 1: リアルタイムの twitter センチメント分析
http://stdatalabs.blogspot.in/2016/09/spark-streaming-part-1-real-time.html
Spark ストリーミング パート 2: Flume を使用したリアルタイムの Twitter 感情分析
http://stdatalabs.blogspot.in/2016/09/spark-streaming-part-2-real-time_10.html
Spark ストリーミング パート 3: kafka を使用したリアルタイムの Twitter 感情分析
http://stdatalabs.blogspot.in/2016/09/spark-streaming-part-3-real-time.html
kafka 統合による Spark Streaming でのデータ保証
http://stdatalabs.blogspot.in/2016/10/data-guarantees-in-spark-streaming-with.html
嵐:
Apache Storm を使用したリアルタイム ストリーム処理 - パート 1
http://stdatalabs.blogspot.in/2016/09/realtime-stream-processing-using-apache.html
Apache Storm と Kafka を使用したリアルタイム ストリーム処理 - パート 2
http://stdatalabs.blogspot.in/2016/10/real-time-stream-processing-using.html