私のプロジェクトでは、multiprocessing
タスクを並行して実行するためにクラスを使用しています。パフォーマンスが優れているので、代わりに使用したいと思いthreading
ます(私のタスクは、CPUまたはI / Oバウンドではなく、TCP / IPバウンドです)。
multiprocessing
クラスには存在しない、asPool.imap_unordered
との素晴らしい機能を持っています。Pool.map_async
threading
threading
代わりに使用するようにコードを変換する正しい方法は何ですか?ドキュメントmultiprocessing.dummy
では、クラスのラッパーであるクラスを紹介していますthreading
。ただし、これにより多くのエラーが発生します(少なくともPython 2.7.3では)。
pool = multiprocessing.Pool(processes)
File "C:\python27\lib\multiprocessing\dummy\__init__.py", line 150, in Pool
return ThreadPool(processes, initializer, initargs)
File "C:\python27\lib\multiprocessing\pool.py", line 685, in __init__
Pool.__init__(self, processes, initializer, initargs)
File "C:\python27\lib\multiprocessing\pool.py", line 136, in __init__
self._repopulate_pool()
File "C:\python27\lib\multiprocessing\pool.py", line 199, in _repopulate_pool
w.start()
File "C:\python27\lib\multiprocessing\dummy\__init__.py", line 73, in start
self._parent._children[self] = None
AttributeError: '_DummyThread' object has no attribute '_children'
編集:実際に起こることは、別のスレッドを実行するGUIがあることです(GUIがスタックするのを防ぐため)。そのスレッドは、ThreadPool
失敗した特定の検索関数を実行します。
編集2:バグ修正は修正され、将来のリリースに含まれる予定です。クラッシャーが修正されたのを見るのは素晴らしいです!
import urllib2, htmllib, formatter
import multiprocessing.dummy as multiprocessing
import xml.dom.minidom
import os
import string, random
from urlparse import parse_qs, urlparse
from useful_util import retry
import config
from logger import log
class LinksExtractor(htmllib.HTMLParser):
def __init__(self, formatter):
htmllib.HTMLParser.__init__(self, formatter)
self.links = []
self.ignoredSites = config.WebParser_ignoredSites
def start_a(self, attrs):
for attr in attrs:
if attr[0] == "href" and attr[1].endswith(".mp3"):
if not filter(lambda x: (x in attr[1]), self.ignoredSites):
self.links.append(attr[1])
def get_links(self):
return self.links
def GetLinks(url, returnMetaUrlObj=False):
'''
Function gather links from a url.
@param url: Url Address.
@param returnMetaUrlObj: If true, returns a MetaUrl Object list.
Else, returns a string list. Default is False.
@return links: Look up.
'''
htmlparser = LinksExtractor(formatter.NullFormatter())
try:
data = urllib2.urlopen(url)
except (urllib2.HTTPError, urllib2.URLError) as e:
log.error(e)
return []
htmlparser.feed(data.read())
htmlparser.close()
links = list(set(htmlparser.get_links()))
if returnMetaUrlObj:
links = map(MetaUrl, links)
return links
def isAscii(s):
"Function checks is the string is ascii."
try:
s.decode('ascii')
except (UnicodeEncodeError, UnicodeDecodeError):
return False
return True
@retry(Exception, logger=log)
def parse(song, source):
'''
Function parses the source search page and returns the .mp3 links in it.
@param song: Search string.
@param source: Search website source. Value can be dilandau, mp3skull, youtube, seekasong.
@return links: .mp3 url links.
'''
source = source.lower()
if source == "dilandau":
return parse_dilandau(song)
elif source == "mp3skull":
return parse_Mp3skull(song)
elif source == "SeekASong":
return parse_SeekASong(song)
elif source == "youtube":
return parse_Youtube(song)
log.error('no source "%s". (from parse function in WebParser)')
return []
def parse_dilandau(song, pages=1):
"Function connects to Dilandau.eu and returns the .mp3 links in it"
if not isAscii(song): # Dilandau doesn't like unicode.
log.warning("Song is not ASCII. Skipping on dilandau")
return []
links = []
song = urllib2.quote(song.encode("utf8"))
for i in range(pages):
url = 'http://en.dilandau.eu/download_music/%s-%d.html' % (song.replace('-','').replace(' ','-').replace('--','-').lower(),i+1)
log.debug("[Dilandau] Parsing %s... " % url)
links.extend(GetLinks(url, returnMetaUrlObj=True))
log.debug("[Dilandau] found %d links" % len(links))
for metaUrl in links:
metaUrl.source = "Dilandau"
return links
def parse_Mp3skull(song, pages=1):
"Function connects to mp3skull.com and returns the .mp3 links in it"
links = []
song = urllib2.quote(song.encode("utf8"))
for i in range(pages):
# http://mp3skull.com/mp3/how_i_met_your_mother.html
url = 'http://mp3skull.com/mp3/%s.html' % (song.replace('-','').replace(' ','_').replace('__','_').lower())
log.debug("[Mp3skull] Parsing %s... " % url)
links.extend(GetLinks(url, returnMetaUrlObj=True))
log.debug("[Mp3skull] found %d links" % len(links))
for metaUrl in links:
metaUrl.source = "Mp3skull"
return links
def parse_SeekASong(song):
"Function connects to seekasong.com and returns the .mp3 links in it"
song = urllib2.quote(song.encode("utf8"))
url = 'http://www.seekasong.com/mp3/%s.html' % (song.replace('-','').replace(' ','_').replace('__','_').lower())
log.debug("[SeekASong] Parsing %s... " % url)
links = GetLinks(url, returnMetaUrlObj=True)
for metaUrl in links:
metaUrl.source = "SeekASong"
log.debug("[SeekASong] found %d links" % len(links))
return links
def parse_Youtube(song, amount=10):
'''
Function searches a song in youtube.com and returns the clips in it using Youtube API.
@param song: The search string.
@param amount: Amount of clips to obtain.
@return links: List of links.
'''
"Function connects to youtube.com and returns the .mp3 links in it"
song = urllib2.quote(song.encode("utf8"))
url = r"http://gdata.youtube.com/feeds/api/videos?q=%s&max-results=%d&v=2" % (song.replace(' ', '+'), amount)
urlObj = urllib2.urlopen(url, timeout=4)
data = urlObj.read()
videos = xml.dom.minidom.parseString(data).getElementsByTagName('feed')[0].getElementsByTagName('entry')
links = []
for video in videos:
youtube_watchurl = video.getElementsByTagName('link')[0].attributes.item(0).value
links.append(get_youtube_hightest_quality_link(youtube_watchurl))
return links
def get_youtube_hightest_quality_link(youtube_watchurl, priority=config.youtube_quality_priority):
'''
Function returns the highest quality link for a specific youtube clip.
@param youtube_watchurl: The Youtube Watch Url.
@param priority: A list represents the qualities priority.
@return MetaUrlObj: MetaUrl Object.
'''
video_id = parse_qs(urlparse(youtube_watchurl).query)['v'][0]
youtube_embedded_watchurl = "http://www.youtube.com/embed/%s?autoplay=1" % video_id
d = get_youtube_dl_links(video_id)
for x in priority:
if x in d.keys():
return MetaUrl(d[x][0], 'youtube', d['VideoName'], x, youtube_embedded_watchurl)
log.error("No Youtube link has been found in get_youtube_hightest_quality_link.")
return ""
@retry(Exception, logger=log)
def get_youtube_dl_links(video_id):
'''
Function gets the download links for a youtube clip.
This function parses the get_video_info format of youtube.
@param video_id: Youtube Video ID.
@return d: A dictonary of qualities as keys and urls as values.
'''
d = {}
url = r"http://www.youtube.com/get_video_info?video_id=%s&el=vevo" % video_id
urlObj = urllib2.urlopen(url, timeout=12)
data = urlObj.read()
data = urllib2.unquote(urllib2.unquote(urllib2.unquote(data)))
data = data.replace(',url', '\nurl')
data = data.split('\n')
for line in data:
if 'timedtext' in line or 'status=fail' in line or '<AdBreaks>' in line:
continue
try:
url = line.split('&quality=')[0].split('url=')[1]
quality = line.split('&quality=')[1].split('&')[0]
except:
continue
if quality in d:
d[quality].append(url)
else:
d[quality] = [url]
try:
videoName = "|".join(data).split('&title=')[1].split('&')[0]
except Exception, e:
log.error("Could not parse VideoName out of get_video_info (%s)" % str(e))
videoName = ""
videoName = unicode(videoName, 'utf-8')
d['VideoName'] = videoName.replace('+',' ').replace('--','-')
return d
class NextList(object):
"A list with a 'next' method."
def __init__(self, l):
self.l = l
self.next_index = 0
def next(self):
if self.next_index < len(self.l):
value = self.l[self.next_index]
self.next_index += 1
return value
else:
return None
def isEOF(self):
" Checks if the list has reached the end "
return (self.next_index >= len(self.l))
class MetaUrl(object):
"a url strecture data with many metadata"
def __init__(self, url, source="", videoName="", quality="", youtube_watchurl=""):
self.url = str(url)
self.source = source
self.videoName = videoName # Youtube Links Only
self.quality = quality # Youtube Links Onlys
self.youtube_watchurl = youtube_watchurl # Youtube Links Onlys
def __repr__(self):
return "<MetaUrl '%s' | %s>" % (self.url, self.source)
def search(song, n, processes=config.search_processes):
'''
Function searches song and returns n valid .mp3 links.
@param song: Search string.
@param n: Number of songs.
@param processes: Number of processes to launch in the subprocessing pool.
'''
linksFromSources = []
pool = multiprocessing.Pool(processes)
args = [(song, source) for source in config.search_sources]
imapObj = pool.imap_unordered(_parse_star, args)
for i in range(len(args)):
linksFromSources.append(NextList(imapObj.next(15)))
pool.terminate()
links = []
next_source = 0
while len(links) < n and not all(map(lambda x: x.isEOF(), linksFromSources)):
nextItem = linksFromSources[next_source].next()
if nextItem:
log.debug("added song %.80s from source ID %d (%s)" % (nextItem.url.split('/')[-1], next_source, nextItem.source))
links.append(nextItem)
if len(linksFromSources) == next_source+1:
next_source = 0
else:
next_source += 1
return links
def _parse_star(args):
return parse(*args)