Tensorflow学习笔记4:分布式Tensorflow
簡(jiǎn)介
Tensorflow API提供了Cluster、Server以及Supervisor來(lái)支持模型的分布式訓(xùn)練。
關(guān)于Tensorflow的分布式訓(xùn)練介紹可以參考Distributed Tensorflow。簡(jiǎn)單的概括說(shuō)明如下:
- Tensorflow分布式Cluster由多個(gè)Task組成,每個(gè)Task對(duì)應(yīng)一個(gè)tf.train.Server實(shí)例,作為Cluster的一個(gè)單獨(dú)節(jié)點(diǎn);
- 多個(gè)相同作用的Task可以被劃分為一個(gè)job,例如ps job作為參數(shù)服務(wù)器只保存Tensorflow model的參數(shù),而worker job則作為計(jì)算節(jié)點(diǎn)只執(zhí)行計(jì)算密集型的Graph計(jì)算。
- Cluster中的Task會(huì)相對(duì)進(jìn)行通信,以便進(jìn)行狀態(tài)同步、參數(shù)更新等操作。
Tensorflow分布式集群的所有節(jié)點(diǎn)執(zhí)行的代碼是相同的。分布式任務(wù)代碼具有固定的模式:
# 第1步:命令行參數(shù)解析,獲取集群的信息ps_hosts和worker_hosts,以及當(dāng)前節(jié)點(diǎn)的角色信息job_name和task_index# 第2步:創(chuàng)建當(dāng)前task結(jié)點(diǎn)的Server cluster = tf.train.ClusterSpec({"ps": ps_hosts, "worker": worker_hosts}) server = tf.train.Server(cluster, job_name=FLAGS.job_name, task_index=FLAGS.task_index)# 第3步:如果當(dāng)前節(jié)點(diǎn)是ps,則調(diào)用server.join()無(wú)休止等待;如果是worker,則執(zhí)行第4步。 if FLAGS.job_name == "ps":server.join()# 第4步:則構(gòu)建要訓(xùn)練的模型 # build tensorflow graph model# 第5步:創(chuàng)建tf.train.Supervisor來(lái)管理模型的訓(xùn)練過(guò)程 # Create a "supervisor", which oversees the training process. sv = tf.train.Supervisor(is_chief=(FLAGS.task_index == 0), logdir="/tmp/train_logs") # The supervisor takes care of session initialization and restoring from a checkpoint. sess = sv.prepare_or_wait_for_session(server.target) # Loop until the supervisor shuts down while not sv.should_stop()# train model?
Tensorflow分布式訓(xùn)練代碼框架
根據(jù)上面說(shuō)到的Tensorflow分布式訓(xùn)練代碼固定模式,如果要編寫一個(gè)分布式的Tensorlfow代碼,其框架如下所示。
import tensorflow as tf# Flags for defining the tf.train.ClusterSpec tf.app.flags.DEFINE_string("ps_hosts", "","Comma-separated list of hostname:port pairs") tf.app.flags.DEFINE_string("worker_hosts", "","Comma-separated list of hostname:port pairs")# Flags for defining the tf.train.Server tf.app.flags.DEFINE_string("job_name", "", "One of 'ps', 'worker'") tf.app.flags.DEFINE_integer("task_index", 0, "Index of task within the job")FLAGS = tf.app.flags.FLAGSdef main(_):ps_hosts = FLAGS.ps_hosts.split(",")worker_hosts = FLAGS.worker_hosts(",")# Create a cluster from the parameter server and worker hosts.cluster = tf.train.ClusterSpec({"ps": ps_hosts, "worker": worker_hosts})# Create and start a server for the local task.server = tf.train.Server(cluster,job_name=FLAGS.job_name,task_index=FLAGS.task_index)if FLAGS.job_name == "ps":server.join()elif FLAGS.job_name == "worker":# Assigns ops to the local worker by default. with tf.device(tf.train.replica_device_setter(worker_device="/job:worker/task:%d" % FLAGS.task_index,cluster=cluster)):# Build model...loss = ...global_step = tf.Variable(0)train_op = tf.train.AdagradOptimizer(0.01).minimize(loss, global_step=global_step)saver = tf.train.Saver()summary_op = tf.merge_all_summaries()init_op = tf.initialize_all_variables()# Create a "supervisor", which oversees the training process.sv = tf.train.Supervisor(is_chief=(FLAGS.task_index == 0),logdir="/tmp/train_logs",init_op=init_op,summary_op=summary_op,saver=saver,global_step=global_step,save_model_secs=600)# The supervisor takes care of session initialization and restoring from# a checkpoint.sess = sv.prepare_or_wait_for_session(server.target)# Start queue runners for the input pipelines (if any). sv.start_queue_runners(sess)# Loop until the supervisor shuts down (or 1000000 steps have completed).step = 0while not sv.should_stop() and step < 1000000:# Run a training step asynchronously.# See `tf.train.SyncReplicasOptimizer` for additional details on how to# perform *synchronous* training._, step = sess.run([train_op, global_step])if __name__ == "__main__":tf.app.run()對(duì)于所有Tensorflow分布式代碼,可變的只有兩點(diǎn):
分布式MNIST任務(wù)
我們通過(guò)修改tensorflow/tensorflow提供的mnist_softmax.py來(lái)構(gòu)造分布式的MNIST樣例來(lái)進(jìn)行驗(yàn)證。修改后的代碼請(qǐng)參考mnist_dist.py。
我們同樣通過(guò)tensorlfow的Docker image來(lái)啟動(dòng)一個(gè)容器來(lái)進(jìn)行驗(yàn)證。
$ docker run -d -v /path/to/your/code:/tensorflow/mnist --name tensorflow tensorflow/tensorflow啟動(dòng)tensorflow之后,啟動(dòng)4個(gè)Terminal,然后通過(guò)下面命令進(jìn)入tensorflow容器,切換到/tensorflow/mnist目錄下
$ docker exec -ti tensorflow /bin/bash $ cd /tensorflow/mnist然后在四個(gè)Terminal中分別執(zhí)行下面一個(gè)命令來(lái)啟動(dòng)Tensorflow cluster的一個(gè)task節(jié)點(diǎn),
# Start ps 0 python mnist_dist.py --ps_hosts=localhost:2221,localhost:2222 --worker_hosts=localhost:2223,localhost:2224 --job_name=ps --task_index=0# Start ps 1 python mnist_dist.py --ps_hosts=localhost:2221,localhost:2222 --worker_hosts=localhost:2223,localhost:2224 --job_name=ps --task_index=1# Start worker 0 python mnist_dist.py --ps_hosts=localhost:2221,localhost:2222 --worker_hosts=localhost:2223,localhost:2224 --job_name=worker --task_index=0# Start worker 1 python mnist_dist.py --ps_hosts=localhost:2221,localhost:2222 --worker_hosts=localhost:2223,localhost:2224 --job_name=worker --task_index=1具體效果自己驗(yàn)證哈。
總結(jié)
以上是生活随笔為你收集整理的Tensorflow学习笔记4:分布式Tensorflow的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 微信iOS多设备多字体适配方案总结
- 下一篇: 《黃帝內經 —— 央視60集紀錄片》