1

私のプログラムは、各アプリケーション サーバーから 1 時間ごとに生成される一連のログ ファイルを毎日分析する必要があります。

したがって、2 つのアプリ サーバーがある場合、48 ファイル (24 ファイル * 2 アプリ サーバー) を処理することになります。

ファイル サイズの範囲は 100 ~ 300 MB です。すべてのファイルの各行は、次の形式のログ エントリです。

[識別子]-[個数]-[個数]-[ログの一部]

例えば

xxx-3-1-ABC
xxx-3-2-ABC
xxx-3-3-ABC

これらは、私が言及した 48 個のファイルに分散できます。これらのログを次のようにマージする必要があります。

xxx-PAIR-ABCABCABC

私の実装では、スレッド プールを使用してファイルを並行して読み取り、ConcurrentHashMap を使用してそれらを集約します。

クラス LogEvent.scala を定義します

class LogEvent (val id: String, val total: Int, var piece: Int, val json: String) {

  var additions: Long = 0
  val pieces = new Array[String](total)
  addPiece(json)


  private def addPiece (json: String): Unit = {
    pieces(piece) = json
    additions += 1
  }

  def isDone: Boolean = {
    additions == total
  }


  def add (slot: Int, json: String): Unit = {
    piece = slot
    addPiece(json)
  } 

主な処理は複数のスレッドで行われ、コードは次の行にある

//For each file
val logEventMap = new ConcurrentHashMap[String, LogEvent]().asScala
Future {
          Source.fromInputStream(gis(file)).getLines().foreach {
            line =>

                  //Extract the id part of the line
                  val idPart: String = IDPartExtractor(line)
                  //Split line on '-'
                  val split: Array[String] = idPart.split("-")



                    val id: String = split(0) + "-" + split(1)
                    val logpart: String = JsonPartExtractor(line)
                    val total = split(2) toInt
                    val piece = split(3) toInt

                    def slot: Int = {
                      piece match {
                        case x if x - 1 < 0 => 0
                        case _ => piece - 1
                      }
                    }

                    def writeLogEvent (logEvent: LogEvent): Unit = {
                      if (logEvent.isDone) {
                        //write to buffer
                        val toWrite = id + "-PAIR-" + logEvent.pieces.mkString("")
                        logEventMap.remove(logEvent.id)
                        writer.writeLine(toWrite)
                      }
                    }

                    //The LOCK
                    appendLock {
                      if (!logEventMap.contains(id)) {
                        val logEvent = new LogEvent(id, total, slot, jsonPart)
                        logEventMap.put(id, logEvent)
                        //writeLogEventToFile()
                      }
                      else {
                        val logEvent = logEventMap.get(id).get
                        logEvent.add(slot, jsonPart)
                        writeLogEvent(logEvent)

                      }
                    } 
                }
          }

すべての先物が完了するまで、メインスレッドはブロックされます

このアプローチを使用して、処理時間を 1 時間以上から約 7 ~ 8 分に短縮することができました。

私の質問は次のとおりです-

  1. 異なるスレッドを使用して複数のファイルを読み込んでおり、集約が行われるブロックでロックする必要があります。これを行うためのより良い方法はありますか?
  2. マップはメモリ内で非常に高速に成長します。そのようなユースケースのためのオフヒープストレージの提案
  3. 別のフィードバックはありませんか。

ありがとう

4

2 に答える 2

0

この種のことについては、可能であればSplunkを使用し、そうでない場合は、後で必要に応じて集約するためにログ ファイルのインデックスを作成することをコピーします。

オフヒープ ストレージについては、Hazelcast または Coherence の分散キャッシュを調べてください。どちらのサポートもjava.util.Map、複数の JVM に格納される実装を提供します。

于 2013-08-22T09:49:59.290 に答える