0

私は Python を学んでおり、このサイトのオンライン リソースや人々の助けを借りて、Python のコツをつかんでいます。私のこの最初のスクリプトでは、Twitter RSS フィード エントリを解析し、その結果をデータベースに挿入していますが、解決できない問題が 1 つあります。つまり、テーブルの 1 つに重複したエントリが挿入されています。

ちょっとした背景として、私はもともと HalOtis.com で RSS フィードをダウンロードするためのベース スクリプトを見つけ、それをいくつかの方法で変更しました。1) Twitter RSS フィードの特異性を考慮して変更しました (コンテンツ、タイトル、URL、等。); 2) 「ハッシュタグ」と多対多の関係 (entry_tag テーブル) のテーブルを追加しました。3) テーブル設定を sqlalchemy に変更。4)発生していた奇妙なユニコードの問題を説明するために、いくつかのアドホックな変更を加えました。その結果、コードはところどころ見苦しくなりますが、良い学習経験であり、現在は機能していますが、「エントリ」テーブルに重複を挿入し続けています。

人々にとって何が最も役立つか分からないので、以下のコード全体を貼り付け、いくつかの場所にいくつかのコメントを付けて、最も重要だと思うことを指摘しました。

これについて何か助けていただければ幸いです。ありがとう!

編集:データベースのスキーマを提供することを誰かが提案しました。私はこれまでにこれをやったことがないので、正しくやっていない場合は、我慢してください。私は4つのテーブルを設定しています:

  1. Twitter RSS フィードのリストを含む RSSFeeds
  2. 各フィードから (解析後に) ダウンロードされた個々のエントリのリストを含む RSSEntries (コンテンツ、ハッシュタグ、日付、URL の列を含む)
  3. 個々のエントリ (ツイート) で見つかったすべてのハッシュタグのリストを含むタグ
  4. entry_tag には、タグをエントリにマップできる列が含まれています。

要するに、以下のスクリプトは、RSS フィード テーブルから 5 つのテスト RSS フィードを取得し、各フィードから 20 個の最新のエントリ/ツイートをダウンロードし、エントリを解析して、情報を RSS エントリ、タグ、および entry_tag テーブルに入れます。

#!/usr/local/bin/python

import sqlite3
import threading
import time
import Queue
from time import strftime
import re       
from string import split 
import feedparser 
from django.utils.encoding import smart_str, smart_unicode      
from sqlalchemy import schema, types, ForeignKey, select, orm
from sqlalchemy import create_engine

engine = create_engine('sqlite:///test98.sqlite', echo=True)
metadata = schema.MetaData(engine)   
metadata.bind = engine

def now():
    return datetime.datetime.now()


#set up four tables, with many-to-many relationship
RSSFeeds = schema.Table('feeds', metadata,
    schema.Column('id', types.Integer, 
        schema.Sequence('feeds_seq_id', optional=True), primary_key=True),
    schema.Column('url', types.VARCHAR(1000), default=u''),
)


RSSEntries = schema.Table('entries', metadata,
    schema.Column('id', types.Integer, 
        schema.Sequence('entries_seq_id', optional=True), primary_key=True),
    schema.Column('feed_id', types.Integer, schema.ForeignKey('feeds.id')),
    schema.Column('short_url', types.VARCHAR(1000), default=u''),
    schema.Column('content', types.Text(), nullable=False),
    schema.Column('hashtags', types.Unicode(255)),
    schema.Column('date', types.String()),  
)


tag_table = schema.Table('tag', metadata,
    schema.Column('id', types.Integer,
       schema.Sequence('tag_seq_id', optional=True), primary_key=True),
    schema.Column('tagname', types.Unicode(20), nullable=False, unique=True),
)


entrytag_table = schema.Table('entrytag', metadata,
    schema.Column('id', types.Integer,
        schema.Sequence('entrytag_seq_id', optional=True), primary_key=True),
    schema.Column('entryid', types.Integer, schema.ForeignKey('entries.id')),
    schema.Column('tagid', types.Integer, schema.ForeignKey('tag.id')),
)


metadata.create_all(bind=engine, checkfirst=True)


