日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

分布式Tensorflow入门Demo

發布時間:2025/3/15 编程问答 12 豆豆
生活随笔 收集整理的這篇文章主要介紹了 分布式Tensorflow入门Demo 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.



關于tensorflow的分布式訓練和部署, 官方有個英文的文檔介紹,但是寫的比較簡單, 給的例子也比較簡單,剛接觸分布式深度學習的可能不太容易理解。在網上看到一些資料,總感覺說的不夠通俗易懂,不如自己寫一個通俗易懂給大家分享一下。

如果大家有看不懂的,歡迎留言,我再改文章,改到大學一年級的學生可以看懂的程度。


1. 單機多GPU訓練 先簡單介紹下單機的多GPU訓練,然后再介紹分布式的多機多GPU訓練。 單機的多GPU訓練, tensorflow的官方已經給了一個cifar的例子,已經有比較詳細的代碼和文檔介紹, 這里大致說下多GPU的過程,以便方便引入到多機多GPU的介紹。 單機多GPU的訓練過程: a) 假設你的機器上有3個GPU; b) 在單機單GPU的訓練中,數據是一個batch一個batch的訓練。 在單機多GPU中,數據一次處理3個batch(假設是3個GPU訓練), 每個GPU處理一個batch的數據計算。 c) 變量,或者說參數,保存在CPU上 d) 剛開始的時候數據由CPU分發給3個GPU, 在GPU上完成了計算,得到每個batch要更新的梯度。 e) 然后在CPU上收集完了3個GPU上的要更新的梯度, 計算一下平均梯度,然后更新參數。 f) 然后繼續循環這個過程。
通過這個過程,處理的速度取決于最慢的那個GPU的速度。如果3個GPU的處理速度差不多的話, 處理速度就相當于單機單GPU的速度的3倍減去數據在CPU和GPU之間傳輸的開銷,實際的效率提升看CPU和GPU之間數據的速度和處理數據的大小。
寫到這里覺得自己寫的還是不同通俗易懂, 下面就打一個更加通俗的比方來解釋一下: 老師給小明和小華布置了10000張紙的乘法題并且把所有的乘法的結果加起來, 每張紙上有128道乘法題。 這里一張紙就是一個batch, batch_size就是128. 小明算加法比較快, 小華算乘法比較快,于是小華就負責計算乘法, 小明負責把小華的乘法結果加起來 。 這樣小明就是CPU,小華就是GPU. 這樣計算的話, 預計小明和小華兩個人得要花費一個星期的時間才能完成老師布置的題目。 于是小明就招來2個算乘法也很快的小紅和小亮。 于是每次小明就給小華,小紅,小亮各分發一張紙,讓他們算乘法, 他們三個人算完了之后, 把結果告訴小明, 小明把他們的結果加起來,然后再給他們沒人分發一張算乘法的紙,依次循環,知道所有的算完。 這里小明采用的是同步模式,就是每次要等他們三個都算完了之后, 再統一算加法,算完了加法之后,再給他們三個分發紙張。這樣速度就取決于他們三個中算乘法算的最慢的那個人, 和分發紙張的速度。

