如何使 PyTorch 模型训练加快

Feb 23, 2023
by Sebastian Raschka

这篇文章介绍了在不影响其准确性的情况下提高 PyTorch 模型训练性能的技术。我们将在 LightningModule 中包装一个 PyTorch 模型,并使用 Trainer 类来启用各种训练优化。 通过动动小手指改几行代码,便可在单个 GPU 上的训练时间从 22.53 分钟压榨到 2.75 分钟,关键是保持模型精度不垮。

这是 8 倍的性能提升!真香!

这篇博文于 03/17/2023 更新,现在使用 PyTorch 2.0 和 Lightning 2.0!

介绍 在本教程中,我们将微调 DistilBERT 模型,它是 BERT 的精炼版本,在几乎相同的预测性能下缩小了 40%。 我们可以通过多种方式微调预训练语言模型。 下图描述了三种最常见的方法。

上述所有三种方法 (a-c) 都假设我们已经使用自监督学习在未标记的数据集上对模型进行了预训练。 然后,在第 2步中,当我们将模型转移到目标任务时,我们要么

a) 提取嵌入并在其上训练分类器(例如,这可以是来自 scikit-learn 的支持向量机); b) 替换/添加输出层并微调transformer的最后一层; c) 替换/添加输出层并微调所有层。 方法 a-c 按计算效率排序,其中 a) 通常是最快的。 根据经验,这种排序顺序也反映了模型的预测性能,其中 c) 通常会产生最高的预测精度。

在本文中,我们将使用 c) 训练一个模型来预测 IMDB 电影评论数据集中的电影评论情绪,该数据集总共包含 50,000 条电影评论。

1)普通 PyTorch 基线

我们先从简单的 PyTorch 基线开始,在 IMDB 电影评论数据集上训练 DistilBERT 模型。 如果你想自己运行代码,你可以conda一个虚拟环境,如下所示:

conda create -n faster-blog python=3.9
conda activate faster-blog

pip install watermark transformers datasets torchmetrics lightning

相关软件版本如下:

Python version: 3.9.15
torch         : 2.0.0+cu118
lightning     : 2.0.0
transformers  : 4.26.1

载数据集代码的 local_dataset_utilities.py 文件。

import os
import sys
import tarfile
import time

import numpy as np
import pandas as pd
from packaging import version
from torch.utils.data import Dataset
from tqdm import tqdm
import urllib


def reporthook(count, block_size, total_size):
    global start_time
    if count == 0:
        start_time = time.time()
        return
    duration = time.time() - start_time
    progress_size = int(count * block_size)
    speed = progress_size / (1024.0**2 * duration)
    percent = count * block_size * 100.0 / total_size

    sys.stdout.write(
        f"r{int(percent)}% | {progress_size / (1024.**2):.2f} MB "
        f"| {speed:.2f} MB/s | {duration:.2f} sec elapsed"
    )
    sys.stdout.flush()


def download_dataset():
    source = "http://ai.stanford.edu/~amaas/data/sentiment/aclImdb_v1.tar.gz"
    target = "aclImdb_v1.tar.gz"

    if os.path.exists(target):
        os.remove(target)

    if not os.path.isdir("aclImdb") and not os.path.isfile("aclImdb_v1.tar.gz"):
        urllib.request.urlretrieve(source, target, reporthook)

    if not os.path.isdir("aclImdb"):

        with tarfile.open(target, "r:gz") as tar:
            tar.extractall()


def load_dataset_into_to_dataframe():
    basepath = "aclImdb"

    labels = {"pos": 1, "neg": 0}

    df = pd.DataFrame()

    with tqdm(total=50000) as pbar:
        for s in ("test", "train"):
            for l in ("pos", "neg"):
                path = os.path.join(basepath, s, l)
                for file in sorted(os.listdir(path)):
                    with open(os.path.join(path, file), "r", encoding="utf-8") as infile:
                        txt = infile.read()

                    if version.parse(pd.__version__) >= version.parse("1.3.2"):
                        x = pd.DataFrame(
                            [[txt, labels[l]]], columns=["review", "sentiment"]
                        )
                        df = pd.concat([df, x], ignore_index=False)

                    else:
                        df = df.append([[txt, labels[l]]], ignore_index=True)
                    pbar.update()
    df.columns = ["text", "label"]

    np.random.seed(0)
    df = df.reindex(np.random.permutation(df.index))

    print("Class distribution:")
    np.bincount(df["label"].values)

    return df


