Distributed TensorFlow(tensorflow分布式)
Distributed TensorFlow
本文档展示了如何创建一个TensorFlow服务器集群,以及如何在该集群中分配一个计算图。我们假设您熟悉编写 TensorFlow 程序的基本概念。
你好,分布式TensorFlow!
要查看一个简单的 TensorFlow 集群,请执行以下操作:
# Start a TensorFlow server as a single-process "cluster".
$ python
>>> import tensorflow as tf
>>> c = tf.constant("Hello, distributed TensorFlow!")
>>> server = tf.train.Server.create_local_server()
>>> sess = tf.Session(server.target) # Create a session on the server.
>>> sess.run(c)
'Hello, distributed TensorFlow!'
tf.train.Server.create_local_server
方法创建一个具有进程内服务器的单进程群集。
创建一个集群
一个TensorFlow“集群”是一组参与TensorFlow图形分布式执行的“任务”。每个任务都与一个TensorFlow“服务器”相关联,该服务器包含一个可用于创建会话的“主”和一个在图中执行操作的“工作人员”。一个集群也可以分成一个或多个“作业”,每个作业包含一个或多个任务。
要创建集群,您需要为集群中的每个任务启动一台TensorFlow服务器。每个任务通常运行在不同的机器上,但您可以在同一台机器上运行多个任务(例如,控制不同的GPU设备)。在每项任务中,请执行以下操作:
1. 创建一个
tf.train.ClusterSpec
描述集群中所有任务。这对每个任务都应该是一样的。
2. 创建一个
tf.train.Server
,传递tf.train.ClusterSpec
给构造函数,并使用作业名称和任务索引标识本地任务。
创建一个tf.train.ClusterSpec来描述集群
群集规范字典将任务名称映射到网络地址列表。将这个字典传递给tf.train.ClusterSpec
构造函数。例如:
tf.train.ClusterSpec 建设 | 可供使用的任务 |
---|---|
tf.train.ClusterSpec{"local": "localhost:2222", "localhost:2223"}) | /job:local/task:0/job:local/task:1 |
tf.train.ClusterSpec{ "worker": "worker0.example.com:2222", "worker1.example.com:2222", "worker2.example.com:2222" , "ps": "ps0.example.com:2222", "ps1.example.com:2222" }) | /job:worker/task:0/job:worker/task:1/job:worker/task:2/job:ps/task:0/job:ps/task:1 |
在每个任务中创建一个tf.train.Server实例
一个tf.train.Server
对象包含一组本地设备,一组到其tf.train.ClusterSpec
中的其他任务的连接,并且tf.Session
可以使用它们来执行分布式计算。每个服务器都是特定命名作业的成员,并且在该任务中有一个任务索引。服务器可以与群集中的任何其他服务器进行通信。
例如,要启动运行两台服务器的集群,localhost:2222
和localhost:2223
,在本地计算机上的两个不同进程中运行以下代码片段:
# In task 0:
cluster = tf.train.ClusterSpec{"local": ["localhost:2222", "localhost:2223"]})
server = tf.train.Server(cluster, job_name="local", task_index=0)
# In task 1:
cluster = tf.train.ClusterSpec{"local": ["localhost:2222", "localhost:2223"]})
server = tf.train.Server(cluster, job_name="local", task_index=1)
注意:
手动指定这些集群规范可能很乏味,特别是对于大型集群。我们正在开发以编程方式启动任务的工具,例如使用像 Kubernete s这样的集群管理器。如果您希望看到支持的特定群集管理器,请提出 GitHub问题。
指定模型中的分布式设备
要将操作放置在特定的进程上,可以使用用于指定(不论ops是在CPU还是在GPU上运行)的功能相同的tf.device
函数。例如:
with tf.device("/job:ps/task:0"):
weights_1 = tf.Variable(...)
biases_1 = tf.Variable(...)
with tf.device("/job:ps/task:1"):
weights_2 = tf.Variable(...)
biases_2 = tf.Variable(...)
with tf.device("/job:worker/task:7"):
input, labels = ...
layer_1 = tf.nn.relu(tf.matmul(input, weights_1) + biases_1)
logits = tf.nn.relu(tf.matmul(layer_1, weights_2) + biases_2)
# ...
train_op = ...
with tf.Session("grpc://worker7.example.com:2222") as sess:
for _ in range(10000):
sess.run(train_op)
在上面的例子中,变量是在ps
工作中的两个任务上创建的,模型的计算密集型部分是在worker
工作中创建的。TensorFlow 将插入任务之间的适当的数据传输(用于直传的从ps
到worker
,以及用于施加梯度的从worker
到ps
)。
复制训练
一种称为“数据并行性”的通用训练配置涉及多个任务,在不同的小批量数据上一个worker
工作训练相同的模型,更新ps
作业中一个或多个任务中托管的共享参数。所有任务通常运行在不同的机器上。在 TensorFlow 中有很多方法可以指定这个结构,我们正在构建库,以简化指定复制模型的工作。可能的方法包括:
图内复制。
在这种方法中,客户端构建一个包含一组参数的tf.Variable
节点(在节点上固定/job:ps
)的tf.Graph
;以及模型的计算密集型部分的多个副本,每个副本都固定在一个不同的任务中/job:worker
。
图形间复制。
在这种方法中,每个/job:worker
任务都有一个单独的客户端,通常与工作任务处于相同的进程中。每个客户建立一个包含参数的类似图形(固定到/job:ps
像以前一样使用tf.train.replica_device_setter
将它们确定性地映射到相同的任务);以及模型的计算密集型部分的单个副本,并将其固定到/job:worker
中的本地任务。
异步培训。
在这种方法中,图的每个副本都有独立的训练循环,无需协调即可执行。它与以上两种复制形式兼容。
同步训练。
在此方法中,所有副本都读取当前参数的相同值,并行计算梯度,然后将它们应用到一起。它与图内复制兼容(例如,使用 CIFAR-10多GPU训练 器中的梯度平均)以及图间复制(例如使用tf.train.SyncReplicasOptimizer
)。
总而言之:训练师计划示例
以下代码显示了分布式培训师程序的框架,实现了图间复制
和异步培训
。它包含参数服务器和辅助任务的代码。
import argparse
import sys
import tensorflow as tf
FLAGS = None
def main(_):
ps_hosts = FLAGS.ps_hosts.split(",")
worker_hosts = FLAGS.worker_hosts.split(",")
# Create a cluster from the parameter server and worker hosts.
cluster = tf.train.ClusterSpec{"ps": ps_hosts, "worker": worker_hosts})
# Create and start a server for the local task.
server = tf.train.Server(cluster,
job_name=FLAGS.job_name,
task_index=FLAGS.task_index)
if FLAGS.job_name == "ps":
server.join()
elif FLAGS.job_name == "worker":
# Assigns ops to the local worker by default.
with tf.device(tf.train.replica_device_setter(
worker_device="/job:worker/task:%d" % FLAGS.task_index,
cluster=cluster)):
# Build model...
loss = ...
global_step = tf.contrib.framework.get_or_create_global_step()
train_op = tf.train.AdagradOptimizer(0.01).minimize(
loss, global_step=global_step)
# The StopAtStepHook handles stopping after running given steps.
hooks=[tf.train.StopAtStepHook(last_step=1000000)]
# The MonitoredTrainingSession takes care of session initialization,
# restoring from a checkpoint, saving to a checkpoint, and closing when done
# or an error occurs.
with tf.train.MonitoredTrainingSession(master=server.target,
is_chief=(FLAGS.task_index == 0),
checkpoint_dir="/tmp/train_logs",
hooks=hooks) as mon_sess:
while not mon_sess.should_stop():
# Run a training step asynchronously.
# See `tf.train.SyncReplicasOptimizer` for additional details on how to
# perform *synchronous* training.
# mon_sess.run handles AbortedError in case of preempted PS.
mon_sess.run(train_op)
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.register("type", "bool", lambda v: v.lower() == "true")
# Flags for defining the tf.train.ClusterSpec
parser.add_argument(
"--ps_hosts",
type=str,
default="",
help="Comma-separated list of hostname:port pairs"
)
parser.add_argument(
"--worker_hosts",
type=str,
default="",
help="Comma-separated list of hostname:port pairs"
)
parser.add_argument(
"--job_name",
type=str,
default="",
help="One of 'ps', 'worker'"
)
# Flags for defining the tf.train.Server
parser.add_argument(
"--task_index",
type=int,
default=0,
help="Index of task within the job"
)
FLAGS, unparsed = parser.parse_known_args()
tf.app.run(main=main, argv=[sys.argv[0]] + unparsed)
要启动具有两个参数服务器和两个工人的培训师,请使用以下命令行(假定调用该trainer.py
脚本):
# On ps0.example.com:
$ python trainer.py \
--ps_hosts=ps0.example.com:2222,ps1.example.com:2222 \
--worker_hosts=worker0.example.com:2222,worker1.example.com:2222 \
--job_name=ps --task_index=0
# On ps1.example.com:
$ python trainer.py \
--ps_hosts=ps0.example.com:2222,ps1.example.com:2222 \
--worker_hosts=worker0.example.com:2222,worker1.example.com:2222 \
--job_name=ps --task_index=1
# On worker0.example.com:
$ python trainer.py \
--ps_hosts=ps0.example.com:2222,ps1.example.com:2222 \
--worker_hosts=worker0.example.com:2222,worker1.example.com:2222 \
--job_name=worker --task_index=0
# On worker1.example.com:
$ python trainer.py \
--ps_hosts=ps0.example.com:2222,ps1.example.com:2222 \
--worker_hosts=worker0.example.com:2222,worker1.example.com:2222 \
--job_name=worker --task_index=1
词汇表
客户
客户端通常是构建 TensorFlow 图并构建tensorflow::Session
与集群交互的程序。客户通常使用Python或C ++编写。单个客户端进程可以直接与多台TensorFlow服务器交互(请参阅上面的“复制式训练”),并且单台服务器可以为多个客户端提供服务。
簇
TensorFlow 集群包含一个或多个“作业”,每个“作业”分为一个或多个“任务”列表。群集通常专用于特定的高级目标,例如训练神经网络,并行使用多台机器。一个集群由一个tf.train.ClusterSpec
对象定义。
工作
一份工作包含一份“任务”清单,这些清单通常用于共同目的。例如,名为ps
(“参数服务器”)的作业通常托管存储和更新变量的节点; 而名为的作业worker
通常托管执行计算密集型任务的无状态节点。作业中的任务通常运行在不同的机器上。这组工作角色是灵活的:例如, worker
可以保持某种状态。
主服务
RPC服务,提供对一组分布式设备的远程访问,并充当会话目标。主服务实现tensorflow::Session
接口,并负责协调跨一个或多个“工作者服务”的工作。所有TensorFlow服务器均实施主服务。
任务
任务对应于特定的 TensorFlow 服务器,并且通常对应于单个进程。任务属于特定的“工作”,并通过其在该工作任务列表中的索引来标识。
TensorFlow服务器
运行tf.train.Server
实例的进程,该实例是群集的成员,并导出“主服务”和“辅助服务”。
工作者服务
使用本地设备执行TensorFlow图形部分的RPC服务。工作者服务实现worker_service.proto。所有的 TensorFlow服务器都实现了工作者服务。