Tensor Model + ZeRO Data Parallelism Tutorial

  • If you haven’t read the Tensor model parallelism tutorial, please read that first.

  • The concepts of the tensor model parallelism and ZeRO data parallelism can be found here.

  • The source code of this tutorial can be found here.

0. Distributed Launcher

This tutorial must be launched using distributed launcher.

If you have 4 GPUs:

python -m torch.distributed.launch --nproc_per_node=4 YOUR_SCRIPT.py

If you installed DeepSpeed in your environments, the following works the same.

deepspeed --num_gpus=4 YOUR_SCRIPT.py

For more information of the distributed launchers, refer to:

1. Multi-dimensional Parallel Training

The Multi-dimensional Parallel Training section is same with Tensor Model + Data Parallel Training tutorial. So, if you have already seen the tutorial, you can skip this.

1.1. The concept of multi-dimensional parallel training

Let’s suppose we have 4 GPUs.

The model parallelism splits the model into multiple pieces and trains a single batch of data. If we use all these GPUs for model parallelism, It would look like the following.

image

The data parallelism copies the model into each device and splits the data into multiple pieces and trains the multiple batches of data. If we use all these GPUs for data parallelism, It would look like the following.

image

Now, we’ll mix these two parallelization methods. We first split the model into multiple pieces.

image

And we now replicate the parallelized model to different GPUs to apply data parallelism. They have coordinates such as (0, 0), (0, 1), (1, 0), (1, 1) with respect to (data, model) rank. For this reason, we call this training mechanism ‘multi-dimensional parallel training’.

image

We can make some ‘groups’ to communicate easily for model and data parallelism.

Model parallel communication is that sends and receives the results of segmented models, and data parallel communication is that sends and receives results of segmented data. Therefore, these communications must only take place inside the group.

image

1.2. The concept of MPU (Model Parallel Unit)

So it would be nice to use the concept of MPU (Model Parallel Unit) to easily manage these communications. MPU was introduced by NVIDIA’s Megatron-LM, and OSLO borrowed this concept. OSLO’s MPU provides the following methods to facilitate 3D parallel communication, including pipeline model parallelism, a concept to be introduced later.

from oslo.pytorch.model_parallelism.network.mpu import MPU

mpu = MPU(tensor_parallel_size=4, pipeline_parallel_size=2)

# Data parallel group, rank and world size
mpu.get_data_parallel_group()
mpu.get_data_parallel_rank()
mpu.get_data_parallel_world_size()

# Tensor model parallel group, rank and world size
mpu.get_tensor_parallel_group()
mpu.get_tensor_parallel_rank()
mpu.get_tensor_parallel_world_size()

# Pipeline model parallel group, rank and world size
mpu.get_pipeline_parallel_group()
mpu.get_pipeline_parallel_rank()
mpu.get_pipeline_parallel_world_size()

When you use the oslo.initialize(...) function, the MPU is created automatically, and the model has its own mpu object.

import oslo

model = oslo.initialize(...)

# Data parallel group, rank and world size
model.mpu.get_data_parallel_group()
model.mpu.get_data_parallel_rank()
model.mpu.get_data_parallel_world_size()
...

Let’s finish the explanation of multi-dimensional parallelism, and actually use this mechanism to train a model.

2. Training

How to use the tensor model + ZeRO data parallelism for training?

2.1. Initialize some variables

BATCH_SIZE = 4
SEQ_LEN = 64
SAVE_INTERVAL = 50
TRAIN_STEP = 100

2.2. Create model and optimizer and tokenizer

from transformers import AutoModelForCausalLM, AutoTokenizer
from torch.optim import Adam

model = AutoModelForCausalLM.from_pretrained("gpt2")
optimizer = Adam(model.parameters(), lr=3e-5)
tokenizer = AutoTokenizer.from_pretrained("gpt2")
# Add pad token for batch training (GPT2 tokenizer doesn't have pad token)
tokenizer.pad_token = tokenizer.eos_token

2.3. Parallelize the model

Note that tp size * ZeRO dp size must be same or smaller than total num of gpus. If you have 4 GPUs, you can set tp size = 2 * ZeRO dp size = 2. If you specify the tp size, the ZeRO dp size will be determined automatically.

import oslo

model = oslo.initialize(
    model, config={"model_parallelism": {"enable": True, "tensor_parallel_size": YOUR_TENSOR_PARALLEL_SIZE}}
)

2.4. Make the model ZeRO data parallelizable

You can use deepspeed.initialize(...) with OSLO model parallelism. But note that you should input model.mpu to the deepspeed.initialize(...) function. The DeepSpeed engine can access the process groups and world sizes and ranks using this MPU object. If you are not famailar with the concept of MPU, please refer to here.

