私はカサンドラを評価しています。私は datastax ドライバーと CQL を使用しています。
更新ごとに名前が異なる、次の内部構造を持つデータを保存したいと思います。
+-------+-------+-------+-------+-------+-------+
| | name1 | name2 | name3 | ... | nameN |
| time +-------+-------+-------+-------+-------+
| | val1 | val2 | val3 | ... | valN |
+-------+-------+-------+-------|-------+-------+
したがって、時間は列キーであり、名前は行キーである必要があります。このテーブルの作成に使用する CQL ステートメントは次のとおりです。
CREATE TABLE IF NOT EXISTS test.wide (
time varchar,
name varchar,
value varchar,
PRIMARY KEY (time,name))
WITH COMPACT STORAGE
クエリを簡単にするために、スキーマをこのようにしたいと思います。また、65000 行を超える更新を保存する必要がある場合もあります。そのため、cassandra の list/set/map データ型を使用することはできません。
名前/値のペアの数はさまざまですが、大量 (~1000) で、1 秒あたり少なくとも 1000 の幅の広い行の挿入を処理できる必要があります。
問題は次のとおりです。それぞれ 10000 個の名前と値のペアを 1000 行挿入する単純なベンチマークを作成しました。CQL と datastax ドライバーを使用するとパフォーマンスが非常に低下しますが、CQL を使用しないバージョン (astyanax を使用) は同じテスト クラスターで良好なパフォーマンスを示します。
この関連する質問を読みました。この質問の受け入れられた回答では、cassandra 2 で利用可能なバッチ準備ステートメントを使用して、新しいワイド行をアトミックかつ迅速に作成できる必要があることを示唆しています。
そのため、それらを使用してみましたが、それでもパフォーマンスが低下します (ローカルホストで実行されている小さな 3 ノード クラスターで 1 秒あたり 2 回の挿入)。明らかな何かが欠けていますか、それとも低レベルのthrift APIを使用する必要がありますか? astyanax で ColumnListMutation を使用して同じ挿入を実装しましたが、1 秒あたり約 30 回の挿入が行われました。
低レベルのthrift APIを使用する必要がある場合:
それは実際に非推奨ですか、それとも低レベルであるため使用するのが不便ですか?
Thrift API で作成されたテーブルを CQL でクエリできますか?
以下は、scala での自己完結型のコード例です。10000 列の幅の広い行を挿入するためのバッチ ステートメントを作成するだけで、挿入のパフォーマンスを繰り返し測定します。
BatchStatement のオプションと整合性レベルを試してみましたが、パフォーマンスが向上するものは何もありませんでした。
私が持っている唯一の説明は、準備されたステートメントで構成されるバッチにもかかわらず、エントリが行に 1 つずつ追加されるということです。
package cassandra
import com.datastax.driver.core._
object CassandraTestMinimized extends App {
val keyspace = "test"
val table = "wide"
val tableName = s"$keyspace.$table"
def createKeyspace = s"""
CREATE KEYSPACE IF NOT EXISTS ${keyspace}
WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }
"""
def createWideTable = s"""
CREATE TABLE IF NOT EXISTS ${tableName} (
time varchar,
name varchar,
value varchar,
PRIMARY KEY (time,name))
WITH COMPACT STORAGE
"""
def writeTimeNameValue(time: String) = s"""
INSERT INTO ${tableName} (time, name, value)
VALUES ('$time', ?, ?)
"""
val cluster = Cluster.builder.addContactPoints("127.0.0.1").build
val session = cluster.connect()
session.execute(createKeyspace)
session.execute(createWideTable)
for(i<-0 until 1000) {
val entries =
for {
i <- 0 until 10000
name = i.toString
value = name
} yield name -> value
val batchPreparedStatement = writeMap(i, entries)
val t0 = System.nanoTime()
session.execute(batchPreparedStatement)
val dt = System.nanoTime() - t0
println(i + " " + (dt/1.0e9))
}
def writeMap(time: Long, update: Seq[(String, String)]) : BatchStatement = {
val template = session
.prepare(writeTimeNameValue(time.toString))
.setConsistencyLevel(ConsistencyLevel.ONE)
val batch = new BatchStatement(BatchStatement.Type.UNLOGGED)
for ((k, v) <- update)
batch.add(template.bind(k, v))
batch
}
}
これは、本質的に同じことを 15 倍高速に実行する astyanax コード ( astyanax の例を変更したもの) です。これも非同期呼び出しを使用しないため、公正な比較であることに注意してください。これには、列ファミリーが既に存在している必要があります。なぜなら、私は astyanax を使用して列ファミリーを作成する方法をまだ理解しておらず、例には列ファミリーを作成するためのコードが含まれていなかったからです。
package cassandra;
import java.util.Iterator;
import com.netflix.astyanax.ColumnListMutation;
import com.netflix.astyanax.serializers.AsciiSerializer;
import com.netflix.astyanax.serializers.LongSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.netflix.astyanax.AstyanaxContext;
import com.netflix.astyanax.Keyspace;
import com.netflix.astyanax.MutationBatch;
import com.netflix.astyanax.connectionpool.NodeDiscoveryType;
import com.netflix.astyanax.connectionpool.OperationResult;
import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
import com.netflix.astyanax.connectionpool.impl.ConnectionPoolConfigurationImpl;
import com.netflix.astyanax.connectionpool.impl.CountingConnectionPoolMonitor;
import com.netflix.astyanax.impl.AstyanaxConfigurationImpl;
import com.netflix.astyanax.model.Column;
import com.netflix.astyanax.model.ColumnFamily;
import com.netflix.astyanax.model.ColumnList;
import com.netflix.astyanax.thrift.ThriftFamilyFactory;
public class AstClient {
private static final Logger logger = LoggerFactory.getLogger(AstClient.class);
private AstyanaxContext<Keyspace> context;
private Keyspace keyspace;
private ColumnFamily<Long, String> EMP_CF;
private static final String EMP_CF_NAME = "employees2";
public void init() {
logger.debug("init()");
context = new AstyanaxContext.Builder()
.forCluster("Test Cluster")
.forKeyspace("test1")
.withAstyanaxConfiguration(new AstyanaxConfigurationImpl()
.setDiscoveryType(NodeDiscoveryType.RING_DESCRIBE)
)
.withConnectionPoolConfiguration(new ConnectionPoolConfigurationImpl("MyConnectionPool")
.setPort(9160)
.setMaxConnsPerHost(1)
.setSeeds("127.0.0.1:9160")
)
.withAstyanaxConfiguration(new AstyanaxConfigurationImpl()
.setCqlVersion("3.0.0")
.setTargetCassandraVersion("2.0.5"))
.withConnectionPoolMonitor(new CountingConnectionPoolMonitor())
.buildKeyspace(ThriftFamilyFactory.getInstance());
context.start();
keyspace = context.getClient();
EMP_CF = ColumnFamily.newColumnFamily(
EMP_CF_NAME,
LongSerializer.get(),
AsciiSerializer.get());
}
public void insert(long time) {
MutationBatch m = keyspace.prepareMutationBatch();
ColumnListMutation<String> x =
m.withRow(EMP_CF, time);
for(int i=0;i<10000;i++)
x.putColumn(Integer.toString(i), Integer.toString(i));
try {
@SuppressWarnings("unused")
Object result = m.execute();
} catch (ConnectionException e) {
logger.error("failed to write data to C*", e);
throw new RuntimeException("failed to write data to C*", e);
}
logger.debug("insert ok");
}
public void createCF() {
}
public void read(long time) {
OperationResult<ColumnList<String>> result;
try {
result = keyspace.prepareQuery(EMP_CF)
.getKey(time)
.execute();
ColumnList<String> cols = result.getResult();
// process data
// a) iterate over columsn
for (Iterator<Column<String>> i = cols.iterator(); i.hasNext(); ) {
Column<String> c = i.next();
String v = c.getStringValue();
System.out.println(c.getName() + " " + v);
}
} catch (ConnectionException e) {
logger.error("failed to read from C*", e);
throw new RuntimeException("failed to read from C*", e);
}
}
public static void main(String[] args) {
AstClient c = new AstClient();
c.init();
long t00 = System.nanoTime();
for(int i=0;i<1000;i++) {
long t0 = System.nanoTime();
c.insert(i);
long dt = System.nanoTime() - t0;
System.out.println((1.0e9/dt) + " " + i);
}
long dtt = System.nanoTime() - t00;
c.read(0);
System.out.println(dtt / 1e9);
}
}
更新: cassandra-userメーリング リストでこのスレッドを見つけました。大きな幅の行の挿入を行う場合、CQL にパフォーマンス上の問題があるようです。この問題を追跡するためのチケットCASSANDRA-6737があります。
Update2: CASSANDRA-6737 に添付されているパッチを試してみたところ、このパッチで問題が完全に修正されていることが確認できました。これを迅速に修正してくれた DataStax の Sylvain Lebresne に感謝します。