资讯专栏INFORMATION COLUMN

TensorFlow学习笔记(9):分布式TensorFlow

PumpkinDylan / 555人阅读

摘要:本文基于官方教程,实践了分布式搭建的过程。一般将任务分为两类一类叫参数服务器,,简称为,用于存储一类就是普通任务,称为,用于执行具体的计算。参数服务器是一套分布式存储,用于保存参数,并提供参数更新的操作。

简介

TensorFlow支持使用多台机器的设备进行计算。本文基于官方教程,实践了分布式TensorFlow搭建的过程。

TensorFlow入门教程

基本概念 TensorFlow集群

A TensorFlow "cluster" is a set of "tasks" that participate in the distributed execution of a TensorFlow graph. Each task is associated with a TensorFlow "server", which contains a "master" that can be used to create sessions, and a "worker" that executes operations in the graph.

从上面的定义可以看出,所谓的TensorFlow集群就是一组任务,每个任务就是一个服务。服务由两个部分组成,第一部分是master,用于创建session,第二部分是worker,用于执行具体的计算。

TensorFlow一般将任务分为两类job:一类叫参数服务器,parameter server,简称为ps,用于存储tf.Variable;一类就是普通任务,称为worker,用于执行具体的计算。

首先来理解一下参数服务器的概念。一般而言,机器学习的参数训练过程可以划分为两个类别:第一个是根据参数算算梯度,第二个是根据梯度更新参数。对于小规模训练,数据量不大,参数数量不多,一个CPU就足够了,两类任务都交给一个CPU来做。对于普通的中等规模的训练,数据量比较大,参数数量不多,计算梯度的任务负荷较重,参数更新的任务负荷较轻,所以将第一类任务交给若干个CPU或GPU去做,第二类任务交给一个CPU即可。对于超大规模的训练,数据量大、参数多,不仅计算梯度的任务要部署到多个CPU或GPU上,而且更新参数的任务也要部署到多个CPU。如果计算量足够大,一台机器能搭载的CPU和GPU数量有限,就需要多台机器来进行计算能力的扩展了。参数服务器是一套分布式存储,用于保存参数,并提供参数更新的操作。

我们来看一下怎么创建一个TensorFlow集群。每个任务用一个ip:port表示。TensorFlow用tf.train.ClusterSpec表示一个集群信息,举例如下:

import tensorflow as tf

# Configuration of cluster 
ps_hosts = [ "xx.xxx.xx.xxxx:oooo", "xx.xxx.xx.xxxx:oooo" ]
worker_hosts = [ "xx.xxx.xx.xxxx:oooo", "xx.xxx.xx.xxxx:oooo", "xx.xxx.xx.xxxx:oooo" ]
cluster = tf.train.ClusterSpec({"ps": ps_hosts, "worker": worker_hosts})

上面的语句提供了一个TensorFlow集群信息,集群有两类任务,称为job,一个job是ps,一个job是worker;ps由2个任务组成,worker由3个任务组成。

定义完集群信息后,使用tf.train.Server创建每个任务:

tf.app.flags.DEFINE_string("job_name", "worker", "One of "ps", "worker"")
tf.app.flags.DEFINE_integer("task_index", 0, "Index of task within the job")

FLAGS = tf.app.flags.FLAGS

def main(_):
    server = tf.train.Server(cluster,
                             job_name=FLAGS.job_name,
                             task_index=FLAGS.task_index)
    server.join()

if __name__ == "__main__":
    tf.app.run()

对于本例而言,我们需要在ip:port对应的机器上运行每个任务,共需执行五次代码,生成五个任务。

python worker.py --job_name=ps --task_index=0
python worker.py --job_name=ps --task_index=1
python worker.py --job_name=worker --task_index=0
python worker.py --job_name=worker --task_index=1
python worker.py --job_name=worker --task_index=2

我们找到集群的某一台机器,执行下面的代码:

# -*- coding=utf-8 -*-

import tensorflow as tf
import numpy as np

