13

Google Dataflow/Apache Beam を介した Web ユーザー セッションからの処理ログを調査しており、ユーザーのログ (ストリーミング) を先月のユーザー セッションの履歴と組み合わせる必要があります。

私は次のアプローチを見てきました:

  1. 30 日間の固定ウィンドウを使用します。ほとんどの場合、メモリに収まるウィンドウが大きくなります。ユーザーの履歴を更新する必要はありません。参照するだけです。
  2. CoGroupByKey を使用して 2 つのデータ セットを結合しますが、2 つのデータ セットのウィンドウ サイズは同じでなければなりません ( https://cloud.google.com/dataflow/model/group-by-key#join )。ケース (24 時間 vs 30 日)
  3. elementSide Input を使用して、特定の inのユーザーのセッション履歴を取得します。processElement(ProcessContext processContext)

私の理解では、経由でロードされたデータは.withSideInputs(pCollectionView)メモリに収まる必要があります。1 人のユーザーのすべてのセッション履歴をメモリに格納できることはわかっていますが、すべてのセッション履歴を格納できるわけではありませ

私の質問は、現在のユーザー セッションにのみ関連する副入力からデータをロード/ストリーミングする方法があるかどうかです。

ユーザーの ID を指定して、サイド入力からユーザーの履歴セッションをロードする parDo 関数を想像しています。ただし、現在のユーザーの履歴セッションのみがメモリに収まります。副入力を介してすべての履歴セッションをロードすると、大きすぎます。

説明するためのいくつかの擬似コード:

public static class MetricFn extends DoFn<LogLine, String> {

    final PCollectionView<Map<String, Iterable<LogLine>>> pHistoryView;

    public MetricFn(PCollectionView<Map<String, Iterable<LogLine>>> historyView) {
        this.pHistoryView = historyView;
    }

    @Override
    public void processElement(ProcessContext processContext) throws Exception {
        Map<String, Iterable<LogLine>> historyLogData = processContext.sideInput(pHistoryView);

        final LogLine currentLogLine = processContext.element();
        final Iterable<LogLine> userHistory = historyLogData.get(currentLogLine.getUserId());
        final String outputMetric = calculateMetricWithUserHistory(currentLogLine, userHistory);
        processContext.output(outputMetric);
    }
}
4

1 に答える 1