def partition_dataset(df):
    df_shuffled = df.sample(frac=1, random_state=1).reset_index()

    df_train = df_shuffled.iloc[:35_000]
    df_val = df_shuffled.iloc[35_000:40_000]
    df_test = df_shuffled.iloc[40_000:]

    df_train.to_csv("train.csv", index=False, encoding="utf-8")
    df_val.to_csv("val.csv", index=False, encoding="utf-8")
    df_test.to_csv("test.csv", index=False, encoding="utf-8")


class IMDBDataset(Dataset):
    def __init__(self, dataset_dict, partition_key="train"):
        self.partition = dataset_dict[partition_key]

    def __getitem__(self, index):
        return self.partition[index]

    def __len__(self):
        return self.partition.

在我们下面讨论之前先看看 主要的 PyTorch 代码:

import os
import os.path as op
import time

from datasets import load_dataset
import torch
from torch.utils.data import DataLoader
import torchmetrics
from transformers import AutoTokenizer
from transformers import AutoModelForSequenceClassification
from watermark import watermark

from local_dataset_utilities import (
   download_dataset,
   load_dataset_into_to_dataframe,
   partition_dataset,
)
from local_dataset_utilities import IMDBDataset


def tokenize_text(batch):
   return tokenizer(batch["text"], truncation=True, padding=True)


def train(num_epochs, model, optimizer, train_loader, val_loader, device):
   for epoch in range(num_epochs):
       train_acc = torchmetrics.Accuracy(task="multiclass", num_classes=2).to(device)

       for batch_idx, batch in enumerate(train_loader):
           model.train()
           for s in ["input_ids", "attention_mask", "label"]:
               batch[s] = batch[s].to(device)

           ### FORWARD AND BACK PROP
           outputs = model(
               batch["input_ids"],
               attention_mask=batch["attention_mask"],
               labels=batch["label"],
           )
           optimizer.zero_grad()
           outputs["loss"].backward()

           ### UPDATE MODEL PARAMETERS
           optimizer.step()

           ### LOGGING
           if not batch_idx % 300:
               print(
                   f"Epoch: {epoch+1:04d}/{num_epochs:04d} | Batch {batch_idx:04d}/{len(train_loader):04d} | Loss: {outputs['loss']:.4f}"
               )

           model.eval()
           with torch.no_grad():
               predicted_labels = torch.argmax(outputs["logits"], 1)
               train_acc.update(predicted_labels, batch["label"])

       ### MORE LOGGING
       with torch.no_grad():
           model.eval()
           val_acc = torchmetrics.Accuracy(task="multiclass", num_classes=2).to(device)
           for batch in val_loader:
               for s in ["input_ids", "attention_mask", "label"]:
                   batch[s] = batch[s].to(device)
               outputs = model(
                   batch["input_ids"],
                   attention_mask=batch["attention_mask"],
                   labels=batch["label"],
               )
               predicted_labels = torch.argmax(outputs["logits"], 1)
               val_acc.update(predicted_labels, batch["label"])

           print(
               f"Epoch: {epoch+1:04d}/{num_epochs:04d} | Train acc.: {train_acc.compute()*100:.2f}% | Val acc.: {val_acc.compute()*100:.2f}%"
           )


   print(watermark(packages="torch,lightning,transformers", python=True))
   print("Torch CUDA available?", torch.cuda.is_available())
   device = "cuda:0" if torch.cuda.is_available() else "cpu"

   torch.manual_seed(123)

   ##########################
   ### 1 Loading the Dataset
   ##########################
   download_dataset()
   df = load_dataset_into_to_dataframe()
   if not (op.exists("train.csv") and op.exists("val.csv") and op.exists("test.csv")):
       partition_dataset(df)

   imdb_dataset = load_dataset(
       "csv",
       data_files={
           "train": "train.csv",
           "validation": "val.csv",
           "test": "test.csv",
       },
   )

   #########################################
   ### 2 Tokenization and Numericalization
   #########################################

   tokenizer = AutoTokenizer.from_pretrained("distilbert-base-uncased")
   print("Tokenizer input max length:", tokenizer.model_max_length, flush=True)
   print("Tokenizer vocabulary size:", tokenizer.vocab_size, flush=True)

   print("Tokenizing ...", flush=True)
   imdb_tokenized = imdb_dataset.map(tokenize_text, batched=True, batch_size=None)
   del imdb_dataset
   imdb_tokenized.set_format("torch", columns=["input_ids", "attention_mask", "label"])
   os.environ["TOKENIZERS_PARALLELISM"] = "false"

   #########################################
   ### 3 Set Up DataLoaders
   #########################################

   train_dataset = IMDBDataset(imdb_tokenized, partition_key="train")
   val_dataset = IMDBDataset(imdb_tokenized, partition_key="validation")
   test_dataset = IMDBDataset(imdb_tokenized, partition_key="test")

   train_loader = DataLoader(
       dataset=train_dataset,
       batch_size=12,
       shuffle=True,
       num_workers=1,
       drop_last=True,
   )

   val_loader = DataLoader(
       dataset=val_dataset,
       batch_size=12,
       num_workers=1,
       drop_last=True,
   )

   test_loader = DataLoader(
       dataset=test_dataset,
       batch_size=12,
       num_workers=1,
       drop_last=True,
   )

   #########################################
   ### 4 Initializing the Model
   #########################################

   model = AutoModelForSequenceClassification.from_pretrained(
       "distilbert-base-uncased", num_labels=2
   )

   model.to(device)
   optimizer = torch.optim.Adam(model.parameters(), lr=5e-5)

   #########################################
   ### 5 Finetuning
   #########################################

   start = time.time()
   train(
       num_epochs=3,
       model=model,
       optimizer=optimizer,
       train_loader=train_loader,
       val_loader=val_loader,
       device=device,
   )

   end = time.time()
   elapsed = end - start
   print(f"Time elapsed {elapsed/60:.2f} min")

   with torch.no_grad():
       model.eval()
       test_acc = torchmetrics.Accuracy(task="multiclass", num_classes=2).to(device)
       for batch in test_loader:
           for s in ["input_ids", "attention_mask", "label"]:
               batch[s] = batch[s].to(device)
           outputs = model(
               batch["input_ids"],
               attention_mask=batch["attention_mask"],
               labels=batch["label"],
           )
           predicted_labels = torch.argmax(outputs["logits"], 1)
           test_acc.update(predicted_labels, batch["label"])

   print(f"Test accuracy {test_acc.compute()*100:.2f}%")

