7

かっこいいとは言えないカテゴリの質問から...

「キューのようなもの」とは、次の操作をサポートすることを意味します。

  • append(entry:Entry)-キューの末尾にエントリを追加します
  • take():エントリ-キューの先頭からエントリを削除して返します
  • Promote(entry_id)-エントリを頭に1つ近づけます。現在その位置を占めているエントリは、古い位置に移動されます
  • demote(entry_id)-promote(entry_id)の反対

オプションの操作は次のようになります。

  • Promote(entry_id、amount)-位置の数を指定することを除いて、promote(entry_id)と同様です
  • demote(entry_id、amount)-promote(entry_id、amount)の反対
  • もちろん、金額を正または負にできる場合は、promote / demoteメソッドを単一のmove(entry_id、amount)メソッドに統合できます。

次の操作をキューで分散して実行できると理想的です(複数のクライアントがキューと対話します)。

queue = ...

queue.append( a )
queue.append( b )
queue.append( c )

print queue
"a b c"

queue.promote( b.id )
print queue
"b a c"

queue.demote( a.id )
"b c a"

x = queue.take()
print x
"b"
print queue
"c a"

このユースケースに特に適したデータストアはありますか?複数のユーザーが同時にキューを変更している場合でも、キューは常に一貫した状態である必要があります。

プロモート/降格/移動の要件がなければ、それほど問題はありません。

編集:上記のタスクを実行するためのJavaおよび/またはPythonライブラリがある場合のボーナスポイント。

ソリューションは非常にうまくスケーリングする必要があります。

4

6 に答える 6

9

Redisはリストと順序集合をサポートしています:http://redis.io/topics/data-types#lists

また、トランザクションとパブリッシュ/サブスクライブメッセージングもサポートしています。ですから、そうです、これはredisで簡単に実行できると思います。

更新:実際、その約80%は何度も実行されています:http ://www.google.co.uk/search?q = python + redis + queue

それらのヒットのいくつかは、あなたが望むものを追加するためにアップグレードすることができます。プロモート/デモート操作を実装するには、トランザクションを使用する必要があります。

クライアントコードで使用するのではなく、サーバー側でluaを使用してその機能を作成できる場合があります。または、サーバー上のredisの周りに薄いラッパーを作成して、必要な操作だけを実装することもできます。

于 2012-05-04T01:24:45.360 に答える
4

Python:「含まれているバッテリー」

RabbitMQ、Redis、RDBMSなどのデータストアを探すのではなく、Pythonといくつかのライブラリがこの問題を解決するのに十分すぎると思います。この日曜大工のアプローチが車輪の再発明であると不満を言う人もいるかもしれませんが、私は別のデータストアを管理するよりも100行のPythonコードを実行することを好みます。

優先キューの実装

定義する操作(追加、取得、昇格、降格)は、優先キューを記述します。残念ながら、Pythonには組み込みの優先キューデータ型がありません。ただし、heapqと呼ばれるヒープライブラリがあり、優先キューはヒープとして実装されることがよくあります。要件を満たす優先キューの実装は次のとおりです。

class PQueue:
    """
    Implements a priority queue with append, take, promote, and demote
    operations.
    """
    def __init__(self):
        """
        Initialize empty priority queue.
        self.toll is max(priority) and max(rowid) in the queue
        self.heap is the heap maintained for take command
        self.rows is a mapping from rowid to items
        self.pris is a mapping from priority to items
        """
        self.toll = 0
        self.heap = list()
        self.rows = dict()
        self.pris = dict()

    def append(self, value):
        """
        Append value to our priority queue.
        The new value is added with lowest priority as an item. Items are
        threeple lists consisting of [priority, rowid, value]. The rowid
        is used by the promote/demote commands.
        Returns the new rowid corresponding to the new item.
        """
        self.toll += 1
        item = [self.toll, self.toll, value]
        self.heap.append(item)
        self.rows[self.toll] = item
        self.pris[self.toll] = item
        return self.toll

    def take(self):
        """
        Take the highest priority item out of the queue.
        Returns the value of the item.
        """
        item = heapq.heappop(self.heap)
        del self.pris[item[0]]
        del self.rows[item[1]]
        return item[2]

    def promote(self, rowid):
        """
        Promote an item in the queue.
        The promoted item swaps position with the next highest item.
        Returns the number of affected rows.
        """
        if rowid not in self.rows: return 0
        item = self.rows[rowid]
        item_pri, item_row, item_val = item
        next = item_pri - 1
        if next in self.pris:
            iota = self.pris[next]
            iota_pri, iota_row, iota_val = iota
            iota[1], iota[2] = item_row, item_val
            item[1], item[2] = iota_row, iota_val
            self.rows[item_row] = iota
            self.rows[iota_row] = item
            return 2
        return 0

