カフカからのイベントを継続的に消費しようとしています。同じアプリケーションも、この消費されたデータを使用して、分析を実行し、n 秒間隔でデータベースを更新します (n = 60 秒と仮定)。
同じアプリケーションで、process1 = Kafka Consumer , process2= Data Analysis and database update logic.
process1 is to be run continuously
process2 is to be executed once every n=60 seconds
process2
計算とデータベースの更新に関係しているため、実行に 5 ~ 10 秒かかります。process1
実行中にストールしたくありませんprocess2
。したがって、私はmultiprocessing module
( Pythonでモジュールを使用していた場合になりprocess1,process2
ますが、GILについて読んだことと、モジュールがマルチコアアーキテクチャを活用できないため、モジュールを使用することにしました。)を使用しています。この場合の同時性。(上記のモジュールの制限についての私の理解が間違っている場合は、お詫び申し上げます。お気軽に訂正してください)。thread1,thread2
Threading
Threading
multiprocessing
GIL
Threading
私が使用しているアプリケーションでは、2 つのプロセス間で非常に単純な対話が行われprocess1
、60 秒で受信したすべてのメッセージでキューがいっぱいになり、60 秒の終わりにすべてのメッセージが に転送されますprocess2
。
この転送ロジックに問題があります。キューの内容を から に転送するにはどうすればよいprocess1
ですprocess2
か (それはメイン プロセスまたは別のプロセスであると思いますか?それは私が持っている別の質問です。メイン プロセスに加えて 2 つのプロセスをインスタンス化する必要がありますか?) 60 秒の終わりにその後、キューの内容をクリアして、別の反復で再び開始します。
これまでのところ、次のものがあります。
import sys
from kafka.client import KafkaClient
from kafka import SimpleConsumer
import time
from multiprocessing import Process,Queue
def kafka_init():
client=KafkaClient('kafka1.wpit.nile.works')
consumer=SimpleConsumer(client, "druidkafkaconsumer", "personalization.targeting.clickstream.prod")
return consumer
def consumeMessages(q):
print "thread started"
while not q.empty():
try:
print q.get(True,1)
Queue.Empty:
break
print "thread ended"
if __name__=="__main__":
starttime=time.time()
timeout=starttime+ 10 #timeout of read in seconds
consumer=kafka_init()
q=Queue()
p=Process(target=consumeMessages,args=q)
while(True):
q.put(consumer.get_message())
if time.time()>timeout:
#transfer logic from process1 to main process here.
print "Start time",starttime
print "End time",time.time()
p.start()
p.join()
break
どんな助けでも大歓迎です。