上面的代码结构分为两部分,函数定义和在 if name == "main" 下执行的代码。 这个结构对于避免以后使用多个 GPU 时 Python 的多处理问题是必要的。

if name == "main" 中的的前三部分包含设置数据集加载器的代码。 第四部分是初始化模型的地方。 第五部分运行训练函数并在测试集上评估微调模型。

在 A100 GPU 上运行代码后,我得到了以下结果:

Epoch: 0001/0003 | Batch 0000/2916 | Loss: 0.6867
Epoch: 0001/0003 | Batch 0300/2916 | Loss: 0.3633
Epoch: 0001/0003 | Batch 0600/2916 | Loss: 0.4122
Epoch: 0001/0003 | Batch 0900/2916 | Loss: 0.3046
Epoch: 0001/0003 | Batch 1200/2916 | Loss: 0.3859
Epoch: 0001/0003 | Batch 1500/2916 | Loss: 0.4489
Epoch: 0001/0003 | Batch 1800/2916 | Loss: 0.5721
Epoch: 0001/0003 | Batch 2100/2916 | Loss: 0.6470
Epoch: 0001/0003 | Batch 2400/2916 | Loss: 0.3116
Epoch: 0001/0003 | Batch 2700/2916 | Loss: 0.2002
Epoch: 0001/0003 | Train acc.: 89.81% | Val acc.: 92.17%
Epoch: 0002/0003 | Batch 0000/2916 | Loss: 0.0935
Epoch: 0002/0003 | Batch 0300/2916 | Loss: 0.0674
Epoch: 0002/0003 | Batch 0600/2916 | Loss: 0.1279
Epoch: 0002/0003 | Batch 0900/2916 | Loss: 0.0686
Epoch: 0002/0003 | Batch 1200/2916 | Loss: 0.0104
Epoch: 0002/0003 | Batch 1500/2916 | Loss: 0.0888
Epoch: 0002/0003 | Batch 1800/2916 | Loss: 0.1151
Epoch: 0002/0003 | Batch 2100/2916 | Loss: 0.0648
Epoch: 0002/0003 | Batch 2400/2916 | Loss: 0.0656
Epoch: 0002/0003 | Batch 2700/2916 | Loss: 0.0354
Epoch: 0002/0003 | Train acc.: 95.02% | Val acc.: 92.09%
Epoch: 0003/0003 | Batch 0000/2916 | Loss: 0.0143
Epoch: 0003/0003 | Batch 0300/2916 | Loss: 0.0108
Epoch: 0003/0003 | Batch 0600/2916 | Loss: 0.0228
Epoch: 0003/0003 | Batch 0900/2916 | Loss: 0.0140
Epoch: 0003/0003 | Batch 1200/2916 | Loss: 0.0220
Epoch: 0003/0003 | Batch 1500/2916 | Loss: 0.0123
Epoch: 0003/0003 | Batch 1800/2916 | Loss: 0.0495
Epoch: 0003/0003 | Batch 2100/2916 | Loss: 0.0039
Epoch: 0003/0003 | Batch 2400/2916 | Loss: 0.0168
Epoch: 0003/0003 | Batch 2700/2916 | Loss: 0.1293
Epoch: 0003/0003 | Train acc.: 97.28% | Val acc.: 89.88%
Time elapsed 21.33 min
Test accuracy 89.92%