train_X = np.random.rand(100).astype(np.float32)
train_Y = train_X * 0.1 + 0.3

# 选择变量存储位置和op执行位置,这里全部放在worker的第一个task上
with tf.device("/job:worker/task:0"):
    X = tf.placeholder(tf.float32)
    Y = tf.placeholder(tf.float32)
    w = tf.Variable(0.0, name="weight")
    b = tf.Variable(0.0, name="reminder")
    y = w * X + b
    loss = tf.reduce_mean(tf.square(y - Y))

    init_op = tf.global_variables_initializer()
    train_op = tf.train.GradientDescentOptimizer(0.01).minimize(loss)

# 选择创建session使用的master
with tf.Session("grpc://xx.xxx.xx.xxxx:oooo") as sess:
    sess.run(init_op)
    for i in range(500):
        sess.run(train_op, feed_dict={X: train_Y, Y: train_Y})
        if i % 50 == 0:
            print i, sess.run(w), sess.run(b)

    print sess.run(w)
    print sess.run(b)

执行结果如下:

0 0.00245265 0.00697793
50 0.0752466 0.213145
100 0.0991397 0.279267
150 0.107308 0.30036
200 0.110421 0.306972
250 0.111907 0.308929
300 0.112869 0.309389
350 0.113663 0.309368
400 0.114402 0.309192
450 0.115123 0.308967
0.115824
0.30873

其实ps和worker本质上是一个东西,就是名字不同,我们将上例中的with tf.device("/job:worker/task:0"):改为with tf.device("/job:psr/task:0"):,一样能够执行。之所以在创建集群时要分为两个类别的任务,是因为TensorFlow提供了一些工具函数,会根据名字不同赋予task不同的任务,ps的用于存储变量,worker的用于计算。

同步与异步更新

同步更新:将数据拆分成多份,每份基于参数计算出各自部分的梯度;当每一份的部分梯度计算完成后,收集到一起算出总梯度,再用总梯度去更新参数。

异步更新:同步更新模式下,每次都要等各个部分的梯度计算完后才能进行参数更新操作,处理速度取决于计算梯度最慢的那个部分,其他部分存在大量的等待时间浪费;异步更新模式下,所有的部分只需要算自己的梯度,根据自己的梯度更新参数,不同部分之间不存在通信和等待。

分布式训练案例
import tensorflow as tf
import numpy as np

# Configuration of cluster 
ps_hosts = [ "xx.xxx.xx.xxxx:oooo", "xx.xxx.xx.xxxx:oooo" ]
worker_hosts = [ "xx.xxx.xx.xxxx:oooo", "xx.xxx.xx.xxxx:oooo", "xx.xxx.xx.xxxx:oooo" ]
cluster = tf.train.ClusterSpec({"ps": ps_hosts, "worker": worker_hosts})

tf.app.flags.DEFINE_integer("task_index", 0, "Index of task within the job")
FLAGS = tf.app.flags.FLAGS

