7

処理する大きなXMLデータファイル(> 160M)があり、SAX / expat/pulldom解析がその方法のようです。ノードをふるいにかけ、処理するノードをキューにプッシュするスレッドが必要です。次に、他のワーカースレッドが、次に使用可能なノードをキューからプルして処理します。

私は次のものを持っています(ロックが必要です、私は知っています-後でそうなります)

import sys, time
import xml.parsers.expat
import threading

q = []

def start_handler(name, attrs):
    q.append(name)

def do_expat():
    p = xml.parsers.expat.ParserCreate()
    p.StartElementHandler = start_handler
    p.buffer_text = True
    print("opening {0}".format(sys.argv[1]))
    with open(sys.argv[1]) as f:
        print("file is open")
        p.ParseFile(f)
        print("parsing complete")


t = threading.Thread(group=None, target=do_expat)
t.start()

while True:
    print(q)
    time.sleep(1)

問題は、whileブロックの本体が1回だけ呼び出され、ctrl-Cでさえ中断できないことです。小さいファイルでは、出力は期待どおりですが、これは、ドキュメントが完全に解析されたときにのみハンドラーが呼び出されることを示しているようです。これは、SAXパーサーの目的を損なうようです。

それは私自身の無知だと確信していますが、どこで間違いを犯しているのかわかりません。

PS:私もstart_handlerこのように変更しようとしました:

def start_handler(name, attrs):
    def app():
        q.append(name)
    u = threading.Thread(group=None, target=app)
    u.start()

しかし、愛はありません。

4

4 に答える 4

8

この問題についてはよくわかりません。ParseFileの呼び出しがブロックされており、GILのために解析スレッドのみが実行されていると思います。これを回避する方法は、multiprocessing代わりに使用することです。とにかく、キューで動作するように設計されています。

あなたはを作り、Processあなたはそれを渡すことができますQueue

import sys, time
import xml.parsers.expat
import multiprocessing
import Queue

def do_expat(q):
    p = xml.parsers.expat.ParserCreate()

    def start_handler(name, attrs):
        q.put(name)

    p.StartElementHandler = start_handler
    p.buffer_text = True
    print("opening {0}".format(sys.argv[1]))
    with open(sys.argv[1]) as f:
        print("file is open")
        p.ParseFile(f)
        print("parsing complete")

if __name__ == '__main__':
    q = multiprocessing.Queue()
    process = multiprocessing.Process(target=do_expat, args=(q,))
    process.start()

    elements = []
    while True:
        while True:
            try:
                elements.append(q.get_nowait())
            except Queue.Empty:
                break

        print elements
        time.sleep(1)

元のスクリプトを複製するために、要素リストを含めました。あなたの最終的な解決策はおそらくget_nowaitPoolまたは類似のものを使用するでしょう。

于 2010-01-19T00:35:06.743 に答える
8

ParseFile、お気づきのように、すべてを「一気飲み」するだけです。実行したい増分解析には適していません。したがって、ファイルを一度に少しずつパーサーにフィードし、他のスレッドに条件付きで制御を譲ることを確認してください。例:

while True:
  data = f.read(BUFSIZE)
  if not data:
    p.Parse('', True)
    break
  p.Parse(data, False)
  time.sleep(0.0)

このtime.sleep(0.0)呼び出しは、「準備ができて待機している他のスレッドに譲る」というPythonの方法です。Parseメソッドはここに文書化されています。

2つ目のポイントは、この使用法のロックを忘れることです。--代わりにQueue.Queueを使用します。これは本質的にスレッドセーフであり、Pythonで複数のスレッドを調整するための最良かつ最も簡単な方法です。Queueインスタンスqを作成し、そのq.put(name)上でスレッドをブロックしてq.get()、さらに作業を行うのを待っています。これはとても簡単です。

(作業がなくなったときにワーカースレッドの終了を調整するために使用できる補助的な戦略がいくつかありますが、特別な要件がない場合は、デーモンスレッドにするだけで、メインのときにすべて終了します。スレッドは行います-ドキュメントを参照してください)。

于 2010-01-19T00:51:53.843 に答える
1

私が間違っているのはq、異なるスレッドから同時にアクセスしているということだけです。実際に書いているようにロックはありません。それは問題を引き起こしている-そしてあなたはおそらくPythonインタプリタがあなたに固執するという形で問題を抱えているだろう。:)

ロックしてみてください。それほど難しくはありません。

import sys, time
import xml.parsers.expat
import threading

q = []
q_lock = threading.Lock() <---

def start_handler(name, attrs):
    q_lock.acquire() <---
    q.append(name)
    q_lock.release() <---

def do_expat():
    p = xml.parsers.expat.ParserCreate()
    p.StartElementHandler = start_handler
    p.buffer_text = True
    print("opening {0}".format(sys.argv[1]))
    with open(sys.argv[1]) as f:
        print("file is open")
        p.ParseFile(f)
        print("parsing complete")


t = threading.Thread(group=None, target=do_expat)
t.start()

while True:
    q_lock.acquire() <---
    print(q)
    q_lock.release() <---
    time.sleep(1)

ほら、それは本当に簡単でした。オブジェクトを保護するためにロック変数を作成し、オブジェクトを使用する前に毎回そのロックを取得し、オブジェクトのタスクを終了した後に毎回解放しました。q.append(name)このようにして、がと重複しないことを保証しましprint(q)た。


(Pythonの新しいバージョンでは、「with ....」構文もあり、ロックを解除したり、ファイルを閉じたり、他のクリーンアップを頻繁に忘れたりしないようにするのに役立ちます。)

于 2010-01-19T00:31:35.543 に答える
0

実装についてはよくわかりませんが、解析が完了するまで実行されるCコードの場合、他のPythonスレッドは実行されません。パーサーがPythonコードを呼び出している場合、他のスレッドを実行するためにGILが解放される可能性がありますが、それについてはよくわかりません。これらの詳細を確認することをお勧めします。

于 2010-01-19T00:37:38.203 に答える