7

私はカサンドラを評価しています。私は datastax ドライバーと CQL を使用しています。

更新ごとに名前が異なる、次の内部構造を持つデータを保存したいと思います。

+-------+-------+-------+-------+-------+-------+
|       | name1 | name2 | name3 | ...   | nameN |
| time  +-------+-------+-------+-------+-------+
|       | val1  | val2  | val3  | ...   | valN  |
+-------+-------+-------+-------|-------+-------+

したがって、時間は列キーであり、名前は行キーである必要があります。このテーブルの作成に使用する CQL ステートメントは次のとおりです。

CREATE TABLE IF NOT EXISTS test.wide (
  time varchar,
  name varchar,
  value varchar,
  PRIMARY KEY (time,name))
  WITH COMPACT STORAGE

クエリを簡単にするために、スキーマをこのようにしたいと思います。また、65000 行を超える更新を保存する必要がある場合もあります。そのため、cassandra の list/set/map データ型を使用することはできません。

名前/値のペアの数はさまざまですが、大量 (~1000) で、1 秒あたり少なくとも 1000 の幅の広い行の挿入を処理できる必要があります。

問題は次のとおりです。それぞれ 10000 個の名前と値のペアを 1000 行挿入する単純なベンチマークを作成しました。CQL と datastax ドライバーを使用するとパフォーマンスが非常に低下しますが、CQL を使用しないバージョン (astyanax を使用) は同じテスト クラスターで良好なパフォーマンスを示します。

この関連する質問を読みました。この質問の受け入れられた回答では、cassandra 2 で利用可能なバッチ準備ステートメントを使用して、新しいワイド行をアトミックかつ迅速に作成できる必要があることを示唆しています。

そのため、それらを使用してみましたが、それでもパフォーマンスが低下します (ローカルホストで実行されている小さな 3 ノード クラスターで 1 秒あたり 2 回の挿入)。明らかな何かが欠けていますか、それとも低レベルのthrift APIを使用する必要がありますか? astyanax で ColumnListMutation を使用して同じ挿入を実装しましたが、1 秒あたり約 30 回の挿入が行われました。

低レベルのthrift APIを使用する必要がある場合:

  • それは実際に非推奨ですか、それとも低レベルであるため使用するのが不便ですか?

  • Thrift API で作成されたテーブルを CQL でクエリできますか?

以下は、scala での自己完結型のコード例です。10000 列の幅の広い行を挿入するためのバッチ ステートメントを作成するだけで、挿入のパフォーマンスを繰り返し測定します。

BatchStatement のオプションと整合性レベルを試してみましたが、パフォーマンスが向上するものは何もありませんでした。

私が持っている唯一の説明は、準備されたステートメントで構成されるバッチにもかかわらず、エントリが行に 1 つずつ追加されるということです。


package cassandra

import com.datastax.driver.core._

object CassandraTestMinimized extends App {

  val keyspace = "test"
  val table = "wide"
  val tableName = s"$keyspace.$table"

  def createKeyspace = s"""
CREATE KEYSPACE IF NOT EXISTS ${keyspace}
WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }
"""

  def createWideTable = s"""
CREATE TABLE IF NOT EXISTS ${tableName} (
time varchar,
name varchar,
value varchar,
PRIMARY KEY (time,name))
WITH COMPACT STORAGE
"""

  def writeTimeNameValue(time: String) = s"""
INSERT INTO ${tableName} (time, name, value)
VALUES ('$time', ?, ?)
"""

  val cluster = Cluster.builder.addContactPoints("127.0.0.1").build
  val session = cluster.connect()

  session.execute(createKeyspace)
  session.execute(createWideTable)

  for(i<-0 until 1000) {
    val entries =
      for {
        i <- 0 until 10000
        name = i.toString
        value = name
      } yield name -> value
    val batchPreparedStatement = writeMap(i, entries)
    val t0 = System.nanoTime()
    session.execute(batchPreparedStatement)
    val dt = System.nanoTime() - t0
    println(i + " " + (dt/1.0e9))
  }

