Python:「含まれているバッテリー」
RabbitMQ、Redis、RDBMSなどのデータストアを探すのではなく、Pythonといくつかのライブラリがこの問題を解決するのに十分すぎると思います。この日曜大工のアプローチが車輪の再発明であると不満を言う人もいるかもしれませんが、私は別のデータストアを管理するよりも100行のPythonコードを実行することを好みます。
優先キューの実装
定義する操作(追加、取得、昇格、降格)は、優先キューを記述します。残念ながら、Pythonには組み込みの優先キューデータ型がありません。ただし、heapqと呼ばれるヒープライブラリがあり、優先キューはヒープとして実装されることがよくあります。要件を満たす優先キューの実装は次のとおりです。
class PQueue:
"""
Implements a priority queue with append, take, promote, and demote
operations.
"""
def __init__(self):
"""
Initialize empty priority queue.
self.toll is max(priority) and max(rowid) in the queue
self.heap is the heap maintained for take command
self.rows is a mapping from rowid to items
self.pris is a mapping from priority to items
"""
self.toll = 0
self.heap = list()
self.rows = dict()
self.pris = dict()
def append(self, value):
"""
Append value to our priority queue.
The new value is added with lowest priority as an item. Items are
threeple lists consisting of [priority, rowid, value]. The rowid
is used by the promote/demote commands.
Returns the new rowid corresponding to the new item.
"""
self.toll += 1
item = [self.toll, self.toll, value]
self.heap.append(item)
self.rows[self.toll] = item
self.pris[self.toll] = item
return self.toll
def take(self):
"""
Take the highest priority item out of the queue.
Returns the value of the item.
"""
item = heapq.heappop(self.heap)
del self.pris[item[0]]
del self.rows[item[1]]
return item[2]
def promote(self, rowid):
"""
Promote an item in the queue.
The promoted item swaps position with the next highest item.
Returns the number of affected rows.
"""
if rowid not in self.rows: return 0
item = self.rows[rowid]
item_pri, item_row, item_val = item
next = item_pri - 1
if next in self.pris:
iota = self.pris[next]
iota_pri, iota_row, iota_val = iota
iota[1], iota[2] = item_row, item_val
item[1], item[2] = iota_row, iota_val
self.rows[item_row] = iota
self.rows[iota_row] = item
return 2
return 0
demoteコマンドはpromoteコマンドとほぼ同じなので、簡潔にするために省略します。これは、Pythonのリスト、dict、およびheapqライブラリにのみ依存することに注意してください。
優先キューの提供
ここで、PQueueデータ型を使用して、インスタンスとの分散相互作用を許可したいと思います。このための優れたライブラリはgeventです。geventは比較的新しく、まだベータ版ですが、驚くほど高速で、十分にテストされています。geventを使用すると、localhost:4040をリッスンするソケットサーバーを非常に簡単にセットアップできます。これが私のサーバーコードです:
pqueue = PQueue()
def pqueue_server(sock, addr):
text = sock.recv(1024)
cmds = text.split(' ')
if cmds[0] == 'append':
result = pqueue.append(cmds[1])
elif cmds[0] == 'take':
result = pqueue.take()
elif cmds[0] == 'promote':
result = pqueue.promote(int(cmds[1]))
elif cmds[0] == 'demote':
result = pqueue.demote(int(cmds[1]))
else:
result = ''
sock.sendall(str(result))
print 'Request:', text, '; Response:', str(result)
if args.listen:
server = StreamServer(('127.0.0.1', 4040), pqueue_server)
print 'Starting pqueue server on port 4040...'
server.serve_forever()
もちろん、本番環境で実行する前に、エラー/バッファ処理を改善する必要があります。ただし、ラピッドプロトタイピングには問題なく機能します。これには、pqueueオブジェクトの周りのロックは必要ないことに注意してください。Geventは実際にはコードを並行して実行するのではなく、その印象を与えるだけです。欠点は、コアを増やしても役に立たないことですが、利点はロックフリーコードです。
誤解しないでください。geventSocketServerは複数のリクエストを同時に処理します。ただし、協調マルチタスクによる要求への応答を切り替えます。これは、コルーチンのタイムスライスを生成する必要があることを意味します。geventsソケットI/O関数は生成するように設計されていますが、pqueueの実装はそうではありません。幸い、pqueueはそのタスクを非常に迅速に完了します。
クライアントも
プロトタイピングをしていると、クライアントがいると便利だと思いました。クライアントを作成するのに少しグーグルがかかったので、そのコードも共有します。
if args.client:
while True:
msg = raw_input('> ')
sock = gsocket.socket(gsocket.AF_INET, gsocket.SOCK_STREAM)
sock.connect(('127.0.0.1', 4040))
sock.sendall(msg)
text = sock.recv(1024)
sock.close()
print text
新しいデータストアを使用するには、最初にサーバーを起動してから、クライアントを起動します。クライアントプロンプトで、次のことができるようになります。
> append one
1
> append two
2
> append three
3
> promote 2
2
> promote 2
0
> take
two
非常にうまくスケーリング
データストアについて考えると、スループットと耐久性に本当に関心があるようです。しかし、「非常にうまくスケーリングする」ことはあなたのニーズを定量化するものではありません。そこで、テスト関数を使用して上記のベンチマークを行うことにしました。テスト関数は次のとおりです。
def test():
import time
import urllib2
import subprocess
import random
random = random.Random(0)
from progressbar import ProgressBar, Percentage, Bar, ETA
widgets = [Percentage(), Bar(), ETA()]
def make_name():
alphabet = 'abcdefghijklmnopqrstuvwxyz'
return ''.join(random.choice(alphabet)
for rpt in xrange(random.randrange(3, 20)))
def make_request(cmds):
sock = gsocket.socket(gsocket.AF_INET, gsocket.SOCK_STREAM)
sock.connect(('127.0.0.1', 4040))
sock.sendall(cmds)
text = sock.recv(1024)
sock.close()
print 'Starting server and waiting 3 seconds.'
subprocess.call('start cmd.exe /c python.exe queue_thing_gevent.py -l',
shell=True)
time.sleep(3)
tests = []
def wrap_test(name, limit=10000):
def wrap(func):
def wrapped():
progress = ProgressBar(widgets=widgets)
for rpt in progress(xrange(limit)):
func()
secs = progress.seconds_elapsed
print '{0} {1} records in {2:.3f} s at {3:.3f} r/s'.format(
name, limit, secs, limit / secs)
tests.append(wrapped)
return wrapped
return wrap
def direct_append():
name = make_name()
pqueue.append(name)
count = 1000000
@wrap_test('Loaded', count)
def direct_append_test(): direct_append()
def append():
name = make_name()
make_request('append ' + name)
@wrap_test('Appended')
def append_test(): append()
...
print 'Running speed tests.'
for tst in tests: tst()
ベンチマーク結果
ラップトップで実行されているサーバーに対して6つのテストを実行しました。結果は非常にうまくスケールしていると思います。出力は次のとおりです。
Starting server and waiting 3 seconds.
Running speed tests.
100%|############################################################|Time: 0:00:21
Loaded 1000000 records in 21.770 s at 45934.773 r/s
100%|############################################################|Time: 0:00:06
Appended 10000 records in 6.825 s at 1465.201 r/s
100%|############################################################|Time: 0:00:06
Promoted 10000 records in 6.270 s at 1594.896 r/s
100%|############################################################|Time: 0:00:05
Demoted 10000 records in 5.686 s at 1758.706 r/s
100%|############################################################|Time: 0:00:05
Took 10000 records in 5.950 s at 1680.672 r/s
100%|############################################################|Time: 0:00:07
Mixed load processed 10000 records in 7.410 s at 1349.528 r/s
ファイナルフロンティア:耐久性
最後に、耐久性は私が完全にプロトタイプを作成しなかった唯一の問題です。でも、そんなに難しいことでもないと思います。優先キューでは、アイテムのヒープ(リスト)に、データ型をディスクに保持するために必要なすべての情報が含まれています。geventを使用すると、マルチプロセッシング方式で関数を生成することもできるため、次のような関数を使用することを想像しました。
def save_heap(heap, toll):
name = 'heap-{0}.txt'.format(toll)
with open(name, 'w') as temp:
for val in heap:
temp.write(str(val))
gevent.sleep(0)
優先キューに保存機能を追加します。
def save(self):
heap_copy = tuple(self.heap)
toll = self.toll
gevent.spawn(save_heap, heap_copy, toll)
これで、データストアをフォークしてディスクに書き込むRedisモデルを数分ごとにコピーできます。さらに高い耐久性が必要な場合は、コマンドをディスクに記録するシステムと上記を組み合わせてください。これらは一緒になって、Redisが使用するAFPとRDBの永続化メソッドです。