正如所见,模型从第 2 轮到第 3 轮开始略微过度拟合,验证准确率从 92.09% 下降到 89.88%。 最终测试准确率为 89.92%,这是对模型进行 21.33 分钟微调后达到的。

2) 使用Trainer Class

现在,让我们将 PyTorch 模型包装在 LightningModule 中,以便我们可以使用来自 Lightning 的 Trainer 类:

import os
import os.path as op
import time

from datasets import load_dataset
import lightning as L
from lightning.pytorch.callbacks import ModelCheckpoint
from lightning.pytorch.loggers import CSVLogger
import matplotlib.pyplot as plt
import pandas as pd
import torch
from torch.utils.data import DataLoader
import torchmetrics
from transformers import AutoTokenizer
from transformers import AutoModelForSequenceClassification
from watermark import watermark

from local_dataset_utilities import (
    download_dataset,
    load_dataset_into_to_dataframe,
    partition_dataset,
)
from local_dataset_utilities import IMDBDataset


def tokenize_text(batch):
    return tokenizer(batch["text"], truncation=True, padding=True)

class LightningModel(L.LightningModule):
    def __init__(self, model, learning_rate=5e-5):
        super().__init__()

        self.learning_rate = learning_rate
        self.model = model

        self.train_acc = torchmetrics.Accuracy(task="multiclass", num_classes=2)
        self.val_acc = torchmetrics.Accuracy(task="multiclass", num_classes=2)
        self.test_acc = torchmetrics.Accuracy(task="multiclass", num_classes=2)

    def forward(self, input_ids, attention_mask, labels):
        return self.model(input_ids, attention_mask=attention_mask, labels=labels)

    def training_step(self, batch, batch_idx):
        outputs = self(
            batch["input_ids"],
            attention_mask=batch["attention_mask"],
            labels=batch["label"],
        )
        self.log("train_loss", outputs["loss"])
        with torch.no_grad():
            logits = outputs["logits"]
            predicted_labels = torch.argmax(logits, 1)
            self.train_acc(predicted_labels, batch["label"])
            self.log("train_acc", self.train_acc, on_epoch=True, on_step=False)
        return outputs["loss"]  # this is passed to the optimizer for training

    def validation_step(self, batch, batch_idx):
        outputs = self(
            batch["input_ids"],
            attention_mask=batch["attention_mask"],
            labels=batch["label"],
        )
        self.log("val_loss", outputs["loss"], prog_bar=True)

        logits = outputs["logits"]
        predicted_labels = torch.argmax(logits, 1)
        self.val_acc(predicted_labels, batch["label"])
        self.log("val_acc", self.val_acc, prog_bar=True)

    def test_step(self, batch, batch_idx):
        outputs = self(
            batch["input_ids"],
            attention_mask=batch["attention_mask"],
            labels=batch["label"],
        )

        logits = outputs["logits"]
        predicted_labels = torch.argmax(logits, 1)
        self.test_acc(predicted_labels, batch["label"])
        self.log("accuracy", self.test_acc, prog_bar=True)

    def configure_optimizers(self):
        optimizer = torch.optim.Adam(
            self.trainer.model.parameters(), lr=self.learning_rate
        )
        return optimizer


