使用 PyTorch Lightning 中的大型 Memmaped OpenWebText 数据集对多 GPU 训练进行故障排除以实现 nanoGPT

问题描述 投票:0回答:1

描述

我目前正在致力于使用 PyTorch Lightning 实现 nanoGPT。我的目标是使用 PyTorch 数据集和 PyTorch Lightning 数据模块加载大型内存映射 OpenWebText 数据集 (16GB),以便在多 GPU (8) 设置中进行训练。然而,我遇到了困难,因为加载过程似乎被卡住并且完全无法工作。奇怪的是,对于单 GPU 训练以及文件较小时,加载效果很好。

我希望获得有关如何解决此加载问题的指导和建议,特别是在处理大型内存映射数据集时。

环境

训练脚本在 SLURM 托管环境中运行。

设置

module load openmpi
module load cuda/12.1

export MASTER_ADDR=`hostname`
export MASTER_PORT=12802
export NCCL_PROTO=simple
export FI_EFA_FORK_SAFE=1
export FI_LOG_LEVEL=1
export FI_EFA_USE_DEVICE_RDMA=1
export NCCL_DEBUG=info

export PYTHONFAULTHANDLER=1

export CUDA_LAUNCH_BLOCKING=0
export OMPI_MCA_mtl_base_verbose=1
export FI_EFA_ENABLE_SHM_TRANSFER=0
export FI_PROVIDER=efa
export FI_EFA_TX_MIN_CREDITS=64
export NCCL_TREE_THRESHOLD=0

Python 包

datasets==2.13.1
einops==0.6.1
jsonargparse==4.21.1
lightning==2.0.2
numpy==1.24.3
setuptools==67.7.2
torch==2.0.1
torchinfo==1.8.0
tqdm==4.65.0
transformers==4.29.2
pydantic==1.10.8
tensorboard==2.13.0

实施

数据准备脚本

import os
from dataclasses import dataclass
from typing import Optional, Union

import numpy as np
from datasets import load_dataset
from jsonargparse import CLI
from lightning import seed_everything
from tqdm.auto import tqdm
from transformers import AutoTokenizer


@dataclass
class Config:
    dataset_name_or_path: str
    tokenizer_name_or_path: str
    output_dir: str
    streaming: bool = False
    val_split: Optional[Union[int, float]] = None
    seed: int = 0
    num_proc: int = 4
    batch_size: int = 1_000


def cli_main(config: Config) -> None:
    # Setup: Reproducibility & Output directories
    seed_everything(config.seed)
    os.makedirs(config.output_dir, exist_ok=True)

    # DataLoading & Preparation
    dataset = load_dataset(config.dataset_name_or_path, streaming=config.streaming)

    # Split training dataset into training & validation sets
    if config.val_split and "val" not in dataset.keys():
        split_dataset = dataset["train"].train_test_split(
            test_size=config.val_split, seed=config.seed, shuffle=True
        )
        dataset["train"] = split_dataset.pop("train")
        dataset["val"] = split_dataset.pop("test")

    # Run tokenization
    tokenizer = AutoTokenizer.from_pretrained(config.tokenizer_name_or_path)

    # For packed datasets the eos token must be provided
    if tokenizer.eos_token is None:
        tokenizer.eos_token = tokenizer.pad_token

    def add_eos_batched(ids):
        return [
            x + [tokenizer.eos_token_id] if x[-1] != tokenizer.eos_token_id else x
            for x in ids
        ]

    def tokenize_examples(examples):
        ids = tokenizer(examples["text"]).input_ids

        # Optionally append eos token
        ids = add_eos_batched(ids)
        length = [len(x) for x in ids]

        return {"ids": ids, "len": length}

    dataset = dataset.map(
        tokenize_examples,
        remove_columns=dataset["train"].column_names,
        desc="Running Tokenization",
        batched=True,
        num_proc=config.num_proc,
    )

    # Save tokenized dataset
    for split_name, split in dataset.items():
        filepath = os.path.join(config.output_dir, f"{split_name}.bin")
        np_split = np.memmap(
            filepath, dtype=np.uint16, mode="w+", shape=(sum(split["len"]),)
        )

        num_samples = len(split["len"])
        num_batches = int(np.ceil(num_samples / config.batch_size))

        desc = f"Saving {split_name} dataset. Num samples: {num_samples}. Num batches: {num_batches}"
        offset = 0
        for batch_idx in tqdm(range(num_batches), desc=desc):
            batch = split.shard(
                num_shards=num_batches, index=batch_idx, contiguous=True
            ).with_format("numpy")
            batch = np.concatenate(batch["ids"])

            np_split[offset : offset + len(batch)] = batch
            offset += len(batch)

        np_split.flush()


