基本的に、サーバー分散アルゴリズムを変更するには、_get _server() メソッドをオーバーライドする必要があります。
インターネットで検索したところ、Google でamix.dk/blog/post/19367という記事を見つけました。これは、Amir Salihefendic によって書かれた非常に優れた資料であり、ketama の一貫したハッシュ アルゴリズムがどのように機能するかを理解するのに大いに役立ちます。であり、彼が作成した HashRing という Python クラスの ketama 実装もあります。
そこで、基本的に彼のクラスを使用し、Memcached クライアントのニーズに合わせて少し変更しました。変更は、廃止された md5 モジュールの変更と、サーバーのキーを生成するために使用される文字列の変更でした。
key = self.gen_key('%s:%s' % (node, i))
に:
key = self.gen_key(
'%s:%s:%s:%s' % (node.address[0],
node.address[1], i, node.weight)
)
また、アルゴリズムが最初のループでサーバーを見つけられなかったときに get_nodes() メソッドで無限ループを引き起こすバグを修正しました。
古い get_nodes() メソッド (サーバーが解放されない場合、無限ループに入ります)。
def get_nodes(self, string_key):
"""Given a string key it returns the nodes as a generator that can hold the key.
The generator is never ending and iterates through the ring
starting at the correct position.
"""
if not self.ring:
yield None, None
node, pos = self.get_node_pos(string_key)
for key in self._sorted_keys[pos:]:
yield self.ring[key]
while True:
for key in self._sorted_keys:
yield self.ring[key]
新しい get_nodes() メソッド:
def get_nodes(self, string_key):
if not self.ring:
yield None, None
node, pos = self.get_node_pos(string_key)
for key in self._sorted_keys[pos:]:
if key in self.ring:
yield self.ring[key]
for key in self._sorted_keys[:pos]:
if key in self.ring:
yield self.ring[key]
add_node() メソッドと remove_node() メソッドに新しい forloop スコープを追加して、レプリカを追加するためのサーバーの重みを考慮しました。
古い方法:
for i in xrange(0, self.replicas):
key = self.gen_key('%s:%s' % (node, i))
self.ring[key] = node
self._sorted_keys.append(key)
新しい方法:
for i in xrange(0, self.replicas):
for x in range(0, node.weight):
key = self.gen_key(
'%s:%s:%s:%s' % (node.address[0],
node.address[1], i, node.weight)
)
if key not in self.ring:
self.ring[key] = node
self._sorted_keys.append(key)
上記のコードは add_node() メソッドに関するものですが、いくつかの考え方が remove_node() に適用されます。
他にもいくつか変更を加えたのかもしれませんが、今のところ思い浮かびません。これは、適切な HashRing クラスです。
from hashlib import md5
class HashRing(object):
def __init__(self, nodes=None, replicas=3):
"""Manages a hash ring.
`nodes` is a list of objects that have a proper __str__ representation.
`replicas` indicates how many virtual points should be used pr. node,
replicas are required to improve the distribution.
"""
self.replicas = replicas
self.ring = dict()
self._sorted_keys = []
if nodes:
for node in nodes:
self.add_node(node)
def add_node(self, node):
"""Adds a `node` to the hash ring (including a number of replicas).
"""
for i in xrange(0, self.replicas):
"""This will ensure that a server with a bigger weight will have
more copies into the ring increasing it's probability to be retrieved.
"""
for x in range(0, node.weight):
key = self.gen_key(
'%s:%s:%s:%s' % (node.address[0],
node.address[1], i, node.weight)
)
if key not in self.ring:
self.ring[key] = node
self._sorted_keys.append(key)
self._sorted_keys.sort()
def remove_node(self, node):
"""Removes `node` from the hash ring and its replicas.
"""
for i in xrange(0, self.replicas):
for x in range(node.weight):
key = self.gen_key(
'%s:%s:%s:%s' % (node.address[0],
node.address[1], i, node.weight)
)
if key in self.ring:
del self.ring[key]
self._sorted_keys.remove(key)
def get_node(self, string_key):
"""
Given a string key a corresponding node in the hash ring is returned.
If the hash ring is empty, `None` is returned.
"""
return self.get_node_pos(string_key)[0]
def get_node_pos(self, string_key):
"""Given a string key a corresponding node in the hash ring is returned
along with it's position in the ring.
If the hash ring is empty, (`None`, `None`) is returned.
"""
if not self.ring:
return None, None
key = self.gen_key(string_key)
nodes = self._sorted_keys
for i in xrange(0, len(nodes)):
node = nodes[i]
if key <= node:
return self.ring[node], i
return self.ring[nodes[0]], 0
def get_nodes(self, string_key):
"""Given a string key it returns the nodes as a generator that can hold
the key.
The generator is never ending and iterates through the ring
starting at the correct position.
"""
if not self.ring:
yield None, None
node, pos = self.get_node_pos(string_key)
for key in self._sorted_keys[pos:]:
if key in self.ring:
yield self.ring[key]
for key in self._sorted_keys[:pos]:
if key in self.ring:
yield self.ring[key]
@staticmethod
def gen_key(key):
"""Given a string key it returns a long value,
this long value represents a place on the hash ring.
md5 is currently used because it mixes well.
"""
m = md5()
m.update(key)
return long(m.hexdigest(), 16)
ketama アルゴリズムまたはデフォルトの modulo をいつ使用するかをより柔軟に決定するために、クラスを少し変更しました。
add_server() メソッドを書いているときに、サーバーをバケット リストに追加するときにサーバーの重みを考慮するのを忘れていることに気付きました。
新しい MemcacheClient は次のようになります。
from consistent_hash import HashRing
class MemcacheClient(memcache.Client):
""" A memcache subclass. It currently allows you to add a new host at run
time.
"""
available_algorithms = ['ketama', 'modulo']
hash_algorithm_index = 0
def __init__(self, hash_algorithm='ketama', *args, **kwargs):
super(MemcacheClient, self).__init__(*args, **kwargs)
if hash_algorithm in self.available_algorithms:
self.hash_algorithm_index = self.available_algorithms.index(
hash_algorithm)
if hash_algorithm == 'ketama':
self.consistent_hash_manager = HashRing(nodes=self.servers)
else:
self.consistent_hash_manager = None
else:
raise Exception(
"The algorithm \"%s\" is not implemented for this client. The "
"options are \"%s\""
"" % (hash_algorithm, " or ".join(self.available_algorithms))
)
def _get_server(self, key):
""" Returns the most likely server to hold the key
"""
if self.hash_algorithm == 'ketama':
""" Basic concept of the Implementation of ketama algorithm
e.g. ring = {100:server1, 110:server2, 120:server3, 140:server4}
If the hash of the current key is 105, it server will be the next
bigger integer in the ring which is 110 (server2)
If a server is added on position 108 the key will be now allocated
to it and not to server 110. Otherwise if the server on position
110 is removed the key will now belong to de server 120.
If there's no bigger integer position in the ring then the hash of
the key, it will take the first server from the ring.
"""
# The variable "servers" is the list of the servers in the ring
# starting from the next bigger integer to the hash of the key,
# till it finds the one that holds the key
servers_generator = self.consistent_hash_manager.get_nodes(key)
for server in servers_generator:
if server.connect():
#print server.address[1]
return server, key
return None, None
else:
return super(MemcacheClient, self)._get_server(key)
def add_server(self, server):
""" Adds a host at runtime to client
"""
# Uncomment this to protect the Client from adding a server in case
# there's no reliable consistent hash algorithm such as MODULO
"""
if not self.consistent_hash_manager:
raise Exception("The current consistent hash algorithm (\"%s\") is"
" not reliable for adding a new server"
"" % self.hash_algorithm)
"""
# Create a new host entry
server = memcache._Host(
server, self.debug, dead_retry=self.dead_retry,
socket_timeout=self.socket_timeout,
flush_on_reconnect=self.flush_on_reconnect
)
# Add this to our server choices
self.servers.append(server)
"""This for statement will ensure that a server with a bigger weight
will have more copies into the buckets increasing it's probability to
be retrieved.
"""
for i in range(server.weight):
self.buckets.append(server)
# Adds this node to the circle
if self.consistent_hash_manager:
self.consistent_hash_manager.add_node(server)
def random_key(size):
""" Generates a random key
"""
return ''.join(random.choice(string.letters) for _ in range(size))
def run_consistent_hash_test(client_obj):
# We have 500 keys to split across our servers
keys = [random_key(100) for i in range(500)]
print(
"\n/////////// CONSISTENT HASH ALGORITHM \"%s\" //////////////"
"" % client_obj.hash_algorithm.upper()
)
print("\n->These are the %s servers:" % len(client_obj.servers))
str_servers = ""
for server in client_obj.servers:
str_servers += "%s:%s, " % (server.address[0], server.address[1])
print("******************************************************************")
print(str_servers)
print("******************************************************************")
# Clear all previous keys from memcache
client_obj.flush_all()
# Distribute the keys over the servers
for key in keys:
client_obj.set(key, 1)
print(
"\n%d keys distributed for %d server(s)\n"
"" % (len(keys), len(client_obj.servers))
)
# Check how many keys come back
valid_keys = client_obj.get_multi(keys)
print(
"%s percent of keys matched, before adding extra servers.\n" \
"" %((len(valid_keys) / float(len(keys))) * 100)
)
# Add 5 new extra servers
interval_extra_servers = range(19, 24)
extra_servers = ['127.0.0.1:112%d' % i for i in interval_extra_servers]
for server in extra_servers:
client_obj.add_server(server)
# Check how many keys come back after adding the extra servers
valid_keys = client_obj.get_multi(keys)
print (
"Added %d new server(s).\n%s percent of keys still matched" \
"" % (len(interval_extra_servers),
(len(valid_keys) / float(len(keys))) * 100)
)
print("\n***************************************************************"
"****\n")
if __name__ == '__main__':
# We have 8 running memcached servers
interval_servers = range(11, 19)
servers = ['127.0.0.1:112%d' % i for i in interval_servers]
"""
Init our subclass. The hash_algorithm paramether can be "modulo"<-
(default) or "ketama" (the new one).
"""
client = MemcacheClient(servers=servers, hash_algorithm='ketama')
run_consistent_hash_test(client)
このクラスを端末で直接実行すると、適切な出力が表示されます