0

私は、Tweepy と Twitter のストリーミング API を使用して、PostreSQL データベースにテーブルを作成しようとしてきました。私は非常に近いです。私はそれを手に入れるのに1行しか離れていないと信じています。私は次のような多くの例を見てきました: http://andrewbrobinson.com/2011/07/15/using-tweepy-to-access-the-twitter-stream/ http://blog.creapptives.com/post/14062057061 /the-key-value-store-everyone-ignored-postgresql Python tweepy sqlite3 db への書き込み tweepy stream to sqlite database - 無効な synatx tweepy を使用した Twitter のストリーミング API へのアクセス など

Tweepy を使用して非常に簡単にツイートをストリーミングできるようになったので、コンシューマ キー、コンシューマ シークレット、アクセス キー、およびアクセス シークレットが正しいことがわかります。また、Postgres をセットアップし、作成したデータベースに正常に接続しています。.py ファイルから psycopg2 を使用して、データベースのテーブルにハードコードされた値をテストしましたが、これも機能しています。選択したキーワードに基づいてツイートがストリーミングされ、データベース内のテーブルに正常に接続されています。これで、つぶやきを postgres データベースのテーブルにストリームするだけで済みます。私が言ったように、私はとても近くにいるので、どんな助けでも大歓迎です。

この簡素化されたスクリプトは、目的のテーブルにデータを挿入します。

import psycopg2

try:
    conn = psycopg2.connect("dbname=teststreamtweets user=postgres password=x host=localhost")
    print "connected"
except:
    print "unable to connect"

namedict = (
    {"first_name":"Joshua", "last_name":"Drake"},
    {"first_name":"Steven", "last_name":"Foo"},
    {"first_name":"David", "last_name":"Bar"}
    )

cur = conn.cursor()

cur.executemany("""INSERT INTO testdata(first_name, last_name) VALUES (%(first_name)s, %(last_name)s)""", namedict);

conn.commit()

以下は、私がしばらくの間編集していたスクリプトです。

import psycopg2
import time
import json
from getpass import getpass
import tweepy

consumer_key = 'x'
consumer_secret = 'x'
access_key = 'x'
access_secret = 'x'

connection = psycopg2.connect("dbname=teststreamtweets user=postgres password=x host=localhost")
cursor = connection.cursor()

#always use this step to begin clean
def reset_cursor():
    cursor = connection.cursor()

class StreamWatcherListener(tweepy.StreamListener):
    def on_data(self, data):
        try:
            print 'before cursor' + data
            connection = psycopg2.connect("dbname=teststreamtweets user=postgres password=x host=localhost")
            cur = connection.cursor()
            print 'status is: ' + str(connection.status)
            #cur.execute("INSERT INTO tweet_list VALUES (%s)" % (data.text))
            cur.executemany("""INSERT INTO tweets(tweet) VALUES (%(text)s)""", data);
            connection.commit()
            print '---------'
            print type(data)
            #print data
        except Exception as e:
            connection.rollback()
            reset_cursor()
            print "not saving"
            return 
        if cursor.lastrowid == None:
            print "Unable to save"

    def on_error(self, status_code):
        print 'Error code = %s' % status_code
        return True

    def on_timeout(self):
        print 'timed out.....'

print 'welcome'
auth1 = tweepy.OAuthHandler(consumer_key, consumer_secret)
auth1.set_access_token(access_key, access_secret)
api = tweepy.API(auth1)

l = StreamWatcherListener()
print 'about to stream'
stream = tweepy.Stream(auth = auth1, listener = l)

setTerms = ['microsoft']
#stream.sample()
stream.filter(track = setTerms)

Sorry if it's a bit messy of code, but have been trying many options. Like I said any suggestions, links to helpful examples, etc would be greatly appreciated as I've tried everything I can think of and am now resorting to a long walk. Thanks a ton.

4

1 に答える 1

1

__init__なぜこれにクラスを使用しているのか、なぜクラスで定義していないのかわかりません。複雑そうです。

これは、私がこの作業を行うために使用する関数の基本的なバージョンです。sqlite しか使ったことがありませんが、構文は基本的に同じように見えます。たぶん、あなたはこれから何かを得ることができます。

def retrieve_tweets(numtweets=10, *args):
    """
    This function optionally takes one or more arguments as keywords to filter tweets.
    It iterates through tweets from the stream that meet the given criteria and sends them 
    to the database population function on a per-instance basis, so as to avoid disaster 
    if the stream is disconnected.

    Both SampleStream and FilterStream methods access Twitter's stream of status elements.
    """   
    filters = []
    for key in args:
        filters.append(str(key))
    if len(filters) == 0:
        stream = tweetstream.SampleStream(username, password)  
    else:
        stream = tweetstream.FilterStream(username, password, track=filters)
    try:
        count = 0
        while count < numtweets:       
            for tweet in stream:
                # a check is needed on text as some "tweets" are actually just API operations
                # the language selection doesn't really work but it's better than nothing(?)
                if tweet.get('text') and tweet['user']['lang'] == 'en':   
                    if tweet['retweet_count'] == 0:
                        # bundle up the features I want and send them to the db population function
                        bundle = (tweet['id'], tweet['user']['screen_name'], tweet['retweet_count'], tweet['text'])
                        db_initpop(bundle)
                        break
                    else:
                        # a RT has a different structure.  This bundles the original tweet.  Getting  the
                        # retweets comes later, after the stream is de-accessed.
                        bundle = (tweet['retweeted_status']['id'], tweet['retweeted_status']['user']['screen_name'], \
                                  tweet['retweet_count'], tweet['retweeted_status']['text'])
                        db_initpop(bundle)
                        break
            count += 1
    except tweetstream.ConnectionError, e:
        print 'Disconnected from Twitter at '+time.strftime("%d %b %Y %H:%M:%S", time.localtime()) \
        +'.  Reason: ', e.reason

def db_initpop(bundle):
    """
    This function places basic tweet features in the database.  Note the placeholder values:
    these can act as a check to verify that no further expansion was available for that method.
    """
    #unpack the bundle 
    tweet_id, user_sn, retweet_count, tweet_text = bundle
    curs.execute("""INSERT INTO tblTweets VALUES (null,?,?,?,?,?,?)""", \
        (tweet_id, user_sn, retweet_count, tweet_text, 'cleaned text', 'cleaned retweet text'))
    conn.commit()
    print 'Database populated with tweet '+str(tweet_id)+' at '+time.strftime("%d %b %Y %H:%M:%S", time.localtime())

幸運を!

于 2012-11-15T20:42:23.463 に答える