欢迎来到飞桨分布式技术文档主页

  • 欢迎您关注飞桨分布式训练,我们希望能帮助每一个用户走上大规模工业化生产之路!

整体介绍与内容概览

欢迎关注大规模深度学习技术

近十年来,深度学习技术不断刷新视觉、自然语言、语音、搜索、推荐等领域各种任务的记录。这其中的原因,用一个关键词描述就是“大规模”。大规模的数据使得模型有足够的知识可以记忆,大规模参数量的模型使得模型本身有能力记忆更多的数据,大规模高性能的算力(以GPU为典型代表)使得模型的训练速度有百倍甚至千倍的提升。数据、模型、算力的发展催生了大规模深度学习这个领域,如何进行多机任务的拆分、如何配置集群训练资源、如何平衡训练速度和收敛速度、如何训练单机无法训练的模型、弹性训练与容错等都是这个方向重点研究的问题。

飞桨分布式训练提供的核心价值

  1. 源自产业实践的经验:

  • 飞桨的分布式训练技术源自百度的业务实践,是经过超大规模业务数据检验过的训练框架。

  • 飞桨分布式训练经过实践检验的应用领域包括自然语言处理,计算机视觉,搜索,推荐等。

  1. 完备的并行模式:

  • 数据并行:针对产业界最常用的数据并行模式,飞桨针对实际业务需求重点打磨多项技术,包括;飞桨提供集合通信架构和参数服务器架构两种方式,支持工业实践中常见的同步训练和异步训练的机制,并提供收敛效果有保障的分布式优化算法。

  • 流水线并行:面向异构硬件,流水线并行能够将模型计算部分拆分到不同硬件并充分流水线化,从而大规模提升异构硬件的整体利用率。

  • 模型并行:对于超大规模分类问题,飞桨提供计算与存储同时并行的模型并行,解决单GPU无法解决的问题。

  1. 面向云端场景的并行训练组件:

  • 飞桨针对集群网络环境、硬件设备比较低配的场景提供多种实用的并行策略和优化算法。

  • 针对云端算力具有弹性的特点,飞桨也始终在探索弹性深度学习的应用。

开始你的分布式训练之旅

RoadMap

  • 我们也会推送大规模深度学习技术领域最前沿的技术到这里

    • 近期:千亿规模模型参数的GPU多机多卡训练,敬请期待

公有云配置

  • TBA(燕明)

Kubernetes 部署

概述

在 kubernetes 上部署分布式任务需要安装 paddle-operator 。 paddle-operator 通过添加自定义资源类型 (paddlejob) 以及部署 controller 和一系列 kubernetes 原生组件的方式实现简单定义即可运行 paddle 任务的需求。

目前支持运行 ParameterServer (PS) 和 Collective 两种分布式任务,当然也支持运行单节点任务。

paddle-operator 安装

准备

安装 paddle-operator 需要有已经安装的 kubernetes (v1.16+) 集群和 kubectl (v1.16+) 工具。

可以通过 git clone 或者复制文件内容保存以下 文件 到本地,

deploy
├── examples
│   ├── wide_and_deep.yaml
│   ├── wide_and_deep_podip.yaml
│   └── wide_and_deep_service.yaml
└── v1
    ├── crd.yaml
    └── operator.yaml

部署 CRD

执行以下命令,

$ kubectl create -f deploy/v1/crd.yaml

通过以下命令查看是否成功,

$ kubectl get crd
NAME                                    CREATED AT
paddlejobs.batch.paddlepaddle.org       2021-02-08T07:43:24Z

部署 controller 及相关组件

注意默认部署的 namespace 为 paddle-system,如果希望在自定义的 namespace 中运行或者提交任务, 需要先在 operator.yaml 文件中对应更改 namespace 配置,其中

  • namespace: paddle-system 表示该资源部署的 namespace,可理解为系统 controller namespace;

  • Deployment 资源中 containers.args 中 –namespace=paddle-system 表示 controller 监控资源所在 namespace,即任务提交 namespace。

执行以下部署命令,

$ kubectl create -f deploy/v1/operator.yaml

通过以下命令查看部署结果和运行状态,

$ kubectl -n paddle-system get pods
NAME                                         READY   STATUS    RESTARTS   AGE
paddle-controller-manager-698dd7b855-n65jr   1/1     Running   0          1m

通过查看 controller 日志以确保运行正常,

$ kubectl -n paddle-system logs paddle-controller-manager-698dd7b855-n65jr

提交 demo 任务查看效果,

$ kubectl -n paddle-system create -f deploy/examples/wide_and_deep.yaml

查看 paddlejob 任务状态, pdj 为 paddlejob 的缩写,

$ kubectl -n paddle-system get pdj
NAME                     STATUS      MODE   PS    WORKER   AGE
wide-ande-deep-service   Completed   PS     2/2   0/2      4m4s

以上信息可以看出:训练任务已经正确完成,该任务为 ps 模式,配置需求 2 个 pserver, 2 个在运行,需求 2 个 woker,0 个在运行(已完成退出)。 可通过 cleanPodPolicy 配置任务完成/失败后的 pod 删除策略,详见任务配置。

查看 pod 状态,

$ kubectl -n paddle-system get pods

卸载

通过以下命令卸载部署的组件,

$ kubectl delete -f deploy/v1/crd.yaml -f deploy/v1/operator.yaml

注意:重新安装时,建议先卸载再安装

paddlejob 任务提交

在上述安装过程中,我们使用了 wide-and-deep 的例子作为提交任务演示,本节详细描述任务封装和提交流程供用户参考提交自己的任务。

代码准备

示例源码可在此获得,wide_and_deep ,train.py 作为程序的入口点。

本示例会在任务镜像中包含训练数据,实际应用过程中一般不会也不建议这样使用,常见用法分为以下两种:

  • 任务运行时,程序通过网络拉取数据到本地进行训练,该情形数据由程序维护,这里不需要额外配置;

  • 任务运行时,程序读取本地目录进行训练,该情形需要使用用户配置 kubernetes 支持的挂载存储,一般建议使用 pvc 抽象,详细示例见下一小节。

制作任务镜像

在 kubernetes 中使用镜像需要有可访问的镜像仓库,这里使用百度云 ccr 作为示例,用户需要自己配置。

用于生成镜像的 Dockerfile 和代码目录,

$ ls
Dockerfile   wide_and_deep

Dockerfile 内容,

$ cat Dockerfile
FROM ubuntu:18.04

RUN apt update && \
    apt install -y python3 python3-dev python3-pip

RUN python3 -m pip install paddlepaddle==2.0.0 -i https://mirror.baidu.com/pypi/simple

## 以下根据用户内容修改

ADD wide_and_deep /wide_and_deep

WORKDIR /wide_and_deep

ENTRYPOINT ["python3", "train.py"]

用户可根据实际情况更改内容和安装额外依赖。

注意:使用 gpu 训练时需要

  • 安装 gpu 版本的 paddlepaddle 和相关组件或选用 官方 docker 作为基础镜像或环境;

  • 需要在集群中安装好对应 驱动工具包 支持。

制作镜像

docker build -t registry.baidubce.com/kuizhiqing/demo-wide-and-deep:v1 .

提交镜像 (需要具有对应权限)

docker push registry.baidubce.com/kuizhiqing/demo-wide-and-deep:v1

配置任务

准备配置文件,

$ cat pdj.yaml
apiVersion: batch.paddlepaddle.org/v1
kind: PaddleJob
metadata:
  name: wide-ande-deep
spec:
  withGloo: 1
  intranet: PodIP
  cleanPodPolicy: OnCompletion
  worker:
    replicas: 2
    template:
      spec:
        containers:
          - name: paddle
            image: registry.baidubce.com/kuizhiqing/demo-wide-and-deep:v1
  ps:
    replicas: 2
    template:
      spec:
        containers:
          - name: paddle
            image: registry.baidubce.com/kuizhiqing/demo-wide-and-deep:v1

说明:

  • 提交命名需要唯一,如果存在冲突请先删除原 paddlejob 确保已经删除再提交;

  • ps 模式时需要同时配置 ps 和 worker,collective 模式时只需要配置 worker 即可;

  • withGloo 可选配置为 0 不启用, 1 只启动 worker 端, 2 启动全部(worker端和Server端), 建议设置 1;

  • cleanPodPolicy 可选配置为 Always/Never/OnFailure/OnCompletion,表示任务终止(失败或成功)时,是否删除 pod,调试时建议 Never,生产时建议 OnCompletion;

  • intranet 可选配置为 Service/PodIP,表示 pod 间的通信方式,用户可以不配置, 默认使用 PodIP;

  • ps 和 worker 的内容为 podTemplateSpec,用户可根据需要遵从 kubernetes 规范添加更多内容, 如 GPU 的配置.

更多配置示例,

apiVersion: batch.paddlepaddle.org/v1
kind: PaddleJob
metadata:
  name: wide-ande-deep
spec:
  intranet: Service
  cleanPodPolicy: OnCompletion
  worker:
    replicas: 2
    template:
      spec:
        containers:
          - name: paddle
            image: registry.baidubce.com/kuizhiqing/demo-wide-and-deep:v1
            resources:
              limits:
                nvidia.com/gpu: 1
        nodeSelector:
          accelerator: nvidia-tesla-p100
  ps:
    replicas: 2
    template:
      spec:
        containers:
          - name: paddle
            image: registry.baidubce.com/kuizhiqing/demo-wide-and-deep:v1
            resources:
              limits:
                nvidia.com/gpu: 1
        nodeSelector:
          accelerator: nvidia-tesla-p100

使用 kubectl 提交 yaml 配置文件以创建任务,

$ kubectl -n paddle-system create -f pdj.yaml

数据存储

在 kubernentes 中使用挂载存储建议使用 pv/pvc 配置,详见 persistent-volumes

这里使用 nfs 云盘作为存储作为示例,配置文件如下,

$ cat pv-pvc.yaml
---
apiVersion: v1
kind: PersistentVolume
metadata:
  name: nfs-pv
spec:
  capacity:
    storage: 10Gi
  volumeMode: Filesystem
  accessModes:
    - ReadWriteOnce
  persistentVolumeReclaimPolicy: Recycle
  storageClassName: slow
  mountOptions:
    - hard
    - nfsvers=4.1
  nfs:
    path: /nas
    server: 10.12.201.xx

---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: nfs-pvc
spec:
  accessModes:
    - ReadWriteOnce
  volumeMode: Filesystem
  resources:
    requests:
      storage: 10Gi
  storageClassName: slow
  volumeName: nfs-pv

使用以下命令在 namespace paddle-system 中 创建 pvc 名为 nfs-pvc 的存储声明,实际引用为 10.12.201.xx 上的 nfs 存储。

$ kubectl -n paddle-system apply -f pv-pvc.yaml

注意 pvc 需要绑定 namespace 且只能在该 namespace 下使用。

提交 paddlejob 任务时,配置 volumes 引用以使用对应存储,

apiVersion: batch.paddlepaddle.org/v1
kind: PaddleJob
metadata:
  name: paddlejob-demo-1
spec:
  cleanPolicy: OnCompletion
  worker:
    replicas: 2
    template:
      spec:
        restartPolicy: "Never"
        containers:
          - name: paddle
            image: registry.baidubce.com/kuizhiqing/paddle-ubuntu:2.0.0-18.04
            command: ["bash","-c"]
            args: ["cd /nas/wide_and_deep; python3 train.py"]
            volumeMounts:
            - mountPath: /nas
              name: data
        volumes:
          - name: data
            persistentVolumeClaim:
              claimName: nfs-pvc
  ps:
    replicas: 2
    template:
      spec:
        restartPolicy: "Never"
        containers:
          - name: paddle
            image: registry.baidubce.com/kuizhiqing/paddle-ubuntu:2.0.0-18.04
            command: ["bash","-c"]
            args: ["cd /nas/wide_and_deep; python3 train.py"]
            volumeMounts:
            - mountPath: /nas
              name: data
        volumes:
          - name: data
            persistentVolumeClaim:
              claimName: nfs-pvc

该示例中,镜像仅提供运行环境,训练代码和数据均通过存储挂载的方式添加。

安装Paddle

使用飞桨进行分布式训练的最小安装集合就是安装Paddle。从Paddle 2.0版本开始,我们面向不同用户群体提供不同类型的分布式训练API。

  • 面向算法工程师为主的高级API paddle.distributed.fleet

  • 面向具有分布式训练底层工程开发能力的工程师提供的API paddle.distributed

  • 您只需要安装Paddle,就可以获得飞桨团队官方提供的所有分布式训练功能。

安装CPU版本

pip install paddlepaddle

或者安装GPU版本

pip install paddlepaddle-gpu

关于安装Paddle,这里 有更完备的安装指南供您参考。

优化算法

  • TBA(天健)

Collective训练

快速开始

Collective训练快速开始

本节将采用CV领域非常经典的模型ResNet50为例,介绍如何使用Fleet API(paddle.distributed.fleet)完成Collective训练任务。数据方面我们采用Paddle内置的flowers数据集,优化器使用Momentum方法。循环迭代多个epoch,每轮打印当前网络具体的损失值和acc值。具体代码保存在FleetX/examples/resnet下面,其中resnet_static.py用于保存模型相关代码,而train_fleet_static.py为本节需要讲解的训练脚本。

版本要求

在编写分布式训练程序之前,用户需要确保已经安装paddlepaddle-2.0.0-rc-cpu或paddlepaddle-2.0.0-rc-gpu及以上版本的飞桨开源框架。

操作方法

与单机单卡的普通模型训练相比,Collective训练的代码主要需要补充三个部分代码:

  1. 导入分布式训练需要的依赖包。

  2. 初始化Fleet环境。

  3. 设置分布式训练需要的优化器。 下面将逐一进行讲解。

导入依赖

导入必要的依赖,例如分布式训练专用的Fleet API(paddle.distributed.fleet)。

from paddle.distributed import fleet
初始化fleet环境

包括定义缺省的分布式策略,然后通过将参数is_collective设置为True,使训练架构设定为Collective架构。

strategy = fleet.DistributedStrategy()
fleet.init(is_collective=True, strategy=strategy)
设置分布式训练使用的优化器

使用distributed_optimizer设置分布式训练优化器。

optimizer = fleet.distributed_optimizer(optimizer)
完整代码

train_fleet_static.py的完整训练代码如下所示。

# -*- coding: UTF-8 -*-
import numpy as np
import argparse
import ast
import paddle
# 导入必要分布式训练的依赖包
import paddle.distributed.fleet as fleet
# 导入模型文件
import resnet_static as resnet
import os

base_lr = 0.1   # 学习率
momentum_rate = 0.9 # 冲量
l2_decay = 1e-4 # 权重衰减

epoch = 10  #训练迭代次数
batch_size = 32 #训练批次大小
class_dim = 10

# 设置优化器
def optimizer_setting(parameter_list=None):
    optimizer = paddle.optimizer.Momentum(
        learning_rate=base_lr,
        momentum=momentum_rate,
        weight_decay=paddle.regularizer.L2Decay(l2_decay),
        parameters=parameter_list)
    return optimizer
# 设置数据读取器
def get_train_loader(feed_list, place):
    def reader_decorator(reader):
        def __reader__():
            for item in reader():
                img = np.array(item[0]).astype('float32').reshape(3, 224, 224)
                label = np.array(item[1]).astype('int64').reshape(1)
                yield img, label

        return __reader__
    train_reader = paddle.batch(
            reader_decorator(paddle.dataset.flowers.train(use_xmap=True)),
            batch_size=batch_size,
            drop_last=True)
    train_loader = paddle.io.DataLoader.from_generator(
        capacity=32,
        use_double_buffer=True,
        feed_list=feed_list,
        iterable=True)
    train_loader.set_sample_list_generator(train_reader, place)
    return train_loader
# 设置训练函数
def train_resnet():
    paddle.enable_static() # 使能静态图功能
    paddle.vision.set_image_backend('cv2')

    image = paddle.static.data(name="x", shape=[None, 3, 224, 224], dtype='float32')
    label= paddle.static.data(name="y", shape=[None, 1], dtype='int64')
    # 调用ResNet50模型
    model = resnet.ResNet(layers=50)
    out = model.net(input=image, class_dim=class_dim)
    avg_cost = paddle.nn.functional.cross_entropy(input=out, label=label)
    acc_top1 = paddle.metric.accuracy(input=out, label=label, k=1)
    acc_top5 = paddle.metric.accuracy(input=out, label=label, k=5)
    # 设置训练资源,本例使用GPU资源
    place = paddle.CUDAPlace(int(os.environ.get('FLAGS_selected_gpus', 0)))

    train_loader = get_train_loader([image, label], place)
    #初始化Fleet环境
    strategy = fleet.DistributedStrategy()
    fleet.init(is_collective=True, strategy=strategy)
    optimizer = optimizer_setting()

    # 通过Fleet API获取分布式优化器,将参数传入飞桨的基础优化器
    optimizer = fleet.distributed_optimizer(optimizer)
    optimizer.minimize(avg_cost)

    exe = paddle.static.Executor(place)
    exe.run(paddle.static.default_startup_program())

    epoch = 10
    step = 0
    for eop in range(epoch):
        for batch_id, data in enumerate(train_loader()):
            loss, acc1, acc5 = exe.run(paddle.static.default_main_program(), feed=data, fetch_list=[avg_cost.name, acc_top1.name, acc_top5.name])
            if batch_id % 5 == 0:
                print("[Epoch %d, batch %d] loss: %.5f, acc1: %.5f, acc5: %.5f" % (eop, batch_id, loss, acc1, acc5))
# 启动训练
if __name__ == '__main__':
    train_resnet()

运行示例

假设要运行2卡的任务,那么只需在命令行中执行:

fleetrun --gpus=0,1 train_fleet_static.py

您将看到显示如下日志信息:

-----------  Configuration Arguments -----------
gpus: 0,1
heter_worker_num: None
heter_workers:
http_port: None
ips: 127.0.0.1
log_dir: log
...
------------------------------------------------
WARNING 2021-01-04 17:59:08,725 launch.py:314] Not found distinct arguments and compiled with cuda. Default use collective mode
launch train in GPU mode
INFO 2021-01-04 17:59:08,727 launch_utils.py:472] Local start 2 processes. First process distributed environment info (Only For Debug):
    +=======================================================================================+
    |                        Distributed Envs                      Value                    |
    +---------------------------------------------------------------------------------------+
    |                 PADDLE_CURRENT_ENDPOINT                 127.0.0.1:17901               |
    |                     PADDLE_TRAINERS_NUM                        2                      |
    |                PADDLE_TRAINER_ENDPOINTS         127.0.0.1:17901,127.0.0.1:18846       |
    |                     FLAGS_selected_gpus                        0                      |
    |                       PADDLE_TRAINER_ID                        0                      |
    +=======================================================================================+

...
W0104 17:59:19.018365 43338 device_context.cc:342] Please NOTE: device: 0, GPU Compute Capability: 7.0, Driver API Version: 10.2, Runtime API Version: 9.2
W0104 17:59:19.022523 43338 device_context.cc:352] device: 0, cuDNN Version: 7.4.
W0104 17:59:23.193490 43338 fuse_all_reduce_op_pass.cc:78] Find all_reduce operators: 161. To make the speed faster, some all_reduce ops are fused during training, after fusion, the number of all_reduce ops is 5.
[Epoch 0, batch 0] loss: 0.12432, acc1: 0.00000, acc5: 0.06250
[Epoch 0, batch 5] loss: 1.01921, acc1: 0.00000, acc5: 0.00000
...

完整2卡的日志信息也可在./log/目录下查看。了解更多fleetrun的用法可参考左侧文档fleetrun 启动分布式任务

单机八卡训练启动命令类似,只需正确指定gpus参数即可,如下所示:

fleetrun --gpus 0,1,2,3,4,5,6,7 train_fleet_static.py

从单机多卡到多机多卡训练,在代码上不需要做任何改动,只需再额外指定ips参数即可。其内容为多机的ip列表,命令如下所示:

fleetrun --ips="xx.xx.xx.xx,yy.yy.yy.yy" --gpus 0,1,2,3,4,5,6,7 train_fleet_static.py

性能基准

请访问飞桨 Perf Repo 获取飞桨性能基准数据。

设计综述

背景

纵观深度学习的发展史,不难发现,很多奠基性的工作,其实早在上世纪四五十年代就被提出了。但是囿于算力,当时的成果主要集中在理论上。直到最近十年间,随着计算性能的不断提升,我们能够在有限的时间内训练出更大更深的神经网络,才让深度学习得以腾飞。不夸张的说,深度学习如今在各个领域取得的成就,和大规模是分不开的。

不管是学术界还是工业界,都一直致力于更快速的训练更大更深的神经网络。大体来分,我们在深度学习领域主要有两个追求,一是训练性能更快,二是模型规模更大。分布式深度学习领域中的很多概念和技巧,都是为了解决这两个问题产生的。

接下来我们就从性能优化和大模型训练两方面入手,介绍几个常用的方法。

性能优化

在通用GPU发布之后,使用显卡训练神经网络的热度开始爆炸性地增长。NVIDIA的CUDA编程语言可以让用户以一种像C一样的语言实现任意代码。那么如何在通用GPU上设计出高效的代码,对性能提升就变得至关重要。本小节大部分的内容,正是关注于此。值得一提的是,除了GPU,市场上还涌现了很多其他硬件厂商开发的AI专用芯片,例如百度的昆仑、华为的昇腾910等。当然,在这些不同芯片上的优化思路都是比较类似的。

在分布式机器学习中,最常用的并行模式是数据并行,即每个工作节点拥有全部模型参数,并训练全量数据的一部分,之后对计算出来的梯度(或参数)进行通信(通常为all-reduce操作)以实现全局信息共享。可以看出,计算和通信是分布式深度学习任务中最主要的两部分。所以常用的性能优化策略也正是从这两部分入手进行的。在计算方面优化手段主要包括计算算子融合;对于通信方面的优化包括通信算子融合、通信拓扑优化等;当然,还有一部分优化同时涉及两部分,例如混合精度训练。下面将会逐一分解进行介绍。

计算OP融合

在深度学习框架中,最基本的计算单元是算子(Operator)。例如常见的矩阵乘法操作,就是以MatMul算子的形式存在。一个完整的计算网络,通常就是由多个算子组合起来的。这样的设计十分灵活,用户可以通过组合不同的算子来验证不同的想法。

但是,鱼和熊掌不可兼得。拥有巨大灵活性所要付出的代价就是性能。举例来讲,假设我们要计算三个输入a、b、c相加的结果,调用过程可能是tmp=add(a, b); out=add(tmp, c)。在这样的网络中,我们会启动两次计算,并开辟了一个中间变量用于存放中间计算结果。在CUDA开发中,这样的一次计算通常是由一个或多个Kernel进行的,而Kernel的启动通常需要一定时间开销,因此在这个例子中加法运算所用到的Kernel就要启动两次,即将产生双倍的时间开销。

