0

S3 バケットにある外部ファイルを読み取り、各ファイル行を取得して移動し、その特定の行を処理する別のアクターに送信するアクター ベースのシステムがあります。私が理解できないのは、ファイルの読み取り中に例外がスローされたときに何が起こるかです。

私のコードは次のとおりです。

import akka.actor._
import akka.actor.ActorSystem

class FileWorker(processorWorker: ActorRef) extends Actor with ActorLogging {

  val fileUtils = new S3Utils()

  private def processFile(fileLocation: String): Unit = {
    try{
         fileUtils.getLinesFromLocation(fileLocation).foreach {
         r =>
        {
           //Some processing happens for the line
            }
          }
        }
    }
    }catch{
      case e:Exception => log.error("Issue processing files from the following location %s".format(fileLocation))
    }
  }

  def receive() = {
    case fileLocation: String => {
      processFile(fileLocation)
    }
  }
}

私のS3Utilsクラスでは、getLinesFromLocationメソッドを次のように定義しました。

 def getLinesFromLocation(fileLocation: String): Iterator[String] = {
    try{
       for {
             fileEntry <- getFileInfo(root,fileLocation)
          } yield fileEntry
    }catch{
      case e:Exception => logger.error("Issue with file location %s:         %s".format(fileLocation,e.getStackTraceString));throw e
    }
  }

私が実際にファイルを読んでいるメソッドは、プライベートメソッドで定義されていますgetFileInfo

 private def getFileInfo(rootBucket: String,fileLocation: String): Iterator[String] = {
    implicit val codec = Codec(Codec.UTF8)
    codec.onMalformedInput(CodingErrorAction.IGNORE)
    codec.onUnmappableCharacter(CodingErrorAction.IGNORE)
    Source.fromInputStream(s3Client.
                       getObject(rootBucket,fileLocation).
                       getObjectContent()).getLines
  }

上記の部分は、S3 に置かれている基になるファイルがどこにもキャッシュされず、一定のスペースで個々の行を単純に反復処理して処理するという前提で作成しました。特定の行の読み取りに問題がある場合、イテレータはアクタに影響を与えることなく先に進みます。

私の最初の質問は、反復子に関する私の理解は正しいですか? 実際には、メモリに圧力をかけたり、メモリリークを発生させたりすることなく、基礎となるファイルシステム (この場合は S3 バケット) から実際に行を読み取っていますか?

次の問題は、反復子が個々のエントリの読み取り中にエラーに遭遇した場合、反復プロセス全体が強制終了されるか、それとも次のエントリに進むかということです。

最後の質問は、ファイル処理ロジックが正しく記述されているかどうかです。

これについていくつかの洞察を得ることは素晴らしいことです。

ありがとう

4

1 に答える 1

1

Amazon s3 には非同期実装がないようで、ピン留めされたアクターで立ち往生しています。したがって、接続ごとにスレッドを割り当て、入力をブロックせず、あまり多くの接続を使用しない場合、実装は正しいです。

重要な手順:

1) processFile は現在のスレッドをブロックしてはなりません。できれば、別のアクターに入力を委任する必要があります。

 private def processFile(fileLocation: String): Unit = {
     ...
         fileUtils.getLinesFromLocation(fileLocation).foreach {  r =>
            lineWorker ! FileLine(fileLocation, r)
         }

    ...
 }

2)FileWorkerピン留めされたアクターを作成します。

## in application.config:
my-pinned-dispatcher {
    executor = "thread-pool-executor"
    type = PinnedDispatcher
}

// in the code:  
val fileWorker = context.actorOf(Props(classOf[FileWorker], lineWorker).withDispatcher("my-pinned-dispatcher"), "FileWorker")

個々のエントリの読み取り中に反復子がエラーに遭遇した場合、反復のプロセス全体が強制終了されますか?

はい、プロセス全体が強制終了され、アクターはメールボックスから次のジョブを取得します。

于 2013-06-10T09:32:34.220 に答える