0

次のコードは、毎秒 10k ~ 20k のレコードを取り込み、パフォーマンスを改善したいと考えています。json 形式を読み取り、Kafka を使用してデータベースに取り込みます。- Zookeeper と Kafka がインストールされた 5 つのノードのクラスターで実行しています。

改善するためのヒントを教えていただけますか?

import os
import json
from multiprocessing import Pool
from kafka.client import KafkaClient
from kafka.producer import SimpleProducer


def process_line(line):
    producer = SimpleProducer(client)
    try:
       jrec = json.loads(line.strip())
       producer.send_messages('twitter2613',json.dumps(jrec))
    except ValueError, e:
                {}


if __name__ == "__main__":
    client = KafkaClient('10.62.84.35:9092')
    myloop=True
    pool = Pool(30)


    direcToData = os.listdir("/FullData/RowData")
    for loop in direcToData:
        mydir2=os.listdir("/FullData/RowData/"+loop)

        for i in mydir2:
            if  myloop:
                 with open("/FullData/RowData/"+loop+"/"+i) as source_file:
                     # chunk the work into batches of 4 lines at a time
                     results = pool.map(process_line, source_file, 30)
4

1 に答える 1