6

私のプロジェクトでは、multiprocessingタスクを並行して実行するためにクラスを使用しています。パフォーマンスが優れているので、代わりに使用したいと思いthreadingます(私のタスクは、CPUまたはI / Oバウンドではなく、TCP / IPバウンドです)。

multiprocessingクラスには存在しない、asPool.imap_unorderedとの素晴らしい機能を持っています。Pool.map_asyncthreading

threading代わりに使用するようにコードを変換する正しい方法は何ですか?ドキュメントmultiprocessing.dummyでは、クラスのラッパーであるクラスを紹介していますthreading。ただし、これにより多くのエラーが発生します(少なくともPython 2.7.3では)。

    pool = multiprocessing.Pool(processes)
  File "C:\python27\lib\multiprocessing\dummy\__init__.py", line 150, in Pool
    return ThreadPool(processes, initializer, initargs)
  File "C:\python27\lib\multiprocessing\pool.py", line 685, in __init__
    Pool.__init__(self, processes, initializer, initargs)
  File "C:\python27\lib\multiprocessing\pool.py", line 136, in __init__
    self._repopulate_pool()
  File "C:\python27\lib\multiprocessing\pool.py", line 199, in _repopulate_pool
    w.start()
  File "C:\python27\lib\multiprocessing\dummy\__init__.py", line 73, in start
    self._parent._children[self] = None
AttributeError: '_DummyThread' object has no attribute '_children'

編集:実際に起こることは、別のスレッドを実行するGUIがあることです(GUIがスタックするのを防ぐため)。そのスレッドは、ThreadPool失敗した特定の検索関数を実行します。

編集2:バグ修正修正され、将来のリリースに含まれる予定です。クラッシャーが修正されたのを見るのは素晴らしいです!

import urllib2, htmllib, formatter
import multiprocessing.dummy as multiprocessing
import xml.dom.minidom
import os
import string, random
from urlparse import parse_qs, urlparse

from useful_util import retry
import config
from logger import log

class LinksExtractor(htmllib.HTMLParser):
    def __init__(self, formatter):
        htmllib.HTMLParser.__init__(self, formatter)
        self.links = []
        self.ignoredSites = config.WebParser_ignoredSites

    def start_a(self, attrs):
        for attr in attrs:
            if attr[0] == "href" and attr[1].endswith(".mp3"):
                if not filter(lambda x: (x in attr[1]), self.ignoredSites):
                    self.links.append(attr[1])

    def get_links(self):
        return self.links


def GetLinks(url, returnMetaUrlObj=False):
    '''
    Function gather links from a url.
    @param url: Url Address.
    @param returnMetaUrlObj: If true, returns a MetaUrl Object list.
                             Else, returns a string list. Default is False.

    @return links: Look up.
    '''
    htmlparser = LinksExtractor(formatter.NullFormatter())

    try:
        data = urllib2.urlopen(url)
    except (urllib2.HTTPError, urllib2.URLError) as e:
        log.error(e)
        return []
    htmlparser.feed(data.read())
    htmlparser.close()

    links = list(set(htmlparser.get_links()))

    if returnMetaUrlObj:
        links = map(MetaUrl, links)

    return links

def isAscii(s):
    "Function checks is the string is ascii."
    try:
        s.decode('ascii')
    except (UnicodeEncodeError, UnicodeDecodeError):
        return False
    return True

@retry(Exception, logger=log)
def parse(song, source):
    '''
    Function parses the source search page and returns the .mp3 links in it.
    @param song: Search string.
    @param source: Search website source. Value can be dilandau, mp3skull, youtube, seekasong.

    @return links: .mp3 url links.
    '''
    source = source.lower()
    if source == "dilandau":
        return parse_dilandau(song)
    elif source == "mp3skull":
        return parse_Mp3skull(song)
    elif source == "SeekASong":
        return parse_SeekASong(song)
    elif source == "youtube":
        return parse_Youtube(song)

    log.error('no source "%s". (from parse function in WebParser)')
    return []

