datastax が提供する spark-cassandra-connector 1.1.0 を使用しています。興味深い問題に気付きましたが、なぜこのようなことが起こっているのかわかりません: cassandra コネクタをブロードキャストしてエグゼキュータで使用しようとすると、構成が無効であることを示す例外が発生し、0.0.0 で Cassandra に接続できません。
スタックトレースの例:
java.io.IOException: Failed to open native connection to Cassandra at {0.0.0.0}:9042
at com.datastax.spark.connector.cql.CassandraConnector$.com$datastax$spark$connector$cql$CassandraConnector$$createSession(CassandraConnector.scala:174)
at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$2.apply(CassandraConnector.scala:160)
at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$2.apply(CassandraConnector.scala:160)
at com.datastax.spark.connector.cql.RefCountedCache.createNewValueAndKeys(RefCountedCache.scala:36)
at com.datastax.spark.connector.cql.RefCountedCache.acquire(RefCountedCache.scala:61)
at com.datastax.spark.connector.cql.CassandraConnector.openSession(CassandraConnector.scala:71)
at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:97)
...
しかし、ブロードキャストせずに使用すると、すべて正常に動作します。
私にとって奇妙なことは、ドライバー側では値が適切な構成をブロードキャストしましたが、エグゼキューター側ではそうではありませんでした。
運転席側:
val dbConf = ssc.sparkContext.getConf
val connector = CassandraConnector(dbConf)
println(connector.hosts) //Set(10.20.1.5)
val broadcastedConnector = ssc.sparkContext.broadcast(connector)
println(broadcastedConnector.value.hosts) //Set(10.20.1.5)
エグゼキュータ側:
mapPartition{
...
println(broadcastedConnector.hosts) // Set(0.0.0.)
...
}
なぜそのような方法で動作しているのか、エグゼキューター側で使用できる方法で Cassandra コネクタをブロードキャストする方法を誰かが説明できますか?
更新1.2.3 バージョンのコネクタにも同じ問題があります。