11

私は、Kafka ストリームを取得し、ストリームに対して非常に基本的な変換を行い、データを DB (関連する場合は voltdb) に挿入する Spark Streaming プログラムに取り組んでいます。DB に行を挿入する速度を測定しようとしています。メトリクスは (JMX を使用して) 役立つと思います。ただし、カスタム メトリックを Spark に追加する方法が見つかりません。Spark のソース コードを調べたところ、このスレッドも見つかりましたが、うまくいきません。また、conf.metrics ファイルで JMX シンクを有効にしました。うまくいかないのは、JConsole でカスタム メトリックが表示されないことです。

スパーク ストリーミングにカスタム メトリックを (できれば JMX 経由で) 追加する方法を誰か説明してもらえますか? あるいは、DB (具体的には VoltDB) への挿入率を測定する方法を教えてください。Java 8でsparkを使用しています。

4

5 に答える 5

17

ソースコードを掘り下げた後、独自のカスタムメトリックを追加する方法を見つけました。それには3つのことが必要です:

  1. 独自のカスタムソースを作成します。だいたいこんな感じ
  2. spark metrics.properties ファイルで Jmx シンクを有効にします。私が使用した特定の行は次のとおりです。*.sink.jmx.class=org.apache.spark.metrics.sink.JmxSinkこれにより、すべてのインスタンスに対して JmxSink が有効になります
  3. カスタム ソースを SparkEnv メトリクス システムに登録します。方法の例をここで見ることができます- 以前にこのリンクを実際に見ましたが、登録部分を見逃していたため、JVisualVM でカスタム メトリックを実際に見ることができませんでした。

コードはエグゼキューターで実行されるため、VoltDB への挿入数を実際にカウントする方法にまだ苦労していますが、それは別のトピックの主題です:)

これが他の人に役立つことを願っています

于 2015-10-01T09:12:00.330 に答える
3

VoltDB からの挿入に基づいて行を挿入するには、アキュムレータを使用します。次に、ドライバーからリスナーを作成できます。おそらく、このようなものから始めることができます

sparkContext.addSparkListener(new SparkListener() {
  override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) {
    stageCompleted.stageInfo.accumulables.foreach { case (_, acc) => {

ここでは、これらの行を組み合わせたアキュムレータにアクセスでき、シンクに送信できます..

于 2015-11-07T23:55:20.370 に答える