spark から cassandra にデータを書き込んでいる間、データが書き込まれません。
フラッシュ バックは次のとおりです。
私は kafka-sparkStreaming-cassandra の統合を行っています。
私はカフカのメッセージを読んでいて、それをカサンドラのテーブルに入れようとしていますCREATE TABLE TEST_TABLE(key INT PRIMARY KEY, value TEXT)
。
kafka から spark-streaming への移行は問題なく実行されていますが、spark から cassandra への移行には問題があります... データがテーブルに書き込まれません。
Cassandra との接続を作成できますが、データが Cassandra テーブルに挿入されません。出力は、接続され、次の秒で切断されることを示しています。
の文字列System.out.print()
はすべて出力にあります。
+++++++++++cassandra connector created++++++++++++++++++++++++++++
+++++++++++++streaming Connection done!+++++++++++++++++++++++++++
++++++++++++++++JavaDStream<TestTable> created++++++++++++++++++++++++++++
Cassandra シェルに 0 行が表示されます。
完全なコードとログと依存関係は以下のとおりです。
public class SparkStream {
static int key=0;
public static void main(String args[]) throws Exception
{
if(args.length != 3)
{
System.out.println("parameters not given properly");
System.exit(1);
}
Logger.getLogger("org").setLevel(Level.OFF);
Logger.getLogger("akka").setLevel(Level.OFF);
Map<String,Integer> topicMap = new HashMap<String,Integer>();
String[] topic = args[2].split(",");
for(String t: topic)
{
topicMap.put(t, new Integer(3));
}
/* Connection to Spark */
SparkConf conf = new SparkConf();
conf.set("spark.cassandra.connection.host", "localhost");
JavaSparkContext sc = new JavaSparkContext("local[4]", "SparkStream",conf);
JavaStreamingContext jssc = new JavaStreamingContext(sc, new Duration(5000));
/* connection to cassandra */
CassandraConnector connector = CassandraConnector.apply(sc.getConf());
System.out.println("+++++++++++cassandra connector created++++++++++++++++++++++++++++");
/* Receive Kafka streaming inputs */
JavaPairReceiverInputDStream<String, String> messages = KafkaUtils.createStream(jssc, args[0], args[1], topicMap );
System.out.println("+++++++++++++streaming Connection done!+++++++++++++++++++++++++++");
/* Create DStream */
JavaDStream<TestTable> data = messages.map(new Function< Tuple2<String,String>, TestTable >()
{
public TestTable call(Tuple2<String, String> message)
{
return new TestTable(new Integer(++key), message._2() );
}
}
);
System.out.println("++++++++++++++++JavaDStream<TestTable> created++++++++++++++++++++++++++++");
/* Write to cassandra */
javaFunctions(data).writerBuilder("testkeyspace", "test_table", mapToRow(TestTable.class)).saveToCassandra();
jssc.start();
jssc.awaitTermination();
}
}
class TestTable implements Serializable
{
Integer key;
String value;
public TestTable() {}
public TestTable(Integer k, String v)
{
key=k;
value=v;
}
public Integer getKey(){
return key;
}
public void setKey(Integer k){
key=k;
}
public String getValue(){
return value;
}
public void setValue(String v){
value=v;
}
public String toString(){
return MessageFormat.format("TestTable'{'key={0}, value={1}'}'", key, value);
}
}
ログは次のとおりです。
+++++++++++cassandra connector created++++++++++++++++++++++++++++
+++++++++++++streaming Connection done!+++++++++++++++++++++++++++
++++++++++++++++JavaDStream<TestTable> created++++++++++++++++++++++++++++
14/12/09 12:07:33 INFO core.Cluster: New Cassandra host localhost/127.0.0.1:9042 added
14/12/09 12:07:33 INFO cql.CassandraConnector: Connected to Cassandra cluster: Test Cluster
14/12/09 12:07:33 INFO cql.LocalNodeFirstLoadBalancingPolicy: Adding host 127.0.0.1 (datacenter1)
14/12/09 12:07:33 INFO cql.LocalNodeFirstLoadBalancingPolicy: Adding host 127.0.0.1 (datacenter1)
14/12/09 12:07:34 INFO cql.CassandraConnector: Disconnected from Cassandra cluster: Test Cluster
14/12/09 12:07:45 INFO core.Cluster: New Cassandra host localhost/127.0.0.1:9042 added
14/12/09 12:07:45 INFO cql.CassandraConnector: Connected to Cassandra cluster: Test Cluster
14/12/09 12:07:45 INFO cql.LocalNodeFirstLoadBalancingPolicy: Adding host 127.0.0.1 (datacenter1)
14/12/09 12:07:45 INFO cql.LocalNodeFirstLoadBalancingPolicy: Adding host 127.0.0.1 (datacenter1)
14/12/09 12:07:46 INFO cql.CassandraConnector: Disconnected from Cassandra cluster: Test Cluster
POM.xml の依存関係は次のとおりです。
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka_2.10</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>com.datastax.spark</groupId>
<artifactId>spark-cassandra-connector_2.10</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>com.datastax.spark</groupId>
<artifactId>spark-cassandra-connector-java_2.10</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.1.1</version>
</dependency>
<dependency>
<groupId>com.msiops.footing</groupId>
<artifactId>footing-tuple</artifactId>
<version>0.2</version>
</dependency>
<dependency>
<groupId>com.datastax.cassandra</groupId>
<artifactId>cassandra-driver-core</artifactId>
<version>2.1.3</version>
</dependency>
コードに何か問題がありますか?またはcassandra構成?