8

Cassandraへの書き込みにDatastax Javaドライバーを使用する予定です。主にDatastax Javaドライバーの機能に興味がBatch WritesありましAsycnhronousたが、Datastaxを使用する以下のコードにこれらの機能を組み込む方法を説明できるチュートリアルを取得できませんJava ドライバー..

/**
 * Performs an upsert of the specified attributes for the specified id.
 */
public void upsertAttributes(final String userId, final Map<String, String> attributes, final String columnFamily) {

    try {

        // make a sql here using the above input parameters.

        String sql = sqlPart1.toString()+sqlPart2.toString();

        DatastaxConnection.getInstance();
        PreparedStatement prepStatement = DatastaxConnection.getSession().prepare(sql);
        prepStatement.setConsistencyLevel(ConsistencyLevel.ONE);        

        BoundStatement query = prepStatement.bind(userId, attributes.values().toArray(new Object[attributes.size()]));

        DatastaxConnection.getSession().execute(query);

    } catch (InvalidQueryException e) {
        LOG.error("Invalid Query Exception in DatastaxClient::upsertAttributes "+e);
    } catch (Exception e) {
        LOG.error("Exception in DatastaxClient::upsertAttributes "+e);
    }
}

以下のコードでは、Datastax Java ドライバーを使用して Cassandra ノードへの接続を作成しています。

/**
 * Creating Cassandra connection using Datastax Java driver
 *
 */
private DatastaxConnection() {

    try{
        builder = Cluster.builder();
        builder.addContactPoint("some_nodes");

        builder.poolingOptions().setCoreConnectionsPerHost(
                HostDistance.LOCAL,
                builder.poolingOptions().getMaxConnectionsPerHost(HostDistance.LOCAL));

        cluster = builder
                .withRetryPolicy(DowngradingConsistencyRetryPolicy.INSTANCE)
                .withReconnectionPolicy(new ConstantReconnectionPolicy(100L))
                .build();

        StringBuilder s = new StringBuilder();
        Set<Host> allHosts = cluster.getMetadata().getAllHosts();
        for (Host h : allHosts) {
            s.append("[");
            s.append(h.getDatacenter());
            s.append(h.getRack());
            s.append(h.getAddress());
            s.append("]");
        }
        System.out.println("Cassandra Cluster: " + s.toString());

        session = cluster.connect("testdatastaxks");

    } catch (NoHostAvailableException e) {
        e.printStackTrace();
        throw new RuntimeException(e);
    } catch (Exception e) {

    }
}

上記のコードにバッチ書き込みまたは非同期機能を追加する方法について誰か助けてくれませんか..助けてくれてありがとう..

Cassandra 1.2.9 を実行しています

4

2 に答える 2

9

非同期の場合、関数を使用するのと同じくらい簡単executeAsyncです:

...
DatastaxConnection.getSession().executeAsync(query);

バッチの場合、クエリを作成する必要があります (コンパイラは文字列の連結を最適化する方法をよく知っているため、文字列を使用します)。

String cql =  "BEGIN BATCH "
       cql += "INSERT INTO test.prepared (id, col_1) VALUES (?,?); ";
       cql += "INSERT INTO test.prepared (id, col_1) VALUES (?,?); ";
       cql += "APPLY BATCH; "

DatastaxConnection.getInstance();
PreparedStatement prepStatement = DatastaxConnection.getSession().prepare(cql);
prepStatement.setConsistencyLevel(ConsistencyLevel.ONE);        

// this is where you need to be careful
// bind expects a comma separated list of values for all the params (?) above
// so for the above batch we need to supply 4 params:                     
BoundStatement query = prepStatement.bind(userId, "col1_val", userId_2, "col1_val_2");

DatastaxConnection.getSession().execute(query);

余談ですが、属性をマップのリストに変更すると仮定すると、ステートメントのバインディングは次のようになると思います。各マップはバッチ内の更新/挿入を表します。

BoundStatement query = prepStatement.bind(userId,
                                          attributesList.get(0).values().toArray(new Object[attributes.size()]), 
                                          userId_2,
                                          attributesList.get(1).values().toArray(new Object[attributes.size()])); 
于 2013-10-06T07:58:44.373 に答える