针对这个操作的一种优化方法是,我们开发一个支持三个输入的OP(假设名为add3)。那么我们只需要启动一次Kernel计算,即out=add3(a, b, c),便可以得到最终的结果。该方法的一个附加好处是还节省了一个临时空间的申请。

这种思路就是所谓的计算OP融合(Fusion),详细内容请参考4.1.1小节。需要说明的是,OP融合在单卡下就有效果,并不是分布式特有的策略。对分布式训练来讲,如何在计算和通信并重的情况下获得更优秀的性能,是我们关注的重点。

接下来的几个小节会结合一个生动的例子来阐述各种优化策略的思想。我们的主人公是Alice和Bob两位小朋友,他们要在各自的房间里做一沓试卷,每张试卷上有若干题目,覆盖不同的知识点。他们的目标是做完所有的试卷,并学到相应的知识。特别的,他们可以通过交换各自学到的内容来修正或巩固自己的知识。Alice和Bob一开始选定的做法是:每当他们之间有人做完一道题,就拨电话给对方,等对方也做完这道题并接起电话后,同步各自的答案,然后同时开始做下一道题。

通信OP融合

Alice和Bob所在的国家电话号码很长,所以他们发现每做完一道题就互相通话,拨电话号码的耗时有些难以接受。他们想,如果商定好做完多道题目,再通话一次进行交流,能省去很多拨电话号码造成的时间开销。

这就是通信OP融合的思想。我们知道每次触发通信都会有一些额外的操作(如先建立连接等),减少这些额外的操作将对性能有很大帮助[1]。顺着这个思路,如果我们能够将多次通信的内容先拼接成连续的数据,然后在一次通信内全部发送/接收,那么将会更充分的利用硬件资源,获得更大的性能提升。

通信OP融合的使用方法请参考4.1.2小节

计算和通信重叠

按照之前的约定,做题快的人(比如Alice)拨通电话后,要等待Bob完成对应的题目之后接起电话才能开始这次通信。在等待Bob接听电话的时候,Alice只是闲坐在那里听着听筒里的彩铃音乐。她突然想到,为什么要听这种无聊的声音,而不开始提前做下面的题目呢?

这就是通信和计算重叠的思想。CUDA中有流(stream[2])的概念,一个流表示一个GPU操作队列,该队列中的操作将以添加到流中的先后顺序而依次执行。那么通过令计算和通信操作加入不同的流中,可以做到二者的执行在时间上重叠。详细内容请参考4.2小节

通信拓扑优化

现在做题的团队壮大了,除了Alice和Bob,又加入了几位新同学。他们的目标变成要让每个人算出来的答案,都被所有其他人知道。最简单的做法,自然是所有人之间通一次电话。但是这样做时间开销太大了。聪明的他们选择了另一种做法,把所有人分成几组,每个组选出一名组长,组员把答案汇总给组长。组长间先互相交换所有的信息,然后再分发给所有组员。

不同的信息交换策略,对应到分布式训练中,就是不同的通信拓扑。上述采用的通信策略借鉴了分层(hierarchical)通信的思想。在业界,有ring-allreduce[3],Double binary trees[4]等多种拓扑结构。

通信拓扑优化的更多使用方法,请参考4.3小节

深度梯度压缩

再次回到仅有Alice和Bob两人做题学习的场景来。他们在做题过程中发现,随着学习的进行,对于不同知识点的掌握程度有好有坏。有的知识点已经掌握的很好了,再做题也提供不了太多新的知识。但另外一些,却仍然感到模棱两可。于是两人约定,每做完T张试卷,选出最拿不准的几个知识点来交流答案,而掌握充分的那些知识点,就不在电话中交流了。

上述思路就是深度梯度压缩(Deep Gradient Compression, DGC)的主要思想。DGC通过将梯度稀疏化,在每轮训练时只选择出一部分比较“重要”的梯度进行同步,以达到降低通信量的目的。当然,减少通信量势必会造成精度损失。为了减少损失程度,作者还提出了动量修正(momentum correction)、本地梯度裁剪(local gradient cliping)、动量因子遮蔽(Momentum factor masking) 等几项技巧。详细内容可以参考4.4.1小节

Local SGD

Alice和Bob觉得没必要每道题都打电话交流答案,就算使用了前述通信OP融合的技术,也只是减少了打电话的频率,但还是每一道题都要对答案。

于是两人又想到了一个能够减少打电话次数的策略:他们决定各自先做T张试卷,自行学习梳理各个知识点的知识,然后再通电话交流各个知识点的心得。当按照这个方法执行的时候两人发现,尽管花费在打电话上的时间确实减少了,但副作用是他们各自学到的知识可能不一定准确,交流次数的减少让他们没法及时纠正自己某些错误的理解。因此他们又想到了另一个更好的沟通策略:刚开始学习的时候交流频繁一点,当对各个知识点有了大致的了解后,再慢慢降低通话的频率。毕竟具备了基础知识后,只有在题海中遇到新题才能带来新的认识,刷再多重复的题目是没什么意义的。

Local SGD就是基于这个思路,最基本的Local SGD属于上例的第一个策略,直接增大参数同步的间隔来减少通信耗时,但是弊端是可能造成训练精度的损失。而对于第二种策略,Local SGD又衍生出了post Local SGD和Adaptive Local SGD两款“加强版”:

  • post Local SGD训练的第一个阶段保持每算出一个参数的梯度,就完成一次同步通信,以保证训练精度;之后到了第二阶段,则增大同步间隔(该间隔是固定的),以提升训练效率。

  • Adaptive Local SGD相对于post Local SGD而言,更加灵活,它会动态调整梯度同步通信的间隔,从而达到训练精度和训练速度之间的平衡。

详细内容可以参考4.4.2小节

自动混合精度

Alice和Bob有一个特殊记忆能力,就是可以把想表述的内容,提炼成少量文字(原先的字数的一半),这样可以减少记忆的内容,但是同时也会导致准确性稍稍出现偏差。

随着知识点和题目越来越多,Alice和Bob觉得脑子发沉,可能脑容量已经快用完了,而打电话交流的时间也越来越长。于是他们决定用上那种记忆能力,这样就释放了大脑中更多的空间,而且打电话交流的内容也随之减半。

在实际应用中,对应这种记忆能力的就是半精度(FP16)类型,使用半精度类型进行训练,称之为混合精度训练。混合精度训练有若干好处,例如减小显存使用量,增大通信吞吐等。当然精度的降低会导致数字表示范围的缩小,进而导致比FP32更容易溢出,为了应对这些问题,我们引入了Dynamic loss scaling和op黑白名单等策略来避免。

  • Dynamic loss scaling:在AMP训练过程中,为了避免精度下溢,每训练一定数量批次的数据,就将Loss放大指定倍数。如果Loss在放大过程中发生上溢,则可以再缩小一定倍数,确保整个训练过程中,梯度可以正常收敛。

  • op黑白名单:通过使用大量模型在不同应用场景中反复验证后,飞桨团队根据半精度数据类型计算的稳定性和加速效果,梳理出一系列适合转换为半精度计算的算子,并将这些算子定义到了一份白名单文件中。同时对于一些经过验证发现不适合转换的算子,也就是使用半精度计算会导致数值不精确的算子将被记录到黑名单文件中。此外一些对半精度计算没有多少影响的算子归类于灰名单。在使用自动混合精度训练过程中,系统会自动读取黑白名单,从而感知到哪些算子需要被转换为半精度计算。

详细内容请参考4.5小节

性能优化

OP融合(计算,通信)

计算融合

将模型网络中顺序执行的多个OPs进行融合能够减少OP 调度的开销,提升训练速度。目前Fleet 中支持如下3种的OP 融合:

  • fuse_all_optimizer_ops:表明是否融合(fuse) 是否融合 optimizer_op,仅对部分 optimizer 可用(SGD、Adam和Momentum)。

  • fuse_elewise_add_act_ops:表明是否融合(fuse) elementwise_add_op和activation_op。

  • fuse_bn_act_ops:表明是否融合(fuse) batch_norm_op 和 activation_op。

通常使用这些策略都会使整体执行过程更快。

通信融合

AllReduce 融合默认情况下会将同一layer中参数的梯度的多个AllReduce操作合并成一个。 比如对于 fc 中有Weight和Bias两个参数,打开该选项之前,需要两次AllReduce操作;打开该选项之后,只用一次AllReduce 操作。这样可以减少梯度同步时的通信耗时。

此外,为支持更大粒度的参数梯度融合,Fleet 提供了以下两个选项,用户可以在训练程序运行前在DistributedStrategy中设置:

  • fuse_grad_size_in_MB: 指定每个AllReduce操作的梯度字节数,如该参数等于16 则每次AllReduce调用传输16MB的梯度。 该参数的经验值为总通信量的十分之一。

  • fuse_grad_size_in_TFLOPS: 指定每次AllReduce操作的最大层数,即到达该层数就进行AllReduce。如该参数等于50, 则最多每50层做一次 fused AllReduce。

注意: AllReduce融合目前不支持sparse参数梯度。

操作实践
# 计算融合
build_strategy = paddle.static.BuildStrategy()
build_strategy.fuse_elewise_add_act_ops = True
build_strategy.fuse_bn_act_ops = True
build_strategy.fuse_relu_depthwise_conv = True
build_strategy.fuse_broadcast_ops = True
build_strategy.fuse_all_optimizer_ops = True

strategy = paddle.distributed.fleet.DistributedStrategy()
strategy.build_strategy = build_strategy

# 通信融合
strategy.fuse_grad_size_in_MB = 16
strategy._fuse_grad_size_in_TFLOPS = 50
strategy.fuse_all_reduce_ops=True

上述例子存放在:example/resnet/train_fleet_static_op_fusion.py。 假设要运行2卡的任务,那么只需在命令行中执行:

fleetrun --gpus=0,1 train_fleet_static_op_fusion.py

您将看到显示如下日志信息:

-----------  Configuration Arguments -----------
gpus: None
heter_worker_num: None
heter_workers:
http_port: None
ips: 127.0.0.1
log_dir: log
...
------------------------------------------------
WARNING 2021-01-19 14:53:04,943 launch.py:316] Not found distinct arguments and compiled with cuda. Default use collective mode
launch train in GPU mode
INFO 2021-01-19 14:53:04,945 launch_utils.py:472] Local start 8 processes. First process distributed environment info (Only For Debug):
    +=======================================================================================+
    |                        Distributed Envs                      Value                    |
    +---------------------------------------------------------------------------------------+
    |                 PADDLE_CURRENT_ENDPOINT                 127.0.0.1:28355               |
    |                     PADDLE_TRAINERS_NUM                        8                      |
    |                PADDLE_TRAINER_ENDPOINTS  ... 0.1:33653,127.0.0.1:27766,127.0.0.1:16631|
    |                     FLAGS_selected_gpus                        0                      |
    |                       PADDLE_TRAINER_ID                        0                      |
    +=======================================================================================+
...
W0119 14:53:16.871562 68031 device_context.cc:362] Please NOTE: device: 0, GPU Compute Capability: 7.0, Driver API Version: 10.2, Runtime API Version: 9.2
W0119 14:53:16.875859 68031 device_context.cc:372] device: 0, cuDNN Version: 7.4.
W0119 14:53:25.973377 68031 build_strategy.cc:116] Currently, fuse_broadcast_ops only works under Reduce mode.
I0119 14:53:27.382609 68031 graph_pattern_detector.cc:101] ---  detected 16 subgraphs
I0119 14:53:27.390769 68031 graph_pattern_detector.cc:101] ---  detected 16 subgraphs
W0119 14:53:27.407582 68031 fuse_optimizer_op_pass.cc:207] Find momentum operators : 161, and 161 for dense gradients. To make the speed faster, those optimization are fused during training.
W0119 14:53:27.436177 68031 fuse_all_reduce_op_pass.cc:79] Find all_reduce operators: 161. To make the speed faster, some all_reduce ops are fused during training, after fusion, the number of all_reduce ops is 6.
[Epoch 0, batch 0] loss: 0.15131, acc1: 0.00000, acc5: 0.03125
[Epoch 0, batch 5] loss: 1.15416, acc1: 0.00000, acc5: 0.03125

通信重叠

简介

Paddle的通信进行重叠(overlap),可以有效提升通信效率。

原理介绍

Paddle的整体框架目前只有一个计算流,但可以有多个通信流。在通信为瓶颈的低配网络中,通过 重叠通信流,可以有效利用通信带宽,从而达到更优的通信性能。多流相关的概念请参考: cuda-streams-best-practices

使用方法

Fleet已经实现通信流overlap,只需设置通信器数量 nccl_comm_num 可以加快GPU之间的通信效率,建议单机设置为1,多机设置为2。

strategy = fleet.DistributedStrategy()
strategy.nccl_comm_num = 2
strategy.sync_nccl_allreduce=False

上述例子存放在:example/resnet/train_fleet_static_overlap.py下面, 假设要运行2卡的任务,那么只需在命令行中执行:

fleetrun --gpus=0,1 train_fleet_static_overlap.py

您将看到显示如下日志信息:

-----------  Configuration Arguments -----------
gpus: 0,1
heter_worker_num: None
heter_workers:
http_port: None
ips: 127.0.0.1
log_dir: log
...
------------------------------------------------
...
    +=======================================================================================+
    |                        Distributed Envs                      Value                    |
    +---------------------------------------------------------------------------------------+
    |                 PADDLE_CURRENT_ENDPOINT                 127.0.0.1:10097               |
    |                     PADDLE_TRAINERS_NUM                        2                      |
    |                PADDLE_TRAINER_ENDPOINTS         127.0.0.1:10097,127.0.0.1:59371       |
    |                     FLAGS_selected_gpus                        0                      |
    |                       PADDLE_TRAINER_ID                        0                      |
    +=======================================================================================+
...
W0118 21:44:34.542804 70071 device_context.cc:362] Please NOTE: device: 0, GPU Compute Capability: 7.0, Driver API Version: 10.2, Runtime API Version: 9.2
W0118 21:44:34.547377 70071 device_context.cc:372] device: 0, cuDNN Version: 7.4.
W0118 21:44:40.178053 70071 fuse_all_reduce_op_pass.cc:79] Find all_reduce operators: 161. To make the speed faster, some all_reduce ops are fused during training, after fusion, the number of all_reduce ops is 5.
[Epoch 0, batch 0] loss: 0.14466, acc1: 0.00000, acc5: 0.03125
[Epoch 0, batch 5] loss: 4.00225, acc1: 0.00000, acc5: 0.03125
...

通信拓扑优化

原理
  • TBA

操作实践

Fleet 实现了底层通过改变通信拓扑,实现分层 allreduce。用户只需要指定相应的DistributedStrategy() 的开关,就可以选择不同的通信拓扑。

dist_strategy = fleet.DistributedStrategy()
dist_strategy.use_hierarchical_allreduce = True
dist_strategy.hierarchical_allreduce_inter_nranks = 8

上述例子存放在:example/resnet/train_fleet_static_communication_topology.py。 假设要运行8卡的任务,那么只需在命令行中执行:

fleetrun --gpus=0,1,2,3,4,5,6,7 train_fleet_static_communication_topology.py

您将看到显示如下日志信息:

-----------  Configuration Arguments -----------
gpus: None
heter_worker_num: None
heter_workers:
http_port: None
ips: 127.0.0.1
log_dir: log
...
------------------------------------------------
...
INFO 2021-01-19 14:58:43,720 launch_utils.py:472] Local start 8 processes. First process distributed environment info (Only For Debug):
    +=======================================================================================+
    |                        Distributed Envs                      Value                    |
    +---------------------------------------------------------------------------------------+
    |                 PADDLE_CURRENT_ENDPOINT                 127.0.0.1:53762               |
    |                     PADDLE_TRAINERS_NUM                        8                      |
    |                PADDLE_TRAINER_ENDPOINTS  ... 0.1:58938,127.0.0.1:54203,127.0.0.1:44221|
    |                     FLAGS_selected_gpus                        0                      |
    |                       PADDLE_TRAINER_ID                        0                      |
    +=======================================================================================+
...
W0119 14:58:52.487838 95116 device_context.cc:362] Please NOTE: device: 0, GPU Compute Capability: 7.0, Driver API Version: 10.2, Runtime API Version: 9.2
W0119 14:58:52.493592 95116 device_context.cc:372] device: 0, cuDNN Version: 7.4.
W0119 14:59:01.665702 95116 fuse_all_reduce_op_pass.cc:79] Find all_reduce operators: 161. To make the speed faster, some all_reduce ops are fused during training, after fusion, the number of all_reduce ops is 5.
[Epoch 0, batch 0] loss: 0.13468, acc1: 0.00000, acc5: 0.06250
[Epoch 0, batch 5] loss: 0.18902, acc1: 0.03125, acc5: 0.03125

通信频率优化

在网络带宽较低的训练场景(如: 公有云上训练,联邦训练)中,梯度同步在低带宽网络下的延迟成为训练速度的主要瓶颈。 Fleet 作为Paddle通用的分布式训练API 实现了: Deep Gradient CompressionLocal SGD 两种训练策略来针对性解决这一问题。

DGC 优化低配网络的分布式GPU训练
DGC 简介

大规模分布式训练需要较高的网络带宽以便进行梯度的聚合更新,这限制了多节点训练的扩展性,同时也需要昂贵的高带宽设备。在低带宽的网络环境下进行分布式训练时,梯度同步成为训练加速的瓶颈。 Deep Gradient Compression 发现:分布式SGD中有99.9%的梯度交换都是冗余的,可以使用深度梯度压缩选择重要梯度进行通信来减少通信量,降低对通信带宽的依赖。Fleet 实现了DGC的稀疏通信方式,可有效在低配网络下进行GPU分布式训练。Fleet 实现了 DGC 论文中的 预热训练 (warming up training), 动量修正 (Momentum Correction), 局部梯度修剪 (local gradient clipping), 动量因子掩藏 (Momentum factor masking) 等策略, 和 正则化项修正 (Weight Decay Correction) 避免稀疏梯度通信训练带来的最终模型精度损失。

下面将介绍 DGC 稀疏通信方式的适用场景、试验效果、基本原理和使用方法。

适用场景

DGC稀疏通信在低带宽通信瓶颈时会有较大的性能提升,但在单机多卡及RDMA网络通信并非瓶颈情况下,并不会带来性能上的提升。同时由于AllGather的通信量会随卡数的增多而增大,所以DGC的多机训练规模也不宜过大。故DGC适用于低配网络,同时节点规模不宜过大,如>128张卡。在云网络或高带宽网络设备昂贵时,DGC可有效降低训练成本。

试验效果
  • 模型:FasterRCNN

  • 硬件: P40两机分布式,每台机器一卡,TCP网络测试。

  • 取300-700步耗时/400step。

  • 精度无损。

DGC 原理简介

这里将简单介绍介绍Fleet DGC 中的一些原理和对应参数应该如何设置。

梯度稀疏

DGC的基本思路是通过只传送重要梯度,即只发送大于给定阈值的梯度来减少通信带宽的使用。为避免信息的丢失,DGC会将剩余梯度在局部累加起来,最终这些梯度会累加大到足以传输。 换个角度,从理论依据上来看,局部梯度累加等同于随时间推移增加batch size,(DGC相当于每一个梯度有自己的batch size)。

假设 N是训练节点个数, b为单卡batch size,局部梯度累加可以被认为batch size从\(Nb\)增大为\(NbT\),其中T是两次更新的稀疏通信间隔。详细的公式推导请参阅 [1] 预热调参 ^^^^^^^^

对于正常的训练,使用DGC一般需进行预热训练,否则可能会有精度损失。由于paddle稀疏梯度聚合通信使用了AllGather,通信量会随卡数增加而增长,所以在卡数较多时不推荐较低稀疏度的预热训练。参数设置如下:

# 1. 以1252个step为一个epoch,前2个epochs使用正常dense通信,后3个epochs逐步提升稀疏度为99.9%
strategy.dgc_configs = {
    "rampup_begin_step": 1252*2,
    "rampup_step": 1252*3,
    "sparsity": [0.984375, 0.996, 0.999]
}
# 2. 前面4个epochs都使用dense通信,之后默认0.999稀疏度运行
strategy.dgc_configs = {
    "rampup_begin_step": 1252*4,
    "rampup_step": 1,
    "sparsity": [0.999]
}

对于Fine-tuning训练,可无需预热训练,从第0个epoch直接使用DGC即可。

# 从第0步开始DGC稀疏通信
strategy.dgc_configs = {
    "rampup_begin_step": 0,
    "rampup_step": 1,
    "sparsity": [0.999]
}
局部梯度累加改进

正常情况,稀疏更新会严重影响收敛性。DGC中采用动量修正(Momentum Correction)和局部梯度裁减(Local Gradient Clipping), 动量因子掩藏, 正则化项修正 4个策略来解决这个问题。

动量修正

上文”局部梯度累加等同于随时间推移增加batch size“的推导没有考虑 Momentum存在的情况。当稀疏度很高时,使用原始Momentum 公式会显著降低模型性能,所以需要在原始公式的基础上对梯度进行修正。

动量修正使用部累加速度项\(u_t\)而非累加真实的梯度\(\nabla_{k, t}\)来修改Momentum 方程,修正后的动量更新公式如下:

\[u_{k, t}=m u_{k, t-1}+\nabla_{k, t}, \quad v_{k, t}=v_{k, t-1}+u_{k, t}, \quad w_{t+1}=w_{t}-\eta \sum_{k=1}^{N} \operatorname{sparse}\left(v_{k, t}\right)\]
局部梯度修剪

梯度修剪是防止梯度爆炸的常用方法。这方法由Pascanu等人在2013年提出,当梯度的l2-norms和大于给定阈值时,就对梯度rescale。正常梯度修剪在梯度聚合后使用,而DGC因为每个节点独立的进行局部梯度累加,所以DGC在使用\(G_t\)累加前对其进行局部梯度修剪。阈值缩放为原来的\(N^{-1/2}\)

动量因子掩藏

因为推迟了较小梯度更新权重的时间,所以会有权重陈旧性问题。稀疏度为99.9%时大部分参数需600到1000步更新一次。迟滞效应会减缓收敛并降低模型精度。DGC中使用下面方程来掩藏动量因子减缓陈旧性问题。

\[Mask \leftarrow\left|v_{k, t}\right|>t h r, \quad v_{k, t} \leftarrow v_{k, t} \odot \neg Mask, \quad u_{k, t} \leftarrow u_{k, t} \odot \neg Mask\]

此掩码可以停止延迟梯度产生的动量,防止陈旧梯度把权重引入错误的方向。

正则化(Weight Decay)项修正

类似动量修正,DGC 中我们同样需要对正则化项进行修正来让参数的延迟更新方向更加准确。

和动量修思路相同,修正需要在局部梯度上添加局部Weight Decay。

\[\nabla_{k, t}=\nabla_{k, t}+\frac{\lambda}{N} w_{t}\]

