3

列に TTL (time-to-live) プロパティがある場合、最終的にそれを含む空の行と一緒に cassandra から完全に削除されることをテストしたいと思います。

私が理解したように、この振る舞いをテストするためのアルゴリズムは

  • オブジェクトを保存するときは、列の TTL を設定します
  • TTL時間が経過するのを待ち、戻り値がnullであることを確認する
  • GC_GRACE_SECONDS ペリオンが通過するまで待つ
  • 行も削除されることを確認します

そして、最後の項目を確認できませんでした。

私が発見したように(たとえば、ここまたはここおよび他の場所で)、圧縮を実行する必要があります。同様の質問が提起されています (例: Hector (Cassandra) Delete Anomaly ) が、役立つものは何も見つかりませんでした。

問題は、統合テストから (hector を使用して) 圧縮を強制して、期待どおりに動作することを確認するにはどうすればよいかということです。または、これを行う他の方法はありますか?

PS列ファミリーの切り捨てはオプションではありません。


詳細はこちら。

私のテスト:

private static final String KEYSPACE = "KEYSPACE";
private static final String COLUMN_FAMILY = "COLUMN_FAMILY";

private static final int GC_CRACE_SECONDS = 5;

// sut
private CassandraService cassandraService;

// dependencies
private Cluster cluster = HFactory.getOrCreateCluster("tstCltr", 
    "localhost:9160");

private Keyspace keyspace;

@BeforeClass
public static void setupBeforeClass() {
    EmbeddedCassandraDaemon.getEmbeddedCassandraDaemon();
}

@Before
public void setUp() throws Exception {
    keyspace = createKeyspace(KEYSPACE, cluster, 
        new QuorumAllConsistencyLevelPolicy());
    cassandraService = new CassandraService(cluster, KEYSPACE, 
        COLUMN_FAMILY, GC_CRACE_SECONDS);
}

@Test
public void rowGetsRemovedAfterGCGraceSeconds() throws Exception {
    Object obj = "OBJECT";
    String rowKey = "key";
    String columnName = "columnName";
    logger.info("before persisting rows count is {}" + countRows());

    cassandraService.persistObjectWithTtl(rowKey, columnName, obj, 5);

    logger.info("after persisting rows count is {}" + countRows());

    Object value = retrieve(rowKey, columnName);
    assertNotNull(value);

    logger.info("before TTL passes rows count is {}" + countRows());

    TimeUnit.SECONDS.sleep(6);

    Object nullValue = retrieve(rowKey, columnName);
    assertNull(nullValue);

    logger.info("after TTL passes rows count is {}" + countRows());

    TimeUnit.SECONDS.sleep(10);

    logger.info("wait 10 more seconds... rows count is {}" + countRows());
    System.out.println("================================" + countRows());

    TimeUnit.SECONDS.sleep(120);

    int countRows = countRows();
    logger.info("wait 2 more minutes... rows count is {}" + countRows);
    assertEquals(0, countRows);
}

永続化するためのコード:

public void persistObjectWithTtl(Object rowKey, Object columnName, 
        Object obj, int ttl) {
    LOGGER.debug("Persist {} / {}", rowKey, columnName);
    HColumn<Object, Object> column = createColumn(columnName, obj, 
            SERIALIZER, SERIALIZER);
    column.setTtl(ttl);
    executeInsertion(rowKey, column);
}

private void executeInsertion(Object rowKey, HColumn<Object, Object> column) {
    Mutator<Object> mutator = createMutator(keyspace, SERIALIZER);
    mutator.addInsertion(rowKey, this.columnFamilyName, column);
    mutator.execute();
}

列ファミリーの GcGraceSeconds の設定:

private void addColumnFamily(String keySpaceName, String columnFamilyName, 
            int gcGraceSeconds) {
    ColumnFamilyDefinition columnFamilyDefinition = 
        createColumnFamilyDefinition(keySpaceName, columnFamilyName);

    ThriftCfDef columnFamilyWithGCGraceSeconds = 
        new ThriftCfDef(columnFamilyDefinition);
    columnFamilyWithGCGraceSeconds.setGcGraceSeconds(gcGraceSeconds);

    cluster.addColumnFamily(columnFamilyWithGCGraceSeconds);
}

そして、SO にある行をカウントするためのコード:

public int countRows() {
    int rowCount = 100;

    ObjectSerializer serializer = ObjectSerializer.get();
    RangeSlicesQuery<Object, Object, Object> rangeSlicesQuery =
            HFactory.createRangeSlicesQuery(keyspace, serializer, 
                serializer, serializer)
                    .setColumnFamily(COLUMN_FAMILY)
                    .setRange(null, null, false, 10)
                    .setRowCount(rowCount);

    Object lastKey = null;

    int i = 0;
    while (true) {
        rangeSlicesQuery.setKeys(lastKey, null);

        QueryResult<OrderedRows<Object, Object, Object>> result = 
            rangeSlicesQuery.execute();
        OrderedRows<Object, Object, Object> rows = result.get();
        Iterator<Row<Object, Object, Object>> rowsIterator = rows.iterator();

        if (lastKey != null && rowsIterator != null) {
            rowsIterator.next();
        }

        while (rowsIterator.hasNext()) {
            Row<Object, Object, Object> row = rowsIterator.next();
            lastKey = row.getKey();
            i++;

            if (row.getColumnSlice().getColumns().isEmpty()) {
                continue;
            }
        }

        if (rows.getCount() < rowCount) {
            break;
        }

    }

    return i;
}

ありがとう。


アップデート:

その理由は、圧縮を実行するのに十分なデータ量がなかったため、より多くのデータを配置し、より頻繁にテーブルをディスクにフラッシュする必要があったためです。したがって、次のテストケースになりました。

@Test
public void rowGetsRemovedAfterGCGraceSeconds() throws Exception {
    final int expectedAmount = 50000;

    logger.info("before persisting rows count is {}", countRows());

    for (int i = 0; i < expectedAmount; i++) {
        String rowKey = RandomStringUtils.randomAlphanumeric(128);
        Object obj = RandomStringUtils.randomAlphanumeric(1000);
        cassandraService.persistObjectWithTtl(rowKey, COLUMN_NAME, obj, 20);

        if (i % 100 == 0) {
            StorageService.instance.forceTableFlush(KEYSPACE, COLUMN_FAMILY);
        }
    }

    logger.info("causing major compaction...");
    StorageService.instance.forceTableCompaction(KEYSPACE, COLUMN_FAMILY);
    logger.info("after major compaction rows count is {}", countRows());

    waitAtMost(Duration.TWO_MINUTES)
        .pollDelay(Duration.TWO_SECONDS)
        .pollInterval(Duration.ONE_HUNDRED_MILLISECONDS)
        .until(new Callable<Boolean>() {
            @Override
            public Boolean call() throws Exception {
                int countRows = countRows();
                logger.info("the rows count is {}", countRows);
                return countRows < expectedAmount;
            }
        });
}

完全なコード:クラスと sut のテスト

4

1 に答える 1