0

IIS からのログ ファイルが hdfs に保存されていますが、Web サーバーの構成が原因で、一部のログにはすべての列が含まれていないか、異なる順序で表示されます。Hive テーブルを定義できるように、共通のスキーマを持つファイルを生成したいと考えています。

良いログの例:

#Fields: date time s-ip cs-method cs-uri-stem useragent
2013-07-16 00:00:00 10.1.15.8 GET /common/viewFile/1232 Mozilla/5.0+Chrome/27.0.1453.116

列が欠落しているログの例 (cs-method と useragent が欠落している):

#Fields: date time s-ip cs-uri-stem 
2013-07-16 00:00:00 10.1.15.8 /common/viewFile/1232

列が欠落しているログは、次のように完全なスキーマにマップする必要があります。

#Fields: date time s-ip cs-method cs-uri-stem useragent
2013-07-16 00:00:00 10.1.15.8 null /common/viewFile/1232 null

不良ログでは、列を任意に組み合わせて、異なる順序で有効にすることができます。

ログ ファイル内のフィールド行に従って、使用可能な列を完全なスキーマにマップするにはどうすればよいですか?

編集:通常、列スキーマを列名をインデックスにマッピングする辞書として定義することで、これにアプローチします。つまり、col['date']=0 col['time']=1 などです。次に、ファイルから #Fields 行を読み取り、有効な列を解析して、ヘッダー名をファイル内の列インデックスにマッピングするヘッダー dict を生成します。 . 次に、インデックスでヘッダーを知っている残りのデータ行について、それをヘッダー=列名で列スキーマにマップし、欠落している列をnullデータで挿入して正しい順序で新しい行を生成します。私の問題は、各マップが単独で実行されるため、hadoop 内でこれを行う方法がわからないことです。したがって、#Fields 情報を各マップと共有するにはどうすればよいですか?

4

1 に答える 1

1

これを使用して、マップを作成する列にヘッダーを適用できます。そこから、次のような UDF を使用できます。

myudf.py

#!/usr/bin/python

@outputSchema('newM:map[]')
def completemap(M):
    if M is None:
        return None
    to_add = ['A', 'D', 'F']
    for item in to_add:
        if item not in M:
            M[item] = None
    return M

@outputSchema('A:chararray, B:chararray, C:chararray, D:chararray, E:chararray, F:chararray')
def completemap_v2(M):
    if M is None:
        return (None,
                None,
                None,
                None,
                None,
                None)
    return (M.get('A', None),
            M.get('B', None),
            M.get('C', None),
            M.get('D', None),
            M.get('E', None),
            M.get('F', None))

不足しているタプルをマップに追加します。

サンプル入力:

csv1.in             csv2.in
-------            ---------
A|B|C               D|E|F
Hello|This|is       PLEASE|WORK|FOO
FOO|BAR|BING        OR|EVERYTHING|WILL
BANG|BOSH           BE|FOR|NAUGHT

サンプル スクリプト:

A = LOAD 'tests/csv' USING myudfs.ExampleCSVLoader('\\|') AS (M:map[]); 
B = FOREACH A GENERATE FLATTEN(myudf.completemap_v2(M));

出力:

B: {null::A: chararray,null::B: chararray,null::C: chararray,null::D: chararray,null::E: chararray,null::F: chararray}
(,,,,,)
(,,,PLEASE,WORK,FOO)
(,,,OR,EVERYTHING,WILL)
(,,,BE,FOR,NAUGHT)
(,,,,,)
(Hello,This,is,,,)
(FOO,BAR,BING,,,)
(BANG,BOSH,,,,)
于 2013-08-20T22:38:02.187 に答える