def main(_):
    with tf.device(tf.train.replica_device_setter(
        worker_device="/job:worker/task:%d" % FLAGS.task_index,
        cluster=cluster)):
        
        x_data = tf.placeholder(tf.float32, [100])
        y_data = tf.placeholder(tf.float32, [100])

        W = tf.Variable(tf.random_uniform([1], -1.0, 1.0))
        b = tf.Variable(tf.zeros([1]))
        y = W * x_data + b
        loss = tf.reduce_mean(tf.square(y - y_data))
        
        global_step = tf.Variable(0, name="global_step", trainable=False)
        optimizer = tf.train.GradientDescentOptimizer(0.1)
        train_op = optimizer.minimize(loss, global_step=global_step)
        
        tf.summary.scalar("cost", loss)
        summary_op = tf.summary.merge_all()
        init_op = tf.global_variables_initializer()
    # The StopAtStepHook handles stopping after running given steps.
    hooks = [ tf.train.StopAtStepHook(last_step=1000000)]
    # The MonitoredTrainingSession takes care of session initialization,
    # restoring from a checkpoint, saving to a checkpoint, and closing when done
    # or an error occurs.
    with tf.train.MonitoredTrainingSession(master="grpc://" + worker_hosts[FLAGS.task_index],
                                           is_chief=(FLAGS.task_index==0), # 我们制定task_index为0的任务为主任务,用于负责变量初始化、做checkpoint、保存summary和复原
                                           checkpoint_dir="/tmp/tf_train_logs",
                                           save_checkpoint_secs=None,
                                           hooks=hooks) as mon_sess:
        while not mon_sess.should_stop():
            # Run a training step asynchronously.
            # See `tf.train.SyncReplicasOptimizer` for additional details on how to
            # perform *synchronous* training.
            # mon_sess.run handles AbortedError in case of preempted PS.
            train_x = np.random.rand(100).astype(np.float32)
            train_y = train_x * 0.1 + 0.3
            _, step, loss_v, weight, biase = mon_sess.run([train_op, global_step, loss, W, b], feed_dict={x_data: train_x, y_data: train_y})
            if step % 100 == 0:
                print "step: %d, weight: %f, biase: %f, loss: %f" %(step, weight, biase, loss_v)
        print "Optimization finished."

if __name__ == "__main__":
    tf.app.run()

代码中,tf.train.replica_device_setter()会根据job名,将with内的Variable op放到ps tasks,将其他计算op放到worker tasks。默认分配策略是轮询。

在属于集群的一台机器中执行上面的代码,屏幕会开始输出每轮迭代的训练参数和损失

python train.py --task_index=0

在另一台机器上执行下面你的代码,再启动一个任务,会看到屏幕开始输出每轮迭代的训练参数和损失,注意,step不再是从0开始,而是在启动时刻上一个启动任务的step后继续。此时观察两个任务,会发现他们同时在对同一参数进行更新。

python train.py --task_index=2
思考

分布式TensorFlow与Spark对比:

分布式的级别不同:TensorFlow的Tensor、Variable和Op不是分布式的,分布式执行的是subgraph. Spark的op和变量都是构建在RDD上,RDD本身是分布式的。

异步训练:TensorFlow支持同步和异步的分布式训练;Spark原生的API只支持同步训练

分布式存储:Spark在底层封装好了worker和分布式数据之间的关系;TensorFlow需要自行维护。

Parameter Server:TensorFlow支持,Spark暂不支持。

TF分布式部署起来还是比较繁琐的,需要定义好每个任务的ip:port,手工启动每个task,不提供一个界面可以对集群进行维护。

参考资料

白话tensorflow分布式部署和开发

理解和实现分布式TensorFlow集群完整教程

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/38424.html

相关文章

  • ApacheCN 人工智能知识树 v1.0

    摘要:贡献者飞龙版本最近总是有人问我,把这些资料看完一遍要用多长时间,如果你一本书一本书看的话,的确要用很长时间。为了方便大家,我就把每本书的章节拆开,再按照知识点合并,手动整理了这个知识树。 Special Sponsors showImg(https://segmentfault.com/img/remote/1460000018907426?w=1760&h=200); 贡献者:飞龙版...

    刘厚水 评论0 收藏0
  • TensorFlow学习笔记(11):数据操作指南

    摘要:本文的目的是聚焦于数据操作能力,讲述中比较重要的一些,帮助大家实现各自的业务逻辑。传入输入值,指定输出的基本数据类型。 引言 用TensorFlow做好一个机器学习项目,需要具备多种代码能力: 工程开发能力:怎么读取数据、怎么设计与运行Computation Graph、怎么保存与恢复变量、怎么保存统计结果、怎么共享变量、怎么分布式部署 数据操作能力:怎么将原始数据一步步转化为模型需...

    jsbintask 评论0 收藏0

发表评论

0条评论

PumpkinDylan

|高级讲师

TA的文章

阅读更多
最新活动
阅读需要支付1元查看
<