上述策略已经在Fleet 框架中实现,用户无须设置。

DGC 快速开始

下文以单机八卡上训练ResNet50 为例子简单介绍 Fleet 中 DGC 的使用。 因为 8张 GPU 的通信都在同一节点内, 一般情况下梯度通信并不会成为训练的瓶颈, 这里只是以其为例子,介绍Fleet 中 DGC 参数的设置。

注意

  • 硬件环境要求: DGC目前只支持GPU多卡及分布式collective训练,需要有相应的cuda、cuDNN、nccl环境。

  • Paddle环境要求: DGC只支持GPU,所以需GPU版本的Paddle。

DGC 相关策略

这里假设:1252个step为一个epoch,前2个epochs使用正常dense通信,后3个epochs逐步提升稀疏度为99.9%

  • rampup_begin_step (int):DGC(含预热训练)开始的 step

  • rampup_step (int):DGC中预热训练持续的 step. 如果sparsity 是 [0.75, 0.9375, 0.984375, 0.996, 0.999],rampup_step 设成 100时, 在 0~19 steps 时 sparsity=0.75,在 20~39 steps 时 sparsity=0.9375, 以此类推。

  • sparsity (list[float]):稀疏度 threshold, (1 - current sparsity) % 的gradient 将会被 allreduce。

strategy = fleet.DistributedStrategy()

strategy.dgc = True
strategy.dgc_configs = {
    "rampup_begin_step": 1252*2,
    "rampup_step": 1252*3,
    "sparsity": [0.984375, 0.996, 0.999]
}

基于ResNet50网络的DGC代码:example/resnet/train_fleet_static_dgc.py

使用Local SGD 优化低带宽下分布式训练
简介

在使用 distributed SGD进行数据并行的分布式训练时,常会遇到以下两个问题:

  • 分布式训练的吞吐会受到集群中随机慢节点(straggling node)和通信延迟的影响。

  • 数据并行分布式增大了训练实际的batch size,过大的batch size 会影响最终的训练精度。

Local SGD通过延长节点间同步的间隔(局部异步训练)来减轻慢节点的影响和减少通信频率,以此提升训练的吞吐。

原理介绍

Local SGD减轻慢节点的影响和减少通信频率,提升训练的吞吐。为了减小相对于本地训练(小batch size)的精度损失,[1][2] 分别提出了:post-Local SGD自适应步长 (Adaptive Communication) Local SGD 策略,来减少参数同步频率降低带来的精度损失。同步SGD和Local SGD在通信同步上的差异如下图所示。

Synchronous SGD 和 Local SGD

在Local SGD 训练中,集群中的每个 trainer 各自会独立的进行 H 个连续的 SGD 更新,然后集群中的所有 trainer 会进行通信,同步(averaging)所有 trainers 上的参数。一个双 trainers,同步间隙为3 步长(iterations)的Local SGD过程如下图所示。黄绿两条路径表示两个 trainers 各自的 Local SGD 更新过程,中间的蓝色路径表示同步后的模型所在的位置。

Local SGD

Local SGD中的一个关键问题是如何确定参数同步的间隔(频率),以达到训练吞吐和训练精度间更好的平衡:

  • 增大参数同步的间隔可以减少 trainers 间通信延迟的影响提高训练吞吐,

  • 但增大同步间隔可能会造成最终训练精度的损失。 [1]

以下两个策略从不同角度试图达到更好的平衡:

  • post Local SGD 将训练过程分成两个阶段:第一阶段 trainers 间同步的间隔为 1 个步长,即同步SGD,来保证最终训练精度;在第二阶段增大同步间隔到固定常数 H,来提升训练吞吐。

  • Adaptive Communication Local SGD 通过动态的调整参数同步的间隔来尝试达到训练吞吐和精度间的更好的平衡。在训练初始或者上一段参数同步完成后,根据如下公式计算一下次参数同步的间隔(iteration)。详细的公式推导和参数定义请参考原论文。

Fleet 中实现了 post Local SGDAdaptive Communication Local SGD 两种策略。

功能效果

实验设置

model

dataset

local batch size

cluster

dtype

warming up

learning rate decay

resnet50

Imagenet

128

4 x 8 x V100

FP32

30

polynomial

实验结果

local step

qps

acc1

acc5

1

8270.91

0.7579

0.9266

2

8715.67

0.7533

0.9265

4

8762.66

0.7551

0.9260

8

9184.62

0.7511

0.9239

16

9431.46

0.7429

0.9206

ADACOMM

8945.74

0.7555

0.9270

可以看到在 post Local SGD (固定同步间隔)情况下,更新间隔越长训练的吞吐越高,但是模型的最终精度也会损失越大。 当使用 ADAPTIVE COMMUNICATION 策略后,训练在吞吐和精度间达到了一个更好的平衡。

使用方法

下文将以单机8卡训练 ResNet50 为例子,简单介绍Local SGD 的用法。需要注意的是 单机八卡的通信都在同一机器节点内, 一般情况下参数同步不会成为训练的瓶颈,这里只是以其为例子,介绍Fleet 中 Local SGD 参数的设置。

定义Local SGD 相关策略

用户首先需要定义paddle SGD 对象,并在SGD对象中设置学习率参数。目前local SGD和自适应步长 local SGD都仅支持SGD和Momentum两种优化器。

  • post Local SGD 中,有两个参数 begin_stepk_steps,局部更新和参数同步都由框架自动完成。begin_step 指定从第几个step之后进行local SGD算法,取值为大于0的整数;k_step 指定训练过程中的全局参数更新间隔,取值为大于0的整数。

strategy = fleet.DistributedStrategy()
strategy.localsgd = True
strategy.localsgd_configs = {
    "k_steps": 1,
    "begin_step": 1,
}
  • 自适应步长 local SGD 中,有两个参数 begin_stepinit_k_steps。begin_step 指定从第几个step之后进行自适应local SGD算法,取值为大于0的整数;用户需要设置init_k_steps作为第一次参数同步的间隔, 之后的同步间隔将由上文中的公式动态确定,在学习率较大时,参数变化大,减小step, 多进行通信从而保证快速收敛;在学习率较小时,参数变化小, 增大step,减少通信次数,从而提升训练速度。 需要注意的是在自适应步长策略中,系统会默认限制最大的同步间隔为 16 step,当公式计算出的间隔大于16 时,按16 steps 进行参数同步。

strategy = fleet.DistributedStrategy()
strategy.adaptive_localsgd = True
strategy.adaptive_localsgd_configs = {
    "init_k_steps": 1,
    "begin_step": 1,
}

上述例子存放在:example/resnet/train_fleet_static_localsgd.py下面, 假设要运行2卡的任务,那么只需在命令行中执行:

fleetrun --gpus=0,1 train_fleet_static_overlap.py

您将看到显示如下日志信息:

-----------  Configuration Arguments -----------
gpus: 0,1
heter_worker_num: None
heter_workers:
http_port: None
ips: 127.0.0.1
log_dir: log
...
------------------------------------------------
...
INFO 2021-01-18 22:01:11,969 launch_utils.py:472] Local start 2 processes. First process distributed environment info (Only For Debug):
   +=======================================================================================+
   |                        Distributed Envs                      Value                    |
   +---------------------------------------------------------------------------------------+
   |                 PADDLE_CURRENT_ENDPOINT                 127.0.0.1:10913               |
   |                     PADDLE_TRAINERS_NUM                        2                      |
   |                PADDLE_TRAINER_ENDPOINTS         127.0.0.1:10913,127.0.0.1:14758       |
   |                     FLAGS_selected_gpus                        0                      |
   |                       PADDLE_TRAINER_ID                        0                      |
   +=======================================================================================+
...
W0118 22:01:20.860090 45921 device_context.cc:362] Please NOTE: device: 0, GPU Compute Capability: 7.0, Driver API Version: 10.2, Runtime API Version: 9.2
W0118 22:01:20.864220 45921 device_context.cc:372] device: 0, cuDNN Version: 7.4.
W0118 22:01:25.578325 45921 gen_nccl_id_op_helper.cc:115] connect addr=127.0.0.1:14758 failed 1 times with reason: Connection refused retry after 0.5 seconds
[Epoch 0, batch 0] loss: 0.14602, acc1: 0.00000, acc5: 0.03125
[Epoch 0, batch 5] loss: 0.16445, acc1: 0.00000, acc5: 0.06250

自动混合精度

简介

在使用数据并行分布式训练的同时, 我们还可以引入自动混合精度(Auto Mixed Precision) 来进一步提升训练的速度.

主流的神经网络模型通常使用单精度 single-precision (FP32) 数据格式来存储模型参数、进行训练和预测. 在上述环节中使用半精度 half-precision (FP16)来代替单精度. 可以带来以下好处:

  1. 减少对GPU memory 的需求: GPU 显存不变情况下, 支持更大模型 / batch size

  2. 降低显存读写时的带宽压力

  3. 加速GPU 数学运算速度 (需要GPU 支持[1])

  4. GPU上 FP16 吞吐是FP32 的 2 - 8 倍[2]

Paddle 支持自动混合精度计算, 并实现了 自动维护FP32 、FP16参数副本, Dynamic loss scaling, op黑白名单 等策略来避免 因 FP16 动态范围较小而带来的模型最终精度损失。 Fleet 作为Paddle通用的分布式训练API提供了简单易用的接口, 用户只需要添加几行代码 就可将自动混合精度应用到原有的分布式训练中进一步提升训练速度.

原理
  • TBA

操作实践

Fleet 将AMP 实现为 meta optimizer, 用户需要指定其的 inner-optimizer. Fleet AMP支持所有 paddle optimziers 和 FLeet meta otpimizers 作为其 inner-optimizer。只需要在reset网络基础上打开相应的开关和配置相应的选项。

strategy = fleet.DistributedStrategy()
strategy.amp = True
strategy.amp_configs = {
    "init_loss_scaling": 32768,
    "decr_every_n_nan_or_inf": 2,
    "incr_every_n_steps": 1000,
    "incr_ratio": 2.0,
    "use_dynamic_loss_scaling": True,
    "decr_ratio": 0.5,
    "custom_white_list": [],
    "custom_black_list": [],
}

上述例子存放在:example/resnet/train_fleet_static_amp.py。 假设要运行8卡的任务,那么只需在命令行中执行:

fleetrun --gpus=0,1,2,3,4,5,6,7 train_fleet_static_amp.py

您将看到显示如下日志信息:

-----------  Configuration Arguments -----------
gpus: None
heter_worker_num: None
heter_workers:
http_port: None
ips: 127.0.0.1
log_dir: log
...
------------------------------------------------
...
INFO 2021-01-19 14:46:03,186 launch_utils.py:472] Local start 8 processes. First process distributed environment info (Only For Debug):
   +=======================================================================================+
   |                        Distributed Envs                      Value                    |
   +---------------------------------------------------------------------------------------+
   |                 PADDLE_CURRENT_ENDPOINT                 127.0.0.1:54114               |
   |                     PADDLE_TRAINERS_NUM                        2                      |
   |                PADDLE_TRAINER_ENDPOINTS  ... 0.1:24697,127.0.0.1:53564,127.0.0.1:37181|
   |                     FLAGS_selected_gpus                        0                      |
   |                       PADDLE_TRAINER_ID                        0                      |
   +=======================================================================================+
W0119 14:46:16.315114 84038 device_context.cc:362] Please NOTE: device: 0, GPU Compute Capability: 7.0, Driver API Version: 10.2, Runtime API Version: 9.2
W0119 14:46:16.320163 84038 device_context.cc:372] device: 0, cuDNN Version: 7.4.
W0119 14:46:25.249166 84038 fuse_all_reduce_op_pass.cc:79] Find all_reduce operators: 161. To make the speed faster, some all_reduce ops are fused during training, after fusion, the number of all_reduce ops is 8.
[Epoch 0, batch 0] loss: 0.19354, acc1: 0.00000, acc5: 0.00000
[Epoch 0, batch 5] loss: 0.20044, acc1: 0.00000, acc5: 0.00000

NV Dali Reader

  • TBA

其他(调节资源的配比、增大bs等)

原理

PaddlePaddle 使用“线程池”模型调度并执行Op,Op在启动GPU计算之前, 通常需要CPU的协助,然而如果Op本身占用时间很小,“线程池”模型下又会带来额外的调度开销。 根据以往的经验,对于CPU任务,num_threads=2 * dev_count 时性能较好, 对于GPU任务,num_threads=4 * dev_count 时性能较好。注意:线程池不是越大越好。

操作实践

用户只需要指定相应的DistributedStrategy()的开关,就可以设置线程数量。

strategy = fleet.DistributedStrategy()

exe_strategy = paddle.static.ExecutionStrategy()
exe_strategy.num_threads = 3
strategy.execution_strategy = exe_strategy

上述例子存放在:example/resnet/train_fleet_static_others.py。 假设要运行8卡的任务,那么只需在命令行中执行:

fleetrun --gpus=0,1,2,3,4,5,6,7 train_fleet_static_others.py

您将看到显示如下日志信息:

-----------  Configuration Arguments -----------
gpus: None
heter_worker_num: None
heter_workers:
http_port: None
ips: 127.0.0.1
log_dir: log
...
------------------------------------------------
...
INFO 2021-01-19 14:50:52,903 launch_utils.py:472] Local start 8 processes. First process distributed environment info (Only For Debug):
    +=======================================================================================+
    |                        Distributed Envs                      Value                    |
    +---------------------------------------------------------------------------------------+
    |                 PADDLE_CURRENT_ENDPOINT                 127.0.0.1:20485               |
    |                     PADDLE_TRAINERS_NUM                        8                      |
    |                PADDLE_TRAINER_ENDPOINTS  ... 0.1:23281,127.0.0.1:41983,127.0.0.1:17503|
    |                     FLAGS_selected_gpus                        0                      |
    |                       PADDLE_TRAINER_ID                        0                      |
    +=======================================================================================+
...
W0119 14:51:04.500844 77798 device_context.cc:362] Please NOTE: device: 0, GPU Compute Capability: 7.0, Driver API Version: 10.2, Runtime API Version: 9.2
W0119 14:51:04.506238 77798 device_context.cc:372] device: 0, cuDNN Version: 7.4.
W0119 14:51:12.378418 77798 fuse_all_reduce_op_pass.cc:79] Find all_reduce operators: 161. To make the speed faster, some all_reduce ops are fused during training, after fusion, the number of all_reduce ops is 5.
[Epoch 0, batch 0] loss: 0.11252, acc1: 0.03125, acc5: 0.06250
[Epoch 0, batch 5] loss: 0.11252, acc1: 0.03125, acc5: 0.06250
[Epoch 0, batch 10] loss: 0.11252, acc1: 0.03125, acc5: 0.06250
[Epoch 0, batch 15] loss: 0.11252, acc1: 0.03125, acc5: 0.06250

大模型训练优化

Forward Recomputation Backpropagation

简介

使用更大的模型和更大的batch size 可以带来更好的效果,但用户需要考虑是随之而来的显存瓶颈问题。Paddle Fleet 提供以下两种策略来解决大模型(大batch size)训练中可能遇到的显存瓶颈问题:

Forward Recomputation Backpropagation(FRB)

该策略通过清除正向计算过程中的中间计算结果,来降低训练过程中使用的存储空间,从而确保硬件有足够的内存做更大batch Size 的训练。

Recompute-Offload

基于Recompute 策略,将显存中checkpoint 卸载到Host 内存中,进一步节省显存空间支持更大batch Size的训练。

原理
Recomputation

我们知道,深度学习网络的一次训练迭代包含三个步骤:

  • 前向计算: 运行前向算子(Operator) 来计算中间隐层(Variable) 的值 。

  • 反向计算: 运行反向算子来计算参数(Parameter)的梯度。

  • 优化: 应用优化算法以更新参数值 。

在前向计算过程中,前向算子会计算出大量的中间结果,由于这些中间结果是训练数据和算子计算得到的,所以训练数据的batch bize 越大,中间结果占用的内存也就越大。飞桨核心框架会使用 Variable来存储这些隐层的中间结果。当模型层数加深时,其中间结果的数量可达数千甚至数万, 占据大量的内存。飞桨核心框架的显存回收机制会及时清除无用的中间结果以节省显存, 但是有些中间结果是反向计算过程中算子的输入,这些中间结果必须存储在内存中,直到相应的反向算子计算完毕。

对于大小固定的内存来说,如果用户希望使用大batch bize 的数据进行训练,则将导致单个中间结果占用内存增大,那么就需要减少中间结果的存储数量,FRB就是基于这种思想设计的。

FRB是将深度学习网络切分为k个部分(segments)。对每个segment 而言:前向计算时,除了小部分必须存储在内存中的Variable 外,其他中间结果都将被删除;在反向计算中,首先重新计算一遍前向算子,以获得中间结果,再运行反向算子。简而言之,FRB 和普通的网络迭代相比,多计算了一遍前向算子。 具体过程如下图所示:

forward_backward

我们把切分网络的变量叫做checkpoints。 那么问题来了,如何选择checkpoints 呢?自从FRB方法提出以来,大量学者在研究这一关键问题。 我们知道深度学习网络通常是由一个个模块串联得到的,比如ResNet-50由16个block串联而成, Bert-Large由24个Encoder layers 串联而成,以两个子模块中间的变量作为切分点就是一个很好的选择。 对于非串联的网络(比如含有大量shortcut结构的网络),FRB也支持对其做切分, 只是可能多耗费一点内存(用于存储shortcut的Variable)。

Recompute-Offload

在上面的Recomputation 步骤中,同样作为Forward 中间结果的checkpoints 会驻留显存,方便在Backward 中重计算。 然而在checkpoint 的生命周期中,仍有一段较长的未被使用的,从极致节省显存的角度去看, 这也是对显存的一种浪费。 Recompute-Offload 原理大致可以分为两步:

  • Forward: 当checkpoint在前向中被生成后,将其卸载(Offload)到Host 内存中,让其所占据的显存可以被释放。

  • Backward:当checkpoint在反向中被重新调用之前,将其预取(Pre-fetch) 回显存中,完成之后的重计算。

注意:

  • 因为checkpoint 在内存和显存间的拷贝较慢,该策略是通过进一步牺牲速度换取更大的batch size, 需要用户权衡训练吞吐和batch size 。

  • Recompute-Offload 支持多卡并行训练, 当多卡并行时开启Offload,训练中同一节点上所有GPU 上的checkpoints 都将卸载到Host 内存中,会存在以下风险:

    • PCIe 带宽瓶颈: 同一节点上的所有GPU 和Host 内存间共享一根PCIe 带宽,如同一节点上GPU 数量较多(单机八卡)容易因为PCIe 带宽限制让训练速度进一步减慢

    • Host 内存溢出: 当同一节点上GPU 数量较多,且每张GPU checkpoints size 较大时,需要注意卸载量是否超出Host 内存大小。

效果

我们在BERT-Large模型上对Recompute 的效果进行了测试,Recompute 可以让batch size 扩大 10倍, Offload 可以在Recompute 的基础上再扩大1.43 倍。 batch size = #seq * seq_max_len 硬件: 单卡 V100 32GB

策略

amp

amp + Recompute

amp + Recompute + offload

batch size

18 * 512

180 * 512

258 * 512

speed

23.94 sents/s

17.82 sents/s

15.47 sents/s

使用方法

为了使用Recompute策略,我们将dist_strategy.recompute设置为True 并设置我们事先定义好的checkpoints。 checkpoint 的选取可以参考论文 《Training Deep Nets with Sublinear Memory Cost》

示例中使用的ResNet50 模型的 checkpoint 不是固定的,不符合 Offload 的要求,固该功能暂无法开启。 当使用 Transformer 时,可以选取每一layer 的FC output 作为checkpoint, 这时各个layer 的checkpoints shapes 一致,可以使用Offload。

res2a.add.output.5.tmp_0 等是用户组网时定义的 variable name

checkpoint_idx = ["2a", "2b", "2c", "3a", "3b", "3c", "3d", "4a", "4b", "4c", "4d", "4e", "4f", "5a", "5b", "5c"]
checkpoints = ['res{}.add.output.5.tmp_0'.format(idx) for idx in checkpoint_idx]
strategy = fleet.DistributedStrategy()
strategy.recompute = True
strategy.amp = True
strategy.recompute_configs = {
    "checkpoints": checkpoints,
    "enable_offload": False,
    "checkpoint_shape": []
    }

上述例子的完整代码存放在:train_fleet_recompute.py下面。假设要运行2卡的任务,那么只需在命令行中执行:

fleetrun --gpus=0,1 train_fleet_recompute.py

您将看到显示如下日志信息:

-----------  Configuration Arguments -----------
gpus: 0,1
heter_worker_num: None
heter_workers:
http_port: None
ips: 127.0.0.1
log_dir: log
...
------------------------------------------------
...
    +=======================================================================================+
    |                        Distributed Envs                      Value                    |
    +---------------------------------------------------------------------------------------+
    |                 PADDLE_CURRENT_ENDPOINT                 127.0.0.1:17901               |
    |                     PADDLE_TRAINERS_NUM                        2                      |
    |                PADDLE_TRAINER_ENDPOINTS         127.0.0.1:17901,127.0.0.1:18846       |
    |                     FLAGS_selected_gpus                        0                      |
    |                       PADDLE_TRAINER_ID                        0                      |
    +=======================================================================================+
...
    +==============================================================================+
    |                                                                              |
    |                         DistributedStrategy Overview                         |
    |                                                                              |
    +==============================================================================+
    |                           amp=True <-> amp_configs                           |
    +------------------------------------------------------------------------------+
    |                     init_loss_scaling                 32768.0                |
    |                    incr_every_n_steps                   1000                 |
    |               decr_every_n_nan_or_inf                    2                   |
    |                            incr_ratio                   2.0                  |
    |                            decr_ratio            0.800000011920929           |
    |              use_dynamic_loss_scaling                   True                 |
    +==============================================================================+
    |                     recompute=True <-> recompute_configs                     |
    +------------------------------------------------------------------------------+
    |                           checkpoints         res2a.add.output.5.tmp_0       |
    |                                               res2b.add.output.5.tmp_0       |
    |                                               res2c.add.output.5.tmp_0       |
    |                                               res3a.add.output.5.tmp_0       |
    |                                               res3b.add.output.5.tmp_0       |
    |                                               res3c.add.output.5.tmp_0       |
    |                                               res3d.add.output.5.tmp_0       |
    |                                               res4a.add.output.5.tmp_0       |
    |                                               res4b.add.output.5.tmp_0       |
    |                                               res4c.add.output.5.tmp_0       |
    |                                               res4d.add.output.5.tmp_0       |
    |                                               res4e.add.output.5.tmp_0       |
    |                                               res4f.add.output.5.tmp_0       |
    |                                               res5a.add.output.5.tmp_0       |
    |                                               res5b.add.output.5.tmp_0       |
    |                                               res5c.add.output.5.tmp_0       |
    |                        enable_offload                  False                 |
    +==============================================================================+