  def writeMap(time: Long, update: Seq[(String, String)]) : BatchStatement = {
    val template = session
      .prepare(writeTimeNameValue(time.toString))
      .setConsistencyLevel(ConsistencyLevel.ONE)
    val batch = new BatchStatement(BatchStatement.Type.UNLOGGED)
    for ((k, v) <- update)
      batch.add(template.bind(k, v))
    batch
  }
}

これは、本質的に同じことを 15 倍高速に実行する astyanax コード ( astyanax の例を変更したもの) です。これも非同期呼び出しを使用しないため、公正な比較であることに注意してください。これには、列ファミリーが既に存在している必要があります。なぜなら、私は astyanax を使用して列ファミリーを作成する方法をまだ理解しておらず、例には列ファミリーを作成するためのコードが含まれていなかったからです。

package cassandra;

import java.util.Iterator;

import com.netflix.astyanax.ColumnListMutation;
import com.netflix.astyanax.serializers.AsciiSerializer;
import com.netflix.astyanax.serializers.LongSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.netflix.astyanax.AstyanaxContext;
import com.netflix.astyanax.Keyspace;
import com.netflix.astyanax.MutationBatch;
import com.netflix.astyanax.connectionpool.NodeDiscoveryType;
import com.netflix.astyanax.connectionpool.OperationResult;
import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
import com.netflix.astyanax.connectionpool.impl.ConnectionPoolConfigurationImpl;
import com.netflix.astyanax.connectionpool.impl.CountingConnectionPoolMonitor;
import com.netflix.astyanax.impl.AstyanaxConfigurationImpl;
import com.netflix.astyanax.model.Column;
import com.netflix.astyanax.model.ColumnFamily;
import com.netflix.astyanax.model.ColumnList;
import com.netflix.astyanax.thrift.ThriftFamilyFactory;

public class AstClient {
    private static final Logger logger = LoggerFactory.getLogger(AstClient.class);

    private AstyanaxContext<Keyspace> context;
    private Keyspace keyspace;
    private ColumnFamily<Long, String> EMP_CF;
    private static final String EMP_CF_NAME = "employees2";

    public void init() {
        logger.debug("init()");

        context = new AstyanaxContext.Builder()
                .forCluster("Test Cluster")
                .forKeyspace("test1")
                .withAstyanaxConfiguration(new AstyanaxConfigurationImpl()
                        .setDiscoveryType(NodeDiscoveryType.RING_DESCRIBE)
                )
                .withConnectionPoolConfiguration(new ConnectionPoolConfigurationImpl("MyConnectionPool")
                        .setPort(9160)
                        .setMaxConnsPerHost(1)
                        .setSeeds("127.0.0.1:9160")
                )
                .withAstyanaxConfiguration(new AstyanaxConfigurationImpl()
                        .setCqlVersion("3.0.0")
                        .setTargetCassandraVersion("2.0.5"))
                .withConnectionPoolMonitor(new CountingConnectionPoolMonitor())
                .buildKeyspace(ThriftFamilyFactory.getInstance());

        context.start();
        keyspace = context.getClient();

        EMP_CF = ColumnFamily.newColumnFamily(
                EMP_CF_NAME,
                LongSerializer.get(),
                AsciiSerializer.get());
    }

    public void insert(long time) {
        MutationBatch m = keyspace.prepareMutationBatch();

        ColumnListMutation<String> x =
                m.withRow(EMP_CF, time);
        for(int i=0;i<10000;i++)
            x.putColumn(Integer.toString(i), Integer.toString(i));

        try {
            @SuppressWarnings("unused")
            Object result = m.execute();
        } catch (ConnectionException e) {
            logger.error("failed to write data to C*", e);
            throw new RuntimeException("failed to write data to C*", e);
        }
        logger.debug("insert ok");
    }

    public void createCF() {
    }