def parse_dilandau(song, pages=1):
    "Function connects to Dilandau.eu and returns the .mp3 links in it"
    if not isAscii(song): # Dilandau doesn't like unicode.
        log.warning("Song is not ASCII. Skipping on dilandau")
        return []

    links = []
    song = urllib2.quote(song.encode("utf8"))

    for i in range(pages):
        url = 'http://en.dilandau.eu/download_music/%s-%d.html' % (song.replace('-','').replace(' ','-').replace('--','-').lower(),i+1)
        log.debug("[Dilandau] Parsing %s... " % url)
        links.extend(GetLinks(url, returnMetaUrlObj=True))
    log.debug("[Dilandau] found %d links" % len(links))

    for metaUrl in links:
        metaUrl.source = "Dilandau"

    return links

def parse_Mp3skull(song, pages=1):
    "Function connects to mp3skull.com and returns the .mp3 links in it"
    links = []
    song = urllib2.quote(song.encode("utf8"))

    for i in range(pages):
        # http://mp3skull.com/mp3/how_i_met_your_mother.html
        url = 'http://mp3skull.com/mp3/%s.html' % (song.replace('-','').replace(' ','_').replace('__','_').lower())
        log.debug("[Mp3skull] Parsing %s... " % url)
        links.extend(GetLinks(url, returnMetaUrlObj=True))
    log.debug("[Mp3skull] found %d links" % len(links))

    for metaUrl in links:
        metaUrl.source = "Mp3skull"

    return links

def parse_SeekASong(song):
    "Function connects to seekasong.com and returns the .mp3 links in it"
    song = urllib2.quote(song.encode("utf8"))

    url = 'http://www.seekasong.com/mp3/%s.html' % (song.replace('-','').replace(' ','_').replace('__','_').lower())
    log.debug("[SeekASong] Parsing %s... " % url)
    links = GetLinks(url, returnMetaUrlObj=True)
    for metaUrl in links:
        metaUrl.source = "SeekASong"
    log.debug("[SeekASong] found %d links" % len(links))

    return links

def parse_Youtube(song, amount=10):
    '''
    Function searches a song in youtube.com and returns the clips in it using Youtube API.
    @param song: The search string.
    @param amount: Amount of clips to obtain.

    @return links: List of links.
    '''
    "Function connects to youtube.com and returns the .mp3 links in it"
    song = urllib2.quote(song.encode("utf8"))
    url = r"http://gdata.youtube.com/feeds/api/videos?q=%s&max-results=%d&v=2" % (song.replace(' ', '+'), amount)
    urlObj = urllib2.urlopen(url, timeout=4)
    data = urlObj.read()
    videos = xml.dom.minidom.parseString(data).getElementsByTagName('feed')[0].getElementsByTagName('entry')

    links = []
    for video in videos:
        youtube_watchurl = video.getElementsByTagName('link')[0].attributes.item(0).value
        links.append(get_youtube_hightest_quality_link(youtube_watchurl))

    return links

def get_youtube_hightest_quality_link(youtube_watchurl, priority=config.youtube_quality_priority):
    '''
    Function returns the highest quality link for a specific youtube clip.
    @param youtube_watchurl: The Youtube Watch Url.
    @param priority: A list represents the qualities priority.

    @return MetaUrlObj: MetaUrl Object.
    '''
    video_id = parse_qs(urlparse(youtube_watchurl).query)['v'][0]
    youtube_embedded_watchurl = "http://www.youtube.com/embed/%s?autoplay=1" % video_id

    d = get_youtube_dl_links(video_id)
    for x in priority:
        if x in d.keys():
            return MetaUrl(d[x][0], 'youtube', d['VideoName'], x, youtube_embedded_watchurl)
    log.error("No Youtube link has been found in get_youtube_hightest_quality_link.")
    return ""