...
W0104 17:59:19.018365 43338 device_context.cc:342] Please NOTE: device: 0, GPU Compute Capability: 7.0, Driver API Version: 10.2, Runtime API Version: 9.2
W0104 17:59:19.022523 43338 device_context.cc:352] device: 0, cuDNN Version: 7.4.
W0104 17:59:23.193490 43338 fuse_all_reduce_op_pass.cc:78] Find all_reduce operators: 161. To make the speed faster, some all_reduce ops are fused during training, after fusion, the number of all_reduce ops is 5.
[Epoch 0, batch 0] loss: 0.12432, acc1: 0.00000, acc5: 0.06250
[Epoch 0, batch 5] loss: 1.01921, acc1: 0.00000, acc5: 0.00000
...

完整2卡的日志信息也可在./log/目录下查看。了解更多fleetrun的用法可参考左侧文档fleetrun 启动分布式任务

Gradient Merge

简介

为了提升模型的性能,人们开始追求:更大规模的数据集、更深的网络层、更庞大的参数规模。但是随之而来的就是给模型训练带来了巨大的压力,因此分布式技术及定制化AI 芯片应运而生。但在分布式训练中,经常会遇到显存或者内存不足的情况,通常是以下几点原因导致的:

  • 输入的数据过大,例如视频类训练数据。

  • 深度模型的参数过多或过大,所需的存储空间超出了内存/显存的大小。

  • AI芯片的内存有限。

为了能正常完成训练,我们通常只能使用较小的batch size 以降低模型训练中的所需要的存储空间,这将导致很多模型无法通过提高训练时的batch size 来提高模型的精度。

Gradient Merge 策略的主要思想是将连续多个batch 数据训练得到的参数梯度合并做一次更新。 在该训练策略下,虽然从形式上看依然是小batch 规模的数据在训练,但是效果上可以达到多个小batch 数据合并成大batch 后训练的效果。

原理

Gradient Merge 只是在训练流程上做了一些微调,达到模拟出大batch size 训练效果的目的。具体来说,就是使用若干原有大小的batch 数据进行训练,即通过“前向+反向” 网络计算得到梯度。其间会有一部分显存/内存用于存放梯度,然后对每个batch计算出的梯度进行叠加,当累加的次数达到某个预设值后,使用累加的梯度对模型进行参数更新,从而达到使用大batch 数据训练的效果。

在较大的粒度上看, GM 是将训练一个step 的过程由原来的 “前向 + 反向 + 更新” 改变成 “(前向 + 反向 + 梯度累加)x k + 更新”, 通过在最终更新前进行 k 次梯度的累加模拟出 batch size 扩大 k 倍的效果。 更具体细节可以参考 《MG-WFBP: Efficient Data Communication for Distributed Synchronous SGD Algorithms》

使用方法

Gradient Merge 策略在使用方面也很简单,用户只需要定义将多少batch 的数据计算出的梯度叠加更新模型参数,便可以实现大batch 训练的目的。

训练代码的框架和其他fleet 训练代码基本一样,用户只需要在 fleet.DistributedStrategy 中配置Gradient Merge 相关参数即可。

假设我们定义了batch size 为 N;通过设置k_steps,使用4个batch size来模拟一个大batch的训练,从而达到了batch size 为 4*N 的训练效果。

gradient_merge_configs中,avg 选项用于控制梯度累计的形式:当被设置为 True 时,会对每次的梯度求和并做平均;反之将直接对梯度求和,并对参数进行更新。

strategy = fleet.DistributedStrategy()
# 使用Gradient merge策略并设置相关参数
strategy.gradient_merge = True
strategy.gradient_merge_configs = {"k_steps": 4, "avg": True}

上述例子的完整代码存放在:train_fleet_gradient_merge.py下面。假设要运行2卡的任务,那么只需在命令行中执行:

fleetrun --gpus=0,1 train_fleet_gradient_merge.py

您将看到显示如下日志信息:

-----------  Configuration Arguments -----------
gpus: 0,1
heter_worker_num: None
heter_workers:
http_port: None
ips: 127.0.0.1
log_dir: log
...
------------------------------------------------
...
    +=======================================================================================+
    |                        Distributed Envs                      Value                    |
    +---------------------------------------------------------------------------------------+
    |                 PADDLE_CURRENT_ENDPOINT                 127.0.0.1:17901               |
    |                     PADDLE_TRAINERS_NUM                        2                      |
    |                PADDLE_TRAINER_ENDPOINTS         127.0.0.1:17901,127.0.0.1:18846       |
    |                     FLAGS_selected_gpus                        0                      |
    |                       PADDLE_TRAINER_ID                        0                      |
    +=======================================================================================+
...
    +==============================================================================+
    |                                                                              |
    |                         DistributedStrategy Overview                         |
    |                                                                              |
    +==============================================================================+
    |                gradient_merge=True <-> gradient_merge_configs                |
    +------------------------------------------------------------------------------+
    |                               k_steps                    4                   |
    |                                   avg                   True                 |
    +==============================================================================+
...
W0104 17:59:19.018365 43338 device_context.cc:342] Please NOTE: device: 0, GPU Compute Capability: 7.0, Driver API Version: 10.2, Runtime API Version: 9.2
W0104 17:59:19.022523 43338 device_context.cc:352] device: 0, cuDNN Version: 7.4.
W0104 17:59:23.193490 43338 fuse_all_reduce_op_pass.cc:78] Find all_reduce operators: 161. To make the speed faster, some all_reduce ops are fused during training, after fusion, the number of all_reduce ops is 5.
[Epoch 0, batch 0] loss: 0.12432, acc1: 0.00000, acc5: 0.06250
[Epoch 0, batch 5] loss: 1.01921, acc1: 0.00000, acc5: 0.00000
...

完整2卡的日志信息也可在./log/目录下查看。了解更多fleetrun的用法可参考左侧文档fleetrun 启动分布式任务

使用LARS / LAMB 优化分布式超大batch 训练

简介

在数据并行分布式训练场景中, 常使用增加GPU数量的方式来加速训练。 为了保证GPU的算力得到充分利用, 每张GPU卡上的batch size都需要足够大。 因此在增加GPU 数量同时, 训练的全局batch size 也会变大。

但越大的全局batch size 会带来训练的收敛问题[1] [2]:

  • 模型最终精度损失

  • 收敛速度变慢, 需要更多的epoch 才能收敛

LARS[3] 和 LAMB[4] 两个优化策略常用来解决上述超大batch 训练中的收敛问题。

Paddle 实现了这两种优化策略,paddle.distributed.fleet 作为Paddle通用的分布式训练API提供了简单易用的接口, 用户只需要添加几行代码就可将策略加入到原有的训练中。 通过这两个优化策略, 我们在超大batch 场景中实现了更快的收敛速度和无损的精度, 结合Fleet 中其他的策略(e.g. AMP) 可以缩短整体训练收敛时间。

原理
LARS

LARS 公式如下:

\[local\_learning\_rate = learning\_rate * lars\_coeff * \frac{||param||}{||gradient|| + lars\_weight\_decay * ||param||}\]
\[\begin{split}velocity = mu * velocity + local\_learning\_rate * (gradient + lars\_weight\_decay * param + epsilon) \\\end{split}\]
\[\begin{split}param = param - velocity \\\end{split}\]

可以看到LARS 其实是在 带weight decaymomentum 优化器的基础上加入了local learning rate 的逻辑, 对每一层的learning rate 进行了放缩。

LAMB

LAMB 公式如下:

\[\begin{split}m_t = \beta_1 m_{t - 1}+ (1 - \beta_1)g_t \\\end{split}\]
\[\begin{split}v_t = \beta_2 v_{t - 1} + (1 - \beta_2)g_t^2 \\\end{split}\]
\[\begin{split}r_t = \frac{m_t}{\sqrt{v_t}+\epsilon} \\\end{split}\]
\[\begin{split}w_t = w_{t-1} -\eta_t \frac{\left \| w_{t-1}\right \|}{\left \| r_t + \lambda w_{t-1}\right \|} (r_t + \lambda w_{t-1}) \\\end{split}\]

和LARS 类似, LAMB 也是在内层优化器的基础上, 套了一个local learning rate 的逻辑, 对每一层的learning rate 进行了放缩。

效果

使用 LARS 在超大batch size 下训练 resnet50:

resnet50 imagenet

Global batch size

epoch

top1

[Goyal et al]

8k

90

76.3%

LARS Paper

32k

90

72.3%

[fleet: lars + amp]

16k

60

76.2%

[fleet: lars + amp]

32k

62

75.9%

使用方法
LARS

fleet 将 LARS实现为一个 fleet meta optimizer, 在使用时需要设置以下几点:

  1. LARS meta optimizer 的 inner optimizer 必须为 momentum, 并在 momentum 中定义 mulr 参数。

  2. 在DistributedStrategy 中设置LARS 特有的 lars_coeff 参数和 lars_weight_decay 参数。

    • LARS 已经将 weight decay 包含进公式中, 用户不需要再在 optimizer中设置 regularization

    • fleet 中还提供 lars_weight_decay 过滤策略, 可以通过在exclude_from_weight_decay 参数加入对应layer 的 name string, 让这一 layer 的参数不进行 lars weight decay。 (通常我们将BN 参数 和FC_bias 从lars weight decay 中过滤)

strategy = fleet.DistributedStrategy()
strategy.lars = True
strategy.lars_configs = {
                    "lars_coeff": 0.001,
                    "lars_weight_decay": 0.0005,
                    "exclude_from_weight_decay": ['batch_norm', '.b_0']
                }

上述例子的完整代码存放在:train_fleet_lars.py下面。假设要运行2卡的任务,那么只需在命令行中执行:

fleetrun --gpus=0,1 train_fleet_lars.py

您将看到显示如下日志信息:

-----------  Configuration Arguments -----------
gpus: 0,1
heter_worker_num: None
heter_workers:
http_port: None
ips: 127.0.0.1
log_dir: log
...
------------------------------------------------
...
+=======================================================================================+
|                        Distributed Envs                      Value                    |
+---------------------------------------------------------------------------------------+
|                       PADDLE_TRAINER_ID                        0                      |
|                 PADDLE_CURRENT_ENDPOINT                 127.0.0.1:12464               |
|                     PADDLE_TRAINERS_NUM                        2                      |
|                PADDLE_TRAINER_ENDPOINTS         127.0.0.1:12464,127.0.0.1:43227       |
|                     FLAGS_selected_gpus                        0                      |
+=======================================================================================+
...
+==============================================================================+
|                                                                              |
|                         DistributedStrategy Overview                         |
|                                                                              |
+==============================================================================+
|                          lars=True <-> lars_configs                          |
+------------------------------------------------------------------------------+
|                            lars_coeff          0.0010000000474974513         |
|                     lars_weight_decay          0.0005000000237487257         |
|                               epsilon                   0.0                  |
|             exclude_from_weight_decay                batch_norm              |
|                                                         .b_0                 |
+==============================================================================+
...
W0114 18:07:51.588716 16234 device_context.cc:346] Please NOTE: device: 4, GPU Compute Capability: 7.0, Driver API Version: 11.0, Runtime API Version: 10.0
W0114 18:07:51.593963 16234 device_context.cc:356] device: 4, cuDNN Version: 7.6.
[Epoch 0, batch 0] loss: 0.14651, acc1: 0.00000, acc5: 0.00000
[Epoch 0, batch 5] loss: 1.82926, acc1: 0.00000, acc5: 0.00000
[Epoch 0, batch 10] loss: 0.00000, acc1: 0.00000, acc5: 0.00000
[Epoch 0, batch 15] loss: 0.13787, acc1: 0.03125, acc5: 0.03125
[Epoch 0, batch 20] loss: 0.12400, acc1: 0.03125, acc5: 0.06250
[Epoch 0, batch 25] loss: 0.17749, acc1: 0.00000, acc5: 0.00000
...

完整 2卡的日志信息也可在./log/目录下查看。了解更多fleetrun的用法可参考左侧文档fleetrun 启动分布式任务

LAMB

fleet 将 LAMB实现为一个 fleet meta optimizer, 在使用时需要设置以下几点:

  1. LAMB meta optimizer 的 inner optimizer 必须为 adam, 并在 adam 中定义 学习率lr, 一阶 moment 的指数衰减率beta1 和二阶moment 的指数衰减率beta2 参数。

  2. 在 DistributedStrategy 里定设置AMB 特有的 lamb_weight_decay 参数.

    • LAMB 已经将 weight decay 包含进公式中, 用户不需要再在 optimizer中设置 regularization

    • fleet 中还提供 lamb_weight_decay 过滤策略, 可以通过在exclude_from_weight_decay 参数加入对应layer 的 name string, 让这一 layer 的参数不进行 lars weight decay。 (通常我们将LN 从lamb weight decay 中过滤)

strategy = fleet.DistributedStrategy()
strategy.lamb = True
strategy.lamb_configs = {
    'lamb_weight_decay': 0.01,
    'exclude_from_weight_decay': ['layer_norm'],
}

上述例子的完整代码存放在:train_fleet_lamb.py下面。假设要运行2卡的任务,那么只需在命令行中执行:

fleetrun --gpus=0,1 train_fleet_lamb.py

您将看到显示如下日志信息:

-----------  Configuration Arguments -----------
gpus: 0,1
heter_worker_num: None
heter_workers:
http_port: None
ips: 127.0.0.1
log_dir: log
...
------------------------------------------------
...
+=======================================================================================+
|                        Distributed Envs                      Value                    |
+---------------------------------------------------------------------------------------+
|                       PADDLE_TRAINER_ID                        0                      |
|                 PADDLE_CURRENT_ENDPOINT                 127.0.0.1:12464               |
|                     PADDLE_TRAINERS_NUM                        2                      |
|                PADDLE_TRAINER_ENDPOINTS         127.0.0.1:12464,127.0.0.1:43227       |
|                     FLAGS_selected_gpus                        0                      |
+=======================================================================================+
...
+==============================================================================+
|                                                                              |
|                         DistributedStrategy Overview                         |
|                                                                              |
+==============================================================================+
|                          lamb=True <-> lamb_configs                          |
+------------------------------------------------------------------------------+
|                     lamb_weight_decay           0.009999999776482582         |
|             exclude_from_weight_decay                layer_norm              |
+==============================================================================+
...
W0114 18:07:51.588716 16234 device_context.cc:346] Please NOTE: device: 4, GPU Compute Capability: 7.0, Driver API Version: 11.0, Runtime API Version: 10.0
W0114 18:07:51.593963 16234 device_context.cc:356] device: 4, cuDNN Version: 7.6.
[Epoch 0, batch 0] loss: 0.14651, acc1: 0.00000, acc5: 0.00000
[Epoch 0, batch 5] loss: 1.82926, acc1: 0.00000, acc5: 0.00000
[Epoch 0, batch 10] loss: 0.00000, acc1: 0.00000, acc5: 0.00000
[Epoch 0, batch 15] loss: 0.13787, acc1: 0.03125, acc5: 0.03125
[Epoch 0, batch 20] loss: 0.12400, acc1: 0.03125, acc5: 0.06250
[Epoch 0, batch 25] loss: 0.17749, acc1: 0.00000, acc5: 0.00000
...

完整2 卡的日志信息也可在./log/目录下查看。了解更多fleetrun的用法可参考左侧文档fleetrun 启动分布式任务

飞桨大规模分类库使用介绍

简介

图像分类技术日趋成熟,ResNet网络在ImageNet数据集上的top5准确率已超过96%。然而,如何高效地完成百万类别甚至是更大规模的分类任务,则是一个极具挑战性的课题。

从多分类神经网络的实现角度分析,其最后一层通常是由全连接层和Softmax构成的组合层,全连接层输出结点数挂钩分类任务的类别数,所以对应的参数量随分类类别数的增长而线性增长。因此,当类别数非常大时,神经网络训练过程占用的显存空间也会很大,甚至是超出单张GPU卡的显存容量,导致神经网络模型无法训练。

以新闻推荐系统为例,假设要对百万类细分类别的新闻条目进行分类,那么仅存储全连接层参数就需要约2GB的显存空间(这里假设神经网络最后一层隐层的输出结点的维度为512,并假设以32比特浮点数表示数据,见下式)。再考虑神经网络训练过程中生成的数量庞多的中间变量,那么训练过程中需要的存储总量往往会超出单张GPU卡的显存容量。

$$全连接层参数显存消耗=\frac{512*10^6*4B}{1024^3}\approx2GB$$

原理介绍

该如何解决这个问题呢?常用的做法是“拆分”。考虑到全连接层的线性可分性,可以将全连接层参数切分到多张GPU卡,采用模型并行方案,减少每张GPU卡的参数存储量。

以下图为例,全连接层参数按行切分到不同的GPU卡上。每次训练迭代过程中,各张GPU卡分别以各自的训练数据计算隐层的输出特征(feature),并通过集合通信操作AllGather得到汇聚后的特征。接着,各张GPU卡以汇聚后的特征和部分全连接层参数计算部分logit值(partial logit),并基于此计算神经网络的损失值。详细推导过程请参阅附录。

plsc

这个方案可以有效解决全连接层参数量随分类类别数线性增长导致的显存空间不足的问题。然而,为了实现这一方案,开发者需要基于现有的深度学习平台设计和实现上例描述的所有操作,包括全连接层参数的切分和集合通信等,动辄需要数百行实现代码,大大增加了开发者的负担。飞桨大规模分类库(PLSC: PaddlePaddle Large Scale Classification),为用户提供了大规模分类任务从训练到部署的全流程解决方案。只需数行代码,即可实现千万类别分类的神经网络。并且,通过PLSC库提供的serving功能用户可以快速部署模型,提供一站式服务。

功能效果

PLSC库在多个数据集上可以取得SOTA的训练精度,下表列出PLSC库分别使用MS1M-ArcFace和CASIA数据集作为训练数据,在不同验证数据集上取得的精度。

模型

训练集

lfw

agendb_30

cfp_ff

cfp_fp

MegaFace (Id/Ver)

ResNet50

MS1M-ArcFace

0.99817

0.99827

0.99857

0.96314

0.980/0.993

ResNet50

CASIA

0.98950

0.90950

0.99057

0.91500

N/A

备注:上述模型训练使用的loss_type为’dist_arcface’。更多关于ArcFace的内容请参考: ArcFace: Additive Angular Margin Loss for Deep Face Recognition

PLSC支持多机分布式训练。一方面,通过多机分布式训练可以将全连接层参数切分到更多的GPU卡,从而支持千万类别分类,并且飞桨大规模分类库理论上支持的分类类别数随着使用的GPU卡数的增加而增加。例如,单机8张V100 GPU配置下支持的最大分类类别数相比不使用PLSC扩大2.52倍。另一方面,使用多机分布式训练可以有效提升训练速度。

下图给出使用不同数量的节点时的训练速度(吞吐)。实验中使用的训练数据集为MS1M-ArcFace,分类类别数为85742,每个节点配备8张NVIDIA V100 GPUs,backbone模型为ResNet50。如图所示,使用飞桨大规模分类库可以取得近似线性的加速比。

performance
使用方法
安装PLSC

执行下面的命令安装PLSC。

pip install plsc
准备模型训练配置代码,保存为train.py文件

使用PLSC组建分类神经网络主要包括下面三个步骤:

  1. 从plsc包导入Entry类,Entry类封装PLSC所有API的接口类;

  2. 实例化Entry类的对象;

  3. 调用Entry类的train方法,开始训练过程。

默认情况下,该训练脚本使用的loss值计算方法为’dist_arcface’,即将全连接层参数切分到多张GPU卡的模型并行方案,需要使用两张或以上的GPU卡。默认地,基础模型为ResNet50模型,关于如何自定义模型请参考 文档

from plsc import Entry
if __name__ == "main":
        ins = Entry()
        ins.set_class_num(1000000) #设置分类类别数
        ins.train()
启动训练任务

可以使用下面的命令行启动训练任务,其中selected_gpus参数用于指定训练中使用的GPU卡。

python -m paddle.distributed.launch \
            --selected_gpus=0,1,2,3,4,5,6,7 \
            train.py

更多PLSC使用文档,请参阅: PLSC Repo

附录

全连接层操作在数学上等价于输入X和参数W的矩阵乘: \(XW\)。参数W可以按列切分为N个部分 \([W_{0}, W_{1}, ..., W_{N-1}]\),并分别放置到N张卡上。

$$XW = X[W_{0}, W_{1}, …, W_{N-1}] = [XW_{0}, XW_{1}, …, XW_{N-1}]$$

因此,在第i张卡上,只需要计算部分结果 \(XW_{i}\)。然后,通过集合通信操作获取全局结果 \(XW\)

使用Sharding 训练超大模型

简介

当模型参数达到百亿或者千亿时, 传统的数据并行训练可能会遇到显存瓶颈。 在数据并行训练中,每个gpu worker 都有一份完整模型参数和优化器状态副本。 《ZeRO: Memory Optimizations Toward Training Trillion Parameter Models》 指出在每个GPU 上都保存一份模型参数和优化器状态副本是冗余的。 我们可以通过将上述参数和副本划分到不同GPU 中, 在每个GPU 只保存部分副本,来减少每张GPU上显存的占用,从而可以支持更大模型的训练。

原理
Sharding

Sharding 实现了类似ZeRO-DP 的训练策略,将模型参数(parameter)、参数梯度(gradient)、参数对应的优化器状态(moment)切分到每一张GPU 上。让模型参数部分所占的显存随并行卡数的增加而减少。 通过 paddle.distributed.fleet 提供的简单易用接口, 用户只需要添加几行代码就可将策略加入到原有的训练中。

模型训练过程中的显存消耗主要由两大部分组成:模型参数及优化器状态、训练产生的中间变量(activations)。 sharding 策略只切分了模型参数和优化器状态,因此模型参数和优化器状态所消耗的显存可以随着并行GPU数量增加而线性减少; 但是每张GPU上仍然维护着模型完整的前向和反向,所以每张GPU依然需要存放模型的训练过程中的产生的全部的中间变量,这部分显存消耗 不会随着GPU 数量的增加而减少。 用户可以通过结合 recompute 策略来减少 activation这部分的显存消耗。

通过sharding 和增加并行GPU 数量,用户可以训练百亿甚至千亿参量的超大模型 (需要结合 recompute, amp 策略)。

Sharding-hybrid-dp

Sharding hybrid数据并行策略,在sharding 并行的基础上再增加一层数据并行逻辑。 该策略的目的是通过 限制sharding 通信的节点数增加多路数据并行 来提高训练吞吐。 如果一个模型在普通Sharding 训练时需要M 张GPU,则则开启hybrid-dp 至少需要 N*M GPU (N>= 2)。

Sharding-hybrid-dp 适用的场景如下:

  • 当前有 4个 8 卡v100 节点

  • 目标模型A 在Sharding 训练时至少需要 8卡 v100 (一个完整的8 卡v100节点)

  • 希望利用全部的 4 个节点来加速训练

