分布式TensorFlow運行環(huán)境的示例分析-創(chuàng)新互聯(lián)

小編給大家分享一下分布式TensorFlow運行環(huán)境的示例分析,相信大部分人都還不怎么了解,因此分享這篇文章給大家參考一下,希望大家閱讀完這篇文章后大有收獲,下面讓我們一起去了解一下吧!

創(chuàng)新互聯(lián)建站專注為客戶提供全方位的互聯(lián)網(wǎng)綜合服務(wù),包含不限于成都網(wǎng)站設(shè)計、做網(wǎng)站、四平網(wǎng)絡(luò)推廣、小程序開發(fā)、四平網(wǎng)絡(luò)營銷、四平企業(yè)策劃、四平品牌公關(guān)、搜索引擎seo、人物專訪、企業(yè)宣傳片、企業(yè)代運營等,從售前售中售后,我們都將竭誠為您服務(wù),您的肯定,是我們大的嘉獎;創(chuàng)新互聯(lián)建站為所有大學(xué)生創(chuàng)業(yè)者提供四平建站搭建服務(wù),24小時服務(wù)熱線:18980820575,官方網(wǎng)址:muchs.cn

1.分布式TensorFlow的角色與原理

在分布式的TensorFlow中的角色分配如下:

PS:作為分布式訓(xùn)練的服務(wù)端,等待各個終端(supervisors)來連接。

worker:在TensorFlow的代碼注釋中被稱為終端(supervisors),作為分布式訓(xùn)練的計算資源終端。

chief supervisors:在眾多的運算終端中必須選擇一個作為主要的運算終端。該終端在運算終端中最先啟動,它的功能是合并各個終端運算后的學(xué)習(xí)參數(shù),將其保存或者載入。

每個具體的網(wǎng)絡(luò)標(biāo)識都是唯一的,即分布在不同IP的機(jī)器上(或者同一個機(jī)器的不同端口)。在實際的運行中,各個角色的網(wǎng)絡(luò)構(gòu)建部分代碼必須100%的相同。三者的分工如下:

服務(wù)端作為一個多方協(xié)調(diào)者,等待各個運算終端來連接。

chief supervisors會在啟動時同一管理全局的學(xué)習(xí)參數(shù),進(jìn)行初始化或者從模型載入。

其他的運算終端只是負(fù)責(zé)得到其對應(yīng)的任務(wù)并進(jìn)行計算,并不會保存檢查點,用于TensorBoard可視化中的summary日志等任何參數(shù)信息。

在整個過程都是通過RPC協(xié)議來進(jìn)行通信的。

2.分布部署TensorFlow的具體方法

配置過程中,首先建立一個server,在server中會將ps及所有worker的IP端口準(zhǔn)備好。接著,使用tf.train.Supervisor中的managed_ssion來管理一個打開的session。session中只是負(fù)責(zé)運算,而通信協(xié)調(diào)的事情就都交給supervisor來管理了。

3.部署訓(xùn)練實例

下面開始實現(xiàn)一個分布式訓(xùn)練的網(wǎng)絡(luò)模型,以線性回歸為例,通過3個端口來建立3個終端,分別是一個ps,兩個worker,實現(xiàn)TensorFlow的分布式運算。

1. 為每個角色添加IP地址和端口,創(chuàng)建sever,在一臺機(jī)器上開3個不同的端口,分別代表PS,chief supervisor和worker。角色的名稱用strjob_name表示,以ps為例,代碼如下:

# 定義IP和端口
strps_hosts = 'localhost:1681'
strworker_hosts = 'localhost:1682,localhost:1683'
# 定義角色名稱
strjob_name = 'ps'
task_index = 0
# 將字符串轉(zhuǎn)數(shù)組
ps_hosts = strps_hosts.split(',')
worker_hosts = strps_hosts.split(',')
cluster_spec = tf.train.ClusterSpec({'ps': ps_hosts, 'worker': worker_hosts})
# 創(chuàng)建server
server = tf.train.Server({'ps':ps_hosts, 'worker':worker_hosts}, job_name=strjob_name, task_index=task_index)

2為ps角色添加等待函數(shù)

ps角色使用server.join函數(shù)進(jìn)行線程掛起,開始接受連續(xù)消息。

# ps角色使用join進(jìn)行等待
if strjob_name == 'ps':
  print("wait")
  server.join()

3.創(chuàng)建網(wǎng)絡(luò)的結(jié)構(gòu)

與正常的程序不同,在創(chuàng)建網(wǎng)絡(luò)結(jié)構(gòu)時,使用tf.device函數(shù)將全部的節(jié)點都放在當(dāng)前任務(wù)下。在tf.device函數(shù)中的任務(wù)是通過tf.train.replica_device_setter來指定的。在tf.train.replica_device_setter中使用worker_device來定義具體任務(wù)名稱;使用cluster的配置來指定角色及對應(yīng)的IP地址,從而實現(xiàn)管理整個任務(wù)下的圖節(jié)點。代碼如下:

