3

基本的に、イベントの到着時に Spark ストリーミング アプリケーションを再起動するイベント コールバックをドライバー プログラムに記述したいと考えています。私のドライバー プログラムは、ファイルから構成を読み取ることによって、ストリームと実行ロジックを設定しています。ファイルが変更される (新しい構成が追加される) たびに、ドライバー プログラムは次の手順を順番に実行する必要があります。

  1. 再起動、
  2. 構成ファイルを (メイン メソッドの一部として) 読み取り、
  3. ストリームを設定する

これを達成するための最良の方法は何ですか?

4

5 に答える 5

3

場合によっては、ストリーミング コンテキストを動的にリロードする必要がある場合があります (たとえば、ストリーミング操作のリロードなど)。その場合、次のことができます ( Scalaの例):

val sparkContext = new SparkContext()

val stopEvent = false
var streamingContext = Option.empty[StreamingContext]
val shouldReload = false

val processThread = new Thread {
  override def run(): Unit = {
    while (!stopEvent){
      if (streamingContext.isEmpty) {

        // new context
        streamingContext = Option(new StreamingContext(sparkContext, Seconds(1)))

        // create DStreams
          val lines = streamingContext.socketTextStream(...)

        // your transformations and actions
        // and decision to reload streaming context
        // ...

        streamingContext.get.start()
      } else {
        if (shouldReload) {
          streamingContext.get.stop(stopSparkContext = false, stopGracefully = true)
          streamingContext.get.awaitTermination()
          streamingContext = Option.empty[StreamingContext]
        } else {
          Thread.sleep(1000)
        }
      }

    }
    streamingContext.get.stop(stopSparkContext =true, stopGracefully = true)
    streamingContext.get.awaitTermination()
  }
}

// and start it  in separate thread
processThread.start()
processThread.join()

またはpythonで:

spark_context = SparkContext()

stop_event = Event()
spark_streaming_context = None
should_reload = False

def process(self):
    while not stop_event.is_set():
        if spark_streaming_context is None:

            # new context
            spark_streaming_context = StreamingContext(spark_context, 0.5)

            # create DStreams
            lines = spark_streaming_context.socketTextStream(...)  

            # your transformations and actions
            # and decision to reload streaming context
            # ...

            self.spark_streaming_context.start()
        else:
            # TODO move to config
            if should_reload:
                spark_streaming_context.stop(stopSparkContext=False, stopGraceFully=True)
                spark_streaming_context.awaitTermination()
                spark_streaming_context = None
            else:
                time.sleep(1)
    else:
        self.spark_streaming_context.stop(stopGraceFully=True)
        self.spark_streaming_context.awaitTermination()


# and start it  in separate thread
process_thread = threading.Thread(target=process)
process_thread.start()
process_thread.join()

コードがクラッシュするのを防ぎ、最後の場所からストリーミング コンテキストを再開するには、チェックポイントメカニズムを使用します。失敗後にジョブの状態を復元できます。

于 2018-02-17T11:30:56.593 に答える
0

Scala では、停止には停止sparkStreamingContextが含まれる場合がありSparkContextます。レシーバーがハングした場合、SparkCintext と SparkStreamingContext を再起動するのが最善であることがわかりました。

以下のコードはもっとエレガントに記述できると確信していますが、プログラムで SparkContext と SparkStreamingContext を再起動できます。これが完了したら、レシーバーをプログラムで再起動することもできます。

    package coname.utilobjects

import com.typesafe.config.ConfigFactory
import grizzled.slf4j.Logging
import coname.conameMLException
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.{Seconds, StreamingContext}

import scala.collection.mutable


object SparkConfProviderWithStreaming extends Logging
{
  val sparkVariables: mutable.HashMap[String, Any] = new mutable.HashMap
}



trait SparkConfProviderWithStreaming extends Logging{






  private val keySSC = "SSC"
  private val keyConf = "conf"
  private val keySparkSession = "spark"


