0

いくつかの列を持つ巨大な .csv ファイルがありますが、私にとって重要な列は .csv ですUSER_ID(User Identifier), DURATION(Duration of Call), TYPE(Incoming or Outgoing), DATE, NUMBER(Mobile No.)

だから私がやろうとしているのは、列のすべてのnull値をに置き換えることです。DURATIONaverage of duration of all the calls of same type by the same user(i.e. of same USER_ID)

次のように平均を見つけました:

以下のクエリでは、同じユーザーによる同じタイプのすべての通話時間の平均を調べています。

Dataset<Row> filteredData = callLogsDataSet.selectExpr(USER_ID, DURATION, TYPE, DATE, NORMALIZE_NUMBER)
      /*1*/ .filter(col(USER_ID).isNotNull().and(col(TYPE).isNotNull()).and(col(NORMALIZE_NUMBER).isNotNull()).and(col(DATE).gt(0)).and(col(TYPE).isin("OUTGOING","INCOMING")))
      /*2*/ .groupBy(col(USER_ID), col(TYPE), col(NORMALIZE_NUMBER))
      /*3*/ .agg(sum(DURATION).alias(DURATION_IN_MIN).divide(count(col(USER_ID))));

filteredData.show() は次のようになります:

|USER_ID                         |type    |normalized_number|(sum(duration) AS `durationInMin` / count(USER_ID))|
+--------------------------------+--------+-----------------+---------------------------------------------------+
|8a8a8a8a592b4ace01595e65901b0013|OUTGOING|+435657456354    |0.0                                                |
|8a8a8a8a592b4ace01595e70dcbd0016|OUTGOING|+876454354353    |48.6                                               |
|8a8a8a8a592b4ace01595e099764000c|INCOMING|+132445686765    |15.0                                               |
|8a8a8a8a592b4ace01592b4ff4b90000|INCOMING|+097645634324    |74.16666666666667                                  |
|8a8a8a8a592b4ace0159366a56290005|INCOMING|+134435657656    |15.0                                               |
|8a8a8a8a592b4ace01595e70dcbd0016|OUTGOING|+135879878543    |31.0                                               |
|8a8a8a8a592b4ace0159366a56290005|INCOMING|+768435245243    |11.0                                               |
|8a8a8a8a592b4ace01592cd8fd160003|INCOMING|+787685534523    |0.0                                                |
|8a8a8a8a592b4ace01595e65901b0013|OUTGOING|+098976865745    |61.5                                               |
|8a8a8a8a592b4ace01592b4ff4b90000|OUTGOING|+123456787644    |43.333333333333336                                 |

以下のクエリでは、データをフィルタリングしnull、ステップ 2 ですべての発生を 0 に置き換えています。

    DataSet<Row> filteredData2 = callLogsDataSet.selectExpr(USER_ID, DURATION, TYPE, DATE, NORMALIZE_NUMBER)
        /*1*/ .filter(col(USER_ID).isNotNull().and(col(TYPE).isNotNull()).and(col(NORMALIZE_NUMBER).isNotNull())
                    .and(col(DATE).gt(0)).and(col(DURATION).gt(0)).and(col(TYPE).isin("OUTGOING","INCOMING")))
        /*2*/ .withColumn(DURATION, when(col(DURATION).isNull(), 0).otherwise(col(DURATION).cast(LONG)))
        /*3*/ .withColumn(DATE, col(DATE).cast(LONG).minus(col(DATE).cast(LONG).mod(ROUND_ONE_MIN)).cast(LONG))
        /*4*/ .groupBy(col(USER_ID), col(DURATION), col(TYPE), col(DATE), col(NORMALIZE_NUMBER))
        /*5*/ .agg(sum(DURATION).alias(DURATION_IN_MIN))
        /*6*/ .withColumn(DAY_TIME, lit(""))
        /*7*/ .withColumn(WEEK_DAY, lit(""))
        /*8*/ .withColumn(HOUR_OF_DAY, lit(0));

filteredData2.show() は次を与えます:

|USER_ID                         |duration|type    |date         |normalized_number|durationInMin|DAY_TIME|WEEK_DAY|HourOfDay|
+--------------------------------+--------+--------+-------------+-----------------+-------------+--------+--------+---------+
|8a8a8a8a592b4ace01595e70dcbd0016|25      |INCOMING|1479017220000|+465435534353    |25           |        |        |0        |
|8a8a8a8a592b4ace01595e099764000c|29      |INCOMING|1482562560000|+545765765775    |29           |        |        |0        |
|8a8a8a8a592b4ace01595e099764000c|75      |OUTGOING|1483363980000|+124435665755    |75           |        |        |0        |
|8a8a8a8a592b4ace01595e70dcbd0016|34      |OUTGOING|1483261920000|+098865563645    |34           |        |        |0        |
|8a8a8a8a592b4ace01595e70dcbd0016|22      |OUTGOING|1481712180000|+232434656765    |22           |        |        |0        |
|8a8a8a8a592b4ace0159366a56290005|64      |OUTGOING|1482984060000|+875634521325    |64           |        |        |0        |
|8a8a8a8a592b4ace0159366a56290005|179     |OUTGOING|1482825060000|+876542543554    |179          |        |        |0        |
|8a8a8a8a592b4ace01595e65901b0013|12      |OUTGOING|1482393360000|+098634563456    |12           |        |        |0        |
|8a8a8a8a592b4ace01595e70dcbd0016|14      |OUTGOING|1482820860000|+1344365i8787    |14           |        |        |0        |
|8a8a8a8a592b4ace01592b4ff4b90000|105     |INCOMING|1478772240000|+234326886784    |105          |        |        |0        |
|8a8a8a8a592b4ace01592b4ff4b90000|453     |OUTGOING|1480944480000|+134435676578    |453          |        |        |0        |
|8a8a8a8a592b4ace01595e099764000c|42      |OUTGOING|1483193100000|+413247687686    |42           |        |        |0        |
|8a8a8a8a592b4ace01595e099764000c|41      |OUTGOING|1481696820000|+134345435645    |41           |        |        |0        |

これら2つを組み合わせるか、これら2つを使用して必要な結果を得るのを手伝ってください。Spark と SparkSQL は初めてです。

ありがとう。

4

0 に答える 0