2

カフカからのイベントを継続的に消費しようとしています。同じアプリケーションも、この消費されたデータを使用して、分析を実行し、n 秒間隔でデータベースを更新します (n = 60 秒と仮定)。

同じアプリケーションで、process1 = Kafka Consumer , process2= Data Analysis and database update logic.

process1 is to be run continuously
process2 is to be executed once every n=60 seconds 

process2計算とデータベースの更新に関係しているため、実行に 5 ~ 10 秒かかります。process1実行中にストールしたくありませんprocess2。したがって、私はmultiprocessing module( Pythonでモジュールを使用していた場合になりprocess1,process2ますが、GILについて読んだことと、モジュールがマルチコアアーキテクチャを活用できないため、モジュールを使用することにしました。)を使用しています。この場合の同時性。(上記のモジュールの制限についての私の理解が間違っている場合は、お詫び申し上げます。お気軽に訂正してください)。thread1,thread2ThreadingThreadingmultiprocessingGILThreading

私が使用しているアプリケーションでは、2 つのプロセス間で非常に単純な対話が行われprocess1、60 秒で受信したすべてのメッセージでキューがいっぱいになり、60 秒の終わりにすべてのメッセージが に転送されますprocess2

この転送ロジックに問題があります。キューの内容を から に転送するにはどうすればよいprocess1ですprocess2か (それはメイン プロセスまたは別のプロセスであると思いますか?それは私が持っている別の質問です。メイン プロセスに加えて 2 つのプロセスをインスタンス化する必要がありますか?) 60 秒の終わりにその後、キューの内容をクリアして、別の反復で再び開始します。

これまでのところ、次のものがあります。

import sys
from kafka.client import KafkaClient
from kafka import SimpleConsumer
import time
from multiprocessing import Process,Queue

def kafka_init():
    client=KafkaClient('kafka1.wpit.nile.works')
    consumer=SimpleConsumer(client, "druidkafkaconsumer", "personalization.targeting.clickstream.prod")
    return consumer

def consumeMessages(q):
    print "thread started"
    while not q.empty():
        try:
            print q.get(True,1)
        Queue.Empty:
            break
    print "thread ended"
if __name__=="__main__":
    starttime=time.time()
    timeout=starttime+ 10 #timeout of read in seconds
    consumer=kafka_init()
    q=Queue()
    p=Process(target=consumeMessages,args=q)
    while(True):
        q.put(consumer.get_message())
        if time.time()>timeout:
            #transfer logic from process1 to main process here.
            print "Start time",starttime
            print "End time",time.time()
            p.start()
            p.join()
            break

どんな助けでも大歓迎です。

4

1 に答える 1

3

あなたが扱っている問題はカフカ固有のものではないので、単に int である一般的な「メッセージ」を使用します。

主な問題は、メッセージが生成されるとすぐにメッセージを処理したい一方で、データベースを 60 秒ごとに更新したいということです。

を使用する場合q.get()、デフォルトでは、このメソッド呼び出しは、キューに利用可能なメッセージが存在するまでブロックされます。これには 60 秒以上かかる場合があり、データベースの更新が大幅に遅れます。したがって、ブロッキングは使用できませんq.getq.get呼び出しが非ブロックになるように、タイムアウトを使用する必要があります。

import time
import multiprocessing as mp
import random
import Queue

def process_messages(q):
    messages = []
    start = time.time()
    while True:
        try:
            message = q.get(timeout=1)
        except Queue.Empty:
            pass
        else:
            messages.append(message)
            print('Doing data analysis on {}'.format(message))
        end = time.time()
        if end-start > 60:
            print('Updating database: {}'.format(messages))
            start = end
            messages = []

def get_messages(q):
    while True:
        time.sleep(random.uniform(0,5))
        message = random.randrange(100)
        q.put(message)

if __name__ == "__main__":
    q = mp.Queue()

    proc1 = mp.Process(target=get_messages, args=[q])
    proc1.start()

    proc2 = mp.Process(target=process_messages, args=[q])
    proc2.start()

    proc1.join()
    proc2.join()

次のような出力が生成されます。

Doing data analysis on 38
Doing data analysis on 8
Doing data analysis on 8
Doing data analysis on 66
Doing data analysis on 37
Updating database: [38, 8, 8, 66, 37]
Doing data analysis on 27
Doing data analysis on 47
Doing data analysis on 57
Updating database: [27, 47, 57]
Doing data analysis on 85
Doing data analysis on 90
Doing data analysis on 86
Doing data analysis on 22
Updating database: [85, 90, 86, 22]
Doing data analysis on 8
Doing data analysis on 92
Doing data analysis on 59
Doing data analysis on 40
Updating database: [8, 92, 59, 40]
于 2015-08-30T21:18:47.780 に答える