9

マルチプロセッシングに頭を悩ませようと多くの時間を費やした後、ベンチマーク テストである次のコードを思いつきました。

例 1:

from multiprocessing  import Process

class Alter(Process):
    def __init__(self, word):
        Process.__init__(self)
        self.word = word
        self.word2 = ''

    def run(self):
        # Alter string + test processing speed
        for i in range(80000):
            self.word2 = self.word2 + self.word

if __name__=='__main__':
    # Send a string to be altered
    thread1 = Alter('foo')
    thread2 = Alter('bar')
    thread1.start()
    thread2.start()

    # wait for both to finish

    thread1.join()
    thread2.join()

    print(thread1.word2)
    print(thread2.word2)

これは 2 秒で完了します (マルチスレッドの半分の時間)。好奇心から、次にこれを実行することにしました。

例 2:

word2 = 'foo'
word3 = 'bar'

word = 'foo'
for i in range(80000):
    word2 = word2 + word

word  = 'bar'
for i in range(80000):
    word3 = word3 + word

print(word2)
print(word3)

恐ろしいことに、これは 0.5 秒もかからずに実行されました。

ここで何が起こっているのですか?例 1 が例 2 の 2 つのプロセスに分割されていることを考えると、例 2 の半分の時間で完了するはずではありませんか?

アップデート:

Chris のフィードバックを検討した後、最も多くの処理時間を消費する「実際の」コードを含め、マルチプロセッシングを検討するように導きました。

self.ListVar = [[13379+ strings],[13379+ strings],
                [13379+ strings],[13379+ strings]]

for b in range(len(self.ListVar)):
    self.list1 = []
    self.temp = []
    for n in range(len(self.ListVar[b])):
        if not self.ListVar[b][n] in self.temp:
            self.list1.insert(n, self.ListVar[b][n] + '(' + 
                              str(self.ListVar[b].count(self.ListVar[b][n])) +
                              ')')
           self.temp.insert(0, self.ListVar[b][n])

   self.ListVar[b] = list(self.list1)
4

4 に答える 4

13

マルチプロセッシングは、あなたがしていることには役立つかもしれませんが、あなたがそれを使用しようと考えている方法ではありません. 基本的にリストのすべてのメンバーに対して何らかの計算を行っているため、multiprocessing.Pool.mapメソッドを使用して、リストのメンバーに対して並列に計算を行うことができます。

以下は、単一のプロセスと を使用したコードのパフォーマンスを示す例ですmultiprocessing.Pool.map

from multiprocessing import Pool
from random import choice
from string import printable
from time import time

def build_test_list():
    # Builds a test list consisting of 5 sublists of 10000 strings each.
    # each string is 20 characters long
    testlist = [[], [], [], [], []]
    for sublist in testlist:
        for _ in xrange(10000):
            sublist.append(''.join(choice(printable) for _ in xrange(20)))
    return testlist

def process_list(l):
    # the time-consuming code
    result = []
    tmp = []
    for n in range(len(l)):
        if l[n] not in tmp:
            result.insert(n, l[n]+' ('+str(l.count(l[n]))+')')
            tmp.insert(0, l[n])
    return result

def single(l):
    # process the test list elements using a single process
    results = []
    for sublist in l:
        results.append(process_list(sublist))
    return results

def multi(l):
    # process the test list elements in parallel
    pool = Pool()
    results = pool.map(process_list, l)
    return results

print "Building the test list..."
testlist = build_test_list()

print "Processing the test list using a single process..."
starttime = time()
singleresults = single(testlist)
singletime = time() - starttime

print "Processing the test list using multiple processes..."
starttime = time()
multiresults = multi(testlist)
multitime = time() - starttime

# make sure they both return the same thing
assert singleresults == multiresults

print "Single process: {0:.2f}sec".format(singletime)
print "Multiple processes: {0:.2f}sec".format(multitime)

出力:

Building the test list...
Processing the test list using a single process...
Processing the test list using multiple processes...
Single process: 34.73sec
Multiple processes: 24.97sec
于 2012-01-08T09:04:00.320 に答える
13

この例は小さすぎてマルチプロセッシングの恩恵を受けられません。

新しいプロセスを開始すると、多くのオーバーヘッドが発生します。重い処理が含まれている場合、それは無視できます。しかし、あなたの例はそれほど集中的ではないので、オーバーヘッドに気付くはずです。

おそらく、実際のスレッドとの大きな違いに気付くでしょう。あまりにも悪い python (まあ、CPython) には、CPU バウンドのスレッドに関する問題があります。

