4

連結したいログファイルを含む S3 バケットがあり、EMR ジョブへの入力として使用します。ログ ファイルは次のようなパスにありますbucket-name/[date]/product/out/[hour]/[minute-based-file]。すべての日付ディレクトリのすべての時間ディレクトリにあるすべての分のログを取得し、それらを 1 つのファイルに連結したいと思います。そのファイルを EMR ジョブへの入力として使用したいと考えています。元のログ ファイルは保存する必要があり、新しく結合されたログ ファイルはおそらく別の S3 バケットに書き込まれます。

hadoop fs -getmergeSSH 経由で EMR マスター ノードで使用しようとしましたが、次のエラーが発生しました。

This file system object (file:///) does not support access to the request path 's3://target-bucket-name/merged.log'

ソース S3 バケットには他のファイルがいくつか含まれているため、そのすべてのファイルを含めたくありません。ワイルドカード マッチは次のようになりますs3n://bucket-name/*/product/out/*/log.*

目的は、EMR への数万または数十万の小さな (10k-3mb) 入力ファイルの問題を回避し、代わりに、より効率的に分割できる 1 つの大きなファイルを与えることです。

4

1 に答える 1

4

最終的に、これを行うためのいくつかの Hadoop ファイルシステム コマンドをラップするスクリプトを作成することになりました。

#!/usr/bin/env ruby

require 'date'

# Merge minute-based log files into daily log files
# Usage: Run on EMR master (e.g. SSH to master then `ruby ~/merge-historical-logs.rb [FROM [TO]]`)

SOURCE_BUCKET_NAME      = 's3-logs-bucket'
DESTINATION_BUCKET_NAME = 's3-merged-logs-bucket'

# Optional date inputs
min_date = if ARGV[0]
  min_date_args = ARGV[0].split('-').map {|item| item.to_i}
  Date.new(*min_date_args)
else
  Date.new [2012, 9, 1]
end

max_date = if ARGV[1]
  max_date_args = ARGV[1].split('-').map {|item| item.to_i}
  Date.new(*max_date_args)
else
  Date.today
end

# Setup directories
hdfs_logs_dir = '/mnt/tmp/logs'
local_tmp_dir = './_tmp_merges'

puts "Cleaning up filesystem"
system "hadoop fs -rmr #{hdfs_logs_dir}"
system "rm -rf #{local_tmp_dir}*"

puts "Making HDFS directories"
system "hadoop fs -mkdir #{hdfs_logs_dir}"

# We will progress backwards, from max to min
date = max_date
while date >= min_date
  # Format date pieces
  year  = date.year
  month = "%02d" % date.month
  day   = "%02d" % date.day

  # Make a directory in HDFS to store this day's hourly logs
  today_hours_dir = "#{hdfs_logs_dir}/#{year}-#{month}-#{day}"
  puts "Making today's hourly directory"
  system "hadoop fs -mkdir #{today_hours_dir}"

  # Break the day's hours into a few chunks
  # This seems to avoid some problems when we run lots of getmerge commands in parallel
  [*(0..23)].each_slice(8).to_a.each do |hour_chunk|
    hour_chunk.each do |_hour|
      hour = "%02d" % _hour

      # Setup args to merge minute logs into hour logs
      source_file = "s3://#{SOURCE_BUCKET_NAME}/#{year}-#{month}-#{day}/product/out/#{hour}/"
      output_file = "#{local_tmp_dir}/#{hour}.log"

      # Launch each hour's getmerge in the background
      full_command = "hadoop fs -getmerge #{source_file} #{output_file}"
      puts "Forking: #{full_command}"
      fork { system full_command }
    end

    # Wait for this batch of the germerge's to finish
    Process.waitall
  end

  # Delete the local temp files Hadoop created
  puts "Removing temp files"
  system "rm #{local_tmp_dir}/.*.crc"

  # Move local hourly logs to hdfs to free up local space
  puts "Moving local logs to HDFS"
  system "hadoop fs -put #{local_tmp_dir}/* #{today_hours_dir}"

  puts "Removing local logs"
  system "rm -rf #{local_tmp_dir}"

  # Merge the day's hourly logs into a single daily log file
  daily_log_file_name = "#{year}-#{month}-#{day}.log"
  daily_log_file_path = "#{local_tmp_dir}_day/#{daily_log_file_name}"
  puts "Merging hourly logs into daily log"
  system "hadoop fs -getmerge #{today_hours_dir}/ #{daily_log_file_path}"

  # Write the daily log file to another s3 bucket
  puts "Writing daily log to s3"
  system "hadoop fs -put #{daily_log_file_path} s3://#{DESTINATION_BUCKET_DIR}/daily-merged-logs/#{daily_log_file_name}"

  # Remove daily log locally
  puts "Removing local daily logs"
  system "rm -rf #{local_tmp_dir}_day"

  # Remove the hourly logs from HDFS
  puts "Removing HDFS hourly logs"
  system "hadoop fs -rmr #{today_hours_dir}"

  # Go back in time
  date -= 1
end
于 2013-05-10T20:12:55.173 に答える