if __name__ == "__main__":
    config = CLI(Config, as_positional=False)
    cli_main(config)

数据加载

import os
from dataclasses import dataclass
from typing import Optional

import numpy as np
import torch
from lightning import LightningDataModule
from torch.utils.data import DataLoader, Dataset


class PackedDataset(Dataset):
    def __init__(
        self, filepath: str, block_size: int = 1024, filemode: str = "r"
    ) -> None:
        super().__init__()

        self.filepath = filepath
        self.block_size = block_size

        self.data = np.memmap(filepath, dtype=np.uint16, mode=filemode)

    def __len__(self) -> int:
        return len(self.data) - self.block_size

    def __getitem__(self, index: int) -> torch.Tensor:
        return torch.as_tensor(
            self.data[index : index + self.block_size].astype(np.int64)
        )


@dataclass
class PackedDataModuleConfig:
    data_dir: str
    block_size: int
    filemode: str = "r"
    train_batch_size: Optional[int] = None
    val_batch_size: Optional[int] = None
    test_batch_size: Optional[int] = None
    num_workers: int = 4
    pin_memory: bool = False
    persistent_workers: bool = False


class PackedDataModule(LightningDataModule):
    def __init__(self, config: PackedDataModuleConfig) -> None:
        super().__init__()

        self.config = config
        self.splits = os.listdir(self.config.data_dir)

    def setup(self, stage: Optional[str] = None) -> None:
        if self.config.train_batch_size and "train.bin" in self.splits:
            self.train_dataset = PackedDataset(
                os.path.join(self.config.data_dir, "train.bin"),
                self.config.block_size,
                self.config.filemode,
            )

        if self.config.val_batch_size and "val.bin" in self.splits:
            self.val_dataset = PackedDataset(
                os.path.join(self.config.data_dir, "val.bin"),
                self.config.block_size,
                self.config.filemode,
            )

        if self.config.test_batch_size and "test.bin" in self.splits:
            self.test_dataset = PackedDataset(
                os.path.join(self.config.data_dir, "test.bin"),
                self.config.block_size,
                self.config.filemode,
            )

    def train_dataloader(self) -> Optional[DataLoader]:
        if hasattr(self, "train_dataset"):
            return self._create_dataloader(
                self.train_dataset, self.config.train_batch_size
            )

    def val_dataloader(self) -> Optional[DataLoader]:
        if hasattr(self, "val_dataset"):
            return self._create_dataloader(self.val_dataset, self.config.val_batch_size)

    def test_dataloader(self) -> Optional[DataLoader]:
        if hasattr(self, "test_dataset"):
            return self._create_dataloader(
                self.test_dataset, self.config.test_batch_size
            )

    def _create_dataloader(self, dataset: Dataset, batch_size: int) -> DataLoader:
        return DataLoader(
            dataset,
            batch_size=batch_size,
            shuffle=False,  # Causes the process to hang for large files
            num_workers=self.config.num_workers,
            pin_memory=self.config.pin_memory,
            persistent_workers=self.config.persistent_workers,
        )
pytorch distributed-computing pytorch-lightning pytorch-dataloader multi-gpu
1个回答
0
投票

将 multiprocessing_context(DataLoader) 从“spawn”更改为“fork”对我有用

© www.soinside.com 2019 - 2024. All rights reserved.