0

ファイル ID のリストを読み取り、それぞれをワーカー プロセスに送信して分析し、入力ファイルごとに 1 つの出力をディスクに書き込む、Sun Grid Engine スーパーコンピューティング クラスターで Python スクリプトを実行しています。

問題は、ワーカー関数内のどこかで IOError(110, 'Connection timed out') が発生していることです。その理由はわかりません。以前、大幅に遅延したネットワーク リクエストを行ったときにこのエラーを受け取ったことがありますが、この場合、ワーカーはディスクからデータを読み取ろうとしているだけです。

私の質問は次のとおりです。ディスクからの読み取り時に接続タイムアウトエラーが発生する原因と、このエラーを解決するにはどうすればよいですか? 他の人が提供できるヘルプは非常に高く評価されます。

完全なスクリプト (IOError が に現れますminhash_text()):

from datasketch import MinHash
from multiprocessing import Pool
from collections import defaultdict
from nltk import ngrams
import json
import sys
import codecs
import config

cores = 24
window_len = 12
step = 4
worker_files = 50
permutations = 256
hashband_len = 4

def minhash_text(args):
  '''Return a list of hashband strings for an input doc'''
  try:
    file_id, path = args
    with codecs.open(path, 'r', 'utf8') as f:
      f = f.read()
    all_hashbands = []
    for window_idx, window in enumerate(ngrams(f.split(), window_len)):
      window_hashbands = []
      if window_idx % step != 0:
        continue
      minhash = MinHash(num_perm=permutations, seed=1)
      for ngram in set(ngrams(' '.join(window), 3)):
        minhash.update( ''.join(ngram).encode('utf8') )
      hashband_vals = []
      for i in minhash.hashvalues:
        hashband_vals.append(i)
        if len(hashband_vals) == hashband_len:
          window_hashbands.append( '.'.join([str(j) for j in hashband_vals]) )
          hashband_vals = []
      all_hashbands.append(window_hashbands)
    return {'file_id': file_id, 'hashbands': all_hashbands}
  except Exception as exc:
    print(' ! error occurred while processing', file_id, exc)
    return {'file_id': file_id, 'hashbands': []}

if __name__ == '__main__':

  file_ids = json.load(open('file_ids.json'))
  file_id_path_tuples = [(file_id, path) for file_id, path in file_ids.items()]

  worker_id = int(sys.argv[1])
  worker_ids = list(ngrams(file_id_path_tuples, worker_files))[worker_id]

  hashband_to_ids = defaultdict(list)
  pool = Pool(cores)

  for idx, result in enumerate(pool.imap(minhash_text, worker_ids)):
    print(' * processed', idx, 'results')
    file_id = result['file_id']
    hashbands = result['hashbands']
    for window_idx, window_hashbands in enumerate(hashbands):
      for hashband in window_hashbands:
        hashband_to_ids[hashband].append(file_id + '.' + str(window_idx))

  with open(config.out_dir + 'minhashes-' + str(worker_id) + '.json', 'w') as out:
    json.dump(dict(hashband_to_ids), out)
4

1 に答える 1