2. 分布式多機多GPU訓練 隨著設計的模型越來越復雜,模型參數越來越多,越來越大, 大到什么程度?多到什么程度? 多參數的個數上百億個, 訓練的數據多到按TB級別來衡量。大家知道每次計算一輪,都要計算梯度,更新參數。 當參數的量級上升到百億量級甚至更大之后, 參數的更新的性能都是問題。 如果是單機16個GPU, 一個step最多也是處理16個batch, 這對于上TB級別的數據來說,不知道要訓練到什么時候。于是就有了分布式的深度學習訓練方法,或者說框架。
參數服務器 在介紹tensorflow的分布式訓練之前,先說下參數服務器的概念。 前面說道, 當你的模型越來越大, 模型的參數越來越多,多到模型參數的更新,一臺機器的性能都不夠的時候, 很自然的我們就會想到把參數分開放到不同的機器去存儲和更新。 因為碰到上面提到的那些問題, 所有參數服務器就被單獨擰出來, 于是就有了參數服務器的概念。 參數服務器可以是多臺機器組成的集群, 這個就有點類似分布式的存儲架構了, 涉及到數據的同步,一致性等等, 一般是key-value的形式,可以理解為一個分布式的key-value內存數據庫,然后再加上一些參數更新的操作。 詳細的細節可以去google一下, 這里就不詳細說了。 反正就是當性能不夠的時候, 幾百億的參數分散到不同的機器上去保存和更新,解決參數存儲和更新的性能問題。 借用上面的小明算題的例子,小明覺得自己算加法都算不過來了, 于是就叫了10個小明過來一起幫忙算。
tensorflow的分布式 不過據說tensorflow的分布式沒有用參數服務器,用的是數據流圖, 這個暫時還沒研究,不過應該和參數服務器有很多相似的地方,這里介紹先按照參數服務器的結構來介紹。 tensorflow的分布式有in-graph和between-gragh兩種架構模式。 這里分別介紹一下。
in-graph 模式: in-graph模式和單機多GPU模型有點類似。 還是一個小明算加法, 但是算乘法的就可以不止是他們一個教室的小華,小紅,小亮了。 可以是其他教師的小張,小李。。。。. in-graph模式, 把計算已經從單機多GPU,已經擴展到了多機多GPU了, 不過數據分發還是在一個節點。 這樣的好處是配置簡單, 其他多機多GPU的計算節點,只要起個join操作, 暴露一個網絡接口,等在那里接受任務就好了。 這些計算節點暴露出來的網絡接口,使用起來就跟本機的一個GPU的使用一樣, 只要在操作的時候指定tf.device(“/job:worker/task:n”), 就可以向指定GPU一樣,把操作指定到一個計算節點上計算,使用起來和多GPU的類似。 但是這樣的壞處是訓練數據的分發依然在一個節點上, 要把訓練數據分發到不同的機器上, 嚴重影響并發訓練速度。在大數據訓練的情況下, 不推薦使用這種模式。
between-graph模式 between-graph模式下,訓練的參數保存在參數服務器, 數據不用分發, 數據分片的保存在各個計算節點, 各個計算節點自己算自己的, 算完了之后, 把要更新的參數告訴參數服務器,參數服務器更新參數。這種模式的優點是不用訓練數據的分發了, 尤其是在數據量在TB級的時候, 節省了大量的時間,所以大數據深度學習還是推薦使用between-graph模式。
同步更新和異步更新 in-graph模式和between-graph模式都支持同步和異步更新 在同步更新的時候, 每次梯度更新,要等所有分發出去的數據計算完成后,返回回來結果之后,把梯度累加算了均值之后,再更新參數。 這樣的好處是loss的下降比較穩定, 但是這個的壞處也很明顯, 處理的速度取決于最慢的那個分片計算的時間。 在異步更新的時候, 所有的計算節點,各自算自己的, 更新參數也是自己更新自己計算的結果, 這樣的優點就是計算速度快, 計算資源能得到充分利用,但是缺點是loss的下降不穩定, 抖動大。 在數據量小的情況下, 各個節點的計算能力比較均衡的情況下, 推薦使用同步模式;數據量很大,各個機器的計算性能摻差不齊的情況下,推薦使用異步的方式。
例子 tensorflow官方有個分布式tensorflow的文檔,但是例子沒有完整的代碼, 這里寫了一個最簡單的可以跑起來的例子,供大家參考,這里也傻瓜式給大家解釋一下代碼,以便更加通俗的理解。 代碼位置: https://github.com/thewintersun/distributeTensorflowExample
功能說明: 代碼實現的功能: 對于表達式 Y = 2 * X + 10, 其中X是輸入,Y是輸出, 現在有很多X和Y的樣本, 怎么估算出來weight是2和biasis是10. 所有的節點,不管是ps節點還是worker節點,運行的都是同一份代碼, 只是命令參數指定不一樣。
執行的命令示例: ps 節點執行: [plain] view plaincopyprint?
  • CUDA_VISIBLE_DEVICES=”?python?distribute.py?–ps_hosts=192.168.100.42:2222?–worker_hosts=192.168.100.42:2224,192.168.100.253:2225?–job_name=ps?–task_index=0??
  • CUDA_VISIBLE_DEVICES='' python distribute.py --ps_hosts=192.168.100.42:2222 --worker_hosts=192.168.100.42:2224,192.168.100.253:2225 --job_name=ps --task_index=0
    worker 節點執行: [plain] view plaincopyprint?
  • CUDA_VISIBLE_DEVICES=0?python?distribute.py?–ps_hosts=192.168.100.42:2222?–worker_hosts=192.168.100.42:2224,192.168.100.253:2225?–job_name=worker?–task_index=0??
  • CUDA_VISIBLE_DEVICES=0?python?distribute.py?–ps_hosts=192.168.100.42:2222?–worker_hosts=192.168.100.42:2224,192.168.100.253:2225?–job_name=worker?–task_index=1??
  • CUDA_VISIBLE_DEVICES=0 python distribute.py --ps_hosts=192.168.100.42:2222 --worker_hosts=192.168.100.42:2224,192.168.100.253:2225 --job_name=worker --task_index=0 CUDA_VISIBLE_DEVICES=0 python distribute.py --ps_hosts=192.168.100.42:2222 --worker_hosts=192.168.100.42:2224,192.168.100.253:2225 --job_name=worker --task_index=1
    前面是參數定義,這里大家應該都知道,: [python] view plaincopyprint?
  • #?Define?parameters??
  • FLAGS?=?tf.app.flags.FLAGS??
  • tf.app.flags.DEFINE_float(’learning_rate’,?0.00003,?‘Initial?learning?rate.’)??
  • tf.app.flags.DEFINE_integer(’steps_to_validate’,?1000,??
  • ’Steps?to?validate?and?print?loss’)??
  • #?For?distributed??
  • tf.app.flags.DEFINE_string(”ps_hosts”,?”“,??
  • ”Comma-separated?list?of?hostname:port?pairs”)??
  • tf.app.flags.DEFINE_string(”worker_hosts”,?”“,??
  • ”Comma-separated?list?of?hostname:port?pairs”)??
  • tf.app.flags.DEFINE_string(”job_name”,?”“,?”One?of?‘ps’,?‘worker’“)??
  • tf.app.flags.DEFINE_integer(”task_index”,?0,?“Index?of?task?within?the?job”)??
  • #?Hyperparameters??
  • learning_rate?=?FLAGS.learning_rate??
  • steps_to_validate?=?FLAGS.steps_to_validate??
  • # Define parameters FLAGS = tf.app.flags.FLAGS tf.app.flags.DEFINE_float('learning_rate', 0.00003, 'Initial learning rate.') tf.app.flags.DEFINE_integer('steps_to_validate', 1000, 'Steps to validate and print loss') # For distributed tf.app.flags.DEFINE_string("ps_hosts", "", "Comma-separated list of hostname:port pairs") tf.app.flags.DEFINE_string("worker_hosts", "", "Comma-separated list of hostname:port pairs") tf.app.flags.DEFINE_string("job_name", "", "One of 'ps', 'worker'") tf.app.flags.DEFINE_integer("task_index", 0, "Index of task within the job") # Hyperparameters learning_rate = FLAGS.learning_rate steps_to_validate = FLAGS.steps_to_validate
    代碼說明: 1. 故意把學習率設置的特別小,是想讓它算慢點,好看見過程; 2. 通過命令行參數可以傳入ps節點的ip和端口, worker節點的ip和端口。ps節點就是paramter server的縮寫, 主要是保存和更新參數的節點, worker節點主要是負責計算的節點。這里說的節點都是虛擬的節點,不一定是物理上的節點; 3. 多個節點用逗號分隔;

    [python] view plaincopyprint?
  • ps_hosts?=?FLAGS.ps_hosts.split(“,”)??
  • worker_hosts?=?FLAGS.worker_hosts.split(”,”)??
  • cluster?=?tf.train.ClusterSpec({”ps”:?ps_hosts,?“worker”:?worker_hosts})??
  • server?=?tf.train.Server(cluster,job_name=FLAGS.job_name,task_index=FLAGS.task_index)??
  • ??
  • if?FLAGS.job_name?==?“ps”:??
  • server.join()??
  • elif?FLAGS.job_name?==?“worker”:??
  • with?tf.device(tf.train.replica_device_setter(??
  • worker_device=”/job:worker/task:%d”?%?FLAGS.task_index,??
  • cluster=cluster)):??
  • ps_hosts = FLAGS.ps_hosts.split(",") worker_hosts = FLAGS.worker_hosts.split(",") cluster = tf.train.ClusterSpec({"ps": ps_hosts, "worker": worker_hosts}) server = tf.train.Server(cluster,job_name=FLAGS.job_name,task_index=FLAGS.task_index)if FLAGS.job_name == "ps": server.join() elif FLAGS.job_name == "worker": with tf.device(tf.train.replica_device_setter( worker_device="/job:worker/task:%d" % FLAGS.task_index, cluster=cluster)):
    代碼說明: 1. ClusterSpec的定義,需要把你要跑這個任務的所有的ps和worker 的節點的ip和端口的信息都包含進去,所有的節點都要執行這段代碼, 就大家互相知道了, 這個集群里面都有哪些成員,不同的成員的類型是什么, 是ps節點還是worker節點。 2. tf.train.Server這個的定義開始,就每個節點不一樣了。 根據執行的命令的參數不同,決定了這個任務是哪個任務。 如果任務名字是ps的話, 程序就join到這里,作為參數更新的服務, 等待其他worker節點給他提交參數更新的數據。 如果是worker任務,就執行后面的計算任務。 3. replica_device_setter, 這個大家可以注意一下, 可以看看tensorflow的文檔對這個的解釋和python的源碼。 在這個with語句之下定義的參數, 會自動分配到參數服務器上去定義,如果有多個參數服務器, 就輪流循環分配。

    [python] view plaincopyprint?
  • global_step?=?tf.Variable(0,?name=‘global_step’,?trainable=False)??
  • ??
  • input?=?tf.placeholder(”float”)??
  • label?=?tf.placeholder(”float”)??
  • ??
  • weight?=?tf.get_variable(”weight”,?[1],?tf.float32,?initializer=tf.random_normal_initializer())??
  • biase?=?tf.get_variable(”biase”,?[1],?tf.float32,?initializer=tf.random_normal_initializer())??
  • pred?=?tf.mul(input,?weight)?+?biase??
  • ??
  • loss_value?=?loss(label,?pred)??
  • ??
  • train_op?=?tf.train.GradientDescentOptimizer(learning_rate).minimize(loss_value,?global_step=global_step)??
  • init_op?=?tf.initialize_all_variables()??
  • saver?=?tf.train.Saver()??
  • tf.scalar_summary(’cost’,?loss_value)??
  • summary_op?=?tf.merge_all_summaries()??
  • global_step = tf.Variable(0, name='global_step', trainable=False)input = tf.placeholder("float") label = tf.placeholder("float")weight = tf.get_variable("weight", [1], tf.float32, initializer=tf.random_normal_initializer()) biase = tf.get_variable("biase", [1], tf.float32, initializer=tf.random_normal_initializer()) pred = tf.mul(input, weight) + biaseloss_value = loss(label, pred)train_op = tf.train.GradientDescentOptimizer(learning_rate).minimize(loss_value, global_step=global_step) init_op = tf.initialize_all_variables() saver = tf.train.Saver() tf.scalar_summary('cost', loss_value) summary_op = tf.merge_all_summaries()
    這塊的代碼和普通的單機單GPU的代碼一樣,就是定義計算邏輯,沒什么區別。

    [python] view plaincopyprint?
  • sv?=?tf.train.Supervisor(is_chief=(FLAGS.task_index?==?0),??
  • logdir=”./checkpoint/”,??
  • init_op=init_op,??
  • summary_op=None,??
  • saver=saver,??
  • global_step=global_step,??
  • save_model_secs=60)??
  • with?sv.managed_session(server.target)?as?sess:??
  • step?=?0??
  • while?step?<?1000000:??
  • train_x?=?np.random.randn(1)??
  • train_y?=?2?*?train_x?+?np.random.randn(1)?*?0.33?+?10??
  • _,?loss_v,?step?=?sess.run([train_op,?loss_value,global_step],?feed_dict={input:train_x,?label:train_y})??
  • if?step?%?steps_to_validate?==?0:??
  • w,b?=?sess.run([weight,biase])??
  • print(“step:?%d,?weight:?%f,?biase:?%f,?loss:?%f”?%(step,?w,?b,?loss_v))??
  • sv = tf.train.Supervisor(is_chief=(FLAGS.task_index == 0), logdir="./checkpoint/", init_op=init_op, summary_op=None, saver=saver, global_step=global_step, save_model_secs=60) with sv.managed_session(server.target) as sess: step = 0 while step < 1000000: train_x = np.random.randn(1) train_y = 2 * train_x + np.random.randn(1) * 0.33 + 10 _, loss_v, step = sess.run([train_op, loss_value,global_step], feed_dict={input:train_x, label:train_y}) if step % steps_to_validate == 0: w,b = sess.run([weight,biase]) print("step: %d, weight: %f, biase: %f, loss: %f" %(step, w, b, loss_v))

    代碼說明:

    1. Supervisor。 含義類似一個監督者, 就是因為分布式了, 很多機器都在運行, 像什么參數初始化, 保存模型, 寫summary什么的,這個supervisoer幫你一起弄起來了, 就不用自己去手工去做這些事情了,而且在分布式的環境下設計到各種參數的共享, 其中的過程自己手工寫也不好寫, 于是tensorflow就給大家包裝好這么一個東西了。 這里的參數is_chief比較重要, 在所有的計算節點里還是有一個主節點的, 這個主節點來負責初始化參數, 模型的保存,summary的保存。 logdir就是保存和裝載模型的路徑。 不過這個似乎的啟動就會去這個logdir的目錄去看有沒有checkpoint的文件,有的話就自動裝載了,沒有就用init_op指定的初始化參數, 好像沒有參數指定不讓它自動load的; 2. 主的worker節點負責模型參數初始化等工作, 在這個過程中, 其他worker節點等待主節點完成初始化工作, 等主節點初始化完成后, 好了, 大家一起開心的跑數據。 3. 這里的global_step的值,是可以所有計算節點共享的, 在執行optimizer的minimize的時候, 會自動加1, 所以可以通過這個可以知道所有的計算節點一共計算了多少步了。
    程序結果示例: 好了,然后我們就開始跑,結果顯示如下:
    worker節點1:


    worker節點2打印信息:

    最后算出來的weight的值接近于2, biasis的值接近于10 。



    Demo中用到的源碼: #coding=utf-8 import numpy as np import tensorflow as tf# Define parameters FLAGS = tf.app.flags.FLAGS tf.app.flags.DEFINE_float('learning_rate', 0.00003, 'Initial learning rate.') tf.app.flags.DEFINE_integer('steps_to_validate', 1000,'Steps to validate and print loss')# For distributed tf.app.flags.DEFINE_string("ps_hosts", "","Comma-separated list of hostname:port pairs") tf.app.flags.DEFINE_string("worker_hosts", "","Comma-separated list of hostname:port pairs") tf.app.flags.DEFINE_string("job_name", "", "One of 'ps', 'worker'") tf.app.flags.DEFINE_integer("task_index", 0, "Index of task within the job") tf.app.flags.DEFINE_integer("issync", 0, "是否采用分布式的同步模式,1表示同步模式,0表示異步模式")# Hyperparameters learning_rate = FLAGS.learning_rate steps_to_validate = FLAGS.steps_to_validatedef main(_):ps_hosts = FLAGS.ps_hosts.split(",")worker_hosts = FLAGS.worker_hosts.split(",")cluster = tf.train.ClusterSpec({"ps": ps_hosts, "worker": worker_hosts})server = tf.train.Server(cluster,job_name=FLAGS.job_name,task_index=FLAGS.task_index)issync = FLAGS.issyncif FLAGS.job_name == "ps":server.join()elif FLAGS.job_name == "worker":with tf.device(tf.train.replica_device_setter(worker_device="/job:worker/task:%d" % FLAGS.task_index,cluster=cluster)):global_step = tf.Variable(0, name='global_step', trainable=False)input = tf.placeholder("float")label = tf.placeholder("float")weight = tf.get_variable("weight", [1], tf.float32, initializer=tf.random_normal_initializer())biase = tf.get_variable("biase", [1], tf.float32, initializer=tf.random_normal_initializer())pred = tf.multiply(input, weight) + biaseloss_value = loss(label, pred)optimizer = tf.train.GradientDescentOptimizer(learning_rate)grads_and_vars = optimizer.compute_gradients(loss_value)if issync == 1:#同步模式計算更新梯度rep_op = tf.train.SyncReplicasOptimizer(optimizer,replicas_to_aggregate=len(worker_hosts),replica_id=FLAGS.task_index,total_num_replicas=len(worker_hosts),use_locking=True)train_op = rep_op.apply_gradients(grads_and_vars,global_step=global_step)init_token_op = rep_op.get_init_tokens_op()chief_queue_runner = rep_op.get_chief_queue_runner()else:#異步模式計算更新梯度train_op = optimizer.apply_gradients(grads_and_vars,global_step=global_step)init_op = tf.initialize_all_variables()saver = tf.train.Saver()tf.summary.scalar('cost', loss_value)summary_op = tf.summary.merge_all()sv = tf.train.Supervisor(is_chief=(FLAGS.task_index == 0),logdir="./checkpoint/",init_op=init_op,summary_op=None,saver=saver,global_step=global_step,save_model_secs=60)with sv.prepare_or_wait_for_session(server.target) as sess:# 如果是同步模式if FLAGS.task_index == 0 and issync == 1:sv.start_queue_runners(sess, [chief_queue_runner])sess.run(init_token_op)step = 0while step < 1000000:train_x = np.random.randn(1)train_y = 2 * train_x + np.random.randn(1) * 0.33 + 10_, loss_v, step = sess.run([train_op, loss_value,global_step], feed_dict={input:train_x, label:train_y})if step % steps_to_validate == 0:w,b = sess.run([weight,biase])print("step: %d, weight: %f, biase: %f, loss: %f" %(step, w, b, loss_v))sv.stop()def loss(label, pred):return tf.square(label - pred)if __name__ == "__main__":tf.app.run

    轉載來源:http://blog.csdn.net/luodongri/article/details/52596780

    總結

    以上是生活随笔為你收集整理的分布式Tensorflow入门Demo的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。