1

この投稿に似た問題を解決しようとしています。元のデータは、いくつかのセンサーの値 (観測値) を含むテキスト ファイルです。各観測にはタイムスタンプが付けられますが、センサー名は各行ではなく 1 回だけ指定されます。しかし、1 つのファイルに複数のセンサーがあります。

Time    MHist::852-YF-007   
2016-05-10 00:00:00 0
2016-05-09 23:59:00 0
2016-05-09 23:58:00 0
2016-05-09 23:57:00 0
2016-05-09 23:56:00 0
2016-05-09 23:55:00 0
2016-05-09 23:54:00 0
2016-05-09 23:53:00 0
2016-05-09 23:52:00 0
2016-05-09 23:51:00 0
2016-05-09 23:50:00 0
2016-05-09 23:49:00 0
2016-05-09 23:48:00 0
2016-05-09 23:47:00 0
2016-05-09 23:46:00 0
2016-05-09 23:45:00 0
2016-05-09 23:44:00 0
2016-05-09 23:43:00 0
2016-05-09 23:42:00 0
Time    MHist::852-YF-008   
2016-05-10 00:00:00 0
2016-05-09 23:59:00 0
2016-05-09 23:58:00 0
2016-05-09 23:57:00 0
2016-05-09 23:56:00 0
2016-05-09 23:55:00 0
2016-05-09 23:54:00 0
2016-05-09 23:53:00 0
2016-05-09 23:52:00 0
2016-05-09 23:51:00 0
2016-05-09 23:50:00 0
2016-05-09 23:49:00 0
2016-05-09 23:48:00 0
2016-05-09 23:47:00 0
2016-05-09 23:46:00 0
2016-05-09 23:45:00 0
2016-05-09 23:44:00 0
2016-05-09 23:43:00 0
2016-05-09 23:42:00 0

したがって、センサー情報が指定されている行でファイルを分割するように Hadoop を構成したいと考えています。次に、これらの行からセンサー名 (852-YF-007 および 852-YF-008 など) を読み取り、それに応じて各センサーの値を読み取るために MapReduce を使用します。

私はPython(Jupyter Notebook)でこれを行いました:

sheet = sc.newAPIHadoopFile(
    '/user/me/sample.txt',
    'org.apache.hadoop.mapreduce.lib.input.TextInputFormat',
    'org.apache.hadoop.io.LongWritable',
    'org.apache.hadoop.io.Text',
    conf={'textinputformat.record.delimiter': 'Time\tMHist'}
)

sf = sheet.filter(lambda (k, v): v)
sf.map(lambda (k, v): v).splitlines())

sf.take(50)

出力は次のようになります。

[[u'::852-YF-007\t',
  u'2016-05-10 00:00:00\t0',
  u'2016-05-09 23:59:00\t0',
  u'2016-05-09 23:58:00\t0',
  u'2016-05-09 23:57:00\t0',
  u'2016-05-09 23:56:00\t0',
  u'2016-05-09 23:55:00\t0',
  u'2016-05-09 23:54:00\t0',
  u'2016-05-09 23:53:00\t0',
  u'2016-05-09 23:52:00\t0',
  u'2016-05-09 23:51:00\t0',
  u'2016-05-09 23:50:00\t0',
  u'2016-05-09 23:49:00\t0',
  u'2016-05-09 23:48:00\t0',
  u'2016-05-09 23:47:00\t0',
  u'2016-05-09 23:46:00\t0',
  u'2016-05-09 23:45:00\t0',
  u'2016-05-09 23:44:00\t0',
  u'2016-05-09 23:43:00\t0',
  u'2016-05-09 23:42:00\t0'],
 [u'::852-YF-008\t',
  u'2016-05-10 00:00:00\t0',
  u'2016-05-09 23:59:00\t0',
  u'2016-05-09 23:58:00\t0',
  u'2016-05-09 23:57:00\t0',
  u'2016-05-09 23:56:00\t0',
  u'2016-05-09 23:55:00\t0',
  u'2016-05-09 23:54:00\t0',
  u'2016-05-09 23:53:00\t0',
  u'2016-05-09 23:52:00\t0',
  u'2016-05-09 23:51:00\t0',
  u'2016-05-09 23:50:00\t0',
  u'2016-05-09 23:49:00\t0',
  u'2016-05-09 23:48:00\t0',
  u'2016-05-09 23:47:00\t0',
  u'2016-05-09 23:46:00\t0',
  u'2016-05-09 23:45:00\t0',
  u'2016-05-09 23:44:00\t0',
  u'2016-05-09 23:43:00\t0',
  u'2016-05-09 23:42:00\t0']]

私の質問は、これをさらに処理してセンサー名を抽出し、そのセンサーの値の行を取得する方法です。ややこれが好き

852-YF-007 --> array of sensor_lines
852-YF-008 --> array of sensor_lines

行自体は、後でタイムスタンプと値に分割されます。しかし、センサー名を行から分割することにもっと興味があります。

4

1 に答える 1

2

個人的には:

  • デリミタを拡張::

    sheet = sc.newAPIHadoopFile(
        path,
        'org.apache.hadoop.mapreduce.lib.input.TextInputFormat',
        'org.apache.hadoop.io.LongWritable',
        'org.apache.hadoop.io.Text',
        conf={'textinputformat.record.delimiter': 'Time\tMHist::'}
    )
    
  • ドロップ キー:

    values = sheet.values()
    
  • 空のエントリを除外する

    non_empty = values.filter(lambda x:  x)
    
  • スプリット:

    grouped_lines = non_empty.map(str.splitlines)
    
  • 個別のキーと値:

    from operator import itemgetter
    
    pairs = grouped_lines.map(itemgetter(0, slice(1, None)))
    
  • そして最後に値を分割します:

    pairs.flatMapValues(lambda xs: [x.split("\t") for x in xs])
    

もちろん、これらはすべて単一の関数で実行できます。

import dateutil.parser

def process(pair):
    _, content = pair
    clean = [x.strip() for x in content.strip().splitlines()]
    if not clean:
        return []
    k, vs = clean[0], clean[1:]
    for v in vs:
        try:
            ds, x = v.split("\t")
            yield k, (dateutil.parser.parse(ds), float(x))  # or int(x)
        except ValueError:
            pass

sheet.flatMap(process)
于 2016-06-30T10:37:06.403 に答える