于 2012-01-08T04:56:41.537 に答える
12

ETA: コードを投稿したので、あなたがしていることをもっと速く (>100 倍速く) 実行する簡単な方法があることをお伝えできます。

あなたがしていることは、文字列のリスト内の各項目に括弧内の頻度を追加していることがわかります。毎回すべての要素をカウントする代わりに (cProfile を使用して確認できるように、これはコードの最大のボトルネックです)、各要素からその頻度にマップするディクショナリを作成するだけです。そうすれば、リストを 2 回確認するだけで済みます。1 回目は頻度辞書を作成し、1 回目はそれを使用して頻度を追加します。

ここでは、新しい方法を示して時間を計測し、生成されたテスト ケースを使用して古い方法と比較します。テスト ケースは、新しい結果が古い結果とまったく同じであることを示しています。注:以下で特に注意する必要があるのは、new_method だけです。

import random
import time
import collections
import cProfile

LIST_LEN = 14000

def timefunc(f):
    t = time.time()
    f()
    return time.time() - t


def random_string(length=3):
    """Return a random string of given length"""
    return "".join([chr(random.randint(65, 90)) for i in range(length)])


class Profiler:
    def __init__(self):
        self.original = [[random_string() for i in range(LIST_LEN)]
                            for j in range(4)]

    def old_method(self):
        self.ListVar = self.original[:]
        for b in range(len(self.ListVar)):
            self.list1 = []
            self.temp = []
            for n in range(len(self.ListVar[b])):
                if not self.ListVar[b][n] in self.temp:
                    self.list1.insert(n, self.ListVar[b][n] + '(' +    str(self.ListVar[b].count(self.ListVar[b][n])) + ')')
                    self.temp.insert(0, self.ListVar[b][n])

            self.ListVar[b] = list(self.list1)
        return self.ListVar

    def new_method(self):
        self.ListVar = self.original[:]
        for i, inner_lst in enumerate(self.ListVar):
            freq_dict = collections.defaultdict(int)
            # create frequency dictionary
            for e in inner_lst:
                freq_dict[e] += 1
            temp = set()
            ret = []
            for e in inner_lst:
                if e not in temp:
                    ret.append(e + '(' + str(freq_dict[e]) + ')')
                    temp.add(e)
            self.ListVar[i] = ret
        return self.ListVar

    def time_and_confirm(self):
        """
        Time the old and new methods, and confirm they return the same value
        """
        time_a = time.time()
        l1 = self.old_method()
        time_b = time.time()
        l2 = self.new_method()
        time_c = time.time()

        # confirm that the two are the same
        assert l1 == l2, "The old and new methods don't return the same value"

        return time_b - time_a, time_c - time_b

p = Profiler()
print p.time_and_confirm()

これを実行すると、(15.963812112808228, 0.05961179733276367) の時間が得られます。これは、約 250 倍高速であることを意味しますが、この利点は、リストの長さと各リスト内の頻度分布の両方に依存します。この速度の利点により、おそらくマルチプロセッシングを使用する必要がないことに同意していただけると思います:)

(私の元の答えは、後世のために以下に残されています)

ETA: ところで、このアルゴリズムはリストの長さがほぼ線形であるのに対し、使用したコードは 2 次であることに注意してください。これは、要素数が多いほど、より有利に機能することを意味します。たとえば、各リストの長さを 1000000 に増やした場合、実行にかかる時間はわずか 5 秒です。外挿に基づいて、古いコードは1日以上かかります:)


実行している操作によって異なります。例えば:

import time
NUM_RANGE = 100000000

from multiprocessing  import Process

def timefunc(f):
    t = time.time()
    f()
    return time.time() - t

def multi():
    class MultiProcess(Process):
        def __init__(self):
            Process.__init__(self)

        def run(self):
            # Alter string + test processing speed
            for i in xrange(NUM_RANGE):
                a = 20 * 20

    thread1 = MultiProcess()
    thread2 = MultiProcess()
    thread1.start()
    thread2.start()
    thread1.join()
    thread2.join()

def single():
    for i in xrange(NUM_RANGE):
        a = 20 * 20

    for i in xrange(NUM_RANGE):
        a = 20 * 20

print timefunc(multi) / timefunc(single)

私のマシンでは、マルチプロセス操作はシングルスレッド操作の約 60% の時間しかかかりません。

于 2012-01-08T05:34:04.463 に答える