if __name__ == "__main__":
    print(watermark(packages="torch,lightning,transformers", python=True), flush=True)
    print("Torch CUDA available?", torch.cuda.is_available(), flush=True)

    torch.manual_seed(123)

    ##########################
    ### 1 Loading the Dataset
    ##########################
    download_dataset()
    df = load_dataset_into_to_dataframe()
    if not (op.exists("train.csv") and op.exists("val.csv") and op.exists("test.csv")):
        partition_dataset(df)

    imdb_dataset = load_dataset(
        "csv",
        data_files={
            "train": "train.csv",
            "validation": "val.csv",
            "test": "test.csv",
        },
    )

    #########################################
    ### 2 Tokenization and Numericalization
    ########################################

    tokenizer = AutoTokenizer.from_pretrained("distilbert-base-uncased")
    print("Tokenizer input max length:", tokenizer.model_max_length, flush=True)
    print("Tokenizer vocabulary size:", tokenizer.vocab_size, flush=True)

    print("Tokenizing ...", flush=True)
    imdb_tokenized = imdb_dataset.map(tokenize_text, batched=True, batch_size=None)
    del imdb_dataset
    imdb_tokenized.set_format("torch", columns=["input_ids", "attention_mask", "label"])
    os.environ["TOKENIZERS_PARALLELISM"] = "false"

    #########################################
    ### 3 Set Up DataLoaders
    #########################################

    train_dataset = IMDBDataset(imdb_tokenized, partition_key="train")
    val_dataset = IMDBDataset(imdb_tokenized, partition_key="validation")
    test_dataset = IMDBDataset(imdb_tokenized, partition_key="test")

    train_loader = DataLoader(
        dataset=train_dataset,
        batch_size=12,
        shuffle=True,
        num_workers=1,
        drop_last=True,
    )

    val_loader = DataLoader(
        dataset=val_dataset,
        batch_size=12,
        num_workers=1,
        drop_last=True,
    )

    test_loader = DataLoader(
        dataset=test_dataset,
        batch_size=12,
        num_workers=1,
        drop_last=True,
    )

    #########################################
    ### 4 Initializing the Model
    #########################################

    model = AutoModelForSequenceClassification.from_pretrained(
        "distilbert-base-uncased", num_labels=2
    )

    #########################################
    ### 5 Finetuning
    #########################################

    lightning_model = LightningModel(model)

    callbacks = [
        ModelCheckpoint(save_top_k=1, mode="max", monitor="val_acc")  # save top 1 model
    ]
    logger = CSVLogger(save_dir="logs/", name="my-model")

    trainer = L.Trainer(
        max_epochs=3,
        callbacks=callbacks,
        accelerator="gpu",
        devices=[1],
        logger=logger,
        log_every_n_steps=10,
        deterministic=True,
    )

    start = time.time()
    trainer.fit(
        model=lightning_model,
        train_dataloaders=train_loader,
        val_dataloaders=val_loader,
    )

    end = time.time()
    elapsed = end - start
    print(f"Time elapsed {elapsed/60:.2f} min")

    test_acc = trainer.test(lightning_model, dataloaders=test_loader, ckpt_path="best")
    print(test_acc)

    with open(op.join(trainer.logger.log_dir, "outputs.txt"), "w") as f:
        f.write((f"Time elapsed {elapsed/60:.2f} min
"))
        f.write(f"Test acc: {test_acc}")

本文关注性能方面 跳过 LightningModule 的细节。

简而言之,设置了一个 LightningModule,它定义了如何执行训练、验证和测试步骤。 然后,主要变化在代码第5部分,我们在其中微调模型。 将 PyTorch 模型包装在 LightningModel 类中,并使用 Trainer 类来拟合模型:

 #########################################
    ### 5 Finetuning
    #########################################

    lightning_model = LightningModel(model)

    callbacks = [
        ModelCheckpoint(save_top_k=1, mode="max", monitor="val_acc")  # save top 1 model
    ]
    logger = CSVLogger(save_dir="logs/", name="my-model")

    trainer = L.Trainer(
        max_epochs=3,
        callbacks=callbacks,
        accelerator="gpu",
        devices=1,
        logger=logger,
        log_every_n_steps=10,
        deterministic=True,
    )

    trainer.fit(
        model=lightning_model,
        train_dataloaders=train_loader,
        val_dataloaders=val_loader,
    )

之前注意到验证准确率从第 2 轮下降到第 3 轮,因此使用 ModelCheckpoint 回调加载最佳模型(基于最高验证准确率)以在测试集上进行模型评估。 此外,将性能记录到 CSV 文件并将 PyTorch 行为设置为确定性。

在同一台机器上,这个模型在 21.79 分钟内达到了 92.6% 的测试准确率:

注意,如果禁用检查点并允许 PyTorch 在非确定性模式下运行,将获得与普通 PyTorch 相同的运行时间。

3)自动混合精度训练

如果我们的 GPU 支持混合精度训练,启用它通常是提高计算效率的主要方法之一。 特别是在训练期间在 32 位和 16 位浮点表示之间切换,而不会牺牲准确性。

使用 Trainer 类,可以通过一行代码启用自动混合精度训练:

 trainer = L.Trainer(
        max_epochs=3,
        callbacks=callbacks,
        accelerator="gpu",
        precision="16",  # <-- NEW
        devices=[1],
        logger=logger,
        log_every_n_steps=10,
        deterministic=True,
    )

如下图所示,使用混合精度训练可将训练时间从 21.79 分钟提高到 8.25 分钟! 这几乎快了3倍!

测试集准确率为 93.2%——与之前的 92.6% 相比甚至略有提高(可能是由于在不同精度模式之间切换时舍入引起的差异。)

4) 使用 Torch.Compile 的静态图

在最近发布的 PyTorch 2.0 中,PyTorch 团队引入了新的 toch.compile 函数,该函数可以通过生成优化的静态图来加速 PyTorch 代码执行。 这是一个 3 步过程,包括图形获取、图形结构降低和图形编译。

