2

完全にクロールできるデータのストリームを取得します。データはすべて Kafka に入れられ、その後 Cassandra に送信されます。現在、kafka コンシューマーは非常に遅く、プロデューサーよりもはるかに遅いです。私はそれらがまったく同じであることを望みます。この結果を得るにはどうすればよいですか、またはコードの何が問題なのですか?

Python での私の Kafka コンシューマー コードは次のとおりです。

import logging
from cassandra.cluster import Cluster
from kafka.consumer.kafka import KafkaConsumer
from kafka.consumer.multiprocess import MultiProcessConsumer
from kafka.client import KafkaClient
from kafka.producer.simple import SimpleProducer
import json
from datetime import datetime, timedelta  
from cassandra import ConsistencyLevel
from dateutil.parser import parse
logging.basicConfig(filename='consumer.log', format='[%(asctime)-15s] %(name)s %(levelname)s %(message)s', level=logging.DEBUG)
class Whitelist(logging.Filter):
    def __init__(self, *whitelist):
        self.whitelist = [logging.Filter(name) for name in whitelist]
    def filter(self, record):
        return any(f.filter(record) for f in self.whitelist)
for handler in logging.root.handlers:
    handler.addFilter(Whitelist('consumer'))
log = logging.getLogger('consumer')
try:
    cluster = Cluster(['localhost']); session = cluster.connect(keyspace)
    kafka = KafkaClient('localhost')
    consumer = MultiProcessConsumer(kafka, b'default',kafkatopic,num_procs=16, max_buffer_size=None)
    article_lookup_stmt = session.prepare("SELECT * FROM articles WHERE id in ?")
    article_lookup_stmt.consistency_level = ConsistencyLevel.QUORUM
    article_insert_stmt = session.prepare("INSERT INTO articles(id, thumbnail, title, url, created_at, scheduled_for, source, category, channel,genre) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)")
    article_by_created_at_insert_stmt = session.prepare("INSERT INTO article_by_created_at(source, created_at, article) VALUES (?, ?, ?)")
    article_by_url_insert_stmt = session.prepare("INSERT INTO article_by_url(url, article) VALUES (?, ?)")
    schedules_insert_stmt = session.prepare("INSERT INTO schedules(source,type,scheduled_for,id) VALUES (?,?,?,?)")
    axes_insert_stmt = session.prepare("INSERT INTO axes(article,at,comments,likes,reads,shares) VALUES (?, ?, ?, ?, ?, ?)")
    while True:
        messages = consumer.get_messages(count=16)
        if len(messages) == 0:
            print 'IDLE'
            continue
        for message in messages:
            try:
                response = json.loads(message.value)
                data = json.loads(response['body'])
                print response['body']
                articles = data['articles']
                idlist = [r['id'] for r in articles]
                if len(idlist)>0:
                    article_rows = session.execute(article_lookup_stmt,[idlist])
                    rows = [r.id for r in article_rows]
                    for article in articles:
                        try:
                            if not article['id'] in rows:
                                article['created_at'] = parse(article['created_at'])
                                scheduled_for=(article['created_at'] + timedelta(minutes=60)).replace(second=0, microsecond=0)
                                session.execute(article_insert_stmt, (article['id'], article['thumbnail'], article['title'], article['url'], article['created_at'], scheduled_for, article['source'], article['category'], article['channel'],article['genre']))
                                session.execute(article_by_created_at_insert_stmt, (article['source'], article['created_at'], article['id']))
                                session.execute(article_by_url_insert_stmt, (article['url'], article['id']))
                                session.execute(schedules_insert_stmt,(article['source'],'article',scheduled_for,article['id']))
                                log.debug('%s %s' % (article['id'],article['created_at']))
                            session.execute(axes_insert_stmt,(article['id'],datetime.utcnow(),article['axes']['comments'],article['axes']['likes'],0,article['axes']['shares']))
                        except Exception as e:
                            print 'error==============:',e
                            continue
            except Exception as e:
                print 'error is:',e
                log.exception(e.message)
except Exception as e:
    log.exception(e.message)

編集:

プロファイル結果も追加しましたが、コードの遅い行は次のようです

    article_rows = session.execute(article_lookup_stmt,[idlist])

Sun Feb 14 16:01:01 2016    consumer.out

         395793 function calls (394232 primitive calls) in 23.074 seconds

   Ordered by: internal time

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
      141   10.695    0.076   10.695    0.076 {select.select}
     7564   10.144    0.001   10.144    0.001 {method 'acquire' of 'thread.lock' objects}
        1    0.542    0.542   23.097   23.097 consumer.py:5(<module>)
     1510    0.281    0.000    0.281    0.000 {method 'recv' of '_socket.socket' objects}
       38    0.195    0.005    0.195    0.005 /usr/local/lib/python2.7/json/decoder.py:371(raw_decode)
       13    0.078    0.006    0.078    0.006 {time.sleep}
     2423    0.073    0.000    0.137    0.000 /usr/local/lib/python2.7/logging/__init__.py:242(__init__)
    22112    0.063    0.000    0.095    0.000 /usr/local/lib/python2.7/site-packages/kafka/util.py:73(relative_unpack)
        3    0.052    0.017    0.162    0.054 /usr/local/lib/python2.7/site-packages/kafka/protocol.py:386(decode_metadata_response)
2006/2005    0.047    0.000    0.055    0.000 /usr/local/lib/python2.7/site-packages/cassandra/policies.py:350(make_query_plan)
     1270    0.032    0.000    0.034    0.000 /usr/local/lib/python2.7/threading.py:259(__init__)
        3    0.024    0.008    0.226    0.075 /usr/local/lib/python2.7/site-packages/kafka/client.py:456(load_metadata_for_topics)
       33    0.024    0.001    0.031    0.001 /usr/local/lib/python2.7/collections.py:288(namedtuple)
    15374    0.024    0.000    0.024    0.000 {built-in method new of type object at 0x788ee0}
      141    0.023    0.000   11.394    0.081 /usr/local/lib/python2.7/site-packages/kafka/client.py:153(_send_broker_aware_request)
      288    0.020    0.000    0.522    0.002 /usr/local/lib/python2.7/site-packages/kafka/conn.py:84(_read_bytes)
     2423    0.018    0.000    0.029    0.000 /usr/local/lib/python2.7/logging/__init__.py:1216(findCaller)
      115    0.018    0.000   11.372    0.099 /usr/local/lib/python2.7/site-packages/kafka/consumer/kafka.py:303(fetch_messages)
     2423    0.018    0.000    0.059    0.000 /usr/local/lib/python2.7/logging/__init__.py:1303(callHandlers)
    24548    0.017    0.000    0.017    0.000 {_struct.unpack}
44228/43959    0.016    0.000    0.016    0.000 {len}

お返事ありがとうございます。

4

1 に答える 1

2

C* に保存せずにコンシューマーを実行してみると、どれだけの違いが生じるかを観察できます。
C* への保存がチョーク ポイントであることが判明した場合 (私はそう推測します)、C* への書き込みのみを担当するスレッド プール (コンシューマーが生成する 16 スレッドよりも大きい) を用意することができます。

そうすれば、コードの遅い部分をオフロードし、コンシューマ コードに些細な部分だけを残すことができます。
を使用できますfrom multiprocessing import Pool
詳細はこちら

于 2016-02-14T16:48:28.787 に答える