0

現在、Apache Beam の scala ラッパー ライブラリである scio を使用しています。やりたいことは、ID に基づいて CloudPubSub から送信されたさまざまな種類のメッセージを結合することです。

メッセージ A は 1 秒ごとに送信され、メッセージ B は 3 秒ごとに 1 回送信されます。メッセージBを受け取ったら、受け取ったメッセージAと同じIDのメッセージを結合したい。

メッセージ例)

A=37
A=38
A=39
B=39
A=40
A=41
A=42
B=42
A=43
A=44

現在のコード

val AInput = sc.pubsubSubscription[String]("projects/hoge/subscriptions/A")`
  .withFixedWindows(Duration.standardSeconds(10))
  .keyBy(a => {
    a.split("=")(1).toInt
  })

val BInput = sc.pubsubSubscription[String]("projects/hoge/subscriptions/B")
  .withFixedWindows(Duration.standardSeconds(10))
  .keyBy(a => {
    println(a.split("=")(1))
    a.split("=")(1).toInt
  })
  .toWindowed
  .map(s => {
    println(s.value.toString)
    println(s.window.maxTimestamp().toDateTime.toString("yyyy/MM/dd HH:mm:ss ZZ"))
    s
  })
  .toSCollection
  .join(AInput)
  .map(a  => {
    println("---------------")
    println(a._1)
    println(a._2._1)
    println(a._2._2)
  })

どちらの行もkeyByの行まで実行します。ただし、参加後に印刷しても何も印刷されません。エラー等はありません...

トラブルにあっている。私は答えを待っています...

(コンソールログ)</p>

9
3
12
(3,B=3)
(9,B=9)
2017/07/17 16:30:09 +09:00
2017/07/17 16:28:39 +09:00
(12,B=12)
2017/07/17 16:29:09 +09:00
6
9
15
12
(6,B=6)
(9,B=9)
2017/07/17 16:30:19 +09:00
2017/07/17 16:30:09 +09:00
(12,B=12)
2017/07/17 16:30:19 +09:00
(15,B=15)
2017/07/17 16:30:19 +09:00
21
24
27
18
30
(21,B=21)
2017/07/17 16:30:29 +09:00
(24,B=24)
2017/07/17 16:30:39 +09:00
(27,B=27)
2017/07/17 16:30:39 +09:00
(18,B=18)
2017/07/17 16:30:29 +09:00
(30,B=30)
2017/07/17 16:30:39 +09:00
33
36
42
(33,B=33)
2017/07/17 16:30:49 +09:00
39
(42,B=42)
2017/07/17 16:30:59 +09:00
(36,B=36)
2017/07/17 16:30:49 +09:00
(39,B=39)
2017/07/17 16:30:59 +09:00
45

ウィンドウ処理は10秒ごとに行われているようですが、処理される時間はバラバラです。さらに、DirectRunner の代わりに DataflowRunner を使用して起動すると、成功することがわかりました。

4

0 に答える 0