实现这一目标的背后很复杂,在PyTorch 2.0相关介绍中有更详细的解释。 作为用户,我们可以通过一个简单的命令 torch.compile 使用这一新功能。

要利用 torch.compile,可以通过添加以下一行代码来修改我们的代码:

# ...
model = AutoModelForSequenceClassification.from_pretrained(
        "distilbert-base-uncased", num_labels=2
    )

model = torch.compile(model) # NEW
lightning_model = LightningModel(model)
# ...

不幸的是,在这种混合精度上下文中,使用默认参数时,torch.compile 似乎不会提升 DistilBERT 模型的性能。 训练时间为 8.44 分钟,而之前为 8.25 分钟。 因此,本文中的后续基准测试不会使用 torch.compile。

备注:两个技巧:

1.将编译放在计时开始之前; 2。使用示例批次启动模型,如下所示

model.to(torch.device("cuda:0"))
  model = torch.compile(model)

  for batch_idx, batch in enumerate(train_loader):
      model.train()
      for s in ["input_ids", "attention_mask", "label"]:
          batch[s] = batch[s].to(torch.device("cuda:0"))
      break

  outputs = model(
      batch["input_ids"],
      attention_mask=batch["attention_mask"],
      labels=batch["label"],
  )

  lightning_model = LightningModel(model)
  # start timing and training below

运行时间提高到 5.6 分钟。 这表明初始优化编译步骤需要几分钟,但最终会加速模型训练。 在这种情况下,由于我们只训练三个时期的模型,因此由于额外的开销,编译的好处不明显。 但是,如果训练模型的时间更长或训练的模型更大,那么编译是值得的。

(注意:目前为分布式设置准备模型有难搞,因为每个单独的 GPU 设备都需要模型的副本。这将需要重新设计一些代码,所以下面不会使用 torch.compile。)

5) 在4个GPU上并行训练分布式数据

上面添加混合精度训练(并尝试添加图形编译)已在单个 GPU 上加速我们的代码,现在让尝试多 GPU 策略。 现在将在四个而不是一个 GPU 上运行相同的代码。

请注意,下图中总结了几种不同的多 GPU 训练技术。

从最简单的技术开始,通过 DistributedDataParallel 实现数据并行。 使用Trainer,只需要修改一行代码:

  trainer = L.Trainer(
        max_epochs=3,
        callbacks=callbacks,
        accelerator="gpu",
        devices=4,  # <-- NEW
        strategy="ddp",  # <-- NEW
        precision="16",
        logger=logger,
        log_every_n_steps=10,
        deterministic=True,
    )


在有四个 A100 GPU机器上,这段代码运行了 3.07 分钟,达到了 93.1% 的测试准确率。 同样,测试集的改进可能是由于使用数据并行时的梯度平均。

6)DeepSpeed

最后,尝试在 Trainer 中使用的 DeepSpeed 多 GPU 策略。

但在实际尝试之前,分享下多 GPU 使用建议。 使用哪种策略在很大程度上取决于模型、GPU 的数量和 GPU 的内存大小。 例如,当预训练模型不适合单个 GPU 的大型模型时,最好从简单的“ddp_sharded”策略开始,该策略将张量并行性添加到“ddp”。使用前面的代码,“ddp_sharded”采用 跑完 2.58 分钟。

或者也可以考虑更复杂的“deepspeed_stage_2”策略,它将优化器状态和梯度分片。 如果这不足以使模型适合 GPU 内存,请尝试“deepspeed_stage_2_offload”变体,它将优化器和梯度状态卸载到 CPU 内存(以性能为代价)。 如果你想微调一个模型,计算吞吐量通常比能够将模型放入较少数量的 GPU 的内存中更不重要。 在这种情况下,您可以探索 deepspeed 的“stage_3”变体,它对所有内容、优化器、梯度和参数进行分片,等等.

首先,我们必须安装 DeepSpeed Python 库:

pip install -U deepspeed

接下来,我们只需更改一行代码即可启用“deepspeed_stage_2”:

 trainer = L.Trainer(
        max_epochs=3,
        callbacks=callbacks,
        accelerator="gpu",
        devices=4,
        strategy="deepspeed_stage_2",  # <-- NEW
        precision="16",
        logger=logger,
        log_every_n_steps=10,
        deterministic=True,
    )

在机器上运行了 2.75 分钟,并达到了 92.6% 的测试准确率。

请注意,PyTorch 现在也有自己的 DeepSpeed 替代方案,称为完全分片 DataParallel,我们可以通过 strategy="fsdp" 使用它。

7) Fabric

