1

次のようなメールの記録があります。

Name MailingID  Timestamp    Event
1 John         1 2014-04-18     Sent
2 John         2 2015-04-21     Sent
3 Mary         1 2015-04-22 Returned
4 Mary         2 2015-04-25     Sent
5 John         1 2015-05-01  Replied

次のように作成できますDataFrame

df <- createDataFrame(sqlContext, data.frame(Name = c('John','John','Mary','Mary','John'),
                                             MailingID = c(1,2,1,2,1),
                                             Timestamp=c('2014-04-18','2015-04-21','2015-04-22','2015-04-25','2015-05-01'),
                                             Event=c('Sent','Sent','Returned','Sent','Replied')))

私は、彼/彼女に送信された最新の 2 つのメールのいずれかに誰が返信したかを知りたいので、要約ヘルパー関数を使用して、次のdplyrことができます。

localDf <- collect(df)

library(lubridate)
library(magrittr)
library(dplyr)

hasRepliedLatest <- function(MailingID, Timestamp, Event, Latest_N) {
  length(intersect(MailingID[Event == 'Replied'], MailingID[Event == 'Sent'][1:Latest_N])) > 0
}

localDf %>%
  arrange(desc(Timestamp)) %>%
  group_by(Name) %>%
  summarize(RepliedLatest = hasRepliedLatest(MailingID, Timestamp, Event, 2))

detach(package:dplyr) # to avoid function confliction with SparkR

結果は次のとおりです。

  Name RepliedLatest
1 John          TRUE
2 Mary         FALSE

SparkRこれを on で、つまりDataFrameon local ではなくonで実行したいと考えていますdata.frame。だから私は試しました:

df %>%
  arrange(desc(df$Timestamp)) %>%
  group_by(df$Name) %>%
  summarize(RepliedLatest = hasRepliedLatest(df$MailingID, df$Timestamp, df$Event, 2))

次に、関数が S4 クラスでは機能しないというエラーが表示されましDataFrameた。でこれを正しく行う方法はSparkR? またはsqlContextによって作成されたSQL クエリを使用したソリューションも歓迎します。sparkRHive.initsparkRSQL.init

4

1 に答える 1

2

SparkSQL <= 1.4 はユーザー定義の集計関数をサポートしていません。私の知る限り、SparkR は UDF をまったくサポートしていないため、現在の開発ブランチを使用していないか、1.5 RC UDF はオプションではありません。

データモデルとロジックを理解しているかどうかはまだわかりませんが、次のようなことを試すことができます:

# Select last 2 sent events and all other which occurred in this window
tmp <- sql(sqlContext,    
   "SELECT *, SUM(CASE WHEN event = 'Sent' THEN 1 ELSE 0 END) OVER w AS ind
    FROM df WHERE Event IN ('Sent', 'Replied')
    HAVING ind <= 2
    WINDOW w AS (PARTITION BY name ORDER BY DATE(Timestamp) DESC)")


# Split sent and replied
sent <- tmp %>% filter(tmp$Event == "Sent")
replied <- tmp %>% filter(tmp$Event == "Replied")

registerTempTable(sent,  "sent")
registerTempTable(replied,  "replied")

# Join and count
sql(sqlContext,
    "SELECT
        sent.name,
        SUM(
            CASE WHEN replied.event IS NOT NULL THEN 1
            ELSE 0 END
        ) > 0 AS repliedlatest 
     FROM sent LEFT JOIN replied ON
        sent.name = replied.name AND
        sent.mailingid = replied.mailingid
     -- Not part of the original logic
     WHERE DATE(sent.timestamp) <= DATE(replied.timestamp) 
     GROUP BY sent.name") %>% head()
于 2015-09-08T06:03:20.840 に答える