8

背景:ストリーミング API から JSON オブジェクトを取得し、それらを pymongo を使用して MongoDB に格納する (一度に 25 個の一括挿入) ようにpythonモジュールをセットアップしました。curl比較のために、同じストリーミング API からへの bash コマンドも用意していpipeますmongoimport。これらのアプローチはどちらも、データを別々のコレクションに保存します。

定期的にcount()、コレクションを監視して、それらがどのように機能するかを確認します。

これまでのところ、pythonモジュールはアプローチよりも約 1000 個の JSON オブジェクトに遅れをとっていますcurl | mongoimport

問題:pythonモジュールを ~ と同期するよう に最適化するにはどうすればよい curl | mongoimportですか?

tweetstreamTwitter APIではなくサードパーティのストリーミングサービスを利用しているため利用できません。

誰かがここで私を助けてくれませんか?

Pythonモジュール:


class StreamReader:
    def __init__(self):
        try:
            self.buff = ""
            self.tweet = ""
            self.chunk_count = 0
            self.tweet_list = []
            self.string_buffer = cStringIO.StringIO()
            self.mongo = pymongo.Connection(DB_HOST)
            self.db = self.mongo[DB_NAME]
            self.raw_tweets = self.db["raw_tweets_gnip"]
            self.conn = pycurl.Curl()
            self.conn.setopt(pycurl.ENCODING, 'gzip')
            self.conn.setopt(pycurl.URL, STREAM_URL)
            self.conn.setopt(pycurl.USERPWD, AUTH)
            self.conn.setopt(pycurl.WRITEFUNCTION, self.handle_data)
            self.conn.perform()
        except Exception as ex:
            print "error ocurred : %s" % str(ex)

    def handle_data(self, data):
        try:
            self.string_buffer = cStringIO.StringIO(data)
            for line in self.string_buffer:
                try:
                    self.tweet = json.loads(line)
                except Exception as json_ex:
                    print "JSON Exception occurred: %s" % str(json_ex)
                    continue

                if self.tweet:
                    try:
                        self.tweet_list.append(self.tweet)
                        self.chunk_count += 1
                        if self.chunk_count % 1000 == 0
                            self.raw_tweets.insert(self.tweet_list)
                            self.chunk_count = 0
                            self.tweet_list = []

                    except Exception as insert_ex:
                        print "Error inserting tweet: %s" % str(insert_ex)
                        continue
        except Exception as ex:
            print "Exception occurred: %s" % str(ex)
            print repr(self.buff)

    def __del__(self):
        self.string_buffer.close()

読んでくれてありがとう。

4

2 に答える 2

3

元々、コードにバグがありました。

                if self.chunk_count % 50 == 0
                    self.raw_tweets.insert(self.tweet_list)
                    self.chunk_count = 0

チャンクカウントをリセットしますが、tweet_listはリセットしません。したがって、2回目は、100個のアイテムを挿入しようとします(50個の新しいアイテムと以前にDBに送信された50個のアイテム)。これは修正されましたが、パフォーマンスに違いが見られます。

バッチサイズ全体が赤いニシンであることがわかります。jsonの大きなファイルを使用して、Python経由でロードするのとmongoimport経由でロードするのではなく、Pythonの方が常に高速でした(セーフモードでも-以下を参照)。

コードを詳しく見てみると、問題はストリーミングAPIが実際にデータをチャンクで処理しているという事実にあることがわかりました。これらのチャンクを取得してデータベースに入れることが期待されています(これがmongoimportが行っていることです)。Pythonがストリームを分割し、リストに追加してから定期的にバッチをMongoに送信するために行っている追加の作業は、おそらく私が見ているものとあなたが見ているものの違いです。

handle_data()にこのスニペットを試してください

def handle_data(self, data):
    try:
        string_buffer = StringIO(data)
        tweets = json.load(string_buffer)
    except Exception as ex:
        print "Exception occurred: %s" % str(ex)
    try:
        self.raw_tweets.insert(tweets)
    except Exception as ex:
        print "Exception occurred: %s" % str(ex)

注意すべき点の1つは、Pythonの挿入が「セーフモード」で実行されていないsafe=Trueことです。挿入ステートメントに引数を追加して、これを変更する必要があります。その後、失敗した挿入で例外が発生し、try/catchは問題を明らかにするエラーを出力します。

パフォーマンスにもそれほどコストはかかりません。現在テストを実行しており、約5分後、2つのコレクションのサイズは1412014113になります。

于 2012-06-02T20:45:17.727 に答える
1

StringIO ライブラリを取り除きました。この場合、WRITEFUNCTIONコールバックhandle_dataはすべての行で呼び出されるため、JSON直接ロードするだけです。ただし、JSONデータに 2 つのオブジェクトが含まれている場合もあります。申し訳ありませんcurlが、資格情報が含まれているため、使用するコマンドを投稿できません。しかし、前述したように、これはすべてのストリーミング API に当てはまる一般的な問題です。


def handle_data(self, buf): 
    try:
        self.tweet = json.loads(buf)
    except Exception as json_ex:
        self.data_list = buf.split('\r\n')
        for data in self.data_list:
            self.tweet_list.append(json.loads(data))    
于 2012-06-10T15:50:49.027 に答える