上述情况如果直接使用全部的 4 个节点 进行普通的sharding 训练, 那么全部的 32 gpus 之间组成一个完整 Sharding parallelism。这样会因为通信瓶颈造成训练速度非常慢:

  • Sharding 中的broadcast 通信 会涉及全部的32 张卡,且为跨节点通信。

  • Sharding 中的 allreduce 通信 会涉及全部的32 张卡,且为跨节点通信。

开启 hybrid-dp 并设置 sharding_group_size = 8 后, 每个节点内的 8 张卡组成一个完整的 Sharding parallelism,4 个节点构成 4路 hybrid data parallelism:

  • Sharding 中的broadcast 通信被限制在每个节点内的 8 张GPU 之间, 没有跨节点通信。

  • Sharding 中的 allreduce 为跨节点通信,但每个allreduce 通信只涉及 对应 sharding_group 上 rank 相同的 4 张GPUs, 且每张GPU仅需要 allreduce通信 1/8 的模型参数。

Sharding-hybrid-dp 通过上述措施,可以较大程度 减少 Sharding 训练 从1节点扩展到4 节点时的(跨节点)通信量。提高节点增加时的加速比,提高训练吞吐。

P.S. hybrid dp 是因为 Sharding parallelism 本身内含一层 data parallelism 逻辑, hybrid dp 是在 Sharding parallelism之上再增加新的一层 data parallelism 逻辑。

效果

下面表格将对比 Sharding 策略对显存的影响。

模型为 Bert-Large,试验环境为 v100 (32GB), recompute = ON, amp = ON, hybrid-dp = OFF。 模型不变,单卡batch size 不变,当并行GPU数量增加时,显存的消耗将减小。 省下的显存可以用来增大模型。

setting

GPU Mem

sharding—off

8667 MB

sharding—on N1C2

5615 MB

sharding—on N1C4

4841 MB

sharding—on N1C8

4127 MB

sharding—on N2C16

3700 MB

Sharding 结合 amp + recompute,可以在 128 张 32GB V100 并行的情况下支持千亿参数(115B)ERNIE 训练。

使用方法

对于sharding,用户需要设置 fuse_broadcast_MB参数。该参数控制广播通信中参数融合的阈值,会影响sharding 训练中的通信速度,是一个需要根据具体模型大小和网络拓扑设定的经验值。

若开启hybrid-dp,用户需要设置 hybrid_dp为True,并指定 sharding_group_size

为了示例代码的简练,下面例子中使用较小 resnet50 模型作为演示。实际训练中,sharding 的目标是通过牺牲训练速度以换取对更大模型的支持,故不适用于 resnet50 等单卡就能训练的模型。

因为resnet50 较小,我们可以令sharding_group_size = 2让模型参数被切分为2 个shards,然后在 一个单机4卡 v100 的节点上组成 2 路 hybrid-dp 并行进行演示。

strategy = fleet.DistributedStrategy()
strategy.sharding = True
strategy.sharding_configs = {
    "fuse_broadcast_MB": 32,
    "hybrid_dp": True,
    "sharding_group_size": 2,
}

上述例子的完整代码存放在:train_fleet_sharding.py下面。假设要运行4卡的任务,那么只需在命令行中执行:

fleetrun --gpus=4,5,6,7 train_fleet_sharding.py

您将看到显示如下日志信息:

-----------  Configuration Arguments -----------
gpus: 4,5,6,7
heter_worker_num: None
heter_workers:
http_port: None
ips: 127.0.0.1
log_dir: log
...
------------------------------------------------
...
+=======================================================================================+
|                        Distributed Envs                      Value                    |
+---------------------------------------------------------------------------------------+
|                       PADDLE_TRAINER_ID                        0                      |
|                 PADDLE_CURRENT_ENDPOINT                 127.0.0.1:18362               |
|                     PADDLE_TRAINERS_NUM                        4                      |
|                PADDLE_TRAINER_ENDPOINTS  ... 0.1:23911,127.0.0.1:35135,127.0.0.1:38263|
|                     FLAGS_selected_gpus                        4                      |
+=======================================================================================+
...
INFO:root:Using Sharing&DP mode !
INFO:root:global word size: 4
INFO:root:global rank: 0
INFO:root:sharding group_size: 2
INFO:root:sharding rank: 0
INFO:root:dp group size: 2
INFO:root:dp rank: 0
INFO:root:current endpoint: 127.0.0.1:18362
INFO:root:sharding group endpoints: ['127.0.0.1:18362', '127.0.0.1:23911']
INFO:root:dp group endpoints: ['127.0.0.1:18362', '127.0.0.1:35135']
INFO:root:global word endpoints: ['127.0.0.1:18362', '127.0.0.1:23911', '127.0.0.1:35135', '127.0.0.1:38263']
server not ready, wait 3 sec to retry...
not ready endpoints:['127.0.0.1:23911']
...
+==============================================================================+
|                      sharding=True <-> sharding_configs                      |
+------------------------------------------------------------------------------+
|                     fuse_broadcast_MB                   32.0                 |
|                             hybrid_dp                   True                 |
|                   sharding_group_size                    2                   |
+==============================================================================+
...
W0114 18:07:51.588716 16234 device_context.cc:346] Please NOTE: device: 4, GPU Compute Capability: 7.0, Driver API Version: 11.0, Runtime API Version: 10.0
W0114 18:07:51.593963 16234 device_context.cc:356] device: 4, cuDNN Version: 7.6.
[Epoch 0, batch 0] loss: 0.14651, acc1: 0.00000, acc5: 0.00000
[Epoch 0, batch 5] loss: 1.82926, acc1: 0.00000, acc5: 0.00000
[Epoch 0, batch 10] loss: 0.00000, acc1: 0.00000, acc5: 0.00000
[Epoch 0, batch 15] loss: 0.13787, acc1: 0.03125, acc5: 0.03125
[Epoch 0, batch 20] loss: 0.12400, acc1: 0.03125, acc5: 0.06250
[Epoch 0, batch 25] loss: 0.17749, acc1: 0.00000, acc5: 0.00000
...

完整4卡的日志信息也可在./log/目录下查看。了解更多fleetrun的用法可参考左侧文档fleetrun 启动分布式任务

流水线并行

简介

通常来讲,训练更大规模的网络模型可以在多种任务上取得更好的效果,如提升图像分类任务的准确率。然而,训练更大规模的网络模型会消耗更多的显存资源,甚至是超过单个设备的显存容量,从而导致模型无法训练。流水线并行通过将网络模型不同的层放置到不同的设备,从而降低单个设备的显存消耗,使得超大规模模型训练成为可能。本文主要介绍飞桨流水线并行的基本原理和使用方法。

原理介绍
pipeline

与数据并行不同,流水线并行将模型的不同层放置到不同的计算设备,降低单个计算设备的显存消耗,从而实现超大规模模型训练。以上图为例,示例模型包含四个模型层。该模型被切分为三个部分,并分别放置到三个不同的计算设备。即,第1层放置到设备0,第2层和第三3层放置到设备1,第4层放置到设备2。相邻设备间通过通信链路传输数据。具体地讲,前向计算过程中,输入数据首先在设备0上通过第1层的计算得到中间结果,并将中间结果传输到设备1,然后在设备1上计算得到第2层和第3层的输出,并将模型第3层的输出结果传输到设备2,在设备2上经由最后一层的计算得到前向计算结果。反向传播过程类似。最后,各个设备上的网络层会使用反向传播过程计算得到的梯度更新参数。由于各个设备间传输的仅是相邻设备间的输出张量,而不是梯度信息,因此通信量较小。

下图给出流水线并行的时序图。最简配置流水线并行模型下,任意时刻只有单个计算设备处于计算状态,其它计算设备则处于空闲状态,因此设备利用率和计算效率较差。

pipeline_timeline1

为了优化流水线并行中设备的计算效率,可以进一步将mini-batch切分成若干更小粒度的micro-batch,以提升流水线并行的并发度,进而达到提升设备利用率和计算效率的目的。如下图所示,一个mini-batch被切分为4个micro-batch;前向阶段,每个设备依次计算单个micro-batch的结果;从而增加了设备间的并发度,降低了流水线并行bubble空间比例,提高了计算效率。

pipeline_timeline2
功能效果

使用流水线并行,可以实现超大规模模型训练。例如,使用多个计算设备,可以实现单个计算设备显存无法容纳的模型训练。

使用方法

在使用流水线并行的训练策略时,我们通过device_guard接口将不同的计算层放置在不同的设备上,如device_guard("gpu:0")。需要注意的是,当前流水线并行仅支持GPU设备。并且,模型中每个层都需要指定放置设备。

# device_guard 使用示例
def build_network():
    with paddle.fluid.device_guard("gpu:0"):
        data = paddle.static.data(name='sequence', shape=[1], dtype='int64')
        data_loader = paddle.io.DataLoader.from_generator(
            feed_list=[data],
            capacity=64,
            use_double_buffer=True,
            iterable=False)
        emb = nn.embedding(input=data, size=[128, 64])
    with paddle.fluid.device_guard("gpu:1"):
        fc = nn.fc(emb, size=10)
        loss = paddle.mean(fc)
    return data_loader, loss

通过设定dist_strategy.pipeline 为True,将流水线并行的策略激活。

fleet.init(is_collective=True)
dist_strategy = paddle.distributed.fleet.DistributedStrategy()
dist_strategy.pipeline = True

进一步地,可以通过dist_strategy.pipeline_configs 配置流水线并行中mini-batch的切分粒度。假设mini-batch的大小为128,可以通过下述代码将mini-batch切为4份更小粒度的micro-batch,每个micro-batch的大小为32。需要注意地是,用户需要保证mini-batch大小是micro-batch大小的整数倍。

fleet.init(is_collective=True)
dist_strategy = paddle.distributed.fleet.DistributedStrategy()
strategy.pipeline_configs = {"micro_batch": 4}

基于ResNet50网络的流水线并行代码:example/resnet

使用下述命令行运行示例代码:

python -m paddle.distributed.launch \
       --gpus="0,1,2,3,4" \
       train_fleet_pipeline.py

控制台输出信息如下:

WARNING 2021-01-08 15:53:27,677 launch.py:314] Not found distinct arguments and compiled with cuda. Default use collective mode
launch train in GPU mode
INFO 2021-01-08 15:53:27,679 launch_utils.py:471] Local start 5 processes. First process distributed environment info (Only For Debug):
 +=======================================================================================+
 |                        Distributed Envs                      Value                    |
 +---------------------------------------------------------------------------------------+
 |                       PADDLE_TRAINER_ID                        0                      |
 |                 PADDLE_CURRENT_ENDPOINT                 127.0.0.1:52033               |
 |                     PADDLE_TRAINERS_NUM                        5                      |
 |                PADDLE_TRAINER_ENDPOINTS  ... 0.1:12178,127.0.0.1:28915,127.0.0.1:32114|
 |                     FLAGS_selected_gpus                        0                      |
 +=======================================================================================+
 INFO 2021-01-08 15:53:27,679 launch_utils.py:475] details abouts PADDLE_TRAINER_ENDPOINTS can be found in log/endpoints.log.
 grep: warning: GREP_OPTIONS is deprecated; please use an alias or script
 server not ready, wait 3 sec to retry...
 not ready endpoints:['127.0.0.1:40388', '127.0.0.1:12178', '127.0.0.1:28915', '127.0.0.1:32114']
 server not ready, wait 3 sec to retry...
 not ready endpoints:['127.0.0.1:12178']
 W0108 15:53:37.673019 103703 device_context.cc:342] Please NOTE: device: 0, GPU Compute Capability: 7.0, Driver API Version: 11.0, Runtime API Version: 10.1
 W0108 15:53:37.678391 103703 device_context.cc:352] device: 0, cuDNN Version: 7.6.

日志信息位于log目录下,log/workerlog.4日志文件的内容如下:

grep: warning: GREP_OPTIONS is deprecated; please use an alias or script
W0108 15:52:27.723405 103188 device_context.cc:342] Please NOTE: device: 4, GPU Compute Capability: 7.0, Driver API Version: 11.0, Runtime API Version: 10.1
W0108 15:52:27.728278 103188 device_context.cc:352] device: 4, cuDNN Version: 7.6.
I0108 15:52:32.665313 103188 gen_nccl_id_op_helper.cc:176] Server listening on: 127.0.0.1:32347 successful.
W0108 15:52:36.874132 103188 operator.cc:1194] Device index is only supported under pipeline parallelism, so it will be ignored.
grep: warning: GREP_OPTIONS is deprecated; please use an alias or script
W0108 15:53:31.393914 103723 device_context.cc:342] Please NOTE: device: 4, GPU Compute Capability: 7.0, Driver API Version: 11.0, Runtime API Version: 10.1
W0108 15:53:31.398906 103723 device_context.cc:352] device: 4, cuDNN Version: 7.6.
I0108 15:53:34.465754 103723 gen_nccl_id_op_helper.cc:176] Server listening on: 127.0.0.1:32114 successful.
W0108 15:53:40.784844 103723 operator.cc:1194] Device index is only supported under pipeline parallelism, so it will be ignored.
[Epoch 0, batch 5] loss: 0.37770, acc1: 0.03125, acc5: 0.03125
[Epoch 0, batch 10] loss: 0.06200, acc1: 0.00000, acc5: 0.03125
[Epoch 0, batch 15] loss: 0.26105, acc1: 0.00000, acc5: 0.00000
[Epoch 0, batch 20] loss: 0.00000, acc1: 0.00000, acc5: 0.00000
[Epoch 0, batch 25] loss: 0.37330, acc1: 0.00000, acc5: 0.06250
[Epoch 0, batch 30] loss: 0.00000, acc1: 0.00000, acc5: 0.00000
[Epoch 0, batch 35] loss: 0.07487, acc1: 0.00000, acc5: 0.00000
[Epoch 0, batch 40] loss: 0.12932, acc1: 0.03125, acc5: 0.06250
[Epoch 0, batch 45] loss: 0.19604, acc1: 0.00000, acc5: 0.03125
[Epoch 0, batch 50] loss: 0.07977, acc1: 0.00000, acc5: 0.00000
[Epoch 0, batch 55] loss: 0.00000, acc1: 0.00000, acc5: 0.00000
[Epoch 0, batch 60] loss: 0.13464, acc1: 0.00000, acc5: 0.06250
[Epoch 0, batch 65] loss: 0.13940, acc1: 0.00000, acc5: 0.03125
[Epoch 0, batch 70] loss: 0.00000, acc1: 0.00000, acc5: 0.00000
[Epoch 0, batch 75] loss: 0.00000, acc1: 0.00000, acc5: 0.00000
注意事项

由于流水线并行将模型的层放置到不同的计算设备,因此在fetch信息时,只有所fetch的数据所在设备进程对应的日志信息中输出数据信息,其它设备进程对应的日志输出None。以上面的示例说明,由于获取的损失值和精度值只在最后一个设备上,因此只有log/workerlog.4日志文件中会输出对应的数据,其它日志文件不会输出对应的数据。

二次开发

  • TBA

整体示例

  • TBA

ParameterServer训练

快速开始

在大数据浪潮的推动下,有标签训练数据的规模取得了飞速的增长。现在人们通常用数百万甚至上千万的有标签图像来训练图像分类器(如,ImageNet包含1400万幅图像,涵盖两万多个种类),用成千上万小时的语音数据来训练语音模型(如,Deep Speech 2系统使用了11940小时的语音数据以及超过200万句表述来训练语音识别模型)。在真实的业务场景中,训练数据的规模可以达到上述数据集的数十倍甚至数百倍,如此庞大的数据需要消耗大量的计算资源和训练时间使模型达到收敛状态(数天时间)。

为了提高模型的训练效率,分布式训练应运而生,其中基于参数服务器的分布式训练为一种常见的中心化共享参数的同步方式。与单机训练不同的是在参数服务器分布式训练中,各个节点充当着不同的角色:

  • 训练节点:该节点负责完成数据读取、前向计算、反向梯度计算等过程,并将计算出的梯度上传至服务节点。

  • 服务节点:在收到所有训练节点传来的梯度后,该节点会将梯度聚合并更新参数。最后将参数发送给训练节点,开始新一轮的训练。

根据参数更新的方式不同,可以分为同步/异步/Geo异步三种:

  • 同步训练:所有Worker的进度保持一致,即每训练完一个Batch后,所有Worker会上传梯度给Server,然后开始等待Server返回更新后的参数。Server在拿到所有Worker上传的梯度后,才会开始计算更新后的参数。因此在任何一个时间点,所有Worker都处于相同的训练阶段。同步训练的优势在于Loss可以比较稳定的下降,缺点是整个训练速度较慢,这是典型的木桶原理,速度的快慢取决于最慢的那个Worker的训练计算时间,因此在训练较为复杂的模型时,即模型训练过程中神经网络训练耗时远大于节点间通信耗时的场景下,推荐使用同步训练模式。

  • 异步训练:与同步训练不同,在异步训练中任何两个Worker之间的参数更新都互不影响。每一个Worker完成训练、上传梯度后,Server都会立即更新参数并将结果返回至相应的训练节点。拿到最新的参数后,该训练节点会立即开始新一轮的训练。异步训练去除了训练过程中的等待机制,训练速度得到了极大的提升,但是缺点也很明显,那就是Loss下降不稳定,容易发生抖动。建议在个性化推荐(召回、排序)、语义匹配等数据量大的场景使用。

  • GEO异步训练:GEO异步训练是飞桨独有的一种异步训练模式,训练过程中任何两个训练节点之间的参数更新同样都互不影响,但是每个训练节点本地都会拥有完整的训练流程,即前向计算、反向计算和参数优化,而且每训练到一定的批次(Batch) 训练节点都会将本地的参数计算一次差值(Step间隔带来的参数差值),将差值发送给服务端累计更新,并拿到最新的参数后,该训练节点会立即开始新一轮的训练。所以显而易见,在GEO异步训练模式下,Worker不用再等待Server发来新的参数即可执行训练,在训练效果和训练速度上有了极大的提升。但是此模式比较适合可以在单机内能完整保存的模型,在搜索、NLP等类型的业务上应用广泛,比较推荐在词向量、语义匹配等场景中使用。

本节将采用推荐领域非常经典的模型wide_and_deep为例,介绍如何使用Fleet API(paddle.distributed.fleet)完成参数服务器训练任务,本次快速开始的示例代码位于https://github.com/PaddlePaddle/FleetX/tree/develop/examples/wide_and_deep。

版本要求

在编写分布式训练程序之前,用户需要确保已经安装paddlepaddle-2.0.0-cpu或paddlepaddle-2.0.0-gpu及以上版本的飞桨开源框架。

操作方法

参数服务器训练的基本代码主要包括如下几个部分: 1. 导入分布式训练需要的依赖包。 2. 定义分布式模式并初始化分布式训练环境。 3. 加载模型及数据。 4. 定义参数更新策略及优化器。 5. 开始训练。

下面将逐一进行讲解。

导入依赖
import paddle
import os
import paddle.distributed.fleet as fleet
import paddle.distributed.fleet.base.role_maker as role_maker
定义分布式模式并初始化分布式训练环境

通过fleet.init()接口,用户可以定义训练相关的环境,注意此环境是用户预先在环境变量中配置好的,包括:训练节点个数,服务节点个数,当前节点的序号,服务节点完整的IP:PORT列表等。

# 当前参数服务器模式只支持静态图模式, 因此训练前必须指定`paddle.enable_static()`
paddle.enable_static()
role = role_maker.PaddleCloudRoleMaker()
fleet.init(role)
加载模型及数据
# 模型定义参考examples/wide_and_deep中model.py
from model import net
from reader import data_reader

feeds, predict, avg_cost = net()

train_reader = paddle.batch(data_reader(), batch_size=4)
reader.decorate_sample_list_generator(train_reader)
定义同步训练 Strategy 及 Optimizer

在Fleet API中,用户可以使用fleet.DistributedStrategy()接口定义自己想要使用的分布式策略。

其中a_sync选项用于定义参数服务器相关的策略,当其被设定为False时,分布式训练将在同步的模式下进行。反之,当其被设定成True时,分布式训练将在异步的模式下进行。

# 定义异步训练
dist_strategy = fleet.DistributedStrategy()
dist_strategy.a_sync = True

# 定义同步训练
dist_strategy = fleet.DistributedStrategy()
dist_strategy.a_sync = False

# 定义Geo异步训练, Geo异步目前只支持SGD优化算法
dist_strategy = fleet.DistributedStrategy()
dist_strategy.a_sync = True
dist_strategy.a_sync_configs = {"k_steps": 100}

optimizer = paddle.optimizer.SGD(learning_rate=0.0001)
optimizer = fleet.distributed_optimizer(optimizer, dist_strategy)
optimizer.minimize(model.loss)
开始训练

完成模型及训练策略以后,我们就可以开始训练模型了。因为在参数服务器模式下会有不同的角色,所以根据不同节点分配不同的任务。

对于服务器节点,首先用init_server()接口对其进行初始化,然后启动服务并开始监听由训练节点传来的梯度。

同样对于训练节点,用init_worker()接口进行初始化后, 开始执行训练任务。运行exe.run()接口开始训练,并得到训练中每一步的损失值。

if fleet.is_server():
    fleet.init_server()
    fleet.run_server()
else:
    exe = paddle.static.Executor(paddle.CPUPlace())
    exe.run(paddle.static.default_startup_program())

    fleet.init_worker()

    for epoch_id in range(1):
        reader.start()
        try:
            while True:
                loss_val = exe.run(program=paddle.static.default_main_program(),
                                   fetch_list=[avg_cost.name])
                loss_val = np.mean(loss_val)
                print("TRAIN ---> pass: {} loss: {}\n".format(epoch_id,
                                                              loss_val))
        except paddle.core.EOFException:
            reader.reset()

    fleet.stop_worker()
运行训练脚本

定义完训练脚本后,我们就可以用fleetrun指令运行分布式任务了。其中server_num, worker_num分别为服务节点和训练节点的数量。在本例中,服务节点有1个,训练节点有两个。

fleetrun --server_num=1 --worker_num=2 train.py

性能基准

请访问飞桨 Perf Repo 获取飞桨性能基准数据。

设计思想

综述

参数服务器概述

参数服务器是个编程框架,用于方便分布式并行程序的编写,其中重点是对大规模参数的分布式存储和协同的支持。

工业界需要训练大型的机器学习模型,这些模型参数往往超大,达到了百GB甚至TB级别,超过了单台服务器的容纳能力,同时训练这些模型的数据量巨大,一次参与训练的数据可能达到上百TB,需要多台服务器共同分担,加速整个训练任务。在这种情况下,采用参数服务器架构可以很好的利用分布式计算的能力来提升训练任务中模型规模的上限和训练效率。

一般参数服务器架构如图(原图论文地址):

ps

