Kafka + Cassandra を Spark 1.5.1 に接続したい。
ライブラリのバージョン:
scalaVersion := "2.10.6"
libraryDependencies ++= Seq(
"org.apache.spark" % "spark-streaming_2.10" % "1.5.1",
"org.apache.spark" % "spark-streaming-kafka_2.10" % "1.5.1",
"com.datastax.spark" % "spark-cassandra-connector_2.10" % "1.5.0-M2"
)
アプリへの初期化と使用:
val sparkConf = new SparkConf(true)
.setMaster("local[2]")
.setAppName("KafkaStreamToCassandraApp")
.set("spark.executor.memory", "1g")
.set("spark.cores.max", "1")
.set("spark.cassandra.connection.host", "127.0.0.1")
次のように Cassandra にスキーマを作成します。
CassandraConnector(sparkConf).withSessionDo { session =>
session.execute(s"DROP KEYSPACE IF EXISTS kafka_streaming")
session.execute(s"CREATE KEYSPACE IF NOT EXISTS kafka_streaming WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1 }")
session.execute(s"CREATE TABLE IF NOT EXISTS kafka_streaming.wordcount (word TEXT PRIMARY KEY, count COUNTER)")
session.execute(s"TRUNCATE kafka_streaming.wordcount")
}
また、準備jar
ができたら、いくつかの戦略を作成します。
assemblyMergeStrategy in assembly := {
case PathList("com", "esotericsoftware", xs@_*) => MergeStrategy.last
case PathList("com", "google", xs@_*) => MergeStrategy.first
case PathList("org", "apache", xs@_*) => MergeStrategy.last
case PathList("io", "netty", xs@_*) => MergeStrategy.last
case PathList("com", "codahale", xs@_*) => MergeStrategy.last
case PathList("META-INF", "io.netty.versions.properties") => MergeStrategy.first
問題は関連していると思います
case PathList("com", "google", xs@_*) => MergeStrategy.first
使用に縛られますMergeStrategy.last
。
何か案は?
例外があります:
Exception in thread "main" java.lang.NoSuchMethodError: com.google.common.reflect.TypeToken.isPrimitive()Z
at com.datastax.driver.core.TypeCodec.<init>(TypeCodec.java:142)
at com.datastax.driver.core.TypeCodec.<init>(TypeCodec.java:136)
at com.datastax.driver.core.TypeCodec$BlobCodec.<init>(TypeCodec.java:609)
at com.datastax.driver.core.TypeCodec$BlobCodec.<clinit>(TypeCodec.java:606)
at com.datastax.driver.core.CodecRegistry.<clinit>(CodecRegistry.java:147)
at com.datastax.driver.core.Configuration$Builder.build(Configuration.java:259)
at com.datastax.driver.core.Cluster$Builder.getConfiguration(Cluster.java:1135)
at com.datastax.driver.core.Cluster.<init>(Cluster.java:111)
at com.datastax.driver.core.Cluster.buildFrom(Cluster.java:178)
at com.datastax.driver.core.Cluster$Builder.build(Cluster.java:1152)
at com.datastax.spark.connector.cql.DefaultConnectionFactory$.createCluster(CassandraConnectionFactory.scala:85)
at com.datastax.spark.connector.cql.CassandraConnector$.com$datastax$spark$connector$cql$CassandraConnector$$createSession(CassandraConnector.scala:155)