0

Amazon s3 に巨大な csv ファイル (100MB+) があり、それらをチャンクで読み取り、ruby CSV ライブラリを使用して処理したいと考えています。csv 処理用の適切な IO オブジェクトを作成するのに苦労しています:

buffer = TheRightIOClass.new
bytes_received = 0
RightAws::S3Interface.new(<access_key>, <access_secret>).retrieve_object(bucket, key) do     |chunk|
  bytes_received += buffer.write(chunk)
  if bytes_received >= 1*MEGABYTE
    bytes_received = 0
    csv(buffer).each do |row|
      process_csv_record(row)
    end
  end
end

def csv(io)
  @csv ||= CSV.new(io, headers: true)
end

ここでの正しいセットアップがどうあるべきか、TheRightIOClass が何であるかはわかりません。ファイル全体を StringIO でメモリにロードしたくありません。これを行うための ruby​​ の bufferedio または ringbuffer はありますか? スレッド (プロセスなし) とパイプを使用した優れたソリューションがあれば、ぜひご覧ください。

4

1 に答える 1

2

StringIOを使用し、巧妙なエラー処理を実行して、処理する前に行全体がチャンクにあることを確認できます。この例のpackerクラスは、解析された行をディスクまたはデータベースにフラッシュするまで、メモリに蓄積します。

packer = Packer.new
object = AWS::S3.new.buckets[bucket].objects[path]
io = StringIO.new
csv = ::CSV.new(io, headers: true)
object.read do |chunk|
  #Append the most recent chunk and rewind the IO
  io << chunk
  io.rewind
  last_offset = 0
  begin
    while row = csv.shift do
      #Store the parsed row unless we're at the end of a chunk
      unless io.eof?
        last_offset = io.pos
        packer << row.to_hash
      end
    end
  rescue ArgumentError, ::CSV::MalformedCSVError => e
    #Only rescue malformed UTF-8 and CSV errors if we're at the end of chunk
    raise e unless io.eof?
  end
  #Seek to our last offset, create a new StringIO with that partial row & advance the cursor
  io.seek(last_offset)
  io.reopen(io.read)
  io.read
  #Flush our accumulated rows to disk every 1 Meg
  packer.flush if packer.bytes > 1*MEGABYTES 
end
#Read the last row
io.rewind
packer << csv.shift.to_hash
packer
于 2012-11-28T06:50:17.700 に答える