即将整个训练节点划分为计算节点(Worker)和参数更新节点(PServer)两种,其中计算节点负责将分配到此节点的数据进行计算,将算得的梯度发送给对应的参数更新节点,后从参数更新节点获取最新的参数更新到本地。参数更新节点采用一定的参数划分方式,将模型参数均匀的划分在多个节点上,每个节点在收到梯度后根据优化策略进行参数更新,同时参数更新节点作为服务方,会对计算节点提供参数查询功能。

飞桨参数服务器概述

飞桨参数服务器的基本架构源自 Parameter Server for Distributed Machine LearningLarge Scale Distributed Deep Networks,并在其基础上做了大量创新来满足百度和其他公司对于参数服务器性能和功能的需求。

飞桨参数服务器拥有以下特性:

  1. 支持同步训练、异步训练和GEO异步训练三种模式,能够多方面满足用户需求,保障模型的性能和稳定性。

  2. 支持千亿级别大规模稀疏模式,支持准入、遗忘策略,完备支持流式训练。

  3. 采用BRPC作为多节点之间通信的主要方法,极大提升了网络可用性(BRPC在百度内部久经考验,值得信赖)。

  4. 超高的吞吐及多机加速比,能够有效利用计算资源,提升训练速度。

飞桨参数服务器架构

架构图如下所示:

ps_store

基本组件描述

  1. FleetAPI: 贯穿整个分布式训练的API, 分布式所有对外暴露的API均由Fleet暴露,不允许其他任何组件暴露API。

  2. DistributedOptimizer: 编译期,结合配置将单机训练网络转换为分布式训练网络,生成适配于各个节点的Program及配置文件。

  3. Reader: 包含Dataset/DataLoader/Feeder, Reader与训练解耦,训练可以与任意Reader适配。

  4. Executor: 每个训练节点(Worker)的主方法,适配各种Reader, 分布式中只通过send/recv和外部进行交互。

  5. Communicator: Worker端负责梯度/参数聚合、拆分、收发的核心模块,独立运行,通过初始化配置及编译期间生成的Worker参数收发配置文件的内容进行工作。

  6. RPC/GLOO: 负责参数传递、节点控制等,通信核心模块。 RPC逻辑会从收发Tensor更新为收发二进制, GLOO负责控制训练过程中对于训练流程的控制,包括Barrier,以及通过GLOO API实现分布式Auc/分布式Acc等逻辑。

  7. ParameterServer: 参数服务器模块,独立运行于PServer端,包含Dense/Sparse参数存储及更新、Decay/Clip/Show、Click等处理逻辑。

参数服务器整体架构分编译期和运行期两个阶段。

编译阶段,框架需在FleetAPI的配合下,将用户定义的单机组网计算图拆分为两部分内容:

  1. 计算节点(Worker)端计算图, Worker端的计算图主要由基础训练网络构成,包含数据读取,前向,反向及与通信组件(Communicator)通信的算子。

  2. 配置文件,PServer端需据此启动RPC Server服务,以及生成参数的存储格式。Worker端需据此完成通信组件Communicator的配置。

运行阶段,PServer端需启动RPC服务,监听并处理Worker的参数拉取、更新等请求。运行阶段,Worker端的训练线程需基于自己划分的训练数据,进行学习,将梯度(参数)发送给Communicator后,根据配置(同步、异步、GEO异步)来确定是等待通信完成,还是直接进行下一轮训练,以此来完成整个参数服务器的分布式训练。Worker端的Communicator通信组件也需在运行阶段初就完成启动,并不断将当前Worker各个训练线程产出的梯度聚合后发送给PServer,然后从PServer上拉取最新参数以供训练线程使用。

分布式训练模式

当前飞桨共支持三种分布式训练模式,同步、异步、GEO异步,每种模式的特点及适用场景为:

  • 同步训练:训练一个minibatch后,每个节点会合并所有线程的梯度发给PServer, PServer端收到所有节点的梯度后,进行梯度聚合及参数更新。因同步训练的过程中有诸多的等待或同步机制,导致整个训练速度较慢,推荐在复杂模型(神经网络训练耗时远大于节点间通信耗时)使用。

  • 异步训练:训练一个minibatch后,每个节点的每个线程会发送梯度给PServer,PServer端不再等待收到所有节点的梯度,而是直接基于已收到的梯度进行参数更新。异步训练去除了训练过程中的等待机制,训练速度得到了极大的提升, 但是因为引入了异步更新的机制会导致训练效果有所波动,建议在召回、排序、语义匹配等数据量大的场景使用。

  • GEO(Geometric Stochastic Gradient Descent)异步训练:GEO是飞桨自研的异步训练框架,在训练效果和训练速度上有了极大的提升,目前只支持SGD优化算法。 每个节点在本地训练若干个minibatch后(具体训练多少个minibatch由配置决定),发送参数更新给PServer端,PServer端接收后通过加和方式更新参数。GEO速度极快,并在搜索、NLP等业务上广泛应用, 推荐在词向量、语义匹配等领域进行使用。

存储设计

本节主要介绍大规模稀疏参数服务器的存储设计。 神经网络训练中,参数共分为稠密参数和稀疏参数两种, 其中稠密参数指每次训练都需要全部更新的参数,例如全连接层(fc, fully-connected)的权重(weight)和偏置(bias)等。 稀疏参数指每次训练仅需部分更新的参数,例如Embedding表,每次训练仅需更新输入对应的Embedding行即可。

原理

参数服务器中,参数的存储设计应该分为两部分,分配和存储。前者介绍每个参数应该存储在哪个PServer上,后者介绍每个PServer上的参数应该如何存储。

PServer参数分配

稠密参数会全部展平成一维数组,拼接在一起变成一个大的一维数组,然后均匀分配在所有PServer上。

稀疏参数的分配方式为取余,每个id应该被分配到哪个PServer上,可通过公式 id % PServer_num 计算得到。

PServer参数存储

每个PServer上的参数存储格式如下图所示:

ps_store

可以发现,无论是稠密参数(DenseTable)还是稀疏参数(SparseTable),最终存储的内容(ValueData)格式都一样,除参数本身外,还需额外存储参数优化计算公式中除梯度外所有的其他中间状态值,下面以sgd,adagrad,adam三种常见优化算法为例进行说明。

若优化算法为Sgd,随机梯度下降,参数更新公式为:

\[param = param - lr * grad\]

需存储参数(param)和学习率(lr, 维度为1)。

若优化器为Adagrad,参数更新公式为:

\[ \begin{align}\begin{aligned}moment &= moment + grad * grad\\param &= param - \frac{lr * grad}{\sqrt{moment} + \epsilon}\end{aligned}\end{align} \]

需存储参数(param)、梯度的二阶矩估计(moment,维度和参数一致)以及学习率(lr,维度为1)。

若优化器为Adam,参数更新公式为:

\[ \begin{align}\begin{aligned}moment\_1 &= \beta_1 * moment\_1 + (1 - \beta_1) * grad\\moment\_2 &= \beta_2 * moment\_2 + (1 - \beta_2) * grad * grad\\{\beta_1}^t &= {\beta_1}^{t-1} * \beta_1\\{\beta_2}^t &= {\beta_2}^{t-1} * \beta_2\\lr &= lr * \frac{\sqrt{1 - {\beta_1}^t}}{1 - {\beta_2}^t}\\param &= param - lr * \frac{moment\_1}{\sqrt{moment\_2} + \epsilon}\end{aligned}\end{align} \]

需存储参数(param),梯度的一阶、二阶矩估计(moment_1, moment_2,维度和参数一致),一阶、二阶矩估计的指数衰减率的累积值(beta1pow, beta2pow, 维度均为1)以及学习率(lr,维度为1)。

稠密参数的存储格式为一个二维Vector数组,第一维大小为分配到当前PServer上的所有稠密参数元素个数和,第二维大小为ValueData的存储长度,如上文所讲,和优化算法相关,其中param大小为1。例如分配到当前PServer上的稠密参数元素个数为dense_numels_i,优化算法为sgd,则Vector的维度为[dense_numels_i,2]。

为了能提高并发处理能力,每个PServer上稀疏参数一般会进行分shard存储,每个id被分到哪个shard可直接通过 id % shard_num 计算得到。每个shard的存储格式为字典(Map),字典关键字(key)为id,值(value)为ValueData,如上文所讲,和优化算法相关,其中param的大小为emb_size。

通信设计

简介

神经网络的训练过程一般由三部分组成,前向计算、反向传播、参数更新。前向计算获取损失值 Loss,然后根据链式法则,反向计算得到每个参数的梯度,最后根据指定的优化算法,完成参数更新。

在多机参数服务器分布式训练中,存在两种不同角色的节点,WorkerPServer

  • Worker:负责完成数据读取、前向计算、反向梯度计算等过程,并将计算出的梯度上传至 PServer

  • PServer:在收到 Worker 传来的梯度后,根据指定的优化方法,更新参数并将参数发送给 Worker ,开始新一轮的训练。

为了减少通信请求、提高整体训练速度,Worker在将梯度发送给PServer前,往往会将本地的参数梯度进行批量融合(Merge),然后才发送给PServer。一般来说,梯度融合的数量为训练线程数。

本节主要对分布式训练任务中,Worker和PServer间的通信流程进行介绍。

原理

梯度融合及发送流程

上图展示了分布式训练过程中,Worker 任务的整个流程。可以发现,分布式训练中 Worker 任务相较于单机训练任务而言,主要有两点区别:

  1. 分布式训练网络相较于单机组网而言,删除了参数更新算子,增加了通信算子(Send OP)。

  2. 增加了 Communicator 通信组件,用以完成梯度融合、发送、接收等操作。

Communicator是分布式参数服务器框架中的通信组件,由多个参数的梯度队列和一个运行主线程组成。Worker通过前向、反向算子得到参数的梯度后,通过通信算子(Send OP)将梯度发送给Communicator中每个参数对应的梯度队列里。

Communictor在框架中以单例形式存在,在 fleet.init_worker() 中完成初始化和启动。启动后,Communicator的主线程会从每个参数的梯度队列中不停取出梯度直至满足融合条件后进行融合,发送至PServer,从PServer端获取最新的参数用于新的训练,并且重复上述一系列操作直至任务结束。梯度融合的条件有以下两个,满足任意一个即可:

  • 数量等于最多可允许的梯度融合数阈值。该阈值通过环境变量 FLAGS_communicator_max_merge_var_num 配置,默认为 CPU_NUM 环境变量指定的线程数,若 CPU_NUM 环境变量未定义,则默认为1。

  • 连续多次均未从参数对应的梯度队列里取出值,且尝试次数达到等待阈值。该阈值通过环境变量 FLAGS_communicator_send_wait_times 配置,默认为5。

在满足梯度融合条件后,只要待融合的梯度数量大于0,无论其是否等于 FLAGS_communicator_max_merge_var_num ,都会进行融合、发送及接收。需要注意的是,Communicator只会从PServer端接收稠密参数,稀疏参数的获取是通过前向算子 distributed_lookup_table 完成的。

性能优化

计算图拆分与优化

简介

计算图拆分目前仅支持Paddle静态图的参数服务器模式

参数服务器的分布式训练为一种常见的中心化共享参数的同步方式。与单机训练不同的是在参数服务器分布式训练中,各个节点充当着不同的角色:

  • 训练节点:该节点负责完成数据读取、前向计算、反向梯度计算等过程,并将计算出的梯度上传至服务节点。训练节点又可进一步分为同构的计算节点 Worker ,与异构的计算节点 Heter-Worker

  • 服务节点:在收到所有训练节点传来的梯度后,该节点会将梯度聚合并更新参数。最后将参数发送给训练节点,开始新一轮的训练。

根据参数更新的方式不同,可以分为同步/异步/Geo异步三种:

  • 同步训练:在同步参数服务器分布式训练中,所有训练节点的进度保持一致。每训练完一个Batch后,训练节点会上传梯度,然后开始等待服务节点返回更新后的参数。服务节点拿到所有训练节点上传的梯度后,才会对参数进行更新。因此,在任何一个时间点,所有训练节点都处于相同的训练阶段。

  • 异步训练:与同步训练不同,在异步训练中任何两个训练节点之间的参数更新都互不影响。每一个训练节点完成训练、上传梯度后,服务节点都会立即更新参数并将结果返回至相应的训练节点。拿到最新的参数后,该训练节点会立即开始新一轮的训练。

  • GEO异步训练:与前两种训练不同,GEO异步训练也是一种异步训练模式,每个训练节点本地都会拥有完整的训练流程(前向-反向-参数优化),训练过程中任何两个训练节点之间的参数更新都互不影响。每训练到一定的轮次(Step) 训练节点会将本地的参数计算一次差值(Step间隔带来的参数差值),将差值发送给服务端累计更新,并拿到最新的参数后,该训练节点会立即开始新一轮的训练。

根据训练节点所使用的设备,与整体架构的不同,可以分为 PS-CPU/PS-GPU/PS-Heter三种:

  • PS-CPUPServer 使用CPU集群机器,Worker 使用同构的CPU集群机器,组成训练拓扑

  • PS-GPUPServer 使用CPU集群机器,Worker 使用同构的GPU集群机器,组成训练拓扑

  • PS-HeterPServer 使用CPU集群机器,Worker 使用同构的CPU集群机器,Heter-Worker 使用异构的AI算力集群机器(GPU/Kunlun等),三者组成训练拓扑

本文将具体展开, 详细介绍各个角色的计算图拆分原理,及如何在此基础上进行性能/效果优化。

原理

参数服务器的计算图拆分,按照角色不同,计算图也有所不同。我们首先从单机的深度学习计算图开始上手:

深度学习网络中有两个基本元素:

  • Operator:op,组成计算的最小操作,比如加减/FC/Embedding查表

  • Variable:var,网络学习的参数,又会分为稠密参数和稀疏参数。

    • 稠密参数(Dense_Var)是指每个step都会更新的参数,比如FC的Weight参数。

    • 稀疏参数(Sparse_Var)是指每个step不是必须更新的参数,如Embedding参数,只会更新当前step中涉及到的key的相应value

单机的计算图如下所示:

  • 计算图拿到参数的值(Var)之后,会首先执行前向OP(FF OP),OP可能会执行在不同的设备上,如CPU/GPU/Kunlun 或其他AI芯片,我们用XPU代指。

  • 前向OP计算完成后,得到loss,会继续计算反向OP(BP OP),得到各个参数的梯度(Var_Grad)

  • 指定SGD/Adam等优化器,利用参数的梯度(Var_Grad)更新原始参数(Var)

  • 重复以上流程,迭代参数的更新,实现深度学习网络的训练

single_program

那么单机的计算图如何转换为参数服务器不同角色的计算图呢?代码又具体怎样实现呢?下面具体介绍。

计算图拆分——PServer

参数服务器模式下,所有参数的全局真值都被分片保存在各个Pserver上,PServer执行Optimizer OP,执行参数的更新。

不失一般性,以PS-CPU异步模式的计算图介绍PServer的计算图,如下图所示

cpu_program
  • Worker(Trainer)在计算得到参数的梯度(Var_Grad)后,会通过RPC发送给PServer

  • PServer监听通信端口,将收到的不同参数分别通过不同的Oprimizer OP完成更新

  • Worker在下一个迭代时,请求PServer上最新的参数

  • 重复以上流程,迭代参数的更新,实现分布式参数服务器的训练

通过上述流程,Pserver的计算图实现的功能主要分为三类:

  1. 执行Optimizer,进行参数更新的功能

  2. 接收Worker发送的梯度,触发Optimzer的功能

  3. 接收Worker发送的请求,发送指定参数的功能

功能2、3通过RPC Server即可实现,本节不再赘述

功能1有两种实现途径:a、使用Paddle组网,构成以optimizer OP; b、使用定制的数据结构及配套的优化算法实现,存储并更新参数,通常该方法用于大规模稀疏的场景。

在同步/异步模式的情况下

PServer将计算图按照上述规则进行生成,并根据训练需要,添加LearningRate Decay等操作组件。

在GeoSGD的情况下

参数的更新OP被放置在Worker上,PServer负责统筹全局参数:没有优化器OP,仅使用Sum等OP,利用各节点发送的参数diff,更新全局参数。更多详细介绍,可以参考文档 低频通信参数服务器训练算法

代码实现

PServer的计算图生成源代码位于 build_pserver_program

使用Fleet API时,参考以下python代码:server_demo.py

# server_demo.py
import random
import paddle
import paddle.distributed.fleet as fleet
import paddle.distributed.fleet.base.role_maker as role_maker

paddle.enable_static()

input_data = paddle.static.data(name="sparse_input", shape=[
    None, 1], dtype="int64")
input_label = paddle.static.data(
    name="label", shape=[None, 1], dtype="int64")
label = paddle.cast(input_label, dtype="int64")

embedding = paddle.static.nn.embedding(
    input_data, is_sparse=True, size=[1000, 128])

fc1 = paddle.static.nn.fc(embedding, size=1024, activation="relu")
fc2 = paddle.static.nn.fc(fc1, size=512, activation="relu")
fc3 = paddle.static.nn.fc(fc2, size=256, activation="relu")
predict = paddle.static.nn.fc(fc3, size=2, activation="softmax")
cost = paddle.nn.functional.cross_entropy(input=predict, label=label)

role = role_maker.PaddleCloudRoleMaker()
fleet.init(role)
strategy = fleet.DistributedStrategy()
strategy.a_sync = True
strategy.a_sync_configs = {"launch_barrier": False}

optimizer = paddle.optimizer.Adam(1e-4)
optimizer = fleet.distributed_optimizer(optimizer, strategy)
optimizer.minimize(cost)

if fleet.is_server():
    fleet.init_server()
export PSERVER_DEBUG=1
fleetrun --worker_num=1 --server_num=1 server_demo.py
cat log/serverlog.0

通过以上命令运行 server_demo.py 后,日志应包含以下的输出

server:
 server_param {
     downpour_server_param {
     service_param {server_class: "BrpcPsServer" client_class: "BrpcPsClient" service_class: "BrpcPsService" start_server_port: 0 server_thread_num: 12
     }
     downpour_table_param {table_id: 1 table_class: "CommonSparseTable" shard_num: 256 type: PS_SPARSE_TABLE
     accessor {accessor_class: "CommMergeAccessor" fea_dim: 1000 embedx_dim: 128

     }
     common {name: "adam" table_name: "embedding_0.w_0" trainer_num: 1 sync: false params: "Param" params: "Moment1" params: "Moment2" params: "Beta1Pow" params: "Beta2Pow" params: "LearningRate" dims: 128 dims: 128 dims: 128 dims: 1 dims: 1 dims: 1 initializers: "uniform_random&0&-0.0729324966669&0.0729324966669" initializers: "fill_constant&0.0" initializers: "fill_constant&0.0" initializers: "fill_constant&0.899999976158" initializers: "fill_constant&0.999000012875" initializers: "fill_constant&9.99999974738e-05"

     }

     }
     downpour_table_param {table_id: 0 table_class: "CommonDenseTable" shard_num: 256 type: PS_DENSE_TABLE
     accessor {accessor_class: "CommMergeAccessor" fea_dim: 788738 embedx_dim: 1

     }
     common {name: "adam" table_name: "MergedDense" trainer_num: 1 sync: false params: "Param" params: "Moment1" params: "Moment2" params: "Beta1Pow" params: "Beta2Pow" params: "LearningRate" dims: 788738 dims: 788738 dims: 788738 dims: 1 dims: 1 dims: 1 initializers: "fill_constant&0.0" initializers: "fill_constant&0.0" initializers: "fill_constant&0.0" initializers: "fill_constant&0.899999976158" initializers: "fill_constant&0.999000012875" initializers: "fill_constant&9.99999974738e-05"

     }

     }
     downpour_table_param {table_id: 2 table_class: "BarrierTable" shard_num: 256 type: PS_OTHER_TABLE
     accessor {accessor_class: "CommMergeAccessor" fea_dim: 0 embedx_dim: 0

     }
     common {name: "" table_name: "barrier_table" trainer_num: 1 sync: false

     }

     }
     }
 }

以上是server计算图的配置信息,可以看到一共有3个数据表:

  • 0号表存储了Dense参数,维度是组网中所有FC层weight和b参数的累和,更新方式是adam

  • 1号表存储了Sparse参数,保存的参数是embedding_0.w_0, 维度是[1000, 128],更新方式是adam

  • 2号表是控制各个节点间初始化的同步

计算图拆分——Worker

参数服务器模式下,训练过程的数据读取,前向计算,反向计算在Worker上执行。

不失一般性,以PS-CPU异步模式的计算图介绍Worker的计算图,如下图所示

cpu_program
  • Worker读取当前批次的训练数据

  • 进行前向OP的计算,得到Loss

  • 基于Loss,进行反向OP的计算,得到各个参数的梯度

  • 发送(Send)参数的梯度给PServer

  • 接收(Recv)更新后的参数

  • 重复以上流程,迭代训练数据,实现分布式参数服务器的训练

通过上述流程,Wokre的计算图与单机的计算图区别为:

  • 去除了Optimzier OP

  • 在Optimizer OP原来的位置前, 添加了Send OP

  • 在Optimizer OP原来的位置后, 添加了Recv OP

在同步/异步模式的情况下

Worker的计算图按照上述规则进行生成,并根据训练需要,添加Clip等操作组件。

目前Paddle的实现中,通信流程使用单例 Communicator 实现,全异步进行训练与通信,因此计算图中仅在最后有Send OP,作用是触发Communicator

在GeoSGD的情况下

Worker实现参数更新的全流程,通过Send OP 触发 GeoCommunicator,计算并发送本地与全局参数的diff,更多详细介绍,可以参考文档 低频通信参数服务器训练算法

代码实现

Worker的计算图生成源代码位于 build_trainer_program

使用Fleet API时,参考以下python代码:worker_demo.py

# worker_demo.py
import random
import paddle
import paddle.distributed.fleet as fleet
import paddle.distributed.fleet.base.role_maker as role_maker

paddle.enable_static()

input_data = paddle.static.data(name="sparse_input", shape=[
    None, 1], dtype="int64")
input_label = paddle.static.data(
    name="label", shape=[None, 1], dtype="int64")
label = paddle.cast(input_label, dtype="int64")

embedding = paddle.static.nn.embedding(
    input_data, is_sparse=True, size=[1000, 128])

fc1 = paddle.static.nn.fc(embedding, size=1024, activation="relu")
fc2 = paddle.static.nn.fc(fc1, size=512, activation="relu")
fc3 = paddle.static.nn.fc(fc2, size=256, activation="relu")
predict = paddle.static.nn.fc(fc3, size=2, activation="softmax")
cost = paddle.nn.functional.cross_entropy(input=predict, label=label)

role = role_maker.PaddleCloudRoleMaker()
fleet.init(role)
strategy = fleet.DistributedStrategy()
strategy.a_sync = True
strategy.a_sync_configs = {"launch_barrier": False}

