【導讀】分布式TensorFlow可以有效地提神經網絡訓練速度,但它的使用並不簡單。雖然官方提供了文檔和示例,如連結【1】,但是它們太難懂了。本文是一篇淺顯易懂的分布式TensorFlow入門教程,可以讓你對分布式TensorFlow的原理和使用有一定的了解。
1.https://github.com/tensorflow/tensorflow/blob/r1.3/tensorflow/python/training/monitored_session.py
作者 | Tutorials by Malo Marrec
編譯 | 專知
整理 | Hujun
How to Write Distributed TensorFlow Code
分布式機器學習策略
模型並行化
當模型過大以至於一臺及其的內存承受不住時,可以將計算圖的不同部分放到不同的機器中,模型參數的存儲和更新都在這些機器中進行。
一個最基本的方法是:把網絡第一層放在一臺機器上,第二層放在另一臺機器上。然而,這樣並不好,在前向傳播時,較深的層需要等待較淺的層,在發現傳播時,較淺的層需要等待較深的層。當模型中有並行的操作時(如GooLeNet),這些操作可以在不同的機器上運行,避免這樣的瓶頸。
數據並行化
整個計算圖被保存在一個或多個參數伺服器(ps)中。訓練操作在多個機器上被執行,這些機器被稱作worker。這些worker讀取不同的數據(data batches),計算梯度,並將更新操作發送給參數伺服器。
數據並行化有兩種主要的方案:
本文會聚焦於如何在數據並行化模型中使用異步訓練方案。
構建數據並行化模型
如前面所述,我們的系統會包含三種類型的節點:
也就是說最小的集群需要包含一個主worker伺服器和一個參數伺服器。可以將它擴展為一個主worker伺服器,多個參數伺服器和多個worker伺服器。
最好有多個參數伺服器,因為worker伺服器和參數伺服器之間有大量的I/O通信。如果只有2個worker伺服器,可能1個參數伺服器可以扛得住所有的讀取和更新請求。但如果你有10個worker而且你的模型非常大,一個參數伺服器可能就不夠了。
在分布式TensorFlow中,同樣的代碼會被發送到所有的節點。雖然你的main.py、train.py等會被同時發送到worker伺服器和參數伺服器,每個節點會依據自己的環境變量來執行不同的代碼塊。
分布式TensorFlow代碼的準備包括三個階段:
定義tf.trainClusterSpec和tf.train.Server
將模型賦給參數伺服器和worker伺服器
配置和啟動tf.train.MonitoredTrainingSession
1. 定義tf.trainClusterSpec和tf.train.Server
tf.train.ClusterSpec object將任務映射到機器,它被用在tf.train.Server的構造函數中來構造tf.train.Server,在每臺機器上創建一個或多個server,並確保每臺機器能知道其他的機器在做什麼。它包含設備的集合(某臺機器上可用的設備),以及一個tf.Session object(tf.Session object會被tf.train.MonitoredTrainingSession 用於執行計算圖)。
通常情況下,一臺機器上有一個任務,除非你的機器有多個GPU,在這種情況下,你會給每個GPU分配一個任務。
從TensorFlow教程中摘取:
一個tf.train.ClusterSpec表示參與分布式TensorFlow計算的進程的集合。每個tf.train.Server都在一個集群中被構建。
一個tf.train.Server實例包含了設備的集合,和一個可以參與分布式訓練的tf.Session目標。一臺伺服器屬於一個集群(由tf.train.ClusterSpec指定)
A server belongs to a cluster (specified by a ),並且對應一個任務。伺服器可以和所在集群中的所有其他伺服器進行通信。
2. 為worker伺服器指定模型的變量和操作
用 with tf.device 命令,你可以將節點(無論是操作還是變量)指定到一個任務或工作中。例如:
with tf.device("/job:ps/task:0"):
X = tf.placeholder(tf.float32, [100,128,128,3],
name="X")
with tf.device("/job:worker/task:0"):
... #training ops definition
train_step = (
tf.train.AdamOptimizer(learning_rate)
.minimize(loss, global_step=global_step)
)
不在with tf.device塊內的節點,會被TensorFlow自動地分配給一個設備。
在數據並行化框架中,節點會被分配到參數伺服器中,操作會被分配到worker伺服器中。手動進行分配不具有擴展性(設想你有10臺參數伺服器,你不會想手動地為每一臺分配變量)。TensorFlow提供了方便的tf.train.replica_device_setter,它可以自動地為設備分配操作。
它以一個tf.train.ClusterSpec對象作為輸入,並返回一個用於傳給tf.device的函數。
在我們的模型中,變量操作被存放在參數伺服器中,訓練操作被存放在worker伺服器中。
上面定義計算圖的操作變為:
with tf.device(tf.train.replica_device_setter
(cluster_spec)):
... #model definition
X = tf.placeholder(tf.float32, [100,128,128,3],
name="X")
... #training ops definition
train_step = (
tf.train.AdamOptimizer(learning_rate)
.minimize(loss, global_step=global_step)
)
3. 配置和啟動tf.train.MonitoredTrainingSession
tf.train.MonitoredTrainingSession是tf.Session在分布式訓練中的等價物。它負責設置一個主worker節點,它會:
參數:
tf.train.MonitoredTrainingSession的參數包含主節點、checkpoints路徑、保存checkpoints以及導出TensorBoard展示所需信息的頻率。
with tf.train.MonitoredTrainingSession(
master=server.target, # as defined with tf.train.
Server
is_chief= ..., #boolean, is this node the master?
checkpoint_dir=..., #path to checkpoint
/tensorboard dir
hooks = hooks #see next section
) as sess:
對於is_chief,你需要在代碼中某處定義某個節點是主節點,例如你可以從集群部署系統中獲取。
設置訓練步數
我猜,你曾經在tf.Session塊中使用了循環,並在循環中的每個迭代中,使用一個或多個sess.run指令。
這不是MonitoredTrainingSession執行的方式,所有的實例需要合理地被終止和同步,一個checkpoint需要被保存。因此,訓練的步數通過一個SessionRunHook對象列表,被直接傳入MonitoredTrainingSession。
向MonitoredTrainingSession對象傳入一個tf.train.StopAtStepHook鉤子,這個鉤子定義了訓練的最後一步,之後參數伺服器和worker伺服器會被關閉。
注意:有一些其他類型的鉤子,你可以基於tf.train.SessionRunHook定義自己的鉤子,這裡不詳細介紹了。
代碼如下:
hooks = [tf.train.StopAtStepHook(last_step = 100000)]
with tf.train.MonitoredTrainingSession(...) as sess:
sess.run(loss)
#run your ops here
圖如下:
在Clusterone中構建數據並行化模型
現在我們了解了分布式TensorFlow代碼中的組件,我來提供一些在Clusterone中運行分布式TensorFlow的高層次的代碼片段:
# Notes:
# You need to have the clusterone package installed
(pip install tensorport)
# Export logs and outputs to /logs, your data is in /data.
import tensorflow as tf
from clusterone import get_data_path, get_logs_path
# Get the environment parameters for distributed
TensorFlow
try:
job_name = os.environ['JOB_NAME']
task_index = os.environ['TASK_INDEX']
ps_hosts = os.environ['PS_HOSTS']
worker_hosts = os.environ['WORKER_HOSTS']
except: # we are not on TensorPort, assuming local,
single node
task_index = 0
ps_hosts = None
worker_hosts = None
# This function defines the master, ClusterSpecs and
device setters
def device_and_target():
# If FLAGS.job_name is not set, we're running
single-machine TensorFlow.
# Don't set a device.
if FLAGS.job_name is None:
print("Running single-machine training")
return (None, "")
# Otherwise we're running distributed TensorFlow.
print("Running distributed training")
if FLAGS.task_index is None or FLAGS.task_index == "":
raise ValueError("Must specify an explicit
`task_index`")
if FLAGS.ps_hosts is None or FLAGS.ps_hosts == "":
raise ValueError("Must specify an explicit
`ps_hosts`")
if FLAGS.worker_hosts is None or FLAGS.worker_hosts
== "":
raise ValueError("Must specify an explicit
`worker_hosts`")
cluster_spec = tf.train.ClusterSpec({
"ps": FLAGS.ps_hosts.split(","),
"worker": FLAGS.worker_hosts.split(","),
})
server = tf.train.Server(
cluster_spec, job_name=FLAGS.job_name,
task_index=FLAGS.task_index)
if FLAGS.job_name == "ps":
server.join()
worker_device = "/job:worker/task:{}".
format(FLAGS.task_index)
# The device setter will automatically place Variables
ops on separate
# parameter servers (ps). The non-Variable ops will
be placed on the workers.
return (
tf.train.replica_device_setter(
worker_device=worker_device,
cluster=cluster_spec),
server.target,
)
device, target = device_and_target()
# Defining graph
with tf.device(device):
# TODO define your graph here
...
# Defining the number of training steps
hooks = [tf.train.StopAtStepHook(last_step=100000)]
with tf.train.MonitoredTrainingSession(master=target,
is_chief=(FLAGS.task_index == 0),
checkpoint_dir=FLAGS.logs_dir,
hooks=hooks) as sess:
while not sess.should_stop():
# execute training step here (read data,
feed_dict, session)
# TODO define training ops
data_batch = ...
feed_dict = {...}
loss, _ = sess.run(...)
原文連結:
https://clusterone.com/blog/2017/09/13/distributed-tensorflow-clusterone/
-END-
專 · 知
人工智慧領域26個主題知識資料全集獲取與加入專知人工智慧服務群: 歡迎微信掃一掃加入專知人工智慧知識星球群,獲取專業知識教程視頻資料和與專家交流諮詢!
請PC登錄www.zhuanzhi.ai或者點擊閱讀原文,註冊登錄專知,獲取更多AI知識資料!
請加專知小助手微信(掃一掃如下二維碼添加),加入專知主題群(請備註主題類型:AI、NLP、CV、 KG等)交流~
請關注專知公眾號,獲取人工智慧的專業知識!
點擊「閱讀原文」,使用專知