with tf.device(tf.train.replica_device_setter(worker_device='/job:worker/task:%d'%task_index,
                       cluster=cluster_spec)):
  X = tf.placeholder('float')
  Y = tf.placeholder('float')
  # 模型參數(shù)
  w = tf.Variable(tf.random_normal([1]), name='weight')
  b = tf.Variable(tf.zeros([1]), name='bias')
  global_step = tf.train.get_or_create_global_step()  # 獲取迭代次數(shù)
  z = tf.multiply(X, w) + b
  tf.summary('z', z)
  cost = tf.reduce_mean(tf.square(Y - z))
  tf.summary.scalar('loss_function', cost)
  learning_rate = 0.001
  optimizer = tf.train.GradientDescentOptimizer(learning_rate).minimize(cost, global_step=global_step)
  saver = tf.train.Saver(max_to_keep=1)
  merged_summary_op = tf.summary.merge_all() # 合并所有summary
  init = tf.global_variables_initializer()

4.創(chuàng)建Supercisor,管理session

在tf.train.Supervisor函數(shù)中,is_chief表明為是否為chief Supervisor角色,這里將task_index=0的worker設(shè)置成chief Supervisor。saver需要將保存檢查點的saver對象傳入。init_op表示使用初始化變量的函數(shù)。

training_epochs = 2000
display_step = 2
sv = tf.train.Supervisor(is_chief=(task_index == 0),# 0號為chief
             logdir='log/spuer/',
             init_op=init,
             summary_op=None,
             saver=saver,
             global_step=global_step,
             save_model_secs=5)
# 連接目標(biāo)角色創(chuàng)建session
with sv.managed_session(saver.target) as sess:

5迭代訓(xùn)練

session中的內(nèi)容與以前一樣,直接迭代訓(xùn)練即可。由于使用了supervisor管理session,將使用sv.summary_computed函數(shù)來保存summary文件。

print('sess ok')
  print(global_step.eval(session=sess))
  for epoch in range(global_step.eval(session=sess), training_epochs*len(train_x)):
    for (x, y) in zip(train_x, train_y):
      _, epoch = sess.run([optimizer, global_step], feed_dict={X: x, Y: y})
      summary_str = sess.run(merged_summary_op, feed_dict={X: x, Y: y})
      sv.summary_computed(sess, summary_str, global_step=epoch)
      if epoch % display_step == 0:
        loss = sess.run(cost, feed_dict={X:train_x, Y:train_y})
        print("Epoch:", epoch+1, 'loss:', loss, 'W=', sess.run(w), w, 'b=', sess.run(b))
  print(' finished ')
  sv.saver.save(sess, 'log/linear/' + "sv.cpk", global_step=epoch)
sv.stop()

(1)在設(shè)置自動保存檢查點文件后,手動保存仍然有效,

(2)在運行一半后,在運行supervisor時會自動載入模型的參數(shù),不需要手動調(diào)用restore。

(3)在session中不需要進(jìn)行初始化的操作。

6.建立worker文件

新建兩個py文件,設(shè)置task_index分別為0和1,其他的部分和上述的代碼相一致。

strjob_name = 'worker'
task_index = 1
strjob_name = 'worker'
task_index = 0

7.運行

我們分別啟動寫好的三個文件,在運行結(jié)果中,我們可以看到循環(huán)的次數(shù)不是連續(xù)的,顯示結(jié)果中會有警告,這是因為在構(gòu)建supervisor時沒有填寫local_init_op參數(shù),該參數(shù)的含義是在創(chuàng)建worker實例時,初始化本地變量,上述代碼中沒有設(shè)置,系統(tǒng)會自動初始化,并給出警告提示。

分布運算的目的是為了提高整體運算速度,如果同步epoch的準(zhǔn)確率需要犧牲總體運行速度為代價,自然很不合適。

在ps的文件中,它只是負(fù)責(zé)連接,并不參與運算。

以上是“分布式TensorFlow運行環(huán)境的示例分析”這篇文章的所有內(nèi)容,感謝各位的閱讀!相信大家都有了一定的了解,希望分享的內(nèi)容對大家有所幫助,如果還想學(xué)習(xí)更多知識,歡迎關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道!

文章標(biāo)題:分布式TensorFlow運行環(huán)境的示例分析-創(chuàng)新互聯(lián)
URL網(wǎng)址:http://muchs.cn/article14/depode.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供小程序開發(fā)做網(wǎng)站、ChatGPT、外貿(mào)網(wǎng)站建設(shè)手機(jī)網(wǎng)站建設(shè)、網(wǎng)站設(shè)計公司

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)

營銷型網(wǎng)站建設(shè)