1

重複の可能性:
複数の引数のPythonマルチプロセッシングpool.map

マルチプロセッシングのサブプロセスに2つの引数をフィードしたいのですが。プール?上り坂を押しているような気がします。2つの引数、タプル、または...まったく可能ですか?2つのファイル名(inとout)を正しく渡すことで、一部のユーザーにとっては正常に機能しているように見えますが、その後、可変ポイントで予期せずバーフが発生します。残念ながら、出力ファイルがすべて空であるため、実際には機能していません。これは、直接呼び出したり、単一処理で呼び出したりした場合には発生しません。別の複雑な要因があります。呼び出されたルーチンは、インポートされた別のモジュールにあります。これを「foo」スタブモジュールとしてローカライズすると、問題は修正されますが、実際の作業を実行しようとせず、引数を出力するだけです。

これは、キューの使い方を学ぶことに頑固に抵抗するのにおそらく長い道のりですが、私が進んでいる道をどこにも押し進めないことを確認したいと思います。

fixtures/txt_data/AAD.txt obj/txt_data/AAD.txt
fixtures/txt_data/ANZSMW.txt obj/txt_data/ANZSMW.txt
fixtures/txt_data/BENPA.txt obj/txt_data/BENPA.txt
fixtures/txt_data/CBAIZQ.txt obj/txt_data/CBAIZQ.txt
Traceback (most recent call last):
  File "./jobflow.py", line 60, in <module>
    main()
  File "./jobflow.py", line 57, in main
    args.func(args)
  File "./jobflow.py", line 40, in market
    pool.map(foo, market_files())
  File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/pool.py", line 225, in map
  return self.map_async(func, iterable, chunksize).get()
File     "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/pool.py", line 522, in get
raise self._value
TypeError: function takes exactly 1 argument (2 given)
fixtures/txt_data/CSDO.txt obj/txt_data/CSDO.txt
fixtures/txt_data/EMB.txt obj/txt_data/EMB.txt
fixtures/txt_data/GGG.txt obj/txt_data/GGG.txt
fixtures/txt_data/IDL.txt obj/txt_data/IDL.txt

これはエラーサンプルです。ファイルが使い果たされる前に停止します。または、2つの引数が必要であるとうめき声を上げますが、2つの引数を渡すように変更すると、1つしか得られません。

def foo(c):
    a, b, = c
    print a, b
    market2.file_main((a, b))  # does comment/uncommenting this break it only because it's in another python file?

def market(args):
    """
    read raw ticker data files and output nice, clean, more valid ticker data files
    """
    pool = multiprocessing.Pool()

    class market_files(object):
        for infile in args.infiles:
            outfile = os.path.join(args.outdir, os.path.basename(infile))
            yield (infile, outfile)

    pool.map(foo, market_files())
4

1 に答える 1

4

ああ、待ってください、それは機能しますが、複数の引数を直接渡すのではなく、それらをタプルに入れます。

私は、ばかげた数のプロセスを生成するすべての反復で新しいProcessandを生成することによってそれを実装しましp.start()た ;-) しかし、複数の引数を飲み込みました。

そこから戻って、イテラブルをリストに単純化しました(イテラブルでうまくいったので、おそらく問題ありません)が、主なことは引数をタプルとして渡すことだったと思います。カッティングルームの床が散らかっていて、うまくいった解決策が見つからないケースの 1 つだったに違いありません。

だから私が持っているコントローラで:

    # Create a list of filenames.
    arglist = []
    for infile in args.infiles:
        outfile = os.path.join(args.outdir, os.path.basename(infile))
        arglist.append((infile, outfile))

    # Pass each process one filename to work on.
    pool = multiprocessing.Pool()
    p = pool.map(func=market2.process, iterable=arglist)

そしてモジュールで:

    def process(x):
        # Open an input file, and output file, and do work.
        infile, outfile = x
        instream = open(infile, 'rB')
        outstream = open(outfile, 'wB')
        main(instream, outstream)
        instream.close()
        outstream.close()

4 コアのパフォーマンス (分):

  • シングルスレッド = 3:54
  • using subprocess = 4:52 (デフォルトでブロックされていると思うので、それがわかるでしょう)
  • squillionsProcessの同時使用 = 2:41 (プロセスごとに 1 ~ 4% の CPU ですべてのコアを飽和させた)
  • プールを使用 = 2:13
于 2012-10-22T09:48:57.743 に答える