4

datastax が提供する spark-cassandra-connector 1.1.0 を使用しています。興味深い問題に気付きましたが、なぜこのようなことが起こっているのかわかりません: cassandra コネクタをブロードキャストしてエグゼキュータで使用しようとすると、構成が無効であることを示す例外が発生し、0.0.0 で Cassandra に接続できません。

スタックトレースの例:

java.io.IOException: Failed to open native connection to Cassandra at {0.0.0.0}:9042
        at com.datastax.spark.connector.cql.CassandraConnector$.com$datastax$spark$connector$cql$CassandraConnector$$createSession(CassandraConnector.scala:174)
        at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$2.apply(CassandraConnector.scala:160)
        at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$2.apply(CassandraConnector.scala:160)
        at com.datastax.spark.connector.cql.RefCountedCache.createNewValueAndKeys(RefCountedCache.scala:36)
        at com.datastax.spark.connector.cql.RefCountedCache.acquire(RefCountedCache.scala:61)
        at com.datastax.spark.connector.cql.CassandraConnector.openSession(CassandraConnector.scala:71)
        at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:97)
...

しかし、ブロードキャストせずに使用すると、すべて正常に動作します。

私にとって奇妙なことは、ドライバー側では値が適切な構成をブロードキャストしましたが、エグゼキューター側ではそうではありませんでした。

運転席側:

  val dbConf = ssc.sparkContext.getConf
  val connector = CassandraConnector(dbConf)
  println(connector.hosts) //Set(10.20.1.5) 
  val broadcastedConnector = ssc.sparkContext.broadcast(connector)
  println(broadcastedConnector.value.hosts) //Set(10.20.1.5) 

エグゼキュータ側:

mapPartition{
...
 println(broadcastedConnector.hosts) // Set(0.0.0.)
...
}

なぜそのような方法で動作しているのか、エグゼキューター側で使用できる方法で Cassandra コネクタをブロードキャストする方法を誰かが説明できますか?

更新1.2.3 バージョンのコネクタにも同じ問題があります。

4

1 に答える 1

3

Cassandra コネクタをブロードキャストする理由はありません。並列化されたクロージャー内で使用すると、構成がシリアル化され、エグゼキューターに新しい接続が作成されるか、既存のエグゼキューター接続が存在する場合はそれが使用されます。

于 2015-07-18T20:42:07.287 に答える