S3 バケットにある外部ファイルを読み取り、各ファイル行を取得して移動し、その特定の行を処理する別のアクターに送信するアクター ベースのシステムがあります。私が理解できないのは、ファイルの読み取り中に例外がスローされたときに何が起こるかです。
私のコードは次のとおりです。
import akka.actor._
import akka.actor.ActorSystem
class FileWorker(processorWorker: ActorRef) extends Actor with ActorLogging {
val fileUtils = new S3Utils()
private def processFile(fileLocation: String): Unit = {
try{
fileUtils.getLinesFromLocation(fileLocation).foreach {
r =>
{
//Some processing happens for the line
}
}
}
}
}catch{
case e:Exception => log.error("Issue processing files from the following location %s".format(fileLocation))
}
}
def receive() = {
case fileLocation: String => {
processFile(fileLocation)
}
}
}
私のS3Utils
クラスでは、getLinesFromLocation
メソッドを次のように定義しました。
def getLinesFromLocation(fileLocation: String): Iterator[String] = {
try{
for {
fileEntry <- getFileInfo(root,fileLocation)
} yield fileEntry
}catch{
case e:Exception => logger.error("Issue with file location %s: %s".format(fileLocation,e.getStackTraceString));throw e
}
}
私が実際にファイルを読んでいるメソッドは、プライベートメソッドで定義されていますgetFileInfo
private def getFileInfo(rootBucket: String,fileLocation: String): Iterator[String] = {
implicit val codec = Codec(Codec.UTF8)
codec.onMalformedInput(CodingErrorAction.IGNORE)
codec.onUnmappableCharacter(CodingErrorAction.IGNORE)
Source.fromInputStream(s3Client.
getObject(rootBucket,fileLocation).
getObjectContent()).getLines
}
上記の部分は、S3 に置かれている基になるファイルがどこにもキャッシュされず、一定のスペースで個々の行を単純に反復処理して処理するという前提で作成しました。特定の行の読み取りに問題がある場合、イテレータはアクタに影響を与えることなく先に進みます。
私の最初の質問は、反復子に関する私の理解は正しいですか? 実際には、メモリに圧力をかけたり、メモリリークを発生させたりすることなく、基礎となるファイルシステム (この場合は S3 バケット) から実際に行を読み取っていますか?
次の問題は、反復子が個々のエントリの読み取り中にエラーに遭遇した場合、反復プロセス全体が強制終了されるか、それとも次のエントリに進むかということです。
最後の質問は、ファイル処理ロジックが正しく記述されているかどうかです。
これについていくつかの洞察を得ることは素晴らしいことです。
ありがとう