0

最近、科学研究の一環として、REST API を使用して Travis CI と GitHub からデータをストリーミングする (または少なくともストリーミングする必要がある) アプリケーションを開発しています。これの目的は、多数の分析をさらに実行するために、コミットとビルドの関係を把握することです。

このために、次の Travis カスタム レシーバーを実装しました。

object TravisUtils { 

  def createStream(ctx : StreamingContext, storageLevel: StorageLevel) : ReceiverInputDStream[Build] = new TravisInputDStream(ctx, storageLevel) 

} 

private[streaming] 
class TravisInputDStream(ctx : StreamingContext, storageLevel : StorageLevel) extends ReceiverInputDStream[Build](ctx) { 

  def getReceiver() : Receiver[Build] = new TravisReceiver(storageLevel) 

} 

private[streaming] 
class TravisReceiver(storageLevel: StorageLevel) extends Receiver[Build](storageLevel) with Logging { 

  def onStart() : Unit = { 
     new BuildStream().addListener(new BuildListener { 

       override def onBuildsReceived(numberOfBuilds: Int): Unit = { 

       } 

       override def onBuildRepositoryReceived(build: Build): Unit = { 
         store(build) 
       } 

       override def onException(e: Exception): Unit = { 
         reportError("Exception while streaming travis", e) 
       } 
    }) 
  } 

  def onStop() : Unit = { 

  } 
} 

一方、受信機はカスタムメイドの TRAVIS API ライブラリ (Apache Async Client を使用して Java で開発) を使用します。ただし、問題は次のとおりです。受信する必要があるデータは継続的であり、変更されます。つまり、Travis と GitHub に絶えずプッシュされます。例として、GitHub が 1 秒あたり約 350 イベント - プッシュ イベント、コミット コメントなどを含む。

しかし、GitHub または Travis のいずれかをストリーミングする場合、最初の 2 つのバッチからデータを取得しますが、その後、DStream の RDD の部分は空です - ストリーミングするデータはありますが!

APIへのリクエストを省略するために使用されるHttpClientなど、これまでにいくつか確認しましたが、実際にこの問題を解決したものはありませんでした。

したがって、私の質問は - 何が起こっているのでしょうか? 期間 x が経過した後、Spark がデータをストリーミングしないのはなぜですか。以下に、設定されたコンテキストと構成を示します。

val configuration = new SparkConf().setAppName("StreamingSoftwareAnalytics").setMaster("local[2]") 

val ctx = new StreamingContext(configuration, Seconds(3)) 

val stream = GitHubUtils.createStream(ctx, StorageLevel.MEMORY_AND_DISK_SER) 

// RDD IS EMPTY - that is what is happenning! 
stream.window(Seconds(9)).foreachRDD(rdd => { 
  if (rdd.isEmpty()) {println("RDD IS EMPTY")} else {rdd.collect().foreach(event => println(event.getRepo.getName + " " + event.getId))} 
}) 

ctx.start() 
ctx.awaitTermination() 

前もって感謝します!

4

0 に答える 0