@retry(Exception, logger=log)
def get_youtube_dl_links(video_id):
    '''
    Function gets the download links for a youtube clip.
    This function parses the get_video_info format of youtube.

    @param video_id: Youtube Video ID.
    @return d: A dictonary of qualities as keys and urls as values.
    '''
    d = {}

    url = r"http://www.youtube.com/get_video_info?video_id=%s&el=vevo" % video_id

    urlObj = urllib2.urlopen(url, timeout=12)
    data = urlObj.read()
    data = urllib2.unquote(urllib2.unquote(urllib2.unquote(data)))
    data = data.replace(',url', '\nurl')
    data = data.split('\n')

    for line in data:
        if 'timedtext' in line or 'status=fail' in line or '<AdBreaks>' in line:
            continue

        try:
            url = line.split('&quality=')[0].split('url=')[1]
            quality = line.split('&quality=')[1].split('&')[0]
        except:
            continue
        if quality in d:
            d[quality].append(url)
        else:
            d[quality] = [url]

    try:
        videoName = "|".join(data).split('&title=')[1].split('&')[0]
    except Exception, e:
        log.error("Could not parse VideoName out of get_video_info (%s)" % str(e))
        videoName = ""

    videoName = unicode(videoName, 'utf-8')
    d['VideoName'] = videoName.replace('+',' ').replace('--','-')
    return d


class NextList(object):
    "A list with a 'next' method."
    def __init__(self, l):
        self.l = l
        self.next_index = 0

    def next(self):
        if self.next_index < len(self.l):
            value = self.l[self.next_index]
            self.next_index += 1
            return value
        else:
            return None

    def isEOF(self):
        " Checks if the list has reached the end "
        return (self.next_index >= len(self.l))

class MetaUrl(object):
    "a url strecture data with many metadata"
    def __init__(self, url, source="", videoName="", quality="", youtube_watchurl=""):
        self.url = str(url)
        self.source = source
        self.videoName = videoName # Youtube Links Only
        self.quality = quality # Youtube Links Onlys
        self.youtube_watchurl = youtube_watchurl # Youtube Links Onlys

    def __repr__(self):
        return "<MetaUrl '%s' | %s>" % (self.url, self.source)


def search(song, n, processes=config.search_processes):
    '''
    Function searches song and returns n valid .mp3 links.
    @param song: Search string.
    @param n: Number of songs.
    @param processes: Number of processes to launch in the subprocessing pool.
    '''
    linksFromSources = []
    pool = multiprocessing.Pool(processes)

    args = [(song, source) for source in config.search_sources]
    imapObj = pool.imap_unordered(_parse_star, args)
    for i in range(len(args)):
        linksFromSources.append(NextList(imapObj.next(15)))
    pool.terminate()

    links = []
    next_source = 0
    while len(links) < n and not all(map(lambda x: x.isEOF(), linksFromSources)):
        nextItem = linksFromSources[next_source].next()
        if nextItem:
            log.debug("added song %.80s from source ID %d (%s)" % (nextItem.url.split('/')[-1], next_source, nextItem.source))
            links.append(nextItem)

        if len(linksFromSources) == next_source+1:
            next_source = 0
        else:
            next_source += 1

    return links

def _parse_star(args):
    return parse(*args)
4

1 に答える 1

6

私のマシンであなたの問題を再現することはできません。あなたのprocesses変数には何がありますか?それintですか?

Python 2.7.3 (default, Apr 10 2012, 23:31:26) [MSC v.1500 32 bit (Intel)] on win32
Type "help", "copyright", "credits" or "license" for more information.
>>> import multiprocessing.dummy as multiprocessing
>>> pool = multiprocessing.Pool(5)
>>> pool
<multiprocessing.pool.ThreadPool object at 0x00C7DF90>
>>>

- - 編集 - -

また、標準ライブラリを台無しにしたかどうかを再確認することもできます。別のフォルダにpython2.7.3をクリーンインストールしてみてください。

----編集2----

次のようにすばやくパッチを適用できます。

import multiprocessing.dummy
import weakref
import threading

class Worker(threading.Thread):
    def __init__(self):
        threading.Thread.__init__(self)

    def run(self):
        poll = multiprocessing.dummy.Pool(5)
        print str(poll)

w = Worker()
w._children = weakref.WeakKeyDictionary()
w.start()
于 2012-05-21T16:29:27.173 に答える