4

Cassandra が推奨する RandomPartitioner (または Murmur3Partitioner)を使用する場合、キーの md5 ハッシュを使用して行がクラスター全体に分散されるため、キーに対して意味のある範囲クエリを実行することはできません。これらのハッシュは「トークン」と呼ばれます。

それにもかかわらず、それぞれにトークンの範囲を割り当てることによって、多くのコンピューティング ワーカー間で大きなテーブルを分割することは非常に便利です。CQL3 を使用すると、トークンに対して直接クエリを発行できるように見えますが、次の pythonは機能しません... 編集: cassandra データベースの最新バージョンに対するテストに切り替えた後に機能し (doh!)、構文も更新します以下のメモ:

## use python cql module
import cql

## If running against an old version of Cassandra, this raises: 
## TApplicationException: Invalid method name: 'set_cql_version'
conn = cql.connect('localhost', cql_version='3.0.2')

cursor = conn.cursor()

try:
    ## remove the previous attempt to make this work
    cursor.execute('DROP KEYSPACE test;')
except Exception, exc:
    print exc

## make a keyspace and a simple table
cursor.execute("CREATE KEYSPACE test WITH strategy_class = 'SimpleStrategy' AND strategy_options:replication_factor = 1;")
cursor.execute("USE test;")
cursor.execute('CREATE TABLE data (k int PRIMARY KEY, v varchar);')

## put some data in the table -- must use single quotes around literals, not double quotes                                                                                                                                   
cursor.execute("INSERT INTO data (k, v) VALUES (0, 'a');")
cursor.execute("INSERT INTO data (k, v) VALUES (1, 'b');")
cursor.execute("INSERT INTO data (k, v) VALUES (2, 'c');")
cursor.execute("INSERT INTO data (k, v) VALUES (3, 'd');")

## split up the full range of tokens.
## Suppose there are 2**k workers:
k = 3 # --> eight workers
token_sub_range = 2**(127 - k)
worker_num = 2 # for example
start_token =    worker_num  * token_sub_range
end_token = (1 + worker_num) * token_sub_range

## put single quotes around the token strings
cql3_command = "SELECT k, v FROM data WHERE token(k) >= '%d' AND token(k) < '%d';" % (start_token, end_token)
print cql3_command

## this fails with "ProgrammingError: Bad Request: line 1:28 no viable alternative at input 'token'"
cursor.execute(cql3_command)

for row in cursor:
    print row

cursor.close()
conn.close()

私はよりpythonicなインターフェースを好むので、理想的にはこれをpycassaで動作させたいと思っています。

これを行うより良い方法はありますか?

4

2 に答える 2

1

回答を含むように質問を更新しました。

于 2013-04-22T22:42:56.427 に答える
0

これは CQL3 ではありませんが、Thrift インターフェイスを直接使用して、localhost が所有するすべての (pickle された) データを読み取る単純なプログラムを次に示します。これを使用して、Cassandra をバックエンドとして単純なマップ/リデュース エンジンを構築できます。すべてのノードは、このようなものを実行して、それ自体に属するデータに対して map() を実行するため、データ取得のためのネットワーク オーバーヘッドは発生しません。結果は、別のノードの reduce() フェーズに戻されます。

明らかに、これは Cassandra1.2+ の vnode ではうまく機能しません。現在、ローカル データの小さなサブセットに対する map() と vnode のサポートを可能にするインデックス作成アプローチを使用しています。

#!/usr/bin/env python2.7

import sys
import socket
import cPickle as pickle
from thrift import Thrift
from thrift.transport import TTransport
from thrift.transport import TSocket
from pycassa.cassandra import Cassandra
from pycassa.cassandra.ttypes import *
import time
import pprint

def main():
    jobname = sys.argv[1]
    pp = pprint.PrettyPrinter(indent=2)

    (client, transport) = connect("localhost")

    # Determine local IP address
    ip = socket.gethostbyname(socket.gethostname())

    # Set up query
    keyspace = "data"
    column_parent = ColumnParent(column_family=foo)

    try:
        # Find range of tokens for which this node is first replica
        for tokenrange in client.describe_ring(keyspace):
            if tokenrange.endpoints[0] == ip:
                start_token=tokenrange.start_token
                end_token=tokenrange.end_token
                break

        # Set kesypace
        client.set_keyspace(keyspace)

        # Query for all data owned by this node
        slice_range = SliceRange(start="", finish="")
        predicate = SlicePredicate(slice_range=slice_range)
        keyrange = KeyRange(start_token=start_token, end_token=end_token, count=10000)
        t0 = time.time()
        ptime = 0
        keycount = 0
        start=""
        for keyslice in client.get_range_slices(column_parent, predicate, keyrange, ConsistencyLevel.ONE):
            keycount += 1
            for col in keyslice.columns:
                pt0 = time.time()
                data = pickle.loads(col.column.value)
                ptime += time.time() - pt0
    except Thrift.TException, tx:
        print 'Thrift: %s' % tx.message
    finally:
        disconnect(transport)

    t1 = time.time() - t0
    print "Read data for %d tasks in: %.2gs" %(keycount, t1)
    print "Job unpickling time: %.2gs" %ptime
    print "Unpickling percentage: %.2f%%" %(ptime/t1*100)

def connect(host):
    """ 
    Connect to cassandra instance on given host.
    Returns: Cassandra.Client object
    """
    socket = TSocket.TSocket(host, 9160)
    transport = TTransport.TFramedTransport(socket)
    protocol = TBinaryProtocol.TBinaryProtocolAccelerated(transport)
    transport.open()
    client = Cassandra.Client(protocol) 
    return (client, transport)

def disconnect(transport):
    """ 
    Disconnect from cassandra instance
    """
    transport.close()

if __name__ == '__main__':
    main()
于 2013-04-22T23:24:51.110 に答える