2

Pythonのマルチスレッド技術を使用してプロジェクトオイラーの問題8を解決しようとしています。

1000桁の数字で5桁連続の最大の積を見つけます。番号はここにあります。

私のアプローチは、元のリストの5つのチャンクから製品を生成し、このプロセスを5回繰り返し、それぞれ開始インデックスを1つ右にシフトすることです。

これが私のスレッドクラスです

class pThread(threading.Thread):
    def __init__(self, l):
        threading.Thread.__init__(self)
        self.l = l
        self.p = 0

    def run(self):

        def greatest_product(l):
        """
        Divide the list into chunks of 5 and find the greatest product
        """
            def product(seq):
                return reduce(lambda x,y : x*y, seq)

            def chunk_product(l, n=5):
                for i in range(0, len(l), n):
                    yield product(l[i:i+n])

            result = 0
            for p in chunk_product(num):
                result = result > p and result or p 

            return result

        self.p = greatest_product(self.l)

元のリストの5桁のチャンクすべてをカバーするために5つのスレッドを作成しようとすると、以下の手動のアプローチで正しい答えが得られnumます。これは、テキストから解析した1桁の数字のリストです。

thread1 = pThread(num)
del num[0]
thread2 = pThread(num)
del num[0]
thread3 = pThread(num)
del num[0]
thread4 = pThread(num)
del num[0]
thread5 = pThread(num)

thread1.start()
thread2.start()
thread3.start()
thread4.start()
thread5.start()

thread1.join()
thread2.join()
thread3.join()
thread4.join()
thread5.join()

def max(*args):
    result = 0
    for i in args:
        result = i > result and i or result
    return result

print max(thread1.p, thread2.p, thread3.p, thread4.p, thread5.p)

しかし、これは正しい結果をもたらしません:

threads = []
for i in range(0, 4):
    tmp = num[:]
    del tmp[0:i+1]
    thread = pThread(tmp)
    thread.start()
    threads.append(thread)

for i in range(0, 4):
    threads[i].join()

私はここで何を間違えましたか?私はマルチスレッドに非常に慣れていないので、優しくしてください。

4

2 に答える 2

4

3 つの問題があります。

  1. 1 つ目は、「手動」のアプローチでは正しい答えが得られないことです。問題に対する正しい答えが、リストの先頭からオフセット 4 にあるだけです。これは、次を使用して確認できます。

    import operator as op
    print max(reduce(op.mul, num[i:i+5]) for i in range(1000))
    for k in range(5):
        print max(reduce(op.mul, num[i:i+5]) for i in range(k, 1000, 5))
    

    「手動」アプローチの問題の1つは、スレッドがnum変数を共有し、それぞれに同じリストがあることです。するとdel num[0]、すべてthreadX.lが影響を受けます。一貫して同じ答えが得られるのは、2 番目の問題によるものです。

  2. この線

    for p in chunk_product(num):
    

    次のようにする必要があります。

    for p in chunk_product(l):
    

    greatest_product(l)グローバル変数ではなく関数のパラメーターを使用するためですnum

  3. 2 番目の方法では、ループが範囲を超えているため、4 つのスレッドのみを生成します[0, 1, 2, 3]tmp[0:i]また、値ではなく値を削除しますtmp[0:i+1]。コードは次のとおりです。

    threads = []
    for i in range(5):
        tmp = num[:]
        del tmp[0:i]
        thread = pThread(tmp)
        thread.start()
        threads.append(thread)
    
    for i in range(5):
        threads[i].join()
    
    print len(threads), map(lambda th: th.p, threads)
    print max(map(lambda th: th.p, threads))
    
于 2013-02-15T06:46:29.793 に答える
1

私は主にマルチプロセッシングの練習と argparse の使い方を学ぶために、これに挑戦しました。

マシンに十分なRAMがない場合に備えて、これには約4〜5ギガのRAMが必要でした.

python euler.py -l 50000000 -n 100 -p 8

Took 5.836833333969116 minutes
The largest product of 100 consecutive numbers is: a very large number

コマンドラインで python euler.py -h と入力すると、次のようになります。

