4

からSCIOを使用 spotifyして のジョブを書き込みます。たとえば、 g1e.g2Dataflowの 2 つの例に従って にストリームを書き込みますが、以下のコードでは次のエラーが発生します。PubSubGCS

エラー

Exception in thread "main" java.lang.IllegalArgumentException: Write can only be applied to a Bounded PCollection 

コード

object StreamingPubSub {
  def main(cmdlineArgs: Array[String]): Unit = {
// set up example wiring
val (opts, args) = ScioContext.parseArguments[ExampleOptions](cmdlineArgs)
val dataflowUtils = new DataflowExampleUtils(opts)
dataflowUtils.setup()

val sc = ScioContext(opts)


sc.pubsubTopic(opts.getPubsubTopic)
.timestampBy {
    _ => new Instant(System.currentTimeMillis() - (scala.math.random * RAND_RANGE).toLong)
  }
.withFixedWindows((Duration.standardHours(1)))
.groupBy(_ => Unit)
.toWindowed
.toSCollection
.saveAsTextFile(args("output"))


val result = sc.close()

// CTRL-C to cancel the streaming pipeline
    dataflowUtils.waitToFinish(result.internal)
  }
}

ウィンドウの概念を Bounded PCollection と混同している可能性があります。これを達成する方法はありますか、それともこれを実現するために何らかの変換を適用する必要がありますか?

4

1 に答える 1

3

SCIO の基盤は、境界のある PCollection のみをサポートsaveAsTextFileする Dataflow の変換を使用していると思います。WriteDataflow は、無制限の PCollection を Google Cloud Storage に書き込むための直接的な API をまだ提供していませんが、これは調査中です。

無制限の PCollection をどこかに永続化するには、BigQuery、Datastore、Bigtable などを検討してください。SCIO の API では、たとえばsaveAsBigQuery.

于 2016-10-05T17:14:43.850 に答える