Twitter ユーティリティで jar をロードした後、spark-shell でいくつかのテストを行っています。動作するコードシーケンスは次のとおりです。
// launch:
// spark-shell --driver-memory 1g --master local[3] --jars target/scala-2.10/tweetProcessing-1.0.jar
import org.apache.spark._
import org.apache.spark.rdd._
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.SparkContext._
import org.apache.spark.streaming.twitter._
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.StreamingContext._
val consumerKey = ...
val consumerSecret = ...
val accessToken = ...
val accessTokenSecret = ...
System.setProperty("twitter4j.oauth.consumerKey", consumerKey)
System.setProperty("twitter4j.oauth.consumerSecret", consumerSecret)
System.setProperty("twitter4j.oauth.accessToken", accessToken)
System.setProperty("twitter4j.oauth.accessTokenSecret", accessTokenSecret)
val ssc = new StreamingContext(sc, Seconds(60))
val tweetStream = TwitterUtils.createStream(ssc, None)
val myNewStream = tweetStream.map(tweet => tweet.getText)
.map(tweetText => tweetText.toLowerCase.split("\\W+"))
.transform(rdd =>
rdd.map(tweetWordSeq => {
tweetWordSeq.foreach { word => {
val mySet = Set("apple", "orange");
if(!(mySet)(word)) word }
}
}))
myNewStream.foreachRDD((rdd,time) => {
println("%s at time %s".format(rdd.count(),time.milliseconds))
})
ssc.start()
(実際には、問題を強調するために、私が行う計算を最大限に減らしました)。ここで、mySet がシリアライズされ、すべてがうまくいきます。
しかし、代わりにブロードキャスト変数を使用し、それに応じてテストを置き換える場合:
val ssc = new StreamingContext(sc, Seconds(60))
val mySet = sc.broadcast(Set("apple", "orange"))
val tweetStream = TwitterUtils.createStream(ssc, None)
val myNewStream = tweetStream.map(tweet => tweet.getText)
.map(tweetText => tweetText.toLowerCase.split("\\W+"))
.transform(rdd =>
rdd.map(tweetWordSeq => {
tweetWordSeq.foreach { word => {
if(!(mySet.value)(word)) word }
}
}))
myNewStream.foreachRDD((rdd,time) => {
println("%s at time %s".format(rdd.count(),time.milliseconds))
})
ssc.start()
私は得る:
ERROR JobScheduler: Error generating jobs for time 1464335160000 ms
org.apache.spark.SparkException: Task not serializable
...
Caused by: java.io.NotSerializableException: Object of org.apache.spark.streaming.dstream.TransformedDStream is being serialized possibly as a part of closure of an RDD operation. This is because the DStream object is being referred to from within the closure. Please rewrite the RDD operation inside this DStream to avoid this. This has been enforced to avoid bloating of Spark tasks with unnecessary objects.
もちろん、ブロードキャスト変数を使用することを好みます (私のセットは、実際にはストップ ワードのかなり大きなセットです) が、問題の原因がよくわかりません。