2

私はSparkを初めて使用するので、ガイドしてください。

Scala を使用した Spark ストリーミングに関連する使用可能な例が多数あります。

https://github.com/apache/incubator-spark/tree/master/examples/src/main/scala/org/apache/spark/streaming/examplesから確認できます。

TwitterPopularTags.scala を実行したい。

この例では、Twitter ログインの詳細を設定できません。

http://spark.incubator.apache.org/docs/latest/streaming-programming-guide.html#linking-with-spark-streaming

ネットワークカウントの例を正常に実行できました。

しかし、実行する
./run-example org.apache.spark.streaming.examples.TwitterPopularTags local[2] と、認証失敗の問題が表示されます...

TwitterPopularTags.scala のような文字列コンテキストを初期化する前に、Twitter のログインの詳細を設定します。

 System.setProperty("twitter4j.oauth.consumerKey", "####");
 System.setProperty("twitter4j.oauth.consumerSecret", "##");
 System.setProperty("twitter4j.oauth.accessToken", "##");
 System.setProperty("twitter4j.oauth.accessTokenSecret", "##");

ガイドしてください。

4

2 に答える 2

2

Twitter の例を実行する前に、ファイル "twitter4j.properties" を Spark ルート ディレクトリ (spark-0.8.0-incubating など) に配置します。

twitter4j.プロパティ:

oauth.consumerKey=***
oauth.consumerSecret=***
oauth.accessToken=***
oauth.accessTokenSecret=***

Scalaの例を使ってMacで私のために働きました。

于 2013-11-01T17:50:51.273 に答える
1

github リンクhttps://github.com/apache/incubator-spark/tree/master/examples/src/main/scala/org/apache/spark/streaming/examplesを開くことができませんでした 。

ただし、私のために働いた以下のコードを使用できます。

import org.apache.spark.streaming.{ Seconds, StreamingContext }
import org.apache.spark.SparkContext._
import org.apache.spark.streaming.twitter._
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.{ SparkContext, SparkConf }
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.flume._

/**
 * A Spark Streaming application that receives tweets on certain 
 * keywords from twitter datasource and find the popular hashtags
 * 
 * Arguments: <comsumerKey> <consumerSecret> <accessToken> <accessTokenSecret> <keyword_1> ... <keyword_n>
 * <comsumerKey>        - Twitter consumer key 
 * <consumerSecret>     - Twitter consumer secret
 * <accessToken>        - Twitter access token
 * <accessTokenSecret>  - Twitter access token secret
 * <keyword_1>          - The keyword to filter tweets
 * <keyword_n>          - Any number of keywords to filter tweets
 * 
 * More discussion at stdatalabs.blogspot.com
 * 
 * @author Sachin Thirumala
 */

object SparkPopularHashTags {
  val conf = new SparkConf().setMaster("local[4]").setAppName("Spark Streaming - PopularHashTags")
  val sc = new SparkContext(conf)

  def main(args: Array[String]) {

    sc.setLogLevel("WARN")

    val Array(consumerKey, consumerSecret, accessToken, accessTokenSecret) = args.take(4)
    val filters = args.takeRight(args.length - 4)

    // Set the system properties so that Twitter4j library used by twitter stream
    // can use them to generat OAuth credentials
    System.setProperty("twitter4j.oauth.consumerKey", consumerKey)
    System.setProperty("twitter4j.oauth.consumerSecret", consumerSecret)
    System.setProperty("twitter4j.oauth.accessToken", accessToken)
    System.setProperty("twitter4j.oauth.accessTokenSecret", accessTokenSecret)

    // Set the Spark StreamingContext to create a DStream for every 5 seconds
    val ssc = new StreamingContext(sc, Seconds(5))
    // Pass the filter keywords as arguements

    //  val stream = FlumeUtils.createStream(ssc, args(0), args(1).toInt)  
    val stream = TwitterUtils.createStream(ssc, None, filters)

    // Split the stream on space and extract hashtags 
    val hashTags = stream.flatMap(status => status.getText.split(" ").filter(_.startsWith("#")))

    // Get the top hashtags over the previous 60 sec window
    val topCounts60 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(60))
      .map { case (topic, count) => (count, topic) }
      .transform(_.sortByKey(false))

    // Get the top hashtags over the previous 10 sec window
    val topCounts10 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(10))
      .map { case (topic, count) => (count, topic) }
      .transform(_.sortByKey(false))

    // print tweets in the currect DStream 
    stream.print()

    // Print popular hashtags  
    topCounts60.foreachRDD(rdd => {
      val topList = rdd.take(10)
      println("\nPopular topics in last 60 seconds (%s total):".format(rdd.count()))
      topList.foreach { case (count, tag) => println("%s (%s tweets)".format(tag, count)) }
    })
    topCounts10.foreachRDD(rdd => {
      val topList = rdd.take(10)
      println("\nPopular topics in last 10 seconds (%s total):".format(rdd.count()))
      topList.foreach { case (count, tag) => println("%s (%s tweets)".format(tag, count)) }
    })

    ssc.start()
    ssc.awaitTermination()
  }
} 

説明:
setMaster("local[4]") - 受信ストリームを収集するために 1 つのスレッドが使用され、それを処理するために別のスレッドが使用されるため、少なくとも 2 つのスレッドでマスターをローカル モードに設定してください。

以下のコードを使用して、人気のあるハッシュタグをカウントします。

val topCounts60 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(60))
      .map { case (topic, count) => (count, topic) }
      .transform(_.sortByKey(false))

上記のスニペットは、で指定されたように、過去 60/10 秒間のハッシュタグのワード カウントを実行し、reduceByKeyAndWindowそれらを降順に並べ替えます。

reduceByKeyAndWindow以前のストリーム間隔で蓄積されたデータに変換を適用する必要がある場合に使用されます。

4 つの Twitter OAuth トークンを引数として渡して、コードを実行します。 ここに画像の説明を入力

人気のあるハッシュタグが 10/60 秒間隔で表示されるはずです。 ここに画像の説明を入力

以下のリンクで Spark Streaming と Storm を Flume と Kafka と統合することで、同様のプロジェクトを確認できます。

スパーク ストリーミング:

Spark ストリーミング パート 1: リアルタイムの twitter センチメント分析 http://stdatalabs.blogspot.in/2016/09/spark-streaming-part-1-real-time.html

Spark ストリーミング パート 2: Flume を使用したリアルタイムの Twitter 感情分析 http://stdatalabs.blogspot.in/2016/09/spark-streaming-part-2-real-time_10.html

Spark ストリーミング パート 3: kafka を使用したリアルタイムの Twitter 感情分析 http://stdatalabs.blogspot.in/2016/09/spark-streaming-part-3-real-time.html

kafka 統合による Spark Streaming でのデータ保証 http://stdatalabs.blogspot.in/2016/10/data-guarantees-in-spark-streaming-with.html

嵐:

Apache Storm を使用したリアルタイム ストリーム処理 - パート 1 http://stdatalabs.blogspot.in/2016/09/realtime-stream-processing-using-apache.html

Apache Storm と Kafka を使用したリアルタイム ストリーム処理 - パート 2 http://stdatalabs.blogspot.in/2016/10/real-time-stream-processing-using.html

于 2016-11-03T10:10:46.510 に答える