最近、科学研究の一環として、REST API を使用して Travis CI と GitHub からデータをストリーミングする (または少なくともストリーミングする必要がある) アプリケーションを開発しています。これの目的は、多数の分析をさらに実行するために、コミットとビルドの関係を把握することです。
このために、次の Travis カスタム レシーバーを実装しました。
object TravisUtils {
def createStream(ctx : StreamingContext, storageLevel: StorageLevel) : ReceiverInputDStream[Build] = new TravisInputDStream(ctx, storageLevel)
}
private[streaming]
class TravisInputDStream(ctx : StreamingContext, storageLevel : StorageLevel) extends ReceiverInputDStream[Build](ctx) {
def getReceiver() : Receiver[Build] = new TravisReceiver(storageLevel)
}
private[streaming]
class TravisReceiver(storageLevel: StorageLevel) extends Receiver[Build](storageLevel) with Logging {
def onStart() : Unit = {
new BuildStream().addListener(new BuildListener {
override def onBuildsReceived(numberOfBuilds: Int): Unit = {
}
override def onBuildRepositoryReceived(build: Build): Unit = {
store(build)
}
override def onException(e: Exception): Unit = {
reportError("Exception while streaming travis", e)
}
})
}
def onStop() : Unit = {
}
}
一方、受信機はカスタムメイドの TRAVIS API ライブラリ (Apache Async Client を使用して Java で開発) を使用します。ただし、問題は次のとおりです。受信する必要があるデータは継続的であり、変更されます。つまり、Travis と GitHub に絶えずプッシュされます。例として、GitHub が 1 秒あたり約 350 イベント - プッシュ イベント、コミット コメントなどを含む。
しかし、GitHub または Travis のいずれかをストリーミングする場合、最初の 2 つのバッチからデータを取得しますが、その後、DStream の RDD の部分は空です - ストリーミングするデータはありますが!
APIへのリクエストを省略するために使用されるHttpClientなど、これまでにいくつか確認しましたが、実際にこの問題を解決したものはありませんでした。
したがって、私の質問は - 何が起こっているのでしょうか? 期間 x が経過した後、Spark がデータをストリーミングしないのはなぜですか。以下に、設定されたコンテキストと構成を示します。
val configuration = new SparkConf().setAppName("StreamingSoftwareAnalytics").setMaster("local[2]")
val ctx = new StreamingContext(configuration, Seconds(3))
val stream = GitHubUtils.createStream(ctx, StorageLevel.MEMORY_AND_DISK_SER)
// RDD IS EMPTY - that is what is happenning!
stream.window(Seconds(9)).foreachRDD(rdd => {
if (rdd.isEmpty()) {println("RDD IS EMPTY")} else {rdd.collect().foreach(event => println(event.getRepo.getName + " " + event.getId))}
})
ctx.start()
ctx.awaitTermination()
前もって感謝します!