1

keyspace1.tableA で更新または削除が行われると、トリガーが keyspace1.tableB に行を追加するように、Cassandra トリガーを実装しようとしています。

tableB の列の名前は、tableA の列とはまったく異なります。

私は Cassandra 2.1 を使用しています。より新しいバージョンに移行するオプションはありません。https://github.com/apache/cassandra/blob/cassandra-2.1/examples/triggers/src/org/apache/cassandra/triggers/InvertedIndex.javaで InvertedIndex トリガーの例を見ると、突然変異:

InvertedIndex の例から:

    for (Cell cell : update)
    {
        // Skip the row marker and other empty values, since they lead to an empty key.
        if (cell.value().remaining() > 0)
        {
            Mutation mutation = new Mutation(properties.getProperty("keyspace"), cell.value());
            mutation.add(properties.getProperty("columnfamily"), cell.name(), key, System.currentTimeMillis());
            mutations.add(mutation);
        }
    }

問題は、この例では、mutation.add に渡されるセル名が cell.name() であることです。これは、その関数を使用して名前を取得できる既存のオブジェクトです。

今のところ、tableA に変更が加えられた時刻を保存しようとしているだけなので、tableB には 2 つの列があります。

  • changetime timeuuid
  • 操作テキスト

changetime と操作を実行して tableB に行を追加するミューテーションを追加する必要があります。Cassandra 2.1.12 でこのような行の変更を追加するにはどうすればよいですか?

私はこれを試しましたが、トリガーでヌルポインター例外が発生しています:

...
String keycol = "changetime";
ByteBuffer uuidKey = ByteBuffer.wrap(UUIDGen.getTimeUUIDBytes()); 
ColumnIdentifier ci = new ColumnIdentifier(keycol, false);
CellName cn = CellNames.simpleSparse(ci);
mutation = new Mutation(keyspace, uuidKey);
mutation.add(tableName,cn, uuidKey, System.currentTimeMillis());
...

どんな支援も大歓迎です-私はCassandraの内部についての知識がないので、詳細は多すぎる情報ではありません.

4

1 に答える 1

1

答えは、CFMetaData コンパレータを使用して Mutation.add(...) が必要とする CellName を作成することです。特定の例を提供するために、 https: //github.com/apache/cassandra/tree/cassandra-3.0/examples/triggers で入手可能な Cassandra 3.0 AuditTrigger のスキーマと例を使用します。

この場合、書き込み先のテーブルは、次のように定義された test.audit テーブルです。

CREATE TABLE test.audit (key timeuuid, keyspace_name text,
    table_name text, primary_key text, PRIMARY KEY(key));

このテーブルには、"key" という名前のパーティション キーがあり、クラスタリング列はありません。定義については、 https://cassandra.apache.org/doc/cql3/CQL.html#createTableStmtのセクション「パーティション キーとクラスタリング列」を参照してください。

makeCellName の呼び出し (次のサンプル コードで確認します) は引数の変数リストを使用するため、これは重要な注意事項です。各引数は、影響を受ける行に対して対応するクラスタリング列に取得させたい値です。 、最後の引数はテキスト形式の列の名前です。

クラスタリング列がない場合 (このスキーマの場合のように)、makeCellName の呼び出しは単一の引数 (列の名前) を取ります。

これらすべてをまとめると、3.0 の例と同じことを行う Cassandra 2.1 の AuditTrigger 関数は次のようになります。

public Collection<Mutation> augment(ByteBuffer key, ColumnFamily update)
{
    CFMetaData cfm = update.metadata();

    List<Mutation> mutations = new ArrayList<>(update.getColumnCount());

    String keyspaceName = "";
    String tableName    = "";
    String keyStr       = "";

    keyspaceName = cfm.ksName;
    tableName = cfm.cfName;

    try {
        keyStr = ByteBufferUtil.string(key);
    } catch (CharacterCodingException e) {
        StringWriter errors = new StringWriter();
        e.printStackTrace(new PrintWriter(errors));
        logger.error(errors.toString());
    }

    for (Cell cell : update)
    {
        // Skip the row marker and other empty values, since they lead to an empty key.
        if (cell.value().remaining() > 0)
        {
            CFMetaData other = Schema.instance.getCFMetaData("test","audit");
            CellNameType cnt = other.comparator;

            ByteBuffer auditkey = UUIDType.instance.decompose(UUIDGen.getTimeUUID());

            // create CellName objects for each of the columns in the audit table row we are inserting
            CellName primaryKeyCellName = cnt.makeCellName("primary_key");
            CellName keyspaceCellName = cnt.makeCellName("keyspace_name");
            CellName tableCellName = cnt.makeCellName("table_name");

            try {
                // put the values we want to write to the audit table into ByteBuffer objects
                ByteBuffer ksvalbb,tablevalbb,keyvalbb;
                ksvalbb=ByteBuffer.wrap(keyspaceName.getBytes("UTF8"));
                tablevalbb=ByteBuffer.wrap(tableName.getBytes("UTF8"));
                keyvalbb=ByteBuffer.wrap(keyStr.getBytes("UTF8"));

                // create the mutation object
                Mutation mutation = new Mutation(keyspaceName, auditkey);

                // get the time which will be needed for the call to mutation.add
                long mutationTime=System.currentTimeMillis();

                // add each of the column values to the mutation
                mutation.add("audit", primaryKeyCellName, keyvalbb,  mutationTime);
                mutation.add("audit", keyspaceCellName,  ksvalbb,  mutationTime);
                mutation.add("audit", tableCellName, tablevalbb,  mutationTime);

                mutations.add(mutation);
            } catch (UnsupportedEncodingException e) {
                StringWriter errors = new StringWriter();
                e.printStackTrace(new PrintWriter(errors));
                logger.error(errors.toString());
            }
        }
    }
    return mutations;
} 
于 2016-02-19T18:41:24.480 に答える