0

これはおそらく、ピカを使用するのと同じくらい Python コールバックに関する質問です。RabbitMQ のキューにサブスクライブし、配信されたメッセージのペイロードを処理し、そのペイロードを一連の (ディスク) ファイルに書き込むコードを開発しようとしています。そこで、 http://www.rabbitmq.com/tutorials/tutorial-one-python.htmlにある単純な「Hello World」の例を使用して、コールバック関数にロジックを追加しました (偶然にも「コールバック」と呼ばれます)。 ) を使用して、受信したメッセージ ペイロードをファイルに書き込みます。

主な問題は次のとおりです。特定の時間が経過した場合、たとえば 300 秒 (5 分) プロセスがファイルを閉じて新しいファイルを作成し、その後受信した新しいメッセージを書き込む追加のコードを書きたいと思います。それ。等々 ...

しかし、私が見ている問題は、メッセージがキューに到着したときにのみコールバック関数が呼び出されることです。そのコールバック関数の外に、経過時間を計測する何らかの処理が必要だと思います....

理論的根拠は、MQ キューで受信したメッセージを含む一連のディスク ファイル (タイムスタンプに基づいてすべてが一意の名前を持つ) を作成することです。メッセージの受信が遅い場合は、現在開いているファイルを閉じて (下流でさらに処理できるようにするため)、別のファイルを開きます。

また、消費開始呼び出し (channel.start_using) を発行した後、その下のコードに到達しないことにも気付きました-なぜですか?

私はpythonのマルチプロセッシングモジュールをいじってみましたが、今のところうまくいきません。

擬似コード コメント付きのスケルトン コードを次に示します:-

#!/usr/bin/env python
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='hello')

print ' [*] Waiting for messages. To exit press CTRL+C'

def callback(ch, method, properties, body):
    print " [x] Received %r" % (body,)

    # want to put code here to write message payloads to a file (unique name)
    # if n secs have elapsed then close the file and create a new file

channel.basic_consume(callback,queue='hello',no_ack=True)

channel.start_consuming()

ありがとう !

4

1 に答える 1

0

Pikaの代替実装を検討する価値があるかもしれません。ピカはもともとブロックしているので、このようなものを作成するのは困難です. 基本的に、IO を監視し、過去 5 分間に何かが書き込まれているかどうかを確認する別のスレッドが必要です。そうでない場合は閉じます。

タイムスタンプを保持することもできます。十分な時間が経過した場合に新しいコールバックを取得したら、ファイルを閉じて新しいファイルを作成できます。ただし、これにより、ファイルが長時間開いたままになりますが、データが 5 分を超えることはありません。

ただし、代わりにPukaをご覧になることをお勧めします。これは、問題の解決策をより簡単に実装できる Pika のノンブロッキングの代替手段です。

于 2013-06-22T13:24:13.960 に答える