    public void read(long time) {
        OperationResult<ColumnList<String>> result;
        try {
            result = keyspace.prepareQuery(EMP_CF)
                    .getKey(time)
                    .execute();

            ColumnList<String> cols = result.getResult();
            // process data

            // a) iterate over columsn
            for (Iterator<Column<String>> i = cols.iterator(); i.hasNext(); ) {
                Column<String> c = i.next();
                String v = c.getStringValue();
                System.out.println(c.getName() + " " + v);
            }

        } catch (ConnectionException e) {
            logger.error("failed to read from C*", e);
            throw new RuntimeException("failed to read from C*", e);
        }
    }

    public static void main(String[] args) {
        AstClient c = new AstClient();
        c.init();
        long t00 = System.nanoTime();
        for(int i=0;i<1000;i++) {
            long t0 = System.nanoTime();
            c.insert(i);
            long dt = System.nanoTime() - t0;
            System.out.println((1.0e9/dt) + " " + i);
        }
        long dtt = System.nanoTime() - t00;

        c.read(0);
        System.out.println(dtt / 1e9);
    }

}

更新: cassandra-userメーリング リストでこのスレッドを見つけました。大きな幅の行の挿入を行う場合、CQL にパフォーマンス上の問題があるようです。この問題を追跡するためのチケットCASSANDRA-6737があります。

Update2: CASSANDRA-6737 に添付されているパッチを試してみたところ、このパッチで問題が完全に修正されていることが確認できました。これを迅速に修正してくれた DataStax の Sylvain Lebresne に感謝します。

4

3 に答える 3

5

これを経験するのはあなただけではありません。私は少し前に CQL と倹約の間の変換に重点を置いたブログ投稿を書きましたが、同じことを見ている人々のメーリング リストの問題へのリンクがあります (幅の広い行の挿入のパフォーマンスの問題は、調査の最初の動機でした): http: //thelastpickle.com/blog/2013/09/13/CQL3-to-Astyanax-Compatibility.html

要するに、CQL は、Cassandra を初めて使用する人にとって、タイピングの負担を軽減し、データ モデルを理解するのに最適です。DataStax ドライバーは適切に作成されており、多くの便利な機能が含まれています。

ただし、幅の広い行の挿入では、Thrift API の方がわずかに高速です。Netflix のブログでは、この特定のユース ケースにはあまり触れていません。さらに、Thrift API は、人々が使用している限りレガシーではありません (多くの人が使用しています)。これは ASF プロジェクトであり、単一のベンダーによって運営されているわけではありません。

一般に、どの Cassandra ベースのアプリケーションでも、ワークロードのパフォーマンス要件を満たす (またはしばしば超える) 何かを行う方法を見つけた場合は、それを使い続けてください。

于 2014-02-19T15:25:36.163 に答える
2

あなたが試すことができるいくつかのこと...あなたのcassandra.yaml(これはCassandra 1.2.xです。2.xではparamsの呼び出しが多少異なる可能性があります):

  • 行キャッシュを無効にする ( row_cache_size_in_mb: 0)
  • メモリー内の行がディスクに溢れ出る前にメモリー制限を増やします ( min_memory_compaction_limit_in_mb)。こぼれが発生したことを示すログ出力が表示された場合にのみ、これを行ってください。
  • 行がノード全体に分散されるようにnum_tokens/値が適切に構成されていることを確認してくださいinitial_token

あなたが試すことができる他のこと:

  • 1 つだけでなく、クラスター内のすべてのノード IP をクライアントに提供する
  • 各 Cassandra ノードにより多くの RAM を提供する
  • テストをマルチスレッドで実行してみてください
  • Linux で Cassandra を実行する場合は、JNA がインストールされ、使用されていることを確認してください。

明確にすること:

  • nodetool3つのノードがお互いを見つけたことを確認しましたか?
  • nodetool3 つのノードの負荷分散について教えてください。
  • 仮想クラスターの物理ホストは、CPU と I/O の使用率について何と言っていますか? 多分それは単に限界に達したのでしょうか?
于 2014-02-14T15:48:29.107 に答える