私は大量の json メッセージ (それぞれ約 2KB) を、カフカからスパーク ストリーミングに来るカサンドラにプッシュしようとしています。
シミュレータ---->カフカ---->SparkStreaming--->カサンドラ。
これらはそれぞれ、30 GB の RAM と 8 コア プロセッサを備えた個別の ec2 インスタンスで、スタンドアロンの単一ノード セットアップとして実行されています。
シミュレーターから約 500 万件のメッセージをプッシュしようとすると、約 10 万件のメッセージの後、cassandra はメッセージの挿入を停止し、spark ストリーミング ジョブはバッチを作成し続けます (spark ストリーミング Web UI に見られるように)。ログも確認しましたが、問題は見つかりませんでした。
また、cassandra に書き込むためのコードでスパーク コネクタを使用している方法がわかりません。
以下のコードを参照してください。
/**
* Spark Streaming to cassandra code
*/
package org.sparkexample;
import java.util.HashMap;
import java.util.Map;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;
import com.datastax.spark.connector.japi.CassandraJavaUtil;
import com.datastax.spark.connector.japi.CassandraStreamingJavaUtil;
import scala.Tuple2;
public class SparkStreamingKafkaTest {
private SparkStreamingKafkaTest() {
}
public static void main(String[] args) {
if (args.length < 6) {
System.err.println("Usage: SparkStreamingKafka <zkQuorum> <group> <topics> <numThreads> <conc write> <cassandra ip>");
System.exit(1);
}
SparkConf sparkConf = new SparkConf().setAppName("SparkStreamingKafka");
//specific to cassandra
sparkConf.set("spark.cassandra.output.concurrent.writes", args[4]);
sparkConf.set("spark.cassandra.connection.host",args[5]);
// Create the context with a 2 second batch size
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000));
int numThreads = Integer.parseInt(args[3]);
Map<String, Integer> topicMap = new HashMap<String, Integer>();
String[] topics = args[2].split(",");
for (String topic : topics) {
topicMap.put(topic, numThreads);
}
JavaPairReceiverInputDStream<String, String> messages = KafkaUtils.createStream(jssc, args[0], args[1],
topicMap);
JavaDStream<WordCount> wc = messages.map(new Function<Tuple2<String, String>, WordCount>() {
@Override
public WordCount call(Tuple2<String, String> tuple2) {
String key = System.currentTimeMillis()+ "_"+ Math.random();
return new WordCount(key, tuple2._2());
}
});
Map <String, String> map = new HashMap<String, String>();
map.put("word", "word");
map.put("count", "count");
CassandraStreamingJavaUtil.javaFunctions(wc).writerBuilder("mykeyspace", "wordcount",CassandraJavaUtil.mapToRow(WordCount.class, map)).saveToCassandra();
jssc.start();
jssc.awaitTermination();
}
}
WordCount.java
package org.sparkexample;
import java.io.Serializable;
public class WordCount implements Serializable{
private String word;
private String count;
public WordCount(){
}
public String getWord() {
return word;
}
public void setWord(String word) {
this.word = word;
}
public String getCount() {
return count;
}
public void setCount(String count) {
this.count = count;
}
public WordCount(String key, String count) {
this.word = key;
this.count = count;
}
}
以下の主要な依存関係を持つデフォルトの cassandra.yml を使用しています。
- スパーク カサンドラ コネクタ_2.10 - 1.4.0-M3
- スパーク-cassandra-connector-java_2.10 - 1.4.0-M3
- cassandra-driver-core - 2.1.7.1
- スパーク ストリーミング kafka_2.10 - 1.4.1
- スパークストリーミング_2.10 - 1.4.1
- スパークコア_2.10 - 1.4.1
問題になる可能性のあるものを提案してください。
nodetool info と nodetool tpstats の出力は次のとおりです。