Reputation: 1311
I am learning Python and, through the help of online resources and people on this site, am getting the hang of it. In this first script of mine, in which I'm parsing Twitter RSS feed entries and inserting the results into a database, there is one remaining problem that I cannot fix. Namely, duplicate entries are being inserted into one of the tables.
As a bit of background, I originally found a base script on for downloading RSS feeds and then modified it in several ways: 1) modified to account for idiosyncracies in Twitter RSS feeds (it's not separated into content, title, URL, etc.); 2) added tables for "hashtags" and for the many-to-many relationship (entry_tag table); 3) changed table set-up to sqlalchemy; 4) made some ad hoc changes to account for weird unicode problems that were occurring. As a result, the code is ugly in places, but it has been a good learning experience and now works--except that it keeps inserting duplicates in the "entries" table.
Since I'm not sure what would be most helpful to people, I've pasted in the entire code below, with some comments in a few places to point out what I think is most important.
I would really appreciate any help with this. Thanks!
Edit: Somebody suggested I provide a schema for the database. I've never done this before, so if I'm not doing it right, bear with me. I am setting up four tables:
In short, the script below grabs the five test RSS feeds from the RSS Feeds table, downloads the 20 latest entries / tweets from each feed, parses the entries, and puts the information into the RSS Entries, Tags, and entry_tag tables.
import sqlite3
import threading
import time
import Queue
from time import strftime
import re
from string import split
import feedparser
from django.utils.encoding import smart_str, smart_unicode
from sqlalchemy import schema, types, ForeignKey, select, orm
from sqlalchemy import create_engine
engine = create_engine('sqlite:///test98.sqlite', echo=True)
metadata = schema.MetaData(engine)
metadata.bind = engine
def now():
#set up four tables, with many-to-many relationship
RSSFeeds = schema.Table('feeds', metadata,
schema.Column('id', types.Integer,
schema.Sequence('feeds_seq_id', optional=True), primary_key=True),
schema.Column('url', types.VARCHAR(1000), default=u''),
RSSEntries = schema.Table('entries', metadata,
schema.Column('id', types.Integer,
schema.Sequence('entries_seq_id', optional=True), primary_key=True),
schema.Column('feed_id', types.Integer, schema.ForeignKey('')),
schema.Column('short_url', types.VARCHAR(1000), default=u''),
schema.Column('content', types.Text(), nullable=False),
schema.Column('hashtags', types.Unicode(255)),
schema.Column('date', types.String()),
tag_table = schema.Table('tag', metadata,
schema.Column('id', types.Integer,
schema.Sequence('tag_seq_id', optional=True), primary_key=True),
schema.Column('tagname', types.Unicode(20), nullable=False, unique=True),
entrytag_table = schema.Table('entrytag', metadata,
schema.Column('id', types.Integer,
schema.Sequence('entrytag_seq_id', optional=True), primary_key=True),
schema.Column('entryid', types.Integer, schema.ForeignKey('')),
schema.Column('tagid', types.Integer, schema.ForeignKey('')),
metadata.create_all(bind=engine, checkfirst=True)
# Insert test set of Twitter RSS feeds
stmt = RSSFeeds.insert()
{'url': ''},
{'url': ''},
{'url': ''},
{'url': ''},
{'url': ''},
#These 3 lines for threading process (see for example)
jobs = Queue.Queue(0)
rss_to_process = Queue.Queue(THREAD_LIMIT)
#connect to sqlite database and grab the 5 test RSS feeds
conn = engine.connect()
feeds = conn.execute('SELECT id, url FROM feeds').fetchall()
#This block contains all the parsing and DB insertion
def store_feed_items(id, items):
""" Takes a feed_id and a list of items and stores them in the DB """
for entry in items:
conn.execute('SELECT id from entries WHERE short_url=?', (,))
#note: entry.summary contains entire feed entry for Twitter,
#i.e., not separated into content, etc.
s = unicode(entry.summary)
test = s.split()
tinyurl2 = [i for i in test if i.startswith('http://')]
hashtags2 = [i for i in s.split() if i.startswith('#')]
content2 = ' '.join(i for i in s.split() if i not in tinyurl2+hashtags2)
content = unicode(content2)
tinyurl = unicode(tinyurl2)
hashtags = unicode (hashtags2)
print hashtags
date = strftime("%Y-%m-%d %H:%M:%S",entry.updated_parsed)
#Insert parsed feed data into entries table
result = conn.execute(RSSEntries.insert(), {'feed_id': id, 'short_url': tinyurl,
'content': content, 'hashtags': hashtags, 'date': date})
entry_id = result.last_inserted_ids()[0]
#Look up tag identifiers and create any that don't exist:
tags = tag_table
tag_id_query = select([tags.c.tagname,], tags.c.tagname.in_(hashtags2))
tag_ids = dict(conn.execute(tag_id_query).fetchall())
for tag in hashtags2:
if tag not in tag_ids:
result = conn.execute(tags.insert(), {'tagname': tag})
tag_ids[tag] = result.last_inserted_ids()[0]
#insert data into entrytag table
if hashtags2: conn.execute(entrytag_table.insert(),
[{'entryid': entry_id, 'tagid': tag_ids[tag]} for tag in hashtags2])
#Rest of file completes the threading process
def thread():
while True:
id, feed_url = jobs.get(False) # False = Don't wait
except Queue.Empty:
entries = feedparser.parse(feed_url).entries
rss_to_process.put((id, entries), True) # This will block if full
for info in feeds: # Queue them up
jobs.put([info['id'], info['url']])
for n in xrange(THREAD_LIMIT):
t = threading.Thread(target=thread)
while threading.activeCount() > 1 or not rss_to_process.empty():
# That condition means we want to do this loop if there are threads
# running OR there's stuff to process
id, entries = rss_to_process.get(False, 1) # Wait for up to a second
except Queue.Empty:
store_feed_items(id, entries)
Upvotes: 0
Views: 1351
Reputation: 2846
It looks like you included SQLAlchemy into a previously existing script that didn't use SQLAlchemy. There are too many moving parts here that none of us apparently understand well enough.
I would recommend starting from scratch. Don't use threading. Don't use sqlalchemy. To start maybe don't even use an SQL database. Write a script that collects the information you want in the simplist possible way into a simple data structure using simple loops and maybe a time.sleep(). Then when that works you can add in storage to an SQL database, and I really don't think writing SQL statements directly is much harder than using an ORM and it's easier to debug IMHO. There is a good chance you will never need to add threading.
"If you think you are smart enough to write multi-threaded programs, you're not." -- James Ahlstrom.
Upvotes: 2