私のプログラムは、各アプリケーション サーバーから 1 時間ごとに生成される一連のログ ファイルを毎日分析する必要があります。
したがって、2 つのアプリ サーバーがある場合、48 ファイル (24 ファイル * 2 アプリ サーバー) を処理することになります。
ファイル サイズの範囲は 100 ~ 300 MB です。すべてのファイルの各行は、次の形式のログ エントリです。
[識別子]-[個数]-[個数]-[ログの一部]
例えば
xxx-3-1-ABC
xxx-3-2-ABC
xxx-3-3-ABC
これらは、私が言及した 48 個のファイルに分散できます。これらのログを次のようにマージする必要があります。
xxx-PAIR-ABCABCABC
私の実装では、スレッド プールを使用してファイルを並行して読み取り、ConcurrentHashMap を使用してそれらを集約します。
クラス LogEvent.scala を定義します
class LogEvent (val id: String, val total: Int, var piece: Int, val json: String) {
var additions: Long = 0
val pieces = new Array[String](total)
addPiece(json)
private def addPiece (json: String): Unit = {
pieces(piece) = json
additions += 1
}
def isDone: Boolean = {
additions == total
}
def add (slot: Int, json: String): Unit = {
piece = slot
addPiece(json)
}
主な処理は複数のスレッドで行われ、コードは次の行にある
//For each file
val logEventMap = new ConcurrentHashMap[String, LogEvent]().asScala
Future {
Source.fromInputStream(gis(file)).getLines().foreach {
line =>
//Extract the id part of the line
val idPart: String = IDPartExtractor(line)
//Split line on '-'
val split: Array[String] = idPart.split("-")
val id: String = split(0) + "-" + split(1)
val logpart: String = JsonPartExtractor(line)
val total = split(2) toInt
val piece = split(3) toInt
def slot: Int = {
piece match {
case x if x - 1 < 0 => 0
case _ => piece - 1
}
}
def writeLogEvent (logEvent: LogEvent): Unit = {
if (logEvent.isDone) {
//write to buffer
val toWrite = id + "-PAIR-" + logEvent.pieces.mkString("")
logEventMap.remove(logEvent.id)
writer.writeLine(toWrite)
}
}
//The LOCK
appendLock {
if (!logEventMap.contains(id)) {
val logEvent = new LogEvent(id, total, slot, jsonPart)
logEventMap.put(id, logEvent)
//writeLogEventToFile()
}
else {
val logEvent = logEventMap.get(id).get
logEvent.add(slot, jsonPart)
writeLogEvent(logEvent)
}
}
}
}
すべての先物が完了するまで、メインスレッドはブロックされます
このアプローチを使用して、処理時間を 1 時間以上から約 7 ~ 8 分に短縮することができました。
私の質問は次のとおりです-
- 異なるスレッドを使用して複数のファイルを読み込んでおり、集約が行われるブロックでロックする必要があります。これを行うためのより良い方法はありますか?
- マップはメモリ内で非常に高速に成長します。そのようなユースケースのためのオフヒープストレージの提案
- 別のフィードバックはありませんか。
ありがとう