マルチプロセッシング モデルを使用してクローラーを開発しています。
multiprocessing.Queue を使用して、クロールする必要がある URL 情報、解析する必要があるページ コンテンツなどを保存します; multiprocessing.Event を使用してサブプロセスを制御します; multiprocessing.Manager.dict を使用して、クロールされた URL のハッシュを保存します;各マルチプロセッシング。 Manager.dict インスタンスは multiprocessing.Lock を使用してアクセスを制御します。
3 つの型パラメーターはすべて、すべてのサブプロセスと親プロセスの間で共有され、すべてのパラメーターはクラスに編成されます。クラスのインスタンスを使用して、共有パラメーターを親プロセスからサブプロセスに転送します。と同じように:
MGR = SyncManager()
class Global_Params():
Queue_URL = multiprocessing.Queue()
URL_RESULY = MGR.dict()
URL_RESULY_Mutex = multiprocessing.Lock()
STOP_EVENT = multiprocessing.Event()
global_params = Global_Params()
私自身のタイムアウト メカニズムでは、process.terminate を使用して、長時間停止できないプロセスを停止します。
私のテスト ケースでは、2500 以上のターゲット サイトがあります (一部はサービスされておらず、一部は巨大です)。ターゲット サイト ファイル内のサイトごとにクロールします。
最初はクローラーはうまく機能していましたが、しばらくすると (8 時間、2 時間、15 時間)、クローラーは 100 以上 (不確定) のサイトをクロールし、エラー情報が表示されます。 :"Errno 32 壊れたパイプ"
問題を特定して解決するために、次の方法を試しました。
クローラーが壊れたサイトAを見つけ、クローラーを使用してサイトを個別にクロールすると、クローラーはうまく機能しました。サイトAを含むすべてのターゲットサイトファイルからフラグメント(20サイトなど)を取得しても、クローラーはうまく機能しました!
「-X /tmp/pymp-* 240 /tmp」を /etc/cron.daily/tmpwatch に追加します
Brokenが発生したとき、ファイル /tmp/pymp-* はまだそこにあります
multiprocessing.managers.SyncManager を使用して multiprocessing.Manager を置き換え、SIGKILL と SIGTERM を除くほとんどのシグナルを無視します
ターゲット サイトごとに、ほとんどの共有パラメーター (キュー、辞書、およびイベント) をクリアします。エラーが発生した場合は、新しいインスタンスを作成します。
while global_params.Queue_url.qsize()>0:
try:
global_params.Queue_url.get(block=False)
except Exception,e:
print_info(str(e))
print_info("Clear Queue_url error!")
time.sleep(1)
global_params.Queue_url = Queue()
pass
以下はトレースバック情報です。print_info 関数は、デバッグ情報を自分で出力および保存するように定義されて
[Errno 32] Broken pipe
Traceback (most recent call last):
File "Spider.py", line 613, in <module>
main(args)
File "Spider.py", line 565, in main
spider.start()
File "Spider.py", line 367, in start
print_info("STATIC_RESULT size:%d" % len(global_params.STATIC_RESULT))
File "<string>", line 2, in __len__
File "/usr/local/python2.7.3/lib/python2.7/multiprocessing/managers.py", line 769, in _callmethod
kind, result = conn.recv()
EOFError
います。