demoteコマンドはpromoteコマンドとほぼ同じなので、簡潔にするために省略します。これは、Pythonのリスト、dict、およびheapqライブラリにのみ依存することに注意してください。

優先キューの提供

ここで、PQueueデータ型を使用して、インスタンスとの分散相互作用を許可したいと思います。このための優れたライブラリはgeventです。geventは比較的新しく、まだベータ版ですが、驚くほど高速で、十分にテストされています。geventを使用すると、localhost:4040をリッスンするソケットサーバーを非常に簡単にセットアップできます。これが私のサーバーコードです:

pqueue = PQueue()

def pqueue_server(sock, addr):
    text = sock.recv(1024)
    cmds = text.split(' ')
    if cmds[0] == 'append':
        result = pqueue.append(cmds[1])
    elif cmds[0] == 'take':
        result = pqueue.take()
    elif cmds[0] == 'promote':
        result = pqueue.promote(int(cmds[1]))
    elif cmds[0] == 'demote':
        result = pqueue.demote(int(cmds[1]))
    else:
        result = ''
    sock.sendall(str(result))
    print 'Request:', text, '; Response:', str(result)

if args.listen:
    server = StreamServer(('127.0.0.1', 4040), pqueue_server)
    print 'Starting pqueue server on port 4040...'
    server.serve_forever()

もちろん、本番環境で実行する前に、エラー/バッファ処理を改善する必要があります。ただし、ラピッドプロトタイピングには問題なく機能します。これには、pqueueオブジェクトの周りのロックは必要ないことに注意してください。Geventは実際にはコードを並行して実行するのではなく、その印象を与えるだけです。欠点は、コアを増やしても役に立たないことですが、利点はロックフリーコードです。

誤解しないでください。geventSocketServerは複数のリクエストを同時に処理します。ただし、協調マルチタスクによる要求への応答を切り替えます。これは、コルーチンのタイムスライスを生成する必要があることを意味します。geventsソケットI/O関数は生成するように設計されていますが、pqueueの実装はそうではありません。幸い、pqueueはそのタスクを非常に迅速に完了します。

クライアントも

プロトタイピングをしていると、クライアントがいると便利だと思いました。クライアントを作成するのに少しグーグルがかかったので、そのコードも共有します。

if args.client:
    while True:
        msg = raw_input('> ')
        sock = gsocket.socket(gsocket.AF_INET, gsocket.SOCK_STREAM)
        sock.connect(('127.0.0.1', 4040))
        sock.sendall(msg)
        text = sock.recv(1024)
        sock.close()
        print text

新しいデータストアを使用するには、最初にサーバーを起動してから、クライアントを起動します。クライアントプロンプトで、次のことができるようになります。

> append one
1
> append two
2
> append three
3
> promote 2
2
> promote 2
0
> take
two

非常にうまくスケーリング

データストアについて考えると、スループットと耐久性に本当に関心があるようです。しかし、「非常にうまくスケーリングする」ことはあなたのニーズを定量化するものではありません。そこで、テスト関数を使用して上記のベンチマークを行うことにしました。テスト関数は次のとおりです。

def test():
    import time
    import urllib2
    import subprocess

    import random
    random = random.Random(0)

    from progressbar import ProgressBar, Percentage, Bar, ETA
    widgets = [Percentage(), Bar(), ETA()]

    def make_name():
        alphabet = 'abcdefghijklmnopqrstuvwxyz'
        return ''.join(random.choice(alphabet)
                       for rpt in xrange(random.randrange(3, 20)))

    def make_request(cmds):
        sock = gsocket.socket(gsocket.AF_INET, gsocket.SOCK_STREAM)
        sock.connect(('127.0.0.1', 4040))
        sock.sendall(cmds)
        text = sock.recv(1024)
        sock.close()

    print 'Starting server and waiting 3 seconds.'
    subprocess.call('start cmd.exe /c python.exe queue_thing_gevent.py -l',
                    shell=True)
    time.sleep(3)

    tests = []
    def wrap_test(name, limit=10000):
        def wrap(func):
            def wrapped():
                progress = ProgressBar(widgets=widgets)
                for rpt in progress(xrange(limit)):
                    func()
                secs = progress.seconds_elapsed
                print '{0} {1} records in {2:.3f} s at {3:.3f} r/s'.format(
                    name, limit, secs, limit / secs)
            tests.append(wrapped)
            return wrapped
        return wrap

    def direct_append():
        name = make_name()
        pqueue.append(name)

    count = 1000000
    @wrap_test('Loaded', count)
    def direct_append_test(): direct_append()

    def append():
        name = make_name()
        make_request('append ' + name)

    @wrap_test('Appended')
    def append_test(): append()

    ...

    print 'Running speed tests.'
    for tst in tests: tst()