# Insert test set of Twitter RSS feeds
stmt = RSSFeeds.insert()
stmt.execute(
    {'url': 'http://twitter.com/statuses/user_timeline/14908909.rss'},
    {'url': 'http://twitter.com/statuses/user_timeline/52903246.rss'},
    {'url': 'http://twitter.com/statuses/user_timeline/41902319.rss'},
    {'url': 'http://twitter.com/statuses/user_timeline/29950404.rss'},
    {'url': 'http://twitter.com/statuses/user_timeline/35699859.rss'},
)



#These 3 lines for threading process (see HalOtis.com for example) 
THREAD_LIMIT = 20
jobs = Queue.Queue(0)
rss_to_process = Queue.Queue(THREAD_LIMIT)


#connect to sqlite database and grab the 5 test RSS feeds
conn = engine.connect()
feeds = conn.execute('SELECT id, url FROM feeds').fetchall()

#This block contains all the parsing and DB insertion 
def store_feed_items(id, items):
    """ Takes a feed_id and a list of items and stores them in the DB """
    for entry in items:
        conn.execute('SELECT id from entries WHERE short_url=?', (entry.link,))
        #note: entry.summary contains entire feed entry for Twitter, 
                    #i.e., not separated into content, etc.
        s = unicode(entry.summary) 
        test = s.split()
        tinyurl2 = [i for i in test if i.startswith('http://')]
        hashtags2 = [i for i in s.split() if i.startswith('#')]
        content2 = ' '.join(i for i in s.split() if i not in tinyurl2+hashtags2)
        content = unicode(content2)
        tinyurl = unicode(tinyurl2)
        hashtags = unicode (hashtags2)
        print hashtags
        date = strftime("%Y-%m-%d %H:%M:%S",entry.updated_parsed)


        #Insert parsed feed data into entries table 
                    #THIS IS WHERE DUPLICATES OCCUR
        result = conn.execute(RSSEntries.insert(), {'feed_id': id, 'short_url': tinyurl,
            'content': content, 'hashtags': hashtags, 'date': date})
        entry_id = result.last_inserted_ids()[0]


        #Look up tag identifiers and create any that don't exist:
        tags = tag_table
        tag_id_query = select([tags.c.tagname, tags.c.id], tags.c.tagname.in_(hashtags2))
        tag_ids = dict(conn.execute(tag_id_query).fetchall())
        for tag in hashtags2:
            if tag not in tag_ids:
                result = conn.execute(tags.insert(), {'tagname': tag})
                tag_ids[tag] = result.last_inserted_ids()[0]

        #insert data into entrytag table 
        if hashtags2: conn.execute(entrytag_table.insert(),
            [{'entryid': entry_id, 'tagid': tag_ids[tag]} for tag in hashtags2])


#Rest of file completes the threading process     
def thread():
    while True:
        try:
            id, feed_url = jobs.get(False) # False = Don't wait
        except Queue.Empty:
            return

        entries = feedparser.parse(feed_url).entries
        rss_to_process.put((id, entries), True) # This will block if full

for info in feeds: # Queue them up
    jobs.put([info['id'], info['url']])

for n in xrange(THREAD_LIMIT):
    t = threading.Thread(target=thread)
    t.start()

while threading.activeCount() > 1 or not rss_to_process.empty():
    # That condition means we want to do this loop if there are threads
    # running OR there's stuff to process
    try:
        id, entries = rss_to_process.get(False, 1) # Wait for up to a second
    except Queue.Empty:
        continue

    store_feed_items(id, entries)
4

1 に答える 1

2

SQLAlchemy を使用していない既存のスクリプトに SQLAlchemy を含めたようです。ここにはあまりにも多くの可動部分があり、私たちの誰も十分に理解していないようです.

最初から始めることをお勧めします。スレッドを使用しないでください。sqlalchemy を使用しないでください。開始するには、SQL データベースを使用することさえないかもしれません。単純なループとおそらく time.sleep() を使用して、できるだけ単純な方法で必要な情報を単純なデータ構造に収集するスクリプトを作成します。次に、それが機能するときに、ストレージを SQL データベースに追加できます。SQL ステートメントを直接記述することは、ORM を使用するよりもはるかに難しく、IMHO をデバッグする方が簡単だとは思いません。スレッドを追加する必要がない可能性は十分にあります。

「自分はマルチスレッド プログラムを書けるほど頭がいいと思っているなら、そうではありません。」-- ジェームズ・アールストロム

于 2009-10-01T20:59:02.723 に答える