In addition to ZeRO DP, various functions in DeepSpeed can be used with OSLO. But, DeepSpeed is constantly adding features, so we haven’t checked all the features, but most of the features could be used together. If some errors occurs while using DeepSpeed with OSLO, please report it through issues.

import deepspeed

engine, _, _, _ = deepspeed.initialize(
    model=model,
    model_parameters=model.parameters(),
    mpu=model.mpu,
    config={
        "train_batch_size": BATCH_SIZE,
        "fp16": {"enabled": True},
        "optimizer": {
            "type": "Adam",
            "params": {
                "lr": 3e-5,
                "weight_decay": 3e-7,
            },
        },
        "scheduler": {
            "type": "WarmupDecayLR",
            "params": {
                "warmup_min_lr": 0,
                "warmup_max_lr": 3e-5,
                "warmup_num_steps": TRAIN_STEP // 10,
                "total_num_steps": TRAIN_STEP,
            },
        },
        "zero_optimization": {
            "stage": 3,
            "allgather_partitions": True,
            "allgather_bucket_size": 5e8,
            "overlap_comm": True,
            "reduce_scatter": True,
            "reduce_bucket_size": 5e8,
            "contiguous_gradients": True,
            "offload_param": {"device": "cpu"},
            "offload_optimizer": {"device": "cpu"},
        },
    },
)

2.5. Load dataset

I used the Hugging Face datasets library in this tutorial.

from datasets import load_dataset

datasets = load_dataset("squad").data["train"]["context"]
datasets = [str(_) for _ in datasets[: TRAIN_STEP * BATCH_SIZE]]

2.6. Create DistributedSampler to parallelize dataset

You must specify the num_replicas and rank using model.mpu when you are creating sampler. If you are unfamiliar with DistributedSampler, please refer to here.

from torch.utils.data import DistributedSampler

sampler = DistributedSampler(
    dataset=datasets,
    shuffle=True,
    num_replicas=model.mpu.get_data_parallel_world_size(),
    rank=model.mpu.get_data_parallel_rank(),
)

2.7. Create the dataloader with sampler.

Note that you should turn off shuffle of data loader.

from torch.utils.data import DataLoader

dataloader = DataLoader(
    datasets,
    batch_size=BATCH_SIZE,
    shuffle=False,
    sampler=sampler,
)

2.8. Do training as usual

Now that we’re all ready, it’s time to begin training. The training code is the same as the previous Tensor Model Parallelism tutorial. However, note that when input batch is forwarding, the DeepSpeed engine object is used not the model object,

for step, batch in enumerate(dataloader):
    optimizer.zero_grad()

    # Make batch
    input_batch = tokenizer(
        batch,
        return_tensors="pt",
        padding=True,
        truncation=True,
        max_length=SEQ_LEN,
    ).to("cuda")

    # Forward-Backward-Step with DDP engine
    # YOU MUST USE ``DEEPSPEED ENGINE``, NOT ``MODEL`` WHEN YOU ARE FORWARDING INPUT.
    loss = engine(**input_batch, labels=input_batch["input_ids"]).loss
    engine.backward(loss)
    engine.step()

    # Save parallelized model
    # This is same with 
    if step % SAVE_INTERVAL == 0:
        model.save_parallelized(save_directory="./parallel_ckpt")

    if step > TRAIN_STEP:
        break

3. Merging Checkpoints

How to merge the parallelized checkpoints?

The Merging Checkpoints section is same with the Tensor Model Parallelism tutorial. So, if you have already seen the tutorial, you can skip this.

3.1. Create model

Usually we create a GPT2 model like this:

from transformers import AutoModelForCausalLM

model = AutoModelForCausalLM.from_pretrained("gpt2")

However, it is okay to create the randomly initialized model because we will load the local checkpoints after creation. Here’s how to crate a randomly initialized model:

from transformers import GPT2Config, GPT2LMHeadModel

config = GPT2Config.from_pretrained("gpt2")
model = GPT2LMHeadModel(config)

3.2. Parallelize the model

import oslo

model = oslo.initialize(
    model, config={"model_parallelism": {"tensor_parallel_size": NUM_YOUR_GPUS}}
)

3.3 Load parallelized checkpoints

We support from_parallelized method to load parallelized checkpoints. You can load them by just input the save path of parallelized checkpoints.

model = model.from_parallelized("./parallel_ckpt")

3.4. Merge parallelized checkpoints

The save_parallelized method have a special argument named merge_checkpoints. If you set this argument as Ture, the parallelized checkpoints of model will be saved as merged form. We recommend merging them after training because this process is a bit slow.

model.save_parallelized("./merged_ckpt", merge_checkpoints=True)
// merged_ckpt

pytorch_model.bin    config.json

This concludes the tensor model + data parallelism tutorial. Thank you.