随着最近的 Lightning 2.0 发布,Lightning AI 发布了用于 PyTorch 的新 Fabric 开源库。 Fabric 本质上是一种扩展 PyTorch 代码的替代方法,无需使用我在上面第 2 节)使用 Trainer 类中介绍的 LightningModule 和 Trainer。

Fabric只需要改几行代码,如下代码所示。 - 表示已删除的行,+ 是为将 Python 代码转换为使用 Fabric 而添加的行。

import os
import os.path as op
import time

+ from lightning import Fabric

from datasets import load_dataset
import matplotlib.pyplot as plt
import pandas as pd
import torch
from torch.utils.data import DataLoader
import torchmetrics
from transformers import AutoTokenizer
from transformers import AutoModelForSequenceClassification
from watermark import watermark

from local_dataset_utilities import download_dataset, load_dataset_into_to_dataframe, partition_dataset
from local_dataset_utilities import IMDBDataset


def tokenize_text(batch):
    return tokenizer(batch["text"], truncation=True, padding=True)


def plot_logs(log_dir):
    metrics = pd.read_csv(op.join(log_dir, "metrics.csv"))

    aggreg_metrics = []
    agg_col = "epoch"
    for i, dfg in metrics.groupby(agg_col):
        agg = dict(dfg.mean())
        agg[agg_col] = i
        aggreg_metrics.append(agg)

    df_metrics = pd.DataFrame(aggreg_metrics)
    df_metrics[["train_loss", "val_loss"]].plot(
        grid=True, legend=True, xlabel="Epoch", ylabel="Loss"
    )
    plt.savefig(op.join(log_dir, "loss.pdf"))

    df_metrics[["train_acc", "val_acc"]].plot(
        grid=True, legend=True, xlabel="Epoch", ylabel="Accuracy"
    )
    plt.savefig(op.join(log_dir, "acc.pdf"))


- def train(num_epochs, model, optimizer, train_loader, val_loader, device):
+ def train(num_epochs, model, optimizer, train_loader, val_loader, fabric):

      for epoch in range(num_epochs):
-         train_acc = torchmetrics.Accuracy(task="multiclass", num_classes=2).to(device)
+         train_acc = torchmetrics.Accuracy(task="multiclass", num_classes=2).to(fabric.device)

        model.train()
        for batch_idx, batch in enumerate(train_loader):

-             for s in ["input_ids", "attention_mask", "label"]:
-                 batch[s] = batch[s].to(device)

            outputs = model(batch["input_ids"], attention_mask=batch["attention_mask"], labels=batch["label"]) 
            optimizer.zero_grad()
-            outputs["loss"].backward()
+            fabric.backward(outputs["loss"])

            ### UPDATE MODEL PARAMETERS
            optimizer.step()

            ### LOGGING
            if not batch_idx % 300:
                print(f"Epoch: {epoch+1:04d}/{num_epochs:04d} | Batch {batch_idx:04d}/{len(train_loader):04d} | Loss: {outputs['loss']:.4f}")

            model.eval()
            with torch.no_grad():
                predicted_labels = torch.argmax(outputs["logits"], 1)
                train_acc.update(predicted_labels, batch["label"])

        ### MORE LOGGING
        model.eval()
        with torch.no_grad():
-            val_acc = torchmetrics.Accuracy(task="multiclass", num_classes=2).to(device)
+            val_acc = torchmetrics.Accuracy(task="multiclass", num_classes=2).to(fabric.device)
            for batch in val_loader:
-                for s in ["input_ids", "attention_mask", "label"]:
-                    batch[s] = batch[s].to(device)
                outputs = model(batch["input_ids"], attention_mask=batch["attention_mask"], labels=batch["label"])
                predicted_labels = torch.argmax(outputs["logits"], 1)
                val_acc.update(predicted_labels, batch["label"])

            print(f"Epoch: {epoch+1:04d}/{num_epochs:04d} | Train acc.: {train_acc.compute()*100:.2f}% | Val acc.: {val_acc.compute()*100:.2f}%")
            train_acc.reset(), val_acc.reset()


if __name__ == "__main__":

    print(watermark(packages="torch,lightning,transformers", python=True))
    print("Torch CUDA available?", torch.cuda.is_available())    