optimizer = paddle.optimizer.Adam(1e-4)
optimizer = fleet.distributed_optimizer(optimizer, strategy)
optimizer.minimize(cost)

if fleet.is_worker():
    print("worker_main_program: {}".format(
        paddle.static.default_main_program()))
fleetrun --worker_num=1 --server_num=1 worker_demo.py
cat log/workerlog.0

通过以上命令运行 worker_demo.py 后,可以打印worker的全部计算图,发现其中并没有Adam相关OP,并且计算图最后是Send OP

{Out=[]} = send(inputs={X=[u'embedding_0.w_0@GRAD']}, is_sparse = 1, op_device = , op_namescope = /, op_role = 4, op_role_var = [], send_varnames = [u'embedding_0.w_0@GRAD'], table_id = 1)
{Out=[]} = send(inputs={X=[u'fc_0.b_0@GRAD', u'fc_0.w_0@GRAD', u'fc_1.b_0@GRAD', u'fc_1.w_0@GRAD', u'fc_2.b_0@GRAD', u'fc_2.w_0@GRAD', u'fc_3.b_0@GRAD', u'fc_3.w_0@GRAD']}, is_sparse = 0, op_device = , op_namescope = /, op_role = 4, op_role_var = [], send_varnames = [u'Dense@Grad'], table_id = 0)
计算图拆分——Heter-Worker

异构参数服务器模式下,训练过程的前向计算,反向计算中的一部分在Heter-Worker上执行。

heter_program
  • Worker(Trainer)读取当前批次的训练数据

  • Worker计算前置的在CPU上的前向OP

  • Woker将前向OP计算结果发给Heter-Woker

  • Heter-Worker计算在xPU上的前向OP,得到loss

  • Heter-Worker计算在xPU上的反向OP,得到相关参数梯度

  • Heter-Worker将部分梯度发送回Worker

  • Woker计算在CPU上的反向OP,得到相关参数梯度

  • Woker与Heter-Worker发送(Send)持有的参数的梯度给PServer

  • Worker与Heter-Woker接收(Recv)更新后的参数

  • 重复以上流程,迭代训练数据,实现分布式参数服务器的训练

通过上述流程,Heter-Worker实现的主要功能是:

  • 与Worker通信,接收并发送指定参数

  • 与PServer通信,发送梯度,接收更新

  • 执行前向/反向 OP的运行

Heter-Woker的计算图由一个或多个异构Block构成,每个Block为一段连续的OP的组合,对应着全局计算图中的一部分。

一个异构block的运行,必然需要依赖前置Variable的产出,同时向后传递本Block生成的,在后续计算所需要的Variable

heter_block

在Heter-PS模式中,Worker使用send_and_recv OP来触发Hetet-Worker上的异构block的运行,Worker向Heter-Worker发送Entrance Variable,同时等待接收Exit Varibale,实现整个计算图流程的通路。

Heter-PS目前仅支持Async模式

代码实现

Heter-Worker的计算图生成源代码位于 build_trainer_program

使用Fleet API时,参考以下python代码:heter_demo.py(需要安装GPU版本的PaddlePaddle):

# heter_demo.py
import random
import paddle
import paddle.distributed.fleet as fleet
import paddle.distributed.fleet.base.role_maker as role_maker

paddle.enable_static()

with paddle.static.device_guard("cpu"):
    input_data = paddle.static.data(name="sparse_input", shape=[
        None, 1], dtype="int64")
    input_label = paddle.static.data(
        name="label", shape=[None, 1], dtype="int64")
    label = paddle.cast(input_label, dtype="int64")
    embedding = paddle.static.nn.embedding(
        input_data, is_sparse=True, size=[1000, 128])

with paddle.static.device_guard("gpu"):
    fc1 = paddle.static.nn.fc(embedding, size=1024, activation="relu")
    fc2 = paddle.static.nn.fc(fc1, size=512, activation="relu")
    fc3 = paddle.static.nn.fc(fc2, size=256, activation="relu")
    predict = paddle.static.nn.fc(fc3, size=2, activation="softmax")
    cost = paddle.nn.functional.cross_entropy(input=predict, label=label)

role = role_maker.PaddleCloudRoleMaker()
fleet.init(role)
strategy = fleet.DistributedStrategy()
strategy.a_sync = True
strategy.a_sync_configs = {"heter_worker_device_guard": "gpu", "launch_barrier": False}

optimizer = paddle.optimizer.Adam(1e-4)
optimizer = fleet.distributed_optimizer(optimizer, strategy)
optimizer.minimize(cost)

if fleet.is_server():
    if role._is_heter_worker():
        print("heter_main_program: {}".format(
            paddle.static.default_main_program()))
fleetrun --worker_num=1 --server_num=1 --heter_worker_num=1 heter_demo.py
cat log/heterlog.0

通过以上命令运行 heter_demo.py 后,可以打印heter-worker的全部计算图,可以发现计算图中包含了一个异构block,该异构Block的起始OP是从FC的mul操作开始的,在算出embedding的梯度后,以Send OP结束。若同时打印worker的计算图,会观察到原来的FC层,被send_and_recv OP替代。

优化
触发稀疏参数更新

稠密参数会默认打包为一个大参数后,分片放到各个PServer上

稀疏参数会被均分到各个PServer上

稀疏参数的保存和更新目前可以通过以下OP触发,它们都实现了远程查寻embedding稀疏表的功能,区别在于输入与输出的维度,功能是一致的,在PS模式时,经过图编译,以下OP都会被等价替换成 distributed_lookup_table OP:

  • paddle.nn.Embedding

    import paddle
    paddle.enable_static()
    
    # sparse=True, 触发参数的稀疏化,加快训练和通信速度
    embedding = paddle.nn.Embedding(
                input=x,
                size=[id_num, id_value_shape],
                sparse=True)
    
  • paddle.static.nn.embedding

    import paddle
    paddle.enable_static()
    
    # is_sparse=True, 触发参数的稀疏化,加快训练和通信速度
    embedding = paddle.static.nn.embedding(
                input=x,
                size=[id_num, id_value_shape],
                is_sparse=True)
    
  • paddle.fluid.layers.embedding

    import paddle
    paddle.enable_static()
    
    # is_sparse=True, 触发参数的稀疏化,加快训练和通信速度
    embedding = paddle.fluid.layers.embedding(
                input=x,
                size=[id_num, id_value_shape],
                is_sparse=True)
    
  • paddle.fluid.contrib.sparse_embedding

    import paddle
    paddle.enable_static()
    
    # sparse_embedding 触发emb的大规模稀疏
    embedding = paddle.fluid.contrib.sparse_embedding(
                input=x,
                size=[id_num, id_value_shape])
    

使用InMemoryDataset/QueueDataset进行训练

注意

本教程目前不支持动态图,仅支持在paddle静态图模式下使用,paddle开启静态图模式

paddle.enable_static()
简介

为了能高速运行模型的训练,我们使用InMemoryDataset/QueueDatasetAPI进行高性能的IO,具体介绍可以参考文档InMemoryDatasetQueueDataset, 以下简称Dataset。Dataset是为多线程及全异步方式量身打造的数据读取方式,每个数据读取线程会与一个训练线程耦合,形成了多生产者-多消费者的模式,会极大的加速我们的模型训练。

本文以训练wide&deep模型为例,在训练中引入基于Dataset 以下是使用Dataset接口一个比较完整的流程:

引入dataset
  1. 通过dataset = paddle.distributed.InMemoryDataset() 或者 dataset = paddle.distributed.QueueDataset()创建一个Dataset对象

  2. 指定dataset读取的训练文件的列表, 通过set_filelist配置。

  3. 通过dataset.init() api 进行Dataset的初始化配置,init()接口接收**kwargs参数, 详见api文档,列举几个配置的初始化

    1. 将我们定义好的数据输入格式传给Dataset, 通过use_var配置。

    2. 指定我们的数据读取方式,由my_data_generator.py实现数据读取的规则,后面将会介绍读取规则的实现, 通过pipe_command配置。pipe_command是Dataset特有的通过管道来读取训练样本的方式,通过set_filelist设置的训练样本文件将被作为管道的输入cat到管道中经过用户自定义的pipe_command最终输出。

    3. 指定数据读取的batch_size,通过batch_size配置。

    4. 指定数据读取的线程数,一般该线程数和训练线程应保持一致,两者为耦合的关系,通过thread_num配置。

dataset = paddle.distributed.InMemoryDataset()
batch_size = config.config["batch_size"]
thread_num = config.config["thread_num"]
dataset.init(use_var=model.inputs, pipe_command="python reader.py", batch_size=batch_size, thread_num=thread_num)
dataset.set_filelist([config.config["train_files_path"]])
如何指定数据读取规则

在上文我们提到了由my_data_generator.py实现具体的数据管道读取规则,那么,怎样为dataset创建数据读取的规则呢? 以下是reader.py的全部代码,具体流程如下: 1. 首先我们需要引入data_generator的类,位于paddle.distributed.fleet.data_generator。 2. 声明一些在数据读取中会用到的类和库。 3. 创建一个子类WideDeepDatasetReader,继承fleet.data_generator的基类,基类有多种选择,如果是多种数据类型混合,并且需要转化为数值进行预处理的,建议使用MultiSlotDataGenerator;若已经完成了预处理并保存为数据文件,可以直接以string的方式进行读取,使用MultiSlotStringDataGenerator,能够进一步加速。在示例代码,我们继承并实现了名为Word2VecReader的data_generator子类,使用MultiSlotDataGenerator方法。 4. 继承并实现基类中的generate_sample函数,逐行读取数据。该函数应返回一个可以迭代的reader方法(带有yield的函数不再是一个普通的函数,而是一个生成器generator,成为了可以迭代的对象,等价于一个数组、链表、文件、字符串etc.) 5. 在这个可以迭代的函数中,如示例代码中的def wd_reader(),我们定义数据读取的逻辑。例如对以行为单位的数据进行截取,转换及预处理。

  1. 最后,我们需要将数据整理为特定的batch的格式,才能够被dataset正确读取,并灌入的训练的网络中。使用基类中的generate_batch函数, 我们无需再做声明 根据设定的’batch_size’, 该函数会在generator_sample函数产生样本数达到batch_size时,调用该函数内队逐条样本的处理逻辑,如示例代码中的def local_iter()

  2. 简单来说,数据的输出顺序与我们在网络中创建的inputs必须是严格一一对应的,并转换为类似字典的形式。在示例代码中,我们将参数名与数值构成的元组组成了一个list,并将其yield输出。如果展开来看,我们输出的数据形如[('dense_input',[value]),('C1',[value]),......('label',[value])]

import paddle
import paddle.distributed.fleet as fleet
import os
import sys

