1

現在、いくつかの機器のさまざまなセンサー読み取り値の時系列データで構成されるデータセットの機能を作成しています。これは、おそらく同じ機器の障害イベントに関連しています。このデータの基本構造は、機器 ID、タイムスタンプ、およびセンサーの読み取り値を組み合わせた一種のテーブルを持っていることです。

| ID | Cycle_ID | Timestamp  | sensor_1 | sensor_2 |
|----|----------|------------|----------|----------|
| 1  | 1        | 1547142555 | 123      | 641      |
| 1  | 1        | 1547142556 | 123      | 644      |
| 1  | 2        | 1547142557 | 124      | 643      |

アイデアは、サイクルに基づいてデータを集約して、それらに対応するシーケンス (および対応する機能) を作成することです。生データの量は膨大で、spark を使用する必要がありますが、集計後の結果のデータセットは、Pandas DF に保存して keras でモデルを構築するために使用するのに十分小さいです。とりわけ、1 つのアイデアは、いくつかのセンサーの主要な DCT コンポーネントを収集して、それらを機能として使用することです。これを行うために、(特に) 次の集計を行います。


from pyspark.sql import Row, window
import pyspark.sql.functions as func

W = window.Window.partitionBy('ID', 'Cycle_ID').orderBy('Timestamp')

df_collect = pfr_flight_match.withColumn('sensor_1_coll', 
                 func.collect_list('sensor_1').over(W)) \
                 .groupBy('ID', 'Cycle_ID') \ 
                 .agg(func.max("sensor_1_coll").alias('sensor_1_coll'))

これにより、各機器のサイクルごとに、センサーの時系列が配列として個別に得られます。ここでのアイデアは、DCT を実行し、主要なn係数のみを保持し、これらを新しい機能列として個別に追加することです。これを行う方法を思いつきましたが、パフォーマンスがひどいようです。そのため、助けを求めています。

残念ながら、配列で Pyspark の DCT を使用することはできないため (ドキュメントによると、機能は DenseVector タイプでなければなりません)、収集した配列を DenseVector に変換する必要があります。効率的な方法はないように思われるので、UDF を使用してこれを行います。

import pyspark.ml
to_vec = func.udf(lambda x: pyspark.ml.linalg.DenseVector(x),
                  pyspark.ml.linalg.VectorUDT())

次のステップは、次のようなものを使用して、DCT 自体を実行することです。

# Determine which column is the target of DCT
col_to_transform = 'sensor_1_coll'
df = df_collect.withColumn('vec', to_vec(col_to_transform))

# After switching the column type to DenseVector, we can apply DCT
dct = pyspark.ml.feature.DCT(inverse=False, inputCol='vec', outputCol='vec_dct')
df_dct = dct.transform(df)

# Drop intermediate columns
df_dct = df_dct.drop('vec', col_to_transform)

ここで、私が落とし穴を恐れるポイントに到達します。DCT ベクトルをある程度の係数に切り詰める必要があります。これらの係数は、後で Pandas DF/Numpy 配列に渡すために個別の列に分解されます。

UDF を使用することは、パフォーマンスの面で良くないのではないかと心配しています。とにかく、DenseVector は配列型として表されません。したがって、これは機能しません:

import pyspark.ml
trunc_vec = func.udf(lambda x: x[0:n],
                  pyspark.ml.linalg.VectorUDT())

そこで私が最後にしたことは、適切な関数を上記の DF の RDD バージョンにマップし、それをデータフレームとして返すことでした。これは私が今使っているものです:

# State columns used for grouping
idx = ['ID', 'Cycle_ID']
keep_coeffs = 30 # How many of the leading coefficients shall be kept?

from functools import partial

# To be mapped onto rdd: Return auxillary columns plus the DCT coeffs as 
# individual columns, which are named serially
 def truncate_dct_vec(vec, coeffs):
    return tuple(vec[i] for i in idx) + tuple(vec.vec_dct.toArray()[0:coeffs+1].tolist())
truncate_dct_vec = partial(truncate_dct_vec, coeffs=keep_coeffs)

# Perform the mapping to get the truncated DCT coefficients, each in an individual column
df_dct = df_dct.rdd.map(truncate_dct_vec).toDF(idx)

問題は、これの実行が非常に遅いように見えることです (おそらく、これらすべての手順を実行する JVM と python の間のシリアライゼーションと変換が原因です)。これはほとんど法外です。私は主に、より高速な代替手段を探しています。これに関する任意の助けをいただければ幸いです。

4

1 に答える 1