これまでのところ、mongo-hadoop-core 1.4.2 を使用して MongoDB からデータを取得できています。操作したいデータは、クエリしているコレクション内の各ドキュメント内の埋め込みドキュメント内の配列内の値であり、これらの値はDouble
. コレクションから取得したデータの型は RDD[(Object, org.bson.BSONObject)] です。これは、各ドキュメントが型 (Object, org.bson.BSONObject) のタプルであることを意味します。
埋め込みドキュメントを取得したいときはいつでも(spark-shell 1.5.1で作業しています):
import com.mongodb.{BasicDBObject, BasicDBList} // classes I am using here.
// 'documents' already taken from collection.
scala> documents
res4: org.apache.spark.rdd.RDD[(Object, org.bson.BSONObject)] = NewHadoopRDD[0] at newAPIHadoopRDD at <console>:32
// getting one document.
scala> val doc = documents.take(1)(0)
doc: (Object, org.bson.BSONObject) = ( ... _id fields ... , ... lots of fields ...)
// getting an embed document from tuple's second element.
scala> val samples = doc._2.get("samp") match {case x: BasicDBObject => x}
samples: com.mongodb.BasicDBObject = (... some fields ...)
// getting an embed document.
scala> val latency = samples.get("latency") match {case x: BasicDBObject => x}
latency: com.mongodb.BasicDBObject = { "raw" : [ 9.71 , 8.77 , 10.16 , 9.49 , 8.54 , 10.29 , 9.55 , 9.16 , 10.78 , 10.31 , 9.54 , 10.69 , 10.33 , 9.58 , 9.07 , 9.72 , 9.48 , 8.72 , 10.59 , 9.81 , 9.31 , 10.64 , 9.87 , 9.29 , 10.38 , 9.64 , 8.86 , 10.84 , 10.06 , 9.29 , 8.45 , 9.08 , 7.55 , 9.75 , 9.05 , 10.38 , 9.64 , 8.25 , 10.27 , 9.54 , 8.52 , 10.26 , 9.53 , 7.87 , 9.76 , 9.02 , 10.27 , 7.93 , 9.73 , 9 , 10.07 , 9.35 , 7.66 , 13.68 , 11.92 , 14.72 , 14 , 12.55 , 11.77 , 11.02 , 11.59 , 10.87 , 10.4 , 9.13 , 10.28 , 9.55 , 10.43 , 8.33 , 9.66 , 8.93 , 8.05 , 11.26 , 10.53 , 9.81 , 10.2 , 9.42 , 7.73 , 9.76 , 9.04 , 8.29 , 9.34 , 7.21 , 10.05 , 9.32 , 10.28 , 8.59 , 10.15 , 9.53 , 7.88 , 9.9 , 9.15 , 13.96 , 13.19 , 11 , 13.6 , 13.01 , 12.17 , 11.39 , 10.64 , 9.9] , "xtrf" : { "...
// getting a bson array.
scala> val array = latency.get("raw") match {case x: BasicDBList => x}
array: com.mongodb.BasicDBList = [ 9.71 , 8.77 , 10.16 , 9.49 , 8.54 , 10.29 , 9.55 , 9.16 , 10.78 , 10.31 , 9.54 , 10.69 , 10.33 , 9.58 , 9.07 , 9.72 , 9.48 , 8.72 , 10.59 , 9.81 , 9.31 , 10.64 , 9.87 , 9.29 , 10.38 , 9.64 , 8.86 , 10.84 , 10.06 , 9.29 , 8.45 , 9.08 , 7.55 , 9.75 , 9.05 , 10.38 , 9.64 , 8.25 , 10.27 , 9.54 , 8.52 , 10.26 , 9.53 , 7.87 , 9.76 , 9.02 , 10.27 , 7.93 , 9.73 , 9 , 10.07 , 9.35 , 7.66 , 13.68 , 11.92 , 14.72 , 14 , 12.55 , 11.77 , 11.02 , 11.59 , 10.87 , 10.4 , 9.13 , 10.28 , 9.55 , 10.43 , 8.33 , 9.66 , 8.93 , 8.05 , 11.26 , 10.53 , 9.81 , 10.2 , 9.42 , 7.73 , 9.76 , 9.04 , 8.29 , 9.34 , 7.21 , 10.05 , 9.32 , 10.28 , 8.59 , 10.15 , 9.53 , 7.88 , 9.9 , 9.15 , 13.96 , 13.19 , 11 , 13.6 , 13.01 , 12.17 , 11.39 , 10.64 , 9.9]
タイプ Object を BasicDBObject に変換するのは非常に不便ですが、使用するにはそれを行う必要がありますget(key: String)
。.asInstanceOf[BasicDBObject]
代わりに使用することもできます match {case x: BasicDBObject => x}
が、より良い方法はありますか??. Double
、Int
、String
などの特定の型を取得することは、クラスDate
から継承されたメソッドを使用して簡単に行うことができます。BasicBsonObject
に関してはBasicDBList
、から継承されたget(key: String)
メソッドがあり、キャストできるが呼び出しのみを使用して返すBasicBsonList
メソッドがあり、キャストできないの配列を返すから継承されたメソッドがあります。ここ:Object
Double
.asInstanceOf[Double]
toArray()
java.util.ArrayList
Object
Double
.map(_.asInstanceOf[Double])
scala> val arrayOfDoubles = array.toArray.map(_.asInstanceOf[Double])
java.lang.ClassCastException: java.lang.Integer cannot be cast to java.lang.Double
at scala.runtime.BoxesRunTime.unboxToDouble(BoxesRunTime.java:119)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:37)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:37)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:37)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:42)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:44)
at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:46)
at $iwC$$iwC$$iwC$$iwC.<init>(<console>:48)
at $iwC$$iwC$$iwC.<init>(<console>:50)
at $iwC$$iwC.<init>(<console>:52)
at $iwC.<init>(<console>:54)
at <init>(<console>:56)
at .<init>(<console>:60)
at .<clinit>(<console>)
at .<init>(<console>:7)
at .<clinit>(<console>)
at $print(<console>)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1340)
at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:857)
at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902)
at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814)
at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:657)
at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:665)
at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670)
at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:997)
at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945)
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1059)
at org.apache.spark.repl.Main$.main(Main.scala:31)
at org.apache.spark.repl.Main.main(Main.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:672)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
しかし、時にはそれが機能します。一部のドキュメントではこのキャストが機能しますが、他のドキュメントでは機能せず、上記のエラー メッセージが出力されます。これは、MongoDB が Spark に提供するデータ構造の問題でしょうか?これらのドキュメントだけでしょうか? 30 個の値を持つ小さな配列は、常に機能するようです。
これまでの私の解決策は、この非効率的な変換です:
scala> val arrayOfDoubles = array.toArray.map(_.toString.toDouble)
arrayOfDoubles: Array[Double] = Array(9.71, 8.77, 10.16, 9.49, 8.54, 10.29, 9.55, 9.16, 10.78, 10.31, 9.54, 10.69, 10.33, 9.58, 9.07, 9.72, 9.48, 8.72, 10.59, 9.81, 9.31, 10.64, 9.87, 9.29, 10.38, 9.64, 8.86, 10.84, 10.06, 9.29, 8.45, 9.08, 7.55, 9.75, 9.05, 10.38, 9.64, 8.25, 10.27, 9.54, 8.52, 10.26, 9.53, 7.87, 9.76, 9.02, 10.27, 7.93, 9.73, 9.0, 10.07, 9.35, 7.66, 13.68, 11.92, 14.72, 14.0, 12.55, 11.77, 11.02, 11.59, 10.87, 10.4, 9.13, 10.28, 9.55, 10.43, 8.33, 9.66, 8.93, 8.05, 11.26, 10.53, 9.81, 10.2, 9.42, 7.73, 9.76, 9.04, 8.29, 9.34, 7.21, 10.05, 9.32, 10.28, 8.59, 10.15, 9.53, 7.88, 9.9, 9.15, 13.96, 13.19, 11.0, 13.6, 13.01, 12.17, 11.39, 10.64, 9.9)
ここで何かが足りないのですか、それとも本当に不便ですか? Object
これらすべてのメソッドがorを返さなければならないのはなぜBSONObject
ですか? 私が見つけたこの問題を克服する方法はありますか?java.lang.Integer
double にキャストされる配列に整数がない場合、これはどこから来たのでしょうか?