0

spark から cassandra にデータを書き込んでいる間、データが書き込まれません。
フラッシュ バックは次のとおりです。
私は kafka-sparkStreaming-cassandra の統合を行っています。
私はカフカのメッセージを読んでいて、それをカサンドラのテーブルに入れようとしていますCREATE TABLE TEST_TABLE(key INT PRIMARY KEY, value TEXT)
kafka から spark-streaming への移行は問題なく実行されていますが、spark から cassandra への移行には問題があります... データがテーブルに書き込まれません。
Cassandra との接続を作成できますが、データが Cassandra テーブルに挿入されません。出力は、接続され、次の秒で切断されることを示しています。
の文字列System.out.print()はすべて出力にあります。

+++++++++++cassandra connector created++++++++++++++++++++++++++++
+++++++++++++streaming Connection done!+++++++++++++++++++++++++++
++++++++++++++++JavaDStream<TestTable> created++++++++++++++++++++++++++++

Cassandra シェルに 0 行が表示されます。
完全なコードとログと依存関係は以下のとおりです。

public class SparkStream {
    static int key=0;
    public static void main(String args[]) throws Exception
    {

        if(args.length != 3)
        {
            System.out.println("parameters not given properly");
            System.exit(1);
        }

        Logger.getLogger("org").setLevel(Level.OFF);
        Logger.getLogger("akka").setLevel(Level.OFF);
        Map<String,Integer> topicMap = new HashMap<String,Integer>();
        String[] topic = args[2].split(",");
        for(String t: topic)
        {
            topicMap.put(t, new Integer(3));
        }

        /* Connection to Spark */
        SparkConf conf = new SparkConf();
        conf.set("spark.cassandra.connection.host", "localhost");
        JavaSparkContext sc = new JavaSparkContext("local[4]", "SparkStream",conf);
        JavaStreamingContext jssc = new JavaStreamingContext(sc, new Duration(5000));


        /* connection to cassandra */
        CassandraConnector connector = CassandraConnector.apply(sc.getConf());
        System.out.println("+++++++++++cassandra connector created++++++++++++++++++++++++++++");


        /* Receive Kafka streaming inputs */
        JavaPairReceiverInputDStream<String, String> messages = KafkaUtils.createStream(jssc, args[0], args[1], topicMap );
        System.out.println("+++++++++++++streaming Connection done!+++++++++++++++++++++++++++");


        /* Create DStream */                
        JavaDStream<TestTable> data = messages.map(new Function< Tuple2<String,String>, TestTable >() 
        {
            public TestTable call(Tuple2<String, String> message)
            {
                return new TestTable(new Integer(++key), message._2() );
            }
        }
        );
        System.out.println("++++++++++++++++JavaDStream<TestTable> created++++++++++++++++++++++++++++");


        /* Write to cassandra */
        javaFunctions(data).writerBuilder("testkeyspace", "test_table", mapToRow(TestTable.class)).saveToCassandra();


        jssc.start();
        jssc.awaitTermination();

    }
}

class TestTable implements Serializable
{
    Integer key;
    String value;

    public TestTable() {}

    public TestTable(Integer k, String v)
    {
        key=k;
        value=v;
    }

    public Integer getKey(){
        return key;
    }

    public void setKey(Integer k){
        key=k;
    }

    public String getValue(){
        return value;
    }

    public void setValue(String v){
        value=v;
    }

    public String toString(){
        return MessageFormat.format("TestTable'{'key={0}, value={1}'}'", key, value);

    }
}

ログは次のとおりです。

+++++++++++cassandra connector created++++++++++++++++++++++++++++
+++++++++++++streaming Connection done!+++++++++++++++++++++++++++
++++++++++++++++JavaDStream<TestTable> created++++++++++++++++++++++++++++
14/12/09 12:07:33 INFO core.Cluster: New Cassandra host localhost/127.0.0.1:9042 added
14/12/09 12:07:33 INFO cql.CassandraConnector: Connected to Cassandra cluster: Test Cluster
14/12/09 12:07:33 INFO cql.LocalNodeFirstLoadBalancingPolicy: Adding host 127.0.0.1 (datacenter1)
14/12/09 12:07:33 INFO cql.LocalNodeFirstLoadBalancingPolicy: Adding host 127.0.0.1 (datacenter1)
14/12/09 12:07:34 INFO cql.CassandraConnector: Disconnected from Cassandra cluster: Test Cluster

14/12/09 12:07:45 INFO core.Cluster: New Cassandra host localhost/127.0.0.1:9042 added
14/12/09 12:07:45 INFO cql.CassandraConnector: Connected to Cassandra cluster: Test Cluster
14/12/09 12:07:45 INFO cql.LocalNodeFirstLoadBalancingPolicy: Adding host 127.0.0.1 (datacenter1)
14/12/09 12:07:45 INFO cql.LocalNodeFirstLoadBalancingPolicy: Adding host 127.0.0.1 (datacenter1)
14/12/09 12:07:46 INFO cql.CassandraConnector: Disconnected from Cassandra cluster: Test Cluster

POM.xml の依存関係は次のとおりです。

   <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming-kafka_2.10</artifactId>
        <version>1.1.0</version>
    </dependency>

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.10</artifactId>
        <version>1.1.0</version>
    </dependency>

<dependency>
    <groupId>com.datastax.spark</groupId>
    <artifactId>spark-cassandra-connector_2.10</artifactId>
    <version>1.1.0</version>
</dependency>
<dependency>
    <groupId>com.datastax.spark</groupId>
    <artifactId>spark-cassandra-connector-java_2.10</artifactId>
    <version>1.1.0</version>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.10</artifactId>
    <version>1.1.1</version>
</dependency>


    <dependency>
        <groupId>com.msiops.footing</groupId>
        <artifactId>footing-tuple</artifactId>
        <version>0.2</version>
    </dependency>   

<dependency>
    <groupId>com.datastax.cassandra</groupId>
    <artifactId>cassandra-driver-core</artifactId>
    <version>2.1.3</version>
</dependency>

コードに何か問題がありますか?またはcassandra構成?

4

1 に答える 1

1

問題を解決しました。columnMapper は、クラス TestTable のゲッターとセッターにアクセスできませんでした。そのため、アクセス修飾子を public に変更しました。しかし、今では1つのファイルに2つのパブリッククラスがありました. これはエラーです。そのため、次のクラスを持つ別のJavaファイルTestTable.javaを作成しました

public class TestTable implements Serializable { 
//code
}

現在、メッセージはカフカから読み取られ、cassandra テーブルに格納されています。

于 2014-12-11T18:29:02.360 に答える