5

大量のデータを含むファイルがあります。各行はレコードです。そして、ファイル全体に対していくつかの ETL 作業を実行しようとしています。現在、標準入力を使用してデータを 1 行ずつ読み取っています。これの優れた点は、スクリプトが非常に柔軟で、他のスクリプトやシェル コマンドと統合できることです。結果を標準出力に書き出します。例えば。

$ cat input_file
line1 
line2
line3
line4
...

私の現在のpythonコードは次のようになります - parse.py

import sys
for line in sys.stdin:
    result = ETL(line)    # ETL is some self defined function which takes a while to execute.
    print result

以下のコードは、現在どのように機能しているかです。

cat input_file | python parse.py > output_file

Python の Threading モジュールを見てきましたが、そのモジュールを使用するとパフォーマンスが劇的に向上するかどうか疑問に思っています。

質問 1:各スレッドのクォータをどのように計画すればよいですか? なぜですか?

...
counter = 0
buffer = []
for line in sys.stdin:
    buffer.append(line)
    if counter % 5 == 0:   # maybe assign 5 rows to each thread? if not, is there a rule of thumb to determine
        counter = 0
        thread = parser(buffer)
        buffer = []
        thread.start() 

質問 2:複数のスレッドが同時に結果を stdout に出力する場合があります。それらを整理して以下の状況を回避するにはどうすればよいですか?

import threading
import time

class parser(threading.Thread):
    def __init__ (self, data_input):
        threading.Thread.__init__(self)
        self.data_input = data_input

    def run(self):
        for elem in self.data_input:
            time.sleep(3)
            print elem + 'Finished'

work = ['a', 'b', 'c', 'd', 'e', 'f']

thread1 = parser(['a', 'b'])  
thread2 = parser(['c', 'd'])
thread3 = parser(['e', 'f'])

thread1.start()
thread2.start()
thread3.start()   

出力は非常に見苦しく、1 つの行に 2 つのスレッドからの出力が含まれています。

aFinished
cFinishedeFinished

bFinished
fFinished
dFinished
4

2 に答える 2

5

最初に 2 番目の質問に答えると、これがミューテックスの目的です。ロックを使用してパーサー間で調整し、特定の期間中に 1 つのスレッドのみが出力ストリームにアクセスできるようにすることで、必要なクリーンな出力を取得できます。

class parser(threading.Thread):
    output_lock = threading.Lock()

    def __init__ (self, data_input):
        threading.Thread.__init__(self)
        self.data_input = data_input

    def run(self):
        for elem in self.data_input:
            time.sleep(3)
            with self.output_lock:
                print elem + 'Finished'

最初の質問に関しては、マルチスレッドが特定のワークロードにメリットをもたらさない可能性が高いことに注意してください。各入力行 (関数) で行う作業ETLが主に CPU バウンドか IO バウンドかによって大きく異なります。前者の場合 (可能性が高いと思われます)、グローバル インタープリター ロックにより、スレッドは役に立ちません。その場合、multiprocessingモジュールを使用して、複数のスレッドではなく複数のプロセス間で作業を分散する必要があります。

しかし、より簡単に実装できるワークフローで同じ結果を得ることができnますsplit。サブファイルごとに個別に抽出と変換のスクリプトを呼び出します。次に、結果の出力ファイルを連結します。

1 つの問題点: 「ファイル全体をメモリにロードしないため、標準入力を使用して 1 行ずつデータを読み取る」には、誤解が含まれます。Python 内からファイルを 1 行ずつ読み取ることができます。たとえば、sys.stdin次のような構文でファイル オブジェクトに置き換えます。

for line in sys.stdin:

ファイル オブジェクトのメソッドも参照してください。パラメータとして読み取る最大バイト数を指定できることにreadline()注意してください。read()

于 2013-08-21T06:03:24.610 に答える
1

Whether threading will be helpful you is highly dependent on on your situation. In particular, if your ETL() function involves a lot of disk access, then threading would likely give you pretty significant speed improvement.

In response to your first question, I've always found that it just depends. There are a lot of factors at play when determining the ideal number of threads, and many of them are program-dependent. If you're doing a lot of disk access (which is pretty slow), for example, then you'll want more threads to take advantage of the downtime while waiting for disk access. If the program is CPU-bound, though, tons of threads may not be super helpful. So, while it may be possible to analyze all the factors to come up with an ideal number of threads, it's usually a lot faster to make an initial guess and then adjust from there.

More specifically, though, assigning a certain number of lines to each thread probably isn't the best way to go about divvying up the work. Consider, for example, if one line takes a particularly long time to process. It would be best if one thread could work away at that one line and the other threads could each do a few more lines in the meantime. The best way to handle this is to use a Queue. If you push each line into a Queue, then each thread can pull a line off the Queue, handle it, and repeat until the Queue is empty. This way, the work gets distributed such that no thread is ever without work to do (until the end, of course).

Now, the second question. You're definitely right that writing to stdout from multiple threads at once isn't an ideal solution. Ideally, you would arrange things so that the writing to stdout happens in only one place. One great way to do that is to use a Queue. If you have each thread write its output to a shared Queue, then you can spawn an additional thread whose sole task is to pull items out of that Queue and print them to stdout. By restricting the printing to just one threading, you'll avoid the issues inherent in multiple threads trying to print at once.

于 2013-08-21T06:07:55.130 に答える