私はしばらくの間これに取り組んできましたが、それを理解できないようです。私のコードとOS(LinuxではPython 2.7.3)がどのプロセスを実行すべきかについて一致しない状況に絞り込みました。これが発生すると、コードは永遠にハングしますが、例外はスローされません。コードが何時間も正しく実行される場合もあれば、数分しか実行されない場合もありますが、その理由はわかりません。これは以下のように表されます。ご覧いただきありがとうございます。
コード出力:
離散文字マトリックスの作成
running PoolWorker_82 (72 triplets), pid 25777, ppid 24892
running PoolWorker_83 (72 triplets), pid 25778, ppid 24892
running PoolWorker_84 (72 triplets), pid 25779, ppid 24892
running PoolWorker_85 (72 triplets), pid 25780, ppid 24892
running PoolWorker_86 (72 triplets), pid 25781, ppid 24892
running PoolWorker_87 (72 triplets), pid 25782, ppid 24892
running PoolWorker_88 (72 triplets), pid 25783, ppid 24892
running PoolWorker_89 (90 triplets), pid 25784, ppid 24892
ps aux からの出力...
1000 24892 2.0 0.9 559948 151088 pts/0 Sl+ 09:14 0:16 p runsimulation.py
1000 25776 0.0 0.8 559932 138320 pts/0 S+ 09:19 0:00 p runsimulation.py
1000 26015 0.0 0.8 559948 138140 pts/0 S+ 09:22 0:00 p runsimulation.py
1000 26021 0.0 0.8 559948 138140 pts/0 S+ 09:22 0:00 p runsimulation.py
1000 26023 0.0 0.8 559948 138140 pts/0 S+ 09:22 0:00 p runsimulation.py
1000 26025 0.0 0.8 559948 138140 pts/0 S+ 09:22 0:00 p runsimulation.py
1000 26027 0.0 0.8 559948 138140 pts/0 S+ 09:22 0:00 p runsimulation.py
1000 26029 0.0 0.8 559948 138140 pts/0 S+ 09:22 0:00 p runsimulation.py
1000 26031 0.0 0.8 559948 138140 pts/0 S+ 09:22 0:00 p runsimulation.py
1000 26036 0.0 0.8 559948 138140 pts/0 S+ 09:22 0:00 p runsimulation.py
親プロセス 24982 がそこにあることがわかりますが、ワーカーの pid はありません。通常、これらは一致します。作業中にワーカーの CPU 使用率が 100% になり、反復が完了するとすべてがなくなります。失敗すると、pid の不一致と 0.0% の CPU を使用しているプロセスが表示されます (列 3)。
私のコードの関連部分は以下のとおりです(呼び出された順序とは逆の順序で):
rpy2 で呼び出される関数を使用した R のセットアップ:
def create_R(dir):
"""
creates the r environment
@param dir: the directory for the output files
"""
r = robjects.r
importr("phangorn")
importr("picante")
importr("MASS")
importr("vegan")
r("options(expressions=500000)")
robjects.globalenv['outfile'] = os.path.abspath(os.path.join(dir, "trees.pdf"))
r('pdf(file=outfile, onefile=T)')
r("par(mfrow=c(2,3))")
r("""
generate_triplet = function(bits) {
triplet = replicate(bits, rTraitDisc(tree, model="ER", k=2,states=0:1))
triplet = t(apply(triplet, 1, as.numeric))
sums = rowSums(triplet)
if (length(which(sums==0)) > 0 && length(which(sums==3)) == 1) {
return(triplet)
}
return(generate_triplet(bits))
}
""")
r("""
get_valid_triplets = function(numsamples, needed, bits) {
tryCatch({
m = generate_triplet(bits)
while (ncol(m) < needed) {
m = cbind(m, generate_triplet(bits))
}
return(m)
}, error = function(e){print(message(e))}, warning = function(e){print(message(e))})
}
""")
ワーカー内で呼び出される関数:
def __get_valid_triplets(num_samples, num_triplets, bits, q):
r = robjects.r
name = current_process().name.replace("-", "_")
timer = stopwatch.Timer()
log("\trunning %s (%d triplets), pid %d, ppid %d" % (name, num_triplets, current_process().pid, os.getppid()),
log_file)
r('%s = get_valid_triplets(%d, %d, %d)' % (name, num_samples, num_triplets, bits))
q.put((name, r[name]))
timer.stop()
log("\t%s complete (%s)" % (name, str(timer)), log_file)
プールを設定し、apply_async を使用してワーカーをディスパッチする関数。ワーカーは、プールが参加した後のプロセスである管理されたキューに書き込みます。
def __generate_candidate_discrete_matrix(num_cols, num_samples, sample_tree, bits, usable_cols):
assert isinstance(sample_tree, dendropy.Tree)
print "Creating discrete character matrix"
r = robjects.r
newick = sample_tree.as_newick_string()
num_samples = len(sample_tree.leaf_nodes())
robjects.globalenv['numcols'] = usable_cols
robjects.globalenv['newick'] = newick + ";"
r("tree = read.tree(text=newick)")
r('m = matrix(nrow=length(tree$tip.label))') #create empty matrix
r('m = m[,-1]') #drop the first NA column
num_procs = mp.cpu_count()
args = []
div, mod = divmod(usable_cols, num_procs)
[args.append(div) for i in range(num_procs)]
args[-1] += mod
for i, elem in enumerate(args):
div, mod = divmod(elem, bits)
args[-1] += mod
args[i] -= mod
manager = Manager()
pool = Pool(processes=num_procs, maxtasksperchild=1)
q = manager.Queue(maxsize=num_procs)
for arg in args:
pool.apply_async(__get_valid_triplets, (num_samples, arg, bits, q))
pool.close()
pool.join()
while not q.empty():
name, data = q.get()
robjects.globalenv[name] = data
r('m = cbind(m, %s)' % name)
r('m = m[,1:%d]' % usable_cols)
r('m = m[order(rownames(m)),]') # consistently order the rows
r('m = t(apply(m, 1, as.numeric))') # convert all factors given by rTraitDisc to numeric
a = r['m']
n = r('rownames(m)')
return a, n
最後に、候補行列を生成するために呼び出される最初の関数は、それが有効なものであることを確認し、そうでない場合は新しい行列で再試行します。有効な場合、Rセッションにいくつかのものを保存し、データを返します
def create_discrete_matrix(num_cols, num_samples, sample_tree, bits):
"""
Creates a discrete char matrix from a tree
@param num_cols: number of columns to create
@param sample_tree: the tree
@return: a r object of the matrix, and a list of the row names
@rtype: tuple(robjects.Matrix, list)
"""
r = robjects.r
usable_cols = find_usable_length(num_cols, bits)
a, n = __generate_candidate_discrete_matrix(num_cols, num_samples, sample_tree, bits, usable_cols)
assert isinstance(a, robjects.Matrix)
assert a.ncol == usable_cols
paralin_matrix, valid = __create_paralin_matrix(a)
if valid is False:
sample_tree = create_tree(num_samples, type = "S")
return create_discrete_matrix(num_cols, num_samples, sample_tree, bits)
else:
robjects.globalenv['paralin_matrix'] = paralin_matrix
r('rownames(paralin_matrix) = rownames(m)')
r('paralin_dist = as.dist(paralin_matrix, diag=T, upper=T)')
r("paralinear_cluster = hclust(paralin_dist, method='average')")
return sample_tree, a, n