-   device = "cuda" if torch.cuda.is_available() else "cpu"
    torch.manual_seed(123)

    ##########################
    ### 1 Loading the Dataset
    ##########################
    download_dataset()
    df = load_dataset_into_to_dataframe()
    if not (op.exists("train.csv") and op.exists("val.csv") and op.exists("test.csv")):
        partition_dataset(df)

    imdb_dataset = load_dataset(
        "csv",
        data_files={
            "train": "train.csv",
            "validation": "val.csv",
            "test": "test.csv",
        },
    )

    #########################################
    ### 2 Tokenization and Numericalization
    #########################################

    tokenizer = AutoTokenizer.from_pretrained("distilbert-base-uncased")
    print("Tokenizer input max length:", tokenizer.model_max_length, flush=True)
    print("Tokenizer vocabulary size:", tokenizer.vocab_size, flush=True)

    print("Tokenizing ...", flush=True)
    imdb_tokenized = imdb_dataset.map(tokenize_text, batched=True, batch_size=None)
    del imdb_dataset
    imdb_tokenized.set_format("torch", columns=["input_ids", "attention_mask", "label"])
    os.environ["TOKENIZERS_PARALLELISM"] = "false"

    #########################################
    ### 3 Set Up DataLoaders
    #########################################

    train_dataset = IMDBDataset(imdb_tokenized, partition_key="train")
    val_dataset = IMDBDataset(imdb_tokenized, partition_key="validation")
    test_dataset = IMDBDataset(imdb_tokenized, partition_key="test")

    train_loader = DataLoader(
        dataset=train_dataset,
        batch_size=12,
        shuffle=True, 
        num_workers=2,
        drop_last=True,
    )

    val_loader = DataLoader(
        dataset=val_dataset,
        batch_size=12,
        num_workers=2,
        drop_last=True,
    )

    test_loader = DataLoader(
        dataset=test_dataset,
        batch_size=12,
        num_workers=2,
        drop_last=True,
    )


    #########################################
    ### 4 Initializing the Model
    #########################################

+    fabric = Fabric(accelerator="cuda", devices=4, 
+                    strategy="deepspeed_stage_2", precision="16-mixed")
+    fabric.launch()

    model = AutoModelForSequenceClassification.from_pretrained(
        "distilbert-base-uncased", num_labels=2)

-   model.to(device)
    optimizer = torch.optim.Adam(model.parameters(), lr=5e-5)

+    model, optimizer = fabric.setup(model, optimizer)
+    train_loader, val_loader, test_loader = fabric.setup_dataloaders(
+        train_loader, val_loader, test_loader)

    #########################################
    ### 5 Finetuning
    #########################################

    start = time.time()
    train(
        num_epochs=3,
        model=model,
        optimizer=optimizer,
        train_loader=train_loader,
        val_loader=val_loader,
-       device=device
+       fabric=fabric
    )

    end = time.time()
    elapsed = end-start
    print(f"Time elapsed {elapsed/60:.2f} min")

    with torch.no_grad():
        model.eval()
-       test_acc = torchmetrics.Accuracy(task="multiclass", num_classes=2).to(device)
+       test_acc = torchmetrics.Accuracy(task="multiclass", num_classes=2).to(fabric.device)
        for batch in test_loader:
-           for s in ["input_ids", "attention_mask", "label"]:
-               batch[s] = batch[s].to(device)
            outputs = model(batch["input_ids"], attention_mask=batch["attention_mask"], labels=batch["label"])
            predicted_labels = torch.argmax(outputs["logits"], 1)
            test_acc.update(predicted_labels, batch["label"])

    print(f"Test accuracy {test_acc.compute()*100:.2f}%")

正如所看到的,修改真的很轻量级! 它运行得如何呢? Fabric 仅用了 1.8 分钟就完成了微调! Fabric 比 Trainer 更轻量级——虽然它也能够使用回调和日志记录,但我们没有在这里启用这些功能来用一个极简示例来演示 Fabric。 太快了.

何时使用 Lightning Trainer 或 Fabric 取决于个人喜好。 根据经验,如果您更喜欢对 PyTorch 代码进行轻量包装,请查看 Fabric。 另一方面,如果你转向更大的项目并且更喜欢 Lightning 提供的代码组织,推荐 Trainer。

结论

在本文中,探索了各种提高 PyTorch 模型训练速度的技术。 如果使用 Lightning Trainer,可以用一行代码在这些选项之间切换,非常方便—,尤其是当您在调试代码时在 CPU 和 GPU 机器之间切换时。

尚未探索的另一个方面是最大化批量大小,这可以进一步提高我们模型的吞吐量。 未完待续。

(代码地址github稍后见评论)

展开阅读全文

页面更新:2024-05-09

标签:模型   梯度   准确率   文中   精度   性能   策略   代码   测试   数据

1 2 3 4 5

上滑加载更多 ↓
推荐阅读:
友情链接:
更多:

本站资料均由网友自行发布提供,仅用于学习交流。如有版权问题,请与我联系,QQ:4156828  

© CopyRight 2008-2024 All Rights Reserved. Powered By bs178.com 闽ICP备11008920号-3
闽公网安备35020302034844号

Top