usage: euler.py [-h] -l L [L ...] -n N [-p P]

Calculates the product of consecutive numbers and return the largest product.

optional arguments:
  -h, --help    show this help message and exit
  -l L [L ...]  A single number or list of numbers, where each # is seperated
                by a space
  -n N          A number that specifies how many consecutive numbers should be
                multiplied together.
  -p P          Number of processes to create. Optional, defaults to the # of
                cores on the pc.        

そしてコード:

"""A multiprocess iplementation for calculation the maximum product of N consecutive
numbers in a given range (list of numbers)."""

import multiprocessing
import math
import time
import operator
from functools import reduce
import argparse

def euler8(alist,lenNums):
    """Returns the largest product of N consecutive numbers in a given range"""
    return max(reduce(operator.mul, alist[i:i+lenNums]) for i in range(len(alist)))

def split_list_multi(listOfNumbers,numLength,threads):
    """Split a list into N parts where N is the # of processes."""
    fullLength = len(listOfNumbers)
    single = math.floor(fullLength/threads)
    results = {}
    counter = 0
    while counter < threads:
        if counter == (threads-1):
            temp = listOfNumbers[single*counter::]
            if counter == 0:
                results[str(counter)] = listOfNumbers[single*counter::]
            else:
                prevListIndex = results[str(counter-1)][-int('{}'.format(numLength-1))::]
                newlist = prevListIndex + temp
                results[str(counter)] = newlist
        else:
            temp = listOfNumbers[single*counter:single*(counter+1)]
            if counter == 0:
                newlist = temp
            else:
                prevListIndex = results[str(counter-1)][-int('{}'.format(numLength-1))::]
                newlist = prevListIndex + temp
            results[str(counter)] = newlist
        counter += 1
    return results,threads

def worker(listNumbers,number,output):
    """A worker. Used to run seperate processes and put the results in the queue"""
    result = euler8(listNumbers,number)
    output.put(result)

def main(listOfNums,lengthNumbers,numCores=multiprocessing.cpu_count()):
    """Runs the module.
    listOfNums must be a list of ints, or single int
    lengthNumbers is N (an int) where N is the # of consecutive numbers to multiply together
    numCores (an int) defaults to however many the cpu has, can specify a number if you choose."""

    if isinstance(listOfNums,list):
        if len(listOfNums) == 1:
            valuesToSplit = [i for i in range(int(listOfNums[0]))]
        else:
            valuesToSplit = [int(i) for i in listOfNums]
    elif isinstance(listOfNums,int):
        valuesToSplit = [i for i in range(listOfNums)]
    else:
        print('First arg must be a number or a list of numbers')

    split = split_list_multi(valuesToSplit,lengthNumbers,numCores)
    done_queue = multiprocessing.Queue()
    jobs = []
    startTime = time.time()

    for num in range(split[1]):
        numChunks = split[0][str(num)]
        thread = multiprocessing.Process(target=worker, args=(numChunks,lengthNumbers,done_queue))
        jobs.append(thread)
        thread.start()

    resultlist = []
    for i in range(split[1]):
        resultlist.append(done_queue.get())

    for j in jobs:
        j.join()

    resultlist = max(resultlist)
    endTime = time.time()
    totalTime = (endTime-startTime)/60
    print("Took {} minutes".format(totalTime))

    return print("The largest product of {} consecutive numbers is: {}".format(lengthNumbers, resultlist))            

if __name__ == '__main__':
    #To call the module from the commandline with arguments
    parser = argparse.ArgumentParser(description="""Calculates the product of consecutive numbers \
    and return the largest product.""")
    parser.add_argument('-l', nargs='+', required=True,
                       help='A single number or list of numbers, where each # is seperated by a space')
    parser.add_argument('-n', required=True, type=int,
                        help = 'A number that specifies how many consecutive numbers should be \
                        multiplied together.')
    parser.add_argument('-p', default=multiprocessing.cpu_count(), type=int,
                       help='Number of processes to create. Optional, defaults to the # of cores on the pc.')
    args = parser.parse_args()
    main(args.l, args.n, args.p)
于 2013-02-15T06:12:49.397 に答える