  lazy val   packagesversion=ConfigFactory.load("streaming").getString("streaming.cassandraconfig.packagesversion")
  lazy val   sparkcassandraconnectionhost=ConfigFactory.load("streaming").getString("streaming.cassandraconfig.sparkcassandraconnectionhost")
  lazy val   sparkdrivermaxResultSize=ConfigFactory.load("streaming").getString("streaming.cassandraconfig.sparkdrivermaxResultSize")
  lazy val   sparknetworktimeout=ConfigFactory.load("streaming").getString("streaming.cassandraconfig.sparknetworktimeout")


  @throws(classOf[conameMLException])
  def intitializeSpark(): Unit =
  {
    getSparkConf()
    getSparkStreamingContext()
    getSparkSession()
  }

  @throws(classOf[conameMLException])
  def getSparkConf(): SparkConf = {
    try {
      if (!SparkConfProviderWithStreaming.sparkVariables.get(keyConf).isDefined) {
        logger.info("\n\nLoading new conf\n\n")
        val conf = new SparkConf().setMaster("local[4]").setAppName("MLPCURLModelGenerationDataStream")
        conf.set("spark.streaming.stopGracefullyOnShutdown", "true")
        conf.set("spark.cassandra.connection.host", sparkcassandraconnectionhost)
        conf.set("spark.driver.maxResultSize", sparkdrivermaxResultSize)
        conf.set("spark.network.timeout", sparknetworktimeout)


        SparkConfProviderWithStreaming.sparkVariables.put(keyConf, conf)
        logger.info("Loaded new conf")
        getSparkConf()
      }
      else {
        logger.info("Returning initialized conf")
        SparkConfProviderWithStreaming.sparkVariables.get(keyConf).get.asInstanceOf[SparkConf]
      }
    }
    catch {
      case e: Exception =>
        logger.error(e.getMessage, e)
        throw new conameMLException(e.getMessage)
    }

  }

  @throws(classOf[conameMLException])
def killSparkStreamingContext
  {
    try
    {
      if(SparkConfProviderWithStreaming.sparkVariables.get(keySSC).isDefined)
        {
          SparkConfProviderWithStreaming.sparkVariables -= keySSC
          SparkConfProviderWithStreaming.sparkVariables -= keyConf
        }
      SparkSession.clearActiveSession()
      SparkSession.clearDefaultSession()

    }
    catch {
      case e: Exception =>
        logger.error(e.getMessage, e)
        throw new conameMLException(e.getMessage)
    }
  }

  @throws(classOf[conameMLException])
  def getSparkStreamingContext(): StreamingContext = {
    try {
      if (!SparkConfProviderWithStreaming.sparkVariables.get(keySSC).isDefined) {
        logger.info("\n\nLoading new streaming\n\n")
        SparkConfProviderWithStreaming.sparkVariables.put(keySSC, new StreamingContext(getSparkConf(), Seconds(6)))

        logger.info("Loaded streaming")
        getSparkStreamingContext()
      }
      else {
        SparkConfProviderWithStreaming.sparkVariables.get(keySSC).get.asInstanceOf[StreamingContext]
      }
    }
    catch {
      case e: Exception =>
        logger.error(e.getMessage, e)
        throw new conameMLException(e.getMessage)
    }
  }

  def getSparkSession():SparkSession=
  {

    if(!SparkSession.getActiveSession.isDefined)
    {
      SparkSession.builder.config(getSparkConf()).getOrCreate()

    }
    else
      {
        SparkSession.getActiveSession.get
      }
  }

}
于 2018-07-12T05:18:51.347 に答える
0

私たちが最近 (こちらの Spark ミートアップで) 検討した 1 つの方法は、Spark と Tandem で Zookeeper を使用してこれを達成することでした。簡単に言えば、Apache Curator を使用して Zookeeper の変更 (外部イベントによってトリガーされる ZK の構成の変更) を監視し、リスナーを再起動します。

参照されているコード ベースはこちら です。config を変更すると、Watcher (Spark ストリーミング アプリ) が正常なシャットダウン後に再起動し、変更がリロードされることがわかります。これがポインタであることを願っています!

于 2017-01-18T16:33:08.540 に答える