ログ行が次のようになるログ ファイルの処理パイプラインを構築しようとしています。
2005-05-06 14:58:57 1 45.23.4.218 304 TCP_HIT 542 1109 GET http sports.espn.go.com /crossdomain.xml - - DIRECT 199.181.132.141 text/xml "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.0; .NET CLR 1.1.4322)" PROXIED Sports/Recreation/Hobbies - 192.16.170.44 SG-HTTP-Service - none -
この特定のログ ファイルについては、処理したい約193705 のログ行があります。
Harits-MacBook-Pro-2:bluecoat_proxy_big harit$ wc -l Demo_log_004.log
195765 Demo_log_004.log
Harits-MacBook-Pro-2:bluecoat_proxy_big harit$ grep -E "GET|POST|CONNECT" Demo_log_004.log | wc -l
192197
Harits-MacBook-Pro-2:bluecoat_proxy_big harit$ wc -l Demo_log_004.log
195765 Demo_log_004.log
Harits-MacBook-Pro-2:bluecoat_proxy_big harit$ grep -v "^#" Demo_log_004.log | wc -l
193705
Harits-MacBook-Pro-2:bluecoat_proxy_big harit$
最初は次のように見えるフローグラフを作成しました
source ~> byteStringToString ~> filterComments ~> splitLogLine ~> broadcast ~> transformEvent ~> sinkEvents
broadcast ~> obfuscateIpAddress ~> sinkAssets
しかし、その後、シンクでの取得量が大幅に減っていることに気付きました
$ java -cp processor/target/lib:processor/target/processor-1.0-SNAPSHOT.jar com.learner.processor.LogFile | tee out.log
$ wc -l out.log
15070 out.log
そのため、すべての線がパイプを通過するかどうかを確認するために、グラフを線形にしました。私の現在のコードは次のようになります
import java.io.File
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.io.Implicits.AddSynchronousFileSource
import akka.stream.scaladsl.FlowGraph.Builder
import akka.stream.scaladsl._
import akka.util.ByteString
import com.learner.messages.BlueCoatEvent
import com.learner.processor.Flows.byteStringToString
import com.typesafe.scalalogging.Logger
import org.slf4j.LoggerFactory
import scala.concurrent.Future
import scala.util.hashing.MurmurHash3.stringHash
object LogFile {
val maxBytesPerLine = 1500
implicit val system = ActorSystem("system")
def apply(file: File) = new LogFile(file)
def main(args: Array[String]) {
LogFile(new File("/Users/harit/Downloads/bluecoat_proxy_big/Demo_log_004.log")).processGraph()
}
}
class LogFile(file: File)(implicit val system: ActorSystem) {
Predef.assert(file.exists(), "log file must exists")
implicit val materializer = ActorMaterializer()
val logger = Logger(LoggerFactory.getLogger(getClass))
val source: Source[ByteString, Future[Long]] = Source.synchronousFile(file)
def processGraph() = {
val sinkEvents = Sink.foreach(println)
val sinkAssets = Sink.ignore
val filterComments = Flow[String].filter(!_.startsWith("#"))
val splitLogLine = Flow[String].map(_.split("\\s") toList)
val transformEvent = Flow[List[String]].map(tokens => BlueCoatEvent(tokens))
val obfuscateIpAddress = Flow[List[String]].map(tokens => Map[String, String](tokens(3) -> stringHash(tokens(3)).toString))
FlowGraph.closed() { implicit builder: Builder[Unit] =>
import FlowGraph.Implicits._
source ~> byteStringToString ~> sinkEvents
}.run()
}
}
と
val byteStringToString: Flow[ByteString, String, Unit] = Flow[ByteString]
.via(Framing.delimiter(ByteString(System.lineSeparator), maximumFrameLength = LogFile.maxBytesPerLine, allowTruncation = true))
.map(_.utf8String)
プログラムを再度実行すると、上記の数値に近い行が再び生成されます (まったく同じではありません)。
私は混乱していて、ここで助けを求めていました
- ストリームは失敗したり例外をスローしたりしませんが、生成される出力行ははるかに少なくなります。これをデバッグするにはどうすればよいですか?
- は1500
maximumFrameLength
ですが、ログ行はそれよりも大きくなる可能性があります (文字数が増える)。それは問題になる可能性がありますか?そのような場合、どうすれば解決できますか? - どうすれば確認できますか? 私のコードによると、
Materializer
を呼び出したときにが返されないためrun()
、 をシャットダウンできませんActorSystem
。何が欠けていますか?
アップデート
ある行で停止することがわかりました2564 chars(bytes)
15070 2005-05-06 14:58:57 42 45.23.4.218 200 TCP_NC_MISS 903 1098 GET http sports.espn.go.com /nba/xml/upcomingTV ?sport=nba - DIRECT sports.espn.go.com text/html;%20charset=iso- 8859-1 "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.0; .NET CLR 1.1.4322)" PROXIED Sports/Recreation/Hobbies - 192.16.170.44 SG-HTTP-Service - none -
15071 2005-05-06 14:58:57 306 45.23.4.218 200 TCP_MISS 16641 2140 GET http m3.doubleclick.net /872526/match2fb_728x90v2.swf ?clickTag=http%253A//ad.doubleclick.net/click%25253Bh% 253Dv3%257C3270%257Cf%257C6e%257C%25252a%257Ct%25253B14910813%25253B0-0%25253B0%25253B11166676%25253B3454-728%257C90%25253B9369149%257C9387045%257C1%25253B%25253B%25257Essc s%25253D%25253fhttp%253A//log.go.com/log%253Fsrvc%25253dsz%252526guid%25253d5504B8AD-0FC2-4475-9203-4CE6D2125953%252526drop%25253d0%252526addata%25253d0%253A63%253A188329%2 53A65%252526a%25253d1%252526goto%25253dhttp%25253a%25252f%25252fwww.levitra.com/match/levitra_promotions/match/get/forms.jsp%25253Frotation%25253D11166676%252526banner%2525 3D14910813&clickTag1=http%253A//log.go.com/log%253Fsrvc%25253dsz%252526guid%25253d5504B8AD-0FC2-4475-9203-4CE6D2125953%252526drop%25253d0%252526addata%25253d0%253A63%253A18 8329%253A65%252526a%25253d1%252526goto%25253dhttp%253A//ad.doubleclick.net/click%25253Bh%253Dv3%257C3270%257Cf%257C6e%257C%25252a%257Ct%25253B14910813%25253B0-0%25253B0%252 53B11166676%25253B3454-728%257C90%25253B9369149%257C9387045%257C1%25253B%25253B%25257Esscs%25253D%25253fhttp%253A//log.go.com/log%253Fsrvc%25253dsz%252526guid%25253d5504B8A D-0FC2-4475-9203-4CE6D2125953%252526drop%25253d0%252526addata%25253d0%253A63%253A188329%253A65%252526a%25253d1%252526goto%25253dhttp%25253a%25252f%25252fwww.levitra.com/mat ch/levitra_promotions/match/get/forms.jsp%25253Frotation%25253D11166676%252526banner%25253D14910813&clickTag2=http%253A//log.go.com/log%253Fsrvc%25253dsz%252526guid%25253d5 504B8AD-0FC2-4475-9203-4CE6D2125953%252526drop%25253d0%252526addata%25253d0%253A63%253A188329%253A65%252526a%25253d1%252526goto%25253dhttp%253A//ad.doubleclick.net/click%25 253Bh%253Dv3%257C3270%257Cf%257C6e%257C%25252a%257Ct%25253B14910813%25253B0-0%25253B0%25253B11166676%25253B3454-728%257C90%25253B9369149%257C9387045%257C1%25253B%25253B%252 57Esscs%25253D%25253fhttp%253A//log.go.com/log%253Fsrvc%25253dsz%252526guid%25253d5504B8AD-0FC2-4475-9203-4CE6D2125953%252526drop%25253d0%252526addata%25253d0%253A63%253A18 8329%253A65%252526a%25253d1%252526goto%25253dhttp%253A//www.levitra.com/consumer/about_levitra/levitra_side_effects.htm%253Frotation%253D11166676%2526banner%253D14910813&cl ickTag3=&clickTag4=&clickTag5= - DIRECT m3.doubleclick.net application/x-shockwave-flash "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.0; .NET CLR 1.1.4322)" PROXIED Web %20Advertisements - 192.16.170.44 SG-HTTP-Service - none -
と
$ wc -l out.log
15070 out.log
であるために停止しline 15071
ます2564 bytes
。
しかし、なぜ例外がスローされないのですか? どうすればこれを処理できますか?