ps-worker スキームを使用して、スケーラブルな分散トレーニング システムを構築しようとしています。この方式では、すべての PS がすべての PS に関する情報を持ち、PS の数は一定のままです。すべてのワーカーは、自分自身とすべての PS しか認識していません。
Tensorflow クラスター伝搬方法を使用して、PS とワーカーの両方を開始し、分散トレーニング ループを維持できます。しかし、各ワーカーは独自のトレーニング プロセスを保持しており、データ構造を他のワーカーと共有していないことがわかりました。
ここにデモがあります:
デモ.py
import tensorflow as tf
import numpy as np
import time
import rest
import os
import sys
import traceback
from tensorflow.core.protobuf import config_pb2
from tensorflow.python.training import server_lib
from tensorflow.core.protobuf import cluster_pb2
from tensorflow.python.training import server_lib
from tensorflow.compat.v1.train import replica_device_setter
from tensorflow.python.client import timeline
from tensorflow.python.ops import data_flow_ops
tf.disable_v2_behavior()
flags = tf.app.flags
FLAGS = flags.FLAGS
flags.DEFINE_string('ps_list', '127.0.0.1:2220','ps_list: to be a comma seperated string, like "127.0.0.1:2220, 127.0.0.1:2221"')
flags.DEFINE_string('worker_ip', '127.0.0.1:2230','worker_list: to be a comma seperated string, like "127.0.0.1:2230, 127.0.0.1:2231"')
flags.DEFINE_string('task_mode', 'worker', 'runninig_mode: ps or worker.')
flags.DEFINE_integer('worker_num', 1, 'worker_num: used for allocating samples.')
flags.DEFINE_integer('task_id', 0, 'task_id: used for allocating samples.')
class Trainer(object):
def build_graph(self, ps_str_list):
var = tf.random_normal([3,2], mean=0.0, stddev=0.5)
return var
def start_ps(ps_list, task_id):
cluster_config = {
'ps': ps_list,
}
print('cluster_config')
print(cluster_config)
sess_config = tf.ConfigProto()
sess_config.allow_soft_placement = False
sess_config.log_device_placement = True
sess_config.Experimental.share_session_state_in_clusterspec_propagation = False
sess_config.Experimental.share_cluster_devices_in_session = False
sess_config.isolate_session_state = False
server = tf.distribute.Server(
tf.train.ClusterSpec(cluster_config),
config = sess_config,
protocol='grpc',
job_name = 'ps',
task_index = task_id,
)
server.join()
def start_worker(ps_list, worker_list, task_id):
sess_config = tf.ConfigProto()
sess_config.allow_soft_placement = False
sess_config.log_device_placement = True
sess_config.Experimental.share_session_state_in_clusterspec_propagation = True
sess_config.Experimental.share_cluster_devices_in_session = True
sess_config.isolate_session_state = False
cluster_config = {
'ps': ps_list,
'localhost': worker_list,
}
server = tf.distribute.Server(
tf.train.ClusterSpec(cluster_config),
protocol="grpc",
config = sess_config,
job_name='localhost',
task_index=task_id,
)
cluster_def = cluster_pb2.ClusterDef()
worker_job = cluster_def.job.add()
worker_job.name = 'worker'
for i,v in enumerate(worker_list):
worker_job.tasks[i] = v
ps_job = cluster_def.job.add()
ps_job.name = "ps"
for i,v in enumerate(ps_list):
ps_job.tasks[i] = v
with tf.device('/job:ps/replica:0/task:0/CPU:0'):
trainer = Trainer()
var = trainer.build_graph(ps_str_list)
with tf.Session(server.target, config=sess_config) as sess:
res = sess.run(var)
print('check{}: sess.run(var) = {}'.format(task_id, res))
print('worker done')
def main(_):
try:
ps_list = FLAGS.ps_list.strip(' ').split(',')
worker_list = FLAGS.worker_ip.strip(' ').split(',')
worker_list = list(map(lambda x: x if ":" in x else "%s:%s" % (x, get_ramdon_port()), worker_list))
task_mode = FLAGS.task_mode
worker_num = FLAGS.worker_num
task_id = FLAGS.task_id
print('ps_list: ', ps_list)
print('worker_list: ', worker_list)
os.environ["CUDA_VISIBLE_DEVICES"] = ""
if task_mode == 'ps':
start_ps(ps_list, task_id)
elif task_mode == 'worker' and task_id==0:
start_worker(ps_list, worker_list, task_id)
else:
print('invalid task_mode. Options include "ps" and "worker".')
sys.exit(1)
except Exception as ex:
print(traceback.format_exc())
if __name__ == "__main__":
tf.app.run()
start.sh
#!/bin/bash
source /env/py3/bin/activate
export GRPC_VERBOSITY="DEBUG"
#export GRPC_TRACE=all
python demo.py \
--ps_list "127.0.0.1:2270" \
--task_mode ps \
--task_id 0 \
1>ps_log0 2>&1 &
sleep 1s
python -u demo.py \
--ps_list "127.0.0.1:2270" \
--worker_ip "127.0.0.1:2220" \
--task_mode worker \
--task_id 0 \
1>log_0 2>&1 &
sleep 1s
python -u demo.py \
--ps_list "127.0.0.1:2270" \
--worker_ip "127.0.0.1:2221" \
--task_mode worker \
--task_id 0 \
1>log_1 2>&1 &
echo "ok"
結果 2 つのワーカー プロセスが正常に起動し、終了しました。しかし、varには異なる値があります。
check0: sess.run(var) = [[-9.1801211e-02 1.3004950e+00]
[ 1.2603621e-03 1.2598373e-01]
[ 2.9150587e-02 3.2354552e-01]]
check1: sess.run(var) = [[-0.22149138 -0.06080906]
[-0.9715663 -0.25317684]
[ 0.54541755 -0.04751018]]
クラスター伝播モードでワーカーに密な値と疎な値を共有させることは可能ですか? クラスターを動的に管理するための重要な機能だと思います。