コンテキスト: ある期間中のいくつかのポイントで橋の状態 (損傷または損傷なし) をチェックすることにより、損傷した橋の長期的な修復をシミュレートしようとしています。repair_over_horizon
そうするために、 (とりわけ) 損傷した橋のリストを取得し、リストのリストを出力するという Python 関数を作成しました。各リストには、特定のタイムステップでまだ損傷している橋が含まれています。たとえば、入力リストが次の場合:
[100、423、667、904、221、56、495、70、109、65]
3 つのタイムステップで使用repair_over_horizon
した場合、出力は次のようになると予想されます。
[[100, 423, 667, 904, 221, 56, 495, 70, 109, 65], [100, 423, 904, 56, 70, 109, 65], [423, 904]].
この関数repair_over_horizon
は、実際には一度に複数の入力リストまたは「マップ」を受け取り、それらの修復を並行して計算します。以下のコードに示すように、並列化はタイムステップごとに実装されます。内部で呼び出される関数、repair_over_horizon
つまりcompute_repairs_at_timestep
andrepair
も参照用に含めました。
問題: 関数は出力に関しては期待どおりに機能しますが、各反復 (タイムステップ) で修復にかなりの時間がかかります。この時間の増加は、タイムステップあたりのマップ数が増えると明らかになります。たとえば、タイムステップあたり 50 個のマップと 10 タイムステップがある状況では、最初の反復に ~6 秒かかり、最後の反復に ~21 秒かかります。タイムステップごとに 500 のマップと 5 つのタイムステップがある場合、最初の反復には ~36 秒かかり、5 回目の反復には ~110 秒かかります。
トラブルシューティング: コードをプロファイリングし (タイムステップごとに 500 マップと以下の 5 タイムステップの結果を参照)、メモリ使用量を確認しましたが、結果からは実装に問題があることがわかりません。
質問: イテレーション間およびマップ数の増加に伴い、この大幅な速度低下の原因は何ですか? それを修正する方法についてのアイデアは大歓迎です。
更新: メモリの問題があるかどうかをテストするために、関数の末尾に向かうさまざまなポイントに gc.collect() を挿入しました。ただし、上記の問題セクションで説明したように、反復ごとに修復に時間がかかることがわかりました。
部分プロファイリング結果:
Ordered by: cumulative time
ncalls tottime percall cumtime percall filename:lineno(function)
2500 1.102 0.000 289.855 0.116 /Library/Python/2.7/site-packages/pp.py:380(submit)
2505 285.901 0.114 285.901 0.114 {cPickle.dumps}
2500 0.020 0.000 127.742 0.051 /Library/Python/2.7/site-packages/pp.py:96(__call__)
2500 0.040 0.000 127.719 0.051 /Library/Python/2.7/site-packages/pp.py:116(__unpickle)
2500 127.675 0.051 127.675 0.051 {cPickle.loads}
5 0.001 0.000 2.282 0.456 /Library/Python/2.7/site-packages/pp.py:280(__init__)
5 0.001 0.000 2.279 0.456 /Library/Python/2.7/site-packages/pp.py:491(set_ncpus)
20 0.001 0.000 2.278 0.114 /Library/Python/2.7/site-packages/pp.py:134(__init__)
20 0.006 0.000 2.276 0.114 /Library/Python/2.7/site-packages/pp.py:140(start)
20 0.001 0.000 1.938 0.097 /Library/Python/2.7/site-packages/pptransport.py:133(receive)
40 1.935 0.048 1.935 0.048 {method 'read' of 'file' objects}
2500 0.174 0.000 1.666 0.001 /Library/Python/2.7/site-packages/pp.py:656(__scheduler)
16084 1.364 0.000 1.364 0.000 {method 'acquire' of 'thread.lock' objects}
1261 0.034 0.000 0.910 0.001 /Library/Python/2.7/site-packages/ppcommon.py:38(start_thread)
1261 0.063 0.000 0.580 0.000 /System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/threading.py:726(start)
1261 0.057 0.000 0.370 0.000 /System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/threading.py:603(wait)
20 0.007 0.000 0.330 0.017 /System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/subprocess.py:650(__init__)
20 0.014 0.001 0.316 0.016 /System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/subprocess.py:1195(_execute_child)
1261 0.058 0.000 0.288 0.000 /System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/threading.py:656(__init__)
2500 0.107 0.000 0.244 0.000 /Library/Python/2.7/site-packages/pp.py:73(__init__)
60344 0.232 0.000 0.232 0.000 {isinstance}
889 0.045 0.000 0.224 0.000 /System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/threading.py:309(wait)
20 0.201 0.010 0.201 0.010 {posix.fork}
2522 0.032 0.000 0.151 0.000 /System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/threading.py:242(Condition)
1261 0.145 0.000 0.145 0.000 {thread.start_new_thread}
1261 0.011 0.000 0.142 0.000 /System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/threading.py:542(Event)
1261 0.025 0.000 0.131 0.000 /System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/threading.py:561(__init__)
2522 0.113 0.000 0.119 0.000 /System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/threading.py:260(__init__)
repair_over_horizon
def repair_over_horizon(n_gm_maps, n_damage_maps,
bridge_array_internal_assembled, bridge_array_new_assembled, horizon =
365, n_steps = 10, trajectories = 10, plot = False, quiet = False):
from itertools import product
t_per_timestep = []
sim_times = get_times(horizon,n_steps)
thresholds = []
for i in range(0,n_steps):
thresholds.append(get_repair_threshold(sim_times[i]))
ppservers = () # starting parallelization
for k in range(0, n_steps):
start = time.time()
jobs = []
job_server = pp.Server(ppservers=ppservers)
print "Starting pp for recovery modelling at k=", k, "with", job_server.get_ncpus(), "workers"
for m, n, l in product(xrange(n_gm_maps), xrange(n_damage_maps), xrange(trajectories)):
jobs.append(job_server.submit(compute_repairs_at_timestep, (m, n, k, l, bridge_array_internal_assembled,
bridge_array_new_assembled, thresholds[k], n_steps), depfuncs = (repair,), modules=('numpy', 'random', 'copy',), group = k))
# get the results of timestep k, since the next timestep depends on those
for job in jobs:
temp_dict = job()
if temp_dict is not None:
if k < n_steps-1:
temp_inds = temp_dict['indices']
mstar = temp_inds[0]
nstar = temp_inds[1]
kstar = temp_inds[2] + 1
lstar = temp_inds[3]
bridge_array_internal_assembled[mstar][nstar][kstar][lstar][:] = temp_dict['internal'][mstar][nstar][kstar][lstar]
bridge_array_new_assembled[mstar][nstar][kstar][lstar][:] = temp_dict['new'][mstar][nstar][kstar][lstar]
else:
pass
t_per_timestep.append(time.time()-start)
print "Done with timestep k = ", k, " in ", time.time() - start, " seconds"
# close server
job_server.destroy()
if quiet == False:
#print bridge_array_internal_assembled_rec
print 'Great. You have modeled stochastic recovery of bridges and created time-dependent damage maps.'
if plot == True:
pass
# returns the two lists of lists as a single dictionary with two keys (internal and new)
return {'internal':bridge_array_internal_assembled, 'new':bridge_array_new_assembled, 'timesteps':t_per_timestep}
compute_repairs_at_timestep
def compute_repairs_at_timestep(m, n, k, l,
bridge_array_internal_assembled, bridge_array_new_assembled, threshold,
n_steps, limit = None):
# keep track of ground motion map, associated damage map, timestep,
and recovery trajectory
inds = [m,n,k,l]
# repair bridges stochastically at a single timestep
temp_dict = repair(bridge_array_internal_assembled[m][n][k][l],
bridge_array_new_assembled[m][n][k][l], threshold, limit)
# propagate repair to next timestep until timesteps are done
if k < n_steps-1:
bridge_array_internal_assembled[m][n][k + 1][l][:] =
temp_dict['internal']
bridge_array_new_assembled[m][n][k + 1][l][:] = temp_dict['new']
return {'indices': inds, 'internal':
bridge_array_internal_assembled, 'new':
bridge_array_new_assembled}
else:
return {'indices': inds, 'internal':
bridge_array_internal_assembled, 'new':
bridge_array_new_assembled}
repair
def repair(bridge_array_internal, bridge_array_new, ext_threshold, limit=None):
import numpy as np
d = len(bridge_array_internal)
if d > 0:
temp_internal = copy.deepcopy(bridge_array_internal)
temp_new = copy.deepcopy(bridge_array_new)
# repair bridges stochastically per Shinozuka et al 2003 model
temp = np.random.rand(1, d)
temp = temp[0]
temp = np.ndarray.tolist(temp)
rep_inds_internal = [] # track the internal ID numbers of bridges that are repaired
rep_inds_new = [] # track the new ID numbers of bridges that are repaired
if limit is None:
for i in range(0, d):
if temp[i] < ext_threshold:
rep_inds_internal.append(temp_internal[i])
rep_inds_new.append(temp_new[i])
if len(rep_inds_internal) > 0: # if bridges have been repaired in the previous loop, we need to remove them from the list of damaged bridges
for i in range(0, len(rep_inds_internal)):
try:
temp_internal.remove(rep_inds_internal[i])
temp_new.remove(rep_inds_new[i])
except ValueError:
pass
else: # if limit is a number
rep_count = 0
for i in range(0, d):
if (temp[i] < ext_threshold) & (rep_count < limit):
rep_inds_internal.append(temp_internal[i])
rep_inds_new.append(temp_new[i])
rep_count += 1
elif rep_count == limit: # if we've repaired as many bridges as we are allowed to, stop repairing bridges
break
else:
pass
if len(rep_inds_internal) > 0: # if bridges have been repaired in the previous loop, we need to remove them from the list of damaged bridges
for i in range(0, len(rep_inds_internal)):
try:
temp_internal.remove(rep_inds_internal[i])
temp_new.remove(rep_inds_new[i])
except ValueError:
pass
return {'internal': temp_internal,'new': temp_new} # these damaged bridge arrays have been updated to not include repaired bridges
elif d == 0:
# all bridges have already been repaired
return {'internal': [], 'new': []}