Cassandra テーブルから数百万行を読み取るのは難しい作業です。実際、このテーブルには 4,000 ~ 5,000 万行の行が含まれています。データは実際にはシステムの内部 URL であり、それらすべてを起動する必要があります。それを起動するために Akka Streams を使用していますが、必要に応じてバック プレッシャーを行いながら、かなりうまく機能しています。しかし、すべてを効果的に読み取る方法はまだ見つかっていません。
これまでに試したこと:
Akka Stream を使用してデータを Stream として読み取ります。特定のテーブルのパブリッシャーを提供する phantom-dsl を使用しています。ただし、すべてを読み取るわけではなく、一部のみを読み取ります。実際には、最初の 100 万を超えると読み取りが停止します。
特定の日付までに Spark を使用して読み取ります。私たちのテーブルは時系列テーブルのようにモデル化されており、年、月、日、分... の列があります。現在、日ごとに選択しているため、Spark は処理する多くのものを取得しませんが、すべての日を選択するのは面倒です。
コードは次のとおりです。
val cassandraRdd =
sc
.cassandraTable("keyspace", "my_table")
.select("id", "url")
.where("year = ? and month = ? and day = ?", date.getYear, date.getMonthOfYear, date.getDayOfMonth)
残念ながら、データを減らすためにパーティションを反復処理することはできません。アクターがシリアル化できないと不平を言うため、収集を使用する必要があります。
val httpPool: Flow[(HttpRequest, String), (Try[HttpResponse], String), HostConnectionPool] = Http().cachedHostConnectionPool[String](host, port).async
val source =
Source
.actorRef[CassandraRow](10000000, OverflowStrategy.fail)
.map(row => makeUrl(row.getString("id"), row.getString("url")))
.map(url => HttpRequest(uri = url) -> url)
val ref = Flow[(HttpRequest, String)]
.via(httpPool.withAttributes(ActorAttributes.supervisionStrategy(decider)))
.to(Sink.actorRef(httpHandlerActor, IsDone))
.runWith(source)
cassandraRdd.collect().foreach { row =>
ref ! row
}
集計などとは異なることを行うために何百万行も読み取った経験があるかどうかを知りたいです。
また、すべてを読み込んで、ストリーミング (spark または Akka) を使用して受信する Kafka トピックに送信することも考えましたが、問題は同じで、これらすべてのデータを効果的にロードする方法は?
編集
今のところ、100 GB の適切な量のメモリを備えたクラスターで実行し、収集と反復処理を行っています。
また、これは、spark でビッグデータを取得し、reduceByKey、aggregateByKey などを使用して分析することとは大きく異なります。
HTTP経由ですべてを取得して送信する必要があります=/
これまでのところ、私が行ったように機能していますが、このデータがどんどん大きくなって、すべてをメモリにフェッチしても意味がなくなるのではないかと心配しています。
このデータをストリーミングしてチャンクでフェッチするのが最善の解決策ですが、これにはまだ良いアプローチが見つかりません。
最後に、Spark を使用してそれらすべてのデータを取得し、CSV ファイルを生成し、Akka Stream IO を使用して処理することを考えています。 100万。