cont_min_ = [0, -3, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
cont_max_ = [20, 600, 100, 50, 64000, 500, 100, 50, 500, 10, 10, 10, 50]
cont_diff_ = [20, 603, 100, 50, 64000, 500, 100, 50, 500, 10, 10, 10, 50]
hash_dim_ = 1000001
continuous_range_ = range(1, 14)
categorical_range_ = range(14, 40)


class WideDeepDatasetReader(fleet.MultiSlotDataGenerator):

    def line_process(self, line):
        features = line.rstrip('\n').split('\t')
        dense_feature = []
        sparse_feature = []
        for idx in continuous_range_:
            if features[idx] == "":
                dense_feature.append(0.0)
            else:
                dense_feature.append(
                    (float(features[idx]) - cont_min_[idx - 1]) / cont_diff_[idx - 1])
        for idx in categorical_range_:
            sparse_feature.append(
                [hash(str(idx) + features[idx]) % hash_dim_])
        label = [int(features[0])]
        return [dense_feature]+sparse_feature+[label]

    def generate_sample(self, line):
        def wd_reader():
            input_data = self.line_process(line)
            feature_name = ["dense_input"]
            for idx in categorical_range_:
                feature_name.append("C" + str(idx - 13))
            feature_name.append("label")
            yield zip(feature_name, input_data)

        return wd_reader

if __name__ == "__main__":
    my_data_generator = WideDeepDatasetReader()
    my_data_generator.set_batch(16)

    my_data_generator.run_from_stdin()
快速调试Dataset

我们可以脱离组网架构,单独验证Dataset的输出是否符合我们预期。使用命令 cat 数据文件 | python dataset读取python文件进行dataset代码的调试:

cat data/part-0 | python reader.py

输出的数据格式如下: 13 0.0 0.00663349917081 0.01 0.0 0.0423125 0.054 0.12 0.0 0.074 0.0 0.4 0.0 0.0 1 371155 1 846239 1 204942 1 600511 1 515218 1 906818 1 369888 1 507110 1 27346 1 698085 1 348211 1 170408 1 597913 1 255651 1 415979 1 186815 1 342789 1 994402 1 880474 1 984402 1 208306 1 26235 1 410878 1 701750 1 934391 1 552857 1 1

理想的输出为(截取了一个片段):

...
13 0.0 0.00663349917081 0.01 0.0 0.0423125 0.054 0.12 0.0 0.074 0.0 0.4 0.0 0.0 1 371155 1 846239 1 204942 1 600511 1 515218 1 906818 1 369888 1 507110 1 27346 1 698085 1 348211 1 170408 1 597913 1 255651 1 415979 1 186815 1 342789 1 994402 1 880474 1 984402 1 208306 1 26235 1 410878 1 701750 1 934391 1 552857 1 1
...

使用Dataset的一些注意事项 - Dataset的基本原理:将数据print到缓存,再由C++端的代码实现读取,因此,我们不能在dataset的读取代码中,加入与数据读取无关的print信息,会导致C++端拿到错误的数据信息。 - dataset目前只支持在unbuntuCentOS等标准Linux环境下使用,在WindowsMac下使用时,会产生预料之外的错误,请知悉。

数据准备

完整数据下载以及预处理之后可以选取一个part的文件作为demo数据保存在data目录下

训练
import paddle
import paddle.distributed.fleet as fleet
import config
# 开启paddle静态图模式
paddle.enable_static()

fleet.init()

model = X.applications.Word2vec()

"""
need config loader correctly.
"""

loader = model.load_dataset_from_file(train_files_path=[config.config["train_files_path"]], dict_path=config.config["dict_path"])

strategy = fleet.DistributedStrategy()
strategy.a_sync = True
optimizer = fleet.distributed_optimizer(optimizer, strategy)

optimizer.minimize(model.cost)

if fleet.is_server():
    fleet.init_server()
    fleet.run_server()

if fleet.is_worker():
    place = paddle.CPUPlace()
    exe = paddle.static.Executor(place)

    exe.run(paddle.static.default_startup_program())

    fleet.init_worker()

    distributed_training(exe, model)
    clear_metric_state(model, place)

    fleet.stop_worker()

完整示例代码可以参考 FleetX/examples/wide_and_deep_dataset 目录

通过以上简洁的代码,即可以实现wide&deep模型的多线程并发训练

低频通信参数服务器训练算法

简介

众所周知,在同步/异步参数服务器分布式训练中Worker每训练完一个周期,都会将梯度上传至PServer,等待PServer分发最新的参数后才开始新一轮的训练。在这种训练方式中,节点间的通信会消耗大量的时间成本,进而影响训练的效率。

为了降低节点见通信对训练速度的影响,Fleet提供了一种更高效的参数更新策略:GeoSGD

原理
geosgd

在GeoSGD更新策略中,Worker的参数更新也是在全异步的条件下进行的。但与异步参数服务器有以下不同:

  • 与普通的参数服务器不同,在GEO策略中,每个Worker负责在本地维护自己的参数更新,在训练一定数量的步数后将本轮训练出的参数与上一轮结束后的参数做差。并除以Worker的个数,将结果上传至PServer。PServer则负责为每个Worker计算其参数与全局参数的diff。

  • GEO更新策略会在训练过程中启动多个进程,负责参数更新及节点通信。在Worker与PServer的整个交互过程中,主进程会保持模型的训练,由子进程负责与PServer进行交互,在拿到与全局参数的diff后将其更新至主进程。

GEO策略通过模型训练与节点通信同步进行的方式,在保证模型效果的前提下,大大提升了训练的速度。在Word2Vec模型上测试发现,GEO策略相比异步参数服务器,训练速度提高了3倍多。

使用方法
添加依赖

首先我们需要添加训练中所用到的python模块paddlepaddle.distributed.fleet,后者主要提供分布式相关的接口和策略配置。

目前Paddle默认为动态图运行模式,分布式参数服务器训练当前仅支持在静态图模式下运行,所以需要自行打开静态图开关。

import paddle
import paddle.distributed.fleet as fleet
paddle.enable_static()
定义模型组网

在这个例子中我们使用Wide&Deep模型。

model = WideDeepModel()
model.net(is_train=True)
初始化分布式训练环境

多机参数服务器均是通过fleet.init()接口初始化分布式训练环境,用户可通过传入 role_maker 进行相关配置,若为None,则框架会自动根据用户在环境变量中的配置进行分布式训练环境的初始化。

fleet.init(role_maker=None)
配置GEO策略及优化算法

在Fleet API中,用户可以使用fleet.DistributedStrategy()接口定义自己想要使用的分布式策略。

想要使用GEO策略,用户首先需要打开异步参数服务器开关,即设置a_sync为True。

然后用户需要通过dist_strategy.a_sync_configs设置Worker上传参数的频率,下面的代码中我们设置Worker每训练400个Batch后与PServer进行交互。

dist_strategy = fleet.DistributedStrategy()
dist_strategy.a_sync = True
dist_strategy.a_sync_configs = {"k_steps": 400}

optimizer = paddle.optimizer.SGD(learning_rate=0.0001)

optimizer = fleet.distributed_optimizer(optimizer, dist_strategy)
optimizer.minimize(model.cost)
开始训练

GEO策略的训练代码沿用了参数服务器分布式训练的形式。

对于PServer节点,首先用init_server()接口对其进行初始化,然后启动服务并开始监听由训练节点传来的参数变化值。

同样对于训练节点,用init_worker()接口进行初始化后,开始执行训练任务。

if fleet.is_server():
    fleet.init_server()
    fleet.run_server()
else:
    exe.run(paddle.static.default_startup_program())
    fleet.init_worker()

    # do training
    distributed_training(exe, model)
运行方法

完整运行示例见 examples/wide_and_deep, 需注意,该示例指定的分布式训练模式为异步,可参考GEO模式策略配置方法,将任务运行模式变为GEO模式。

配置完成后,通过fleetrun指令运行分布式任务。命令示例如下,其中server_num, worker_num分别为服务节点和训练节点的数量。

fleetrun --server_num=2 --worker_num=2 train.py

增量训练

简介

增量训练是一种常见的机器学习方法,在深度学习领域内有广泛的应用,它代表的是一种动态学习的训练方式,即训练数据随着时间的推移源源不断的加入到当前训练流程中,扩展当前模型的知识和能力。

飞桨的参数服务器训练支持增量训练,支持训练在某一时间点进行训练模型参数(含部分优化器的状态)的全量保存,在重启训练时将之前保存的全量参数进行加载,结合新的训练数据继续训练,从而学习到新的有用信息。

原理介绍

飞桨参数服务器增量训练包含两部分内容,即模型保存和模型加载。训练节点分为PServer和Worker两种,每个Worker上都有完整的稠密参数,没有稀疏参数。稀疏参数和稠密参数分布于全部的PServer节点上。

飞桨模型参数在参数服务器下分为稠密参数和稀疏参数两种, 在调用模型保存的接口后,会分别在PServer端和0号Worker端进行参数的保存,其中0号Worker端将保存全部的稠密参数及相关的状态,每个PServer将以分片的形式保存位于该PServer端的稀疏参数。

飞桨模型参数在参数服务器下分为稠密参数和稀疏参数两种, 需要分别在PServer端和0号Worker端进行加载才能完成对参数的加载。

训练启动时每个PServer的基本初始流程如下:

  • 每个节点执行 fleet.init_server(dirname=None, var_names=None, **kwargs) 进行PServer端初始化,分配到此节点的稠密参数会按照定义的形状和初始化方法进行初始化, 稀疏参数则只预定义出初始化方法,稀疏参数会在训练过程中根据前向通信算子发送过来的ID进行实时初始化。 init_server用有两个选配参数,分别是 dirname`和`var_names,`dirname`表示需要增量加载的模型路径,两个选配参数相互配合实现稀疏参数的加载,注意, 如果只指定 dirname, 则表示会从指定的目录中加载全部的稀疏参数, 如果还指定了`var_names`,则表示加载指定参数名的稀疏参数。 注意,init_server 只会加载稀疏参数,稠密参数的加载在Worker端进行。

  • 每个节点执行 fleet.run_server() 表明当前节点已经初始化成功,可以支持Worker端的连接和通信。

训练启动时每个Worker的基本初始流程如下:

  • 每个节点执行 exe.run(paddle.static.default_startup_program()) 进行参数初始化。

  • 0号节点执行 paddle.fluid.io.load_vars() 指定要加载的稠密参数的名字列表和模型目录,将稠密参数通过此方式进行加载。

  • 每个节点执行 fleet.init_worker() , 其中0号节点的稠密参数将同步给相应的PServer,其他节点(非0号)会从PServer端将稠密参数取回本地赋值给本地的稠密参数。

至此,完成了整个训练开始前,PServer和Worker中稠密参数和稀疏参数的加载和同步。

功能效果

  • 训练开始时,使用上述方法,可实现模型参数的全量加载。

  • 训练结束是,使用上述方法,可实现模型参数的全量保存。

使用方法

模型保存:

# 在需要保存模型的地方,执行下面的命令,即可完成模型中全量参数的保存
# 其中, 稠密参数会被保存在0号Worker上, 稀疏参数会被保存在每个PServer上的同名路径下

dirname = "/you/path/to/model"

if fleet.is_first_worker():
    fleet.save_persistables(dirname)

模型加载:

# 模型加载需要区分是PServer还是Worker
dirname = "/you/path/to/model"

if fleet.is_server():
    var_names = None
    fleet.init_server(dirname, var_names)
    fleet.run_server()

if fleet.is_worker():
    place = paddle.CPUPlace()
    exe = paddle.static.Executor(place)

    exe.run(paddle.static.default_startup_program())
    var_names = ["w", "b"]
    fluid.io.load_vars(executor=exe, dirname=path, vars=var_names)
    fleet.init_worker()

运行成功提示

  1. 模型加载当前并没有提示

  2. 模型保存成功,会在相应的目录保存下模型文件, 稀疏参数会被保存在每个PServer上的同名路径下。

常见问题与注意事项

  • 节点动态调整

  • 训练节点在发生变化的情况下, 稀疏参数需要做一次重新分布分布以满足新的加载需求。

  • 当前框架并没有提供此稀疏参数重分布脚本,目前需要用户自行编写。

  • 加载指定稠密参数

  • 用户可以选择性的加载所需的稠密参数,具体是在 0号 Worker 执行 `fluid.io.load_vars`时 ,指定的 vars的列表来控制。

  • 加载指定稀疏参数

  • 用户可以选择性的加载指定的稀疏参数,具体是在PServer执行`init_server`时,指定`var_names`的列表,通过此列表来控制加载的参数名单。

论文/引用

[略]

流式训练

简介

飞桨参数服务器训练支持流式训练模式,支持配置千亿级大规模稀疏及[0, INT64]范围内的ID映射,支持模型自增长及配置特征准入(不存在的特征可以以适当的条件创建)、淘汰(够以一定的策略进行过期的特征的清理)等策略,支持模型增量保存,通过多种优化来保证流式训练的流程及效果。

原理介绍

流式训练(OnlineLearning), 即训练数据不是一次性放入训练系统,而是随着时间流式的加入到训练过程中去。 整个训练服务不停止,数据经过预处理后进入训练系统参与训练并产出线上所需的预测模型参数。通过流式数据的生产、实时训练及快速部署上线来提升推荐系统的性能和效果。流式训练是按照一定顺序进行数据的接收和处理,每接收一个数据,模型会对它进行预测并对当前模型进行更新,然后处理下一个数据。 像信息流、小视频、电商等场景,每天都会新增大量的数据, 让每天(每一刻)新增的数据基于上一天(上一刻)的模型进行新的预测和模型更新。

功能效果

通过合理配置,可实现大规模流式训练,提升推荐系统的性能和效果。

本文中涉及到的相关功能和使用示例: 1. 使用大规模稀疏的算子进行组网 2. 配置准入策略 3. 配置模型保存及增量保存

使用方法

流式训练是个上下游牵涉众多的训练方法,本文只贴出训练相关的配置给用户做一个讲解,具体使用需要结合实际情况进行代码的伪代码:

# 初始化分布式环境
fleet.init()

# your real net function
model = net()

# 使用参数服务器异步训练模式
strategy = paddle.distributed.fleet.DistributedStrategy()
strategy.a_sync = True

# 分布式训练图优化
adam = paddle.fluid.optimizer.Adam(learning_rate=5e-06)
adam = fleet.distributed_optimizer(adam, strategy=strategy)
adam.minimize(model.avg_cost)

# 启动PServer
if fleet.is_server():
    fleet.init_server()
    fleet.run_server()

if fleet.is_worker():
    # 初始化Worker
    exe.run(paddle.static.default_startup_program())
    fleet.init_worker()

    while True:

        # 持续不断的从`get_ready_training_set`获取可训练的书记集和相关的配置
        # 下面是一个按小时训练的例子
        dataset, hour, day = get_ready_training_dataset()

        if dataset is None:
            break

        # 使用`dataset`中的数据进行训练和模型保存
        exe.train_from_dataset(program=paddle.static.default_main_program(),
                               dataset=dataset,
                               fetch_list=[model.auc],
                               fetch_info=["avg_auc"],
                               print_period=10)

        # 0号保存模型即可,每天第0个小时进行全量保存, 剩余时间进行增量保存
        if fleet.is_first_worker():
            mode = 1 if hour == 0 else 2
            fleet.save_persistables(exe, "output/epoch_{}".format(day), mode)

    fleet.stop_worker()

运行成功提示

[略]

常见问题与注意事项

  1. 训练过程中,如需使用分布式指标,请参考<分布式指标章节>。

  2. 如果训练中途中断,需要加载模型后继续训练,请参考<增量训练章节>

论文/引用

[略]

分布式指标

简介

分布式指标是指在分布式训练任务中用以评测模型效果的指标。它和单机模型评测指标的区别在于,单机指标仅评测当前节点的测试数据,而分布式指标需评测所有节点的全量测试数据。

原理

分布式指标的计算一般包含三步,下面我们以分布式准确率为例介绍整个过程。

  1. 初始化分布式训练环境

    import paddle.distributed.fleet as fleet
    fleet.init()
    
  2. 定义指标计算需要的所有中间状态统计值,每个训练节点统计各自的状态值。准确率计算需要样本总数和正确分类的样本数两个统计值。

    ...
    pred, label = model()
    
    # 1. 定义中间状态统计值,样本总数和正确分类的样本数
    correct_cnt = paddle.static.create_global_var(name="right_cnt", persistable=True, dtype='float32', shape=[1], value=0)
    total_cnt = paddle.static.create_global_var(name="total_cnt", persistable=True, dtype='float32', shape=[1], value=0)
    
    # 2. 训练节点自己的状态统计
    batch_cnt = paddle.sum(
        paddle.full(shape=[paddle.shape(label)[0], 1], fill_value=1.0))
    batch_accuracy = paddle.static.accuracy(input=pred, label=label)
    batch_correct = batch_cnt * batch_accuracy
    
    paddle.assign(correct_cnt + batch_correct, correct_cnt)
    paddle.assign(total_cnt + batch_cnt, total_cnt)
    accuracy = correct_cnt / total_cnt
    
  3. 所有训练节点间进行 all_reduce 操作,获取全局统计值,然后根据指标计算公式,计算全局指标。

    global_cnt = fleet.metrics.sum(total_cnt)
    global_correct = fleet.metrics.sum(corrent_cnt)
    global_accuracy = float(global_correct) / float(global_cnt)
    

分布式指标

为方便使用,Paddle在 paddle.distributed.metrics 下将常见的一些指标计算进行了封装,下面对这些API的功能及参数进行说明,并提供用法示例。

分布式AUC
paddle.distributed.fleet.metrics.auc(stat_pos, stat_neg, scope=None, util=None)

分布式AUC(Area Under the Curve)。AUC 是一个二分类任务中常用的效果评价指标,指ROC曲线和横坐标轴之间的面积,该值介于0~1之间,越大代表分类器效果越好。

参数:

  • stat_pos, (numpy.array|Tensor|string, required): 单机正样例中间统计结果,即单机 paddle.static.aucstat_pos 输出。

  • stat_neg, (numpy.array|Tensor|string, required): 单机负样例中间统计结果,即单机 paddle.static.aucstat_neg 输出。

  • scope, (Scope, optional),作用域,若为None,则使用全局/默认作用域,默认为None。

  • util, (UtilBase, optinal),分布式训练工具类,若为None,则使用默认工具类 fleet.util, 默认为None。

用法示例:

...
pred, label = model()

# 1. 单机组网阶段,计算正负样例中间统计结果。
auc, batch_auc, [batch_stat_pos, batch_stat_neg, stat_pos, stat_neg] = \
    paddle.static.auc(input=pred, label=label)

# 2. 分布式训练阶段,计算全局AUC。
global_auc = fleet.metrics.auc(stat_pos, stat_neg)
分布式Accuracy
paddle.distributed.fleet.metrics.acc(correct, total, scope=None, util=None)

分布式准确率。准确率(Accuracy)是分类任务中常用的一个效果评价指标。通过比对预测标签和实际标签是否一致,从而计算模型的分类效果,公式如下:

\[accuracy = \frac{correct}{total}\]

其中,correct 是预测标签等于真实标签的样本总数,total 是全部样本总数。

参数:

  • correct, (numpy.array|Tensor|string, required): 单机预测标签等于真实标签的样本总数。

  • total, (numpy.array|Tensor|string, required): 单机样本总数。

  • scope, (Scope, optional),作用域,若为None,则使用全局/默认作用域,默认为None。

  • util, (UtilBase, optinal),分布式训练工具类,若为None,则使用默认工具类 fleet.util, 默认为None。

用法示例:

...
pred, label = model()

# 1. 单机组网阶段,计算样本总数和预测正确的样本数
correct_cnt = paddle.static.create_global_var(name="right_cnt", persistable=True, dtype='float32', shape=[1], value=0)
total_cnt = paddle.static.create_global_var(name="total_cnt", persistable=True, dtype='float32', shape=[1], value=0)

batch_cnt = paddle.sum(
    paddle.full(shape=[paddle.shape(label)[0], 1], fill_value=1.0))
batch_accuracy = paddle.static.accuracy(input=pred, label=label)
batch_correct = batch_cnt * batch_accuracy

paddle.assign(correct_cnt + batch_correct, correct_cnt)
paddle.assign(total_cnt + batch_cnt, total_cnt)
accuracy = correct_cnt / total_cnt

# 2. 分布式训练阶段,计算全局准确率。
global_accuracy = fleet.metrics.acc(correct_cnt, total_cnt)
分布式MAE
paddle.distributed.fleet.metrics.mae(abserr, total_ins_num, scope=None, util=None)

分布式平均绝对误差(Mean Absolute Error)。平均绝对误差是绝对误差的平均值,一般用于计算 loss 损失值。

\[ \begin{align}\begin{aligned}abserr &= \sum |input - label|\\mae &= \frac{abserr}{total\_ins\_num}\end{aligned}\end{align} \]

其中,input 是样本预测结果, label 是样本真实标签,abserr 为绝对误差和,total_ins_num 是样本总数。

参数:

  • abserr, (numpy.array|Tensor|string, required): 单机绝对误差和统计值。

  • total_ins_num, (numpy.array|Tensor|string, required): 单机样本总数。

  • scope, (Scope, optional),作用域,若为None,则使用全局/默认作用域,默认为None。

  • util, (UtilBase, optinal),分布式训练工具类,若为None,则使用默认工具类 fleet.util, 默认为None。

用法示例:

...
pred, label = model()

# 1. 单机组网阶段,计算绝对误差和样本总数
abserr = paddle.static.create_global_var(name="abserr", persistable=True, dtype='float32', shape=[1], value=0)
total_cnt = paddle.static.create_global_var(name="total_cnt", persistable=True, dtype='float32', shape=[1], value=0)

batch_cnt = paddle.sum(
    paddle.full(shape=[paddle.shape(label)[0], 1], fill_value=1.0))
batch_abserr = paddle.nn.functional.l1_loss(pred, label, reduction='sum')

paddle.assign(abserr + batch_abserr, abserr)
paddle.assign(total_cnt + batch_cnt, total_cnt)
mae = abserr / total_cnt

# 2. 分布式训练阶段,计算全局准确率。
global_mae = fleet.metrics.mae(abserr, total_cnt)
分布式MSE
paddle.distributed.fleet.metrics.mse(sqrerr, ins_num, scope=None, util=None)

分布式均方误差(Mean Squared Error)。均方误差是误差平方和的平均值,一般用于计算 loss 损失值。

\[ \begin{align}\begin{aligned}sqrerr &= \sum (input - label)^2\\mse &= \frac{sqrerr}{total\_ins\_num}\end{aligned}\end{align} \]

其中,input 是样本预测结果, label 是样本真实标签,sqrerr 为平方误差和,total_ins_num 是样本总数。

参数:

  • sqrerr, (numpy.array|Tensor|string, required): 单机平方误差和统计值。

  • total_ins_num, (numpy.array|Tensor|string, required): 单机样本总数。

  • scope, (Scope, optional),作用域,若为None,则使用全局/默认作用域,默认为None。

  • util, (UtilBase, optinal),分布式训练工具类,若为None,则使用默认工具类 fleet.util, 默认为None。

用法示例:

...
pred, label = model()

# 1. 单机组网阶段,计算平方误差和样本总数
sqrerr = paddle.static.create_global_var(name="sqrerr", persistable=True, dtype='float32', shape=[1], value=0)
total_cnt = paddle.static.create_global_var(name="total_cnt", persistable=True, dtype='float32', shape=[1], value=0)

batch_cnt = paddle.sum(
    paddle.full(shape=[paddle.shape(label)[0], 1], fill_value=1.0))
batch_sqrerr = paddle.nn.functional.mse_loss(pred, label, reduction='sum')

paddle.assign(sqrerr + batch_sqrerr, sqrerr)
paddle.assign(total_cnt + batch_cnt, total_cnt)
mse =  sqrerr / total_cnt

# 2. 分布式训练阶段,计算全局准确率。
global_mse = fleet.metrics.mse(sqrerr, total_cnt)
分布式RMSE
paddle.distributed.fleet.metrics.rmse(sqrerr, total_ins_num, scope=None, util=None)

分布式均方根误差(Root Mean Squared Error)。均方根误差是均方误差的算术平方根,亦称标准误差,一般用于计算 loss 损失值。

\[ \begin{align}\begin{aligned}sqrerr &= \sum (input - label)^2\\rmse &= \sqrt{\frac{sqrerr}{total\_ins\_num}}\end{aligned}\end{align} \]

其中,input 是样本预测结果, label 是样本真实标签,sqrerr 为平方误差和,total_ins_num 是样本总数。

参数:

  • sqrerr, (numpy.array|Tensor|string, required): 单机平方误差和统计值。

  • total_ins_num, (numpy.array|Tensor|string, required): 单机样本总数。

  • scope, (Scope, optional),作用域,若为None,则使用全局/默认作用域,默认为None。

  • util, (UtilBase, optinal),分布式训练工具类,若为None,则使用默认工具类 fleet.util, 默认为None。

用法示例:

...
pred, label = model()

# 1. 单机组网阶段,计算平方误差和样本总数
sqrerr = paddle.static.create_global_var(name="sqrerr", persistable=True, dtype='float32', shape=[1], value=0)
total_cnt = paddle.static.create_global_var(name="total_cnt", persistable=True, dtype='float32', shape=[1], value=0)

batch_cnt = paddle.sum(
    paddle.full(shape=[paddle.shape(label)[0], 1], fill_value=1.0))
batch_sqrerr = paddle.nn.functional.mse_loss(pred, label, reduction='sum')

paddle.assign(sqrerr + batch_sqrerr, sqrerr)
paddle.assign(total_cnt + batch_cnt, total_cnt)
mse =  sqrerr / total_cnt
rmse = paddle.sqrt(mse)

# 2. 分布式训练阶段,计算全局准确率。
global_rmse = fleet.metrics.rmse(sqrerr, total_cnt)
分布式Sum
paddle.distributed.fleet.metrics.sum(input, scope=None, util=None)

分布式求和。一般用于自定义指标计算。

参数:

  • input, (numpy.array|Tensor|string, required),需要分布式求和的输入参数。

  • scope, (Scope, optional),作用域,若为None,则使用全局/默认作用域,默认为None。

  • util, (UtilBase, optinal),分布式训练工具类,若为None,则使用默认工具类 fleet.util, 默认为None。

用法示例:

...
# 1. 单机组网阶段,计算Loss
loss = model()

# 2. 分布式训练阶段,计算全局Loss和
loss_val, = exe.run(paddle.static.default_main_program(),
                    fetch_list=[loss.name])
loss_sum = fleet.metrics.sum(loss_val)
分布式Max
paddle.distributed.fleet.metrics.max(input, scope=None, util=None)

分布式求最大值。一般用于自定义指标计算。

参数:

  • input, (numpy.array|Tensor|string, required),需要分布式求最大值的输入参数。

  • scope, (Scope, optional),作用域,若为None,则使用全局/默认作用域,默认为None。

  • util, (UtilBase, optinal),分布式训练工具类,若为None,则使用默认工具类 fleet.util, 默认为None。

用法示例:

...
# 1. 单机组网阶段,计算Loss
loss = model()

# 2. 分布式训练阶段,计算全局最大Loss
loss_val, = exe.run(paddle.static.default_main_program(),
                    fetch_list=[loss.name])
max_loss = paddle.metrics.max(loss_val)
分布式Min
paddle.distributed.fleet.metrics.min(input, scope=None, util=None)

分布式求最小值。一般用于自定义指标计算。

参数:

  • input, (numpy.array|Tensor|string, required),需要分布式求最大值的输入参数。

  • scope, (Scope, optional),作用域,若为None,则使用全局/默认作用域,默认为None。

  • util, (UtilBase, optinal),分布式训练工具类,若为None,则使用默认工具类 fleet.util, 默认为None。

用法示例:

...
# 1. 单机组网阶段
loss = model()

# 2. 分布式训练阶段,计算全局最小Loss
loss_val, = exe.run(paddle.static.default_main_program(),
                    fetch_list=[loss.name])
min_loss = fleet.metrics.min(loss_val)

使用方法

完整运行示例见 examples/wide_and_deep

通过fleetrun指令运行分布式任务。命令示例如下,其中server_num, worker_num分别为服务节点和训练节点的数量。

fleetrun --server_num=2 --worker_num=2 train.py

分布式预测

简介

分布式预测任务将预测数据均匀分布式在多台机器上,每台机器仅预测整个数据集的一部分,节点之间通过 all_reduce 等集合通信操作完成各自预测结果的同步,从而获取整个数据集的预测结果。

为什么要做分布式预测,除了通过数据并行的方式节省预测时间外,另一个很重要的原因是,在某些场景,例如推荐系统或者搜索引擎中, 稀疏参数(embedding)的 feature id 可能会非常多,当 feature id 达到一定数量时,稀疏参数会变得很大以至于单机内存无法存放,从而导致无法预测。

原理

分布式预测的原理基本和分布式训练一致,都将节点分为 WorkerPServer 两类,这两类节点在训练任务和预测任务中的分工如下:

  • Worker:在训练时,Worker负责完成训练数据读取、从PServer上拉取稀疏参数然后进行前向网络计算、反向梯度计算等过程,并将计算出的梯度上传至PServer。在预测时,Worker负责完成预测数据读取、从PServer上拉取稀疏参数然后进行前向计算。所有Worker间可进行集合通信,从而获取全局的预测结果。

  • PServer:在训练时,PServer在收到训练Worker传来的梯度后,会根据指定的优化器完成更新参数,并将参数发送给训练Worker。在预测时,PServer仅作为稀疏参数存储器,响应预测Worker拉取稀疏参数的请求。

分布式预测任务的流程主要有以下三步:

  1. 自定义预测组网

  2. 初始化分布式集群环境,加载模型参数。

  3. 生成分布式预测组网,自定义reader,开始预测。

分布式预测功能主要通过 DistributedInfer 工具类完成,下面对相关API的功能和参数进行介绍。

class paddle.distributed.fleet.utils.ps_util.DistributedInfer(main_program=None, startup_program=None)

PaddlePaddle的分布式预测工具类。

参数:
  • main_program(paddle.static.Program, optional),单机预测组网,若为None,则认为 paddle.static.default_main_program() 为单机预测组网。默认为None。

  • startup_program(paddle.static.Program, optional),单机预测初始化组网,若为None,则认为 paddle.static.default_startup_program() 为单机预测初始化组网。默认为None。

方法:

init_distributed_infer_env(exe, loss, role_maker=None, dirname=None)

初始化分布式集群环境,加载模型参数。需要注意,该接口仅在纯分布式预测的任务中才需要被调用,在先训练后预测的分布式一体任务里,此接口无需调用,且不会生效。

参数:
  • exe, (paddle.static.Executor, required),初始化分布式集群环境时需要用到的网络执行器。

  • loss, (Tensor, required), 预测网络 loss 变量。

  • role_maker, (RoleMakerBase, optional), 分布式训练(预测)任务环境配置,若为None,则框架会自动根据用户在环境变量中的配置进行分布式训练(预测)环境的初始化。默认为None。

  • dirname, (String, optional), 参数路径。若为None,则不加载参数。默认为None。

get_dist_infer_program():

生成分布式预测组网。相较于单机预测组网,两者区别仅在于:将稀疏参数查询操作替换为分布式稀疏参数查询操作,即将 lookup_table 算子替换为 distributed_lookup_table

返回:

Program,分布式预测组网。

使用方法

分布式预测常见的应用场景有以下两种,分布式训练+预测一体任务,及独立的分布式预测任务,两种任务的特点分别为:

  • 分布式训练 + 预测一体任务:指分布式训练结束后,Worker节点不向PServer发送任务结束通知,而是继续开始预测。这类任务在进行预测时,分布式集群环境已经初始化好,且不需要进行参数加载。

  • 分布式预测任务:指纯预测的分布式任务。这类任务在进行预测时,分布式集群环境还未初始化好,且往往需要进行参数加载。

下面分别介绍对这两种分布式预测任务的使用方法:

分布式训练 + 预测一体任务
...
model = WideDeepModel()
model.net(is_train=True)

if fleet.is_server():
    fleet.init_server()
    fleet.run_server()
else:
    exe.run(paddle.default_startup_program())
    fleet.init_worker()

    # 分布式训练
    distributed_training(exe, model)

    # 1. 生成单机预测组网
    test_main_program = paddle.static.Program()
    test_startup_program = paddle.static.Program()
    with paddle.static.program_guard(main_program=test_main_program, startup_program=test_startup_program):
        with paddle.utils.unique_name.guard():
            model.net(is_train=False)

    # 2. 生成分布式预测组网,定义reader,进行预测
    dist_infer = DistributedInfer(main_program=test_main_program, startup_program=test_startup_program)
    dist_infer_program = dist_infer.get_dist_infer_program()

    test_data = WideDeepDataset(data_path="./data")
    reader = model.loader.set_sample_generator(test_data, batch_size=batch_size, drop_last=True, places=place)

    reader.start()
    batch_idx = 0
    try:
        while True:
            loss_val = exe.run(program=dist_infer_program,
                                fetch_list=[model.cost.name])
            if batch_idx % 10 == 0:
                loss_val = np.mean(loss_val)
                message = "TEST ---> batch_idx: {} loss: {}\n".format(batch_idx, loss_val)
    except fluid.core.EOFException:
        reader.reset()

    fleet.stop_worker()
分布式预测任务
...

# 1. 定义单机预测组网
model = WideDeepModel()
model.net(is_train=False)

# 2. 初始化分布式预测环境,加载模型参数
dist_infer = DistributedInfer(main_program=test_main_program, startup_program=test_startup_program)
exe = paddle.static.Executor()
dirname = "./init_params/"
dist_infer.init_distributed_infer_env(exe, model.cost, dirname=dirname)

# 3.生成分布式预测组网,定义reader,进行预测
if fleet.is_worker():
    dist_infer_program = dist_infer.get_dist_infer_program()

    test_data = WideDeepDataset(data_path="./data")
    reader = model.loader.set_sample_generator(test_data, batch_size=batch_size, drop_last=True, places=place)

    reader.start()
    batch_idx = 0
    try:
        while True:
            loss_val = exe.run(program=dist_infer_program,
                                fetch_list=[model.cost.name])
            if batch_idx % 10 == 0:
                loss_val = np.mean(loss_val)
                message = "TEST ---> batch_idx: {} loss: {}\n".format(batch_idx, loss_val)
                print(message)
    except fluid.core.EOFException:
        reader.reset()

    fleet.stop_worker()
运行方法

完整运行示例见 examples/wide_and_deep。该示例为分布式训练 + 预测一体任务。

配置完成后,通过fleetrun指令运行分布式任务。命令示例如下,其中server_num, worker_num分别为服务节点和训练节点的数量。

fleetrun --server_num=2 --worker_num=2 train.py

二次开发

  • TBA

整体示例

  • TBA

大规模蒸馏

  • TBA

自监督训练

  • TBA

弹性训练

  • TBA

FleetX扩展工具包

  • TBA

用户FAQ

  • TBA


Fleet使用Apache License 2.0开源协议