ベンチマーク結果

ラップトップで実行されているサーバーに対して6つのテストを実行しました。結果は非常にうまくスケールしていると思います。出力は次のとおりです。

Starting server and waiting 3 seconds.
Running speed tests.
100%|############################################################|Time: 0:00:21
Loaded 1000000 records in 21.770 s at 45934.773 r/s
100%|############################################################|Time: 0:00:06
Appended 10000 records in 6.825 s at 1465.201 r/s
100%|############################################################|Time: 0:00:06
Promoted 10000 records in 6.270 s at 1594.896 r/s
100%|############################################################|Time: 0:00:05
Demoted 10000 records in 5.686 s at 1758.706 r/s
100%|############################################################|Time: 0:00:05
Took 10000 records in 5.950 s at 1680.672 r/s
100%|############################################################|Time: 0:00:07
Mixed load processed 10000 records in 7.410 s at 1349.528 r/s

ファイナルフロンティア:耐久性

最後に、耐久性は私が完全にプロトタイプを作成しなかった唯一の問題です。でも、そんなに難しいことでもないと思います。優先キューでは、アイテムのヒープ(リスト)に、データ型をディスクに保持するために必要なすべての情報が含まれています。geventを使用すると、マルチプロセッシング方式で関数を生成することもできるため、次のような関数を使用することを想像しました。

def save_heap(heap, toll):
    name = 'heap-{0}.txt'.format(toll)
    with open(name, 'w') as temp:
        for val in heap:
            temp.write(str(val))
            gevent.sleep(0)

優先キューに保存機能を追加します。

def save(self):
    heap_copy = tuple(self.heap)
    toll = self.toll
    gevent.spawn(save_heap, heap_copy, toll)

これで、データストアをフォークしてディスクに書き込むRedisモデルを数分ごとにコピーできます。さらに高い耐久性が必要な場合は、コマンドをディスクに記録するシステムと上記を組み合わせてください。これらは一緒になって、Redisが使用するAFPとRDBの永続化メソッドです。

于 2012-05-12T03:00:36.820 に答える
2

Websphere MQは、これらのほとんどすべてを実行できます。

プロモート/降格は、キューからメッセージを削除して、より高い/より低い優先度でメッセージを元に戻すか、シーケンス番号として「CORRELID」を使用することによって、ほぼ可能です。

于 2012-05-04T01:43:13.747 に答える
2

RabbitMQの何が問題になっていますか?それはあなたが必要としているものとまったく同じように聞こえます。

本番環境でもRedisを幅広く使用していますが、タスクを完了として設定したり、TTLで完了していない場合にタスクを再送信したりするなど、キューが通常持つ機能の一部がありません。一方、一般的なストレージのように、キューにはない他の機能があり、非常に高速です。

于 2012-05-07T08:16:08.917 に答える
1

Redissonを使用すると、Redisが提供する分散アプローチで使い慣れた、、、javaインターフェイスがList実装Queueされます。:の例BlockingQueueDequeDeque

Redisson redisson = Redisson.create();

RDeque<SomeObject> queue = redisson.getDeque("anyDeque");
queue.addFirst(new SomeObject());
queue.addLast(new SomeObject());
SomeObject obj = queue.removeFirst();
SomeObject someObj = queue.removeLast();

redisson.shutdown();

その他のサンプル:

https://github.com/mrniko/redisson/wiki/7.-distributed-collections/#77-list
https://github.com/mrniko/redisson/wiki/7.-distributed-collections/#78-queue https://github.com/mrniko/redisson/wiki/7.-distributed-collections/#710-blocking-queue

于 2015-08-05T14:11:52.257 に答える
0

何らかの理由でSQLデータベースをバックエンドとして使用することにした場合、ポーリングが必要なため、MySQLは使用しません(他の多くの理由で使用しません)が、PostgreSQLは他のクライアントに信号を送るためにLISTEN/NOTIFYをサポートしています。変更をポーリングする必要はありません。ただし、すべてのリスニングクライアントに一度に信号を送るため、勝者のリスナーを選択するためのメカニズムが必要になります。

補足として、昇格/降格メカニズムが役立つかどうかはわかりません。挿入中にジョブを適切にスケジュールすることをお勧めします...

于 2012-05-10T07:21:29.537 に答える