5

Cassandra テーブルから数百万行を読み取るのは難しい作業です。実際、このテーブルには 4,000 ~ 5,000 万行の行が含まれています。データは実際にはシステムの内部 URL であり、それらすべてを起動する必要があります。それを起動するために Akka Streams を使用していますが、必要に応じてバック プレッシャーを行いながら、かなりうまく機能しています。しかし、すべてを効果的に読み取る方法はまだ見つかっていません。

これまでに試したこと:

  • Akka Stream を使用してデータを Stream として読み取ります。特定のテーブルのパブリッシャーを提供する phantom-dsl を使用しています。ただし、すべてを読み取るわけではなく、一部のみを読み取ります。実際には、最初の 100 万を超えると読み取りが停止します。

  • 特定の日付までに Spark を使用して読み取ります。私たちのテーブルは時系列テーブルのようにモデル化されており、年、月、日、分... の列があります。現在、日ごとに選択しているため、Spark は処理する多くのものを取得しませんが、すべての日を選択するのは面倒です。

コードは次のとおりです。

val cassandraRdd =
      sc
        .cassandraTable("keyspace", "my_table")
        .select("id", "url")
        .where("year = ? and month = ? and day = ?", date.getYear, date.getMonthOfYear, date.getDayOfMonth)

残念ながら、データを減らすためにパーティションを反復処理することはできません。アクターがシリアル化できないと不平を言うため、収集を使用する必要があります。

val httpPool: Flow[(HttpRequest, String), (Try[HttpResponse], String), HostConnectionPool] = Http().cachedHostConnectionPool[String](host, port).async

val source =
  Source
    .actorRef[CassandraRow](10000000, OverflowStrategy.fail)
    .map(row => makeUrl(row.getString("id"), row.getString("url")))
    .map(url => HttpRequest(uri = url) -> url)

val ref = Flow[(HttpRequest, String)]
  .via(httpPool.withAttributes(ActorAttributes.supervisionStrategy(decider)))
  .to(Sink.actorRef(httpHandlerActor, IsDone))
  .runWith(source)

cassandraRdd.collect().foreach { row =>
  ref ! row
}

集計などとは異なることを行うために何百万行も読み取った経験があるかどうかを知りたいです。

また、すべてを読み込んで、ストリーミング (spark または Akka) を使用して受信する Kafka トピックに送信することも考えましたが、問題は同じで、これらすべてのデータを効果的にロードする方法は?

編集

今のところ、100 GB の適切な量のメモリを備えたクラスターで実行し、収集と反復処理を行っています。

また、これは、spark でビッグデータを取得し、reduceByKey、aggregateByKey などを使用して分析することとは大きく異なります。

HTTP経由ですべてを取得して送信する必要があります=/

これまでのところ、私が行ったように機能していますが、このデータがどんどん大きくなって、すべてをメモリにフェッチしても意味がなくなるのではないかと心配しています。

このデータをストリーミングしてチャンクでフェッチするのが最善の解決策ですが、これにはまだ良いアプローチが見つかりません。

最後に、Spark を使用してそれらすべてのデータを取得し、CSV ファイルを生成し、Akka Stream IO を使用して処理することを考えています。 100万。

4

1 に答える 1

5

さて、読書をしたり、他の人と話したり、テストを行ったりした後、次のコードサンプルで結果を得ることができます:

val sc = new SparkContext(sparkConf)

val cassandraRdd = sc.cassandraTable(config.getString("myKeyspace"), "myTable")
  .select("key", "value")
  .as((key: String, value: String) => (key, value))
  .partitionBy(new HashPartitioner(2 * sc.defaultParallelism))
  .cache()

cassandraRdd
  .groupByKey()
  .foreachPartition { partition =>
    partition.foreach { row =>

      implicit val system = ActorSystem()
      implicit val materializer = ActorMaterializer()

      val myActor = system.actorOf(Props(new MyActor(system)), name = "my-actor")

      val source = Source.fromIterator { () => row._2.toIterator }
      source
        .map { str =>
          myActor ! Count
          str
        }
        .to(Sink.actorRef(myActor, Finish))
        .run()
    }
  }

sc.stop()


class MyActor(system: ActorSystem) extends Actor {

  var count = 0

  def receive = {

    case Count =>
      count = count + 1

    case Finish =>
      println(s"total: $count")
      system.shutdown()

  }
}

case object Count
case object Finish

私がやっていることは次のとおりです。

  • partitionBy および groupBy メソッドを使用して、十分な数のパーティションとパーティショナーを実現してみてください
  • キャッシュを使用してデータ シャッフルを防ぎ、Spark がノード間で大きなデータを移動するようにし、高い IO を使用するなど。
  • 依存関係を含むアクター システム全体と、 foreachPartition メソッド内のストリームを作成します。ここにトレードオフがあります。ActorSystem は 1 つしか持てませんが、質問で書いたように .collect を悪用する必要があります。ただし、内部ですべてを作成しても、クラスター全体に分散された Spark 内で実行することができます。
  • kill(Finish) というメッセージを含む Sink.actorRef を使用して、反復子の最後で各アクター システムを終了します。

おそらく、このコードはさらに改善される可能性がありますが、これまでのところ、.collect を使用せず、Spark 内でのみ動作することに満足しています。

于 2016-05-11T00:06:12.907 に答える