본문 바로가기

Pytorch/가속화

[Pytorch] DDP-Distributed Data Parallel 구현

반응형

도입

ImageNet과 같이 큰 모델을 학습할 때엔 multi-gpu를 활용하여 학습속도를 향상시킬 수 있습니다. DataParallel과 DistributedDataParallel이 대표적인 방법입니다. DataParallel방법은 딱 한 줄만 수정해줘도 동작하기 때문에, 코딩하는데에는 오랜시간이 걸리진 않지만, DDP에 비해 gpu의 memory를 많이 잡아먹게되고, 학습속도도 비교적 느리기 때문에 전체 학습시간은 더 오래걸리게 된다는 큰 단점이 있습니다.

 

 

이번 포스트에서는 학습 속도 향상을 위한 DistributedDataParallel(DDP)을 사용하는 방법에 대해 포스트해보도록 하겠습니다.

이번 포스트의 구성은 

# 도입

# 간단한 설명

# 상세한 설명

순으로 진행하였습니다.

 

데이터 셋은 CIFAR100을 사용했습니다.

간단한 설명

최대한 간단하게 작성하였습니다. 자세한 동작 방식이나, 함수의 역할이 궁금하다면 자세한 설명을 참고해주세요.

 

또한 args의 default값이나 사용한 값이 알고싶다면 github를 참고해주세요.
https://github.com/dbwp031/Accel_pytorch/tree/main/Amp-pytorch

 

GitHub - dbwp031/Accel_pytorch: Accelerating tools for Pytorch (AMP, DDP ...)

Accelerating tools for Pytorch (AMP, DDP ...). Contribute to dbwp031/Accel_pytorch development by creating an account on GitHub.

github.com

train-multi-ddp.py 혹은 train-multi-ddp-amp.py를 참고하시면 됩니다.


일반적인 학습 코드를 작성하는 순서는 아래와 같습니다.

1. 모델 정의

2. criterion / optimizer 정의

3. 데이터 로딩

4. 학습

 

DDP 학습 코드 작성 순서는 아래와 같아야 합니다.

0. 새로운 main함수 정의 / main_worker정의

1. gpu 설정

2. 모델 정의

3. multiprocess 설정

4. criterion / optimizer 정의

5. 데이터로딩

6. 학습

0. 새로운 main함수 정의 / main_worker정의

일반적인 학습 코드에서의 코드는 아래 순서대로 입니다.

 

#일반적인 학습 코드 main 함수
def main():
	args = parser.parse_args()
    # 데이터 로드
    # 모델 / criterion / optimizer 정의
    for i in range(epoch):
    	train()
        validate()

 

히지만 DDP에서는 기존 main함수의 역할을 main_worker함수가 대신해줍니다. 그 대신 새로운 main함수에서는 mp.spawn함수를 통해 하나의 프로세스에서 하나의 main_worker함수가 동작하게 합니다.

 

def main(): 
    args = parser.parse_args()
	# 간단한 동작 방법을 알고싶다면 생략해도 되는 부분.
    # if args.dist_url=='env://' and args.world_size==-1:
    #     args.world_size=int(os.environ['WORLD_SIZE'])
    
    args.distributed = args.world_size > 1 or args.multiprocessing_distributed
    ngpus_per_node = torch.cuda.device_count()

    if args.multiprocessing_distributed:
        args.world_size=ngpus_per_node*args.world_size
        mp.spawn(main_worker,nprocs=ngpus_per_node,args=(ngpus_per_node,args))
    else:
        main_worker(args.gpu,ngpus_per_node,args)

 

mp.spawn()함수가 main_worker함수를 ngpus_per_node 개수만큼 생성합니다.

1. gpu 설정

2~6번은 main_worker() 함수에 관한 내용입니다.

 

def main_worker(gpu,ngpus_per_node, args):
    # 내용1 :gpu 설정
    print(gpu,ngpus_per_node)
    args.gpu = gpu

    global best_err1, best_err5
    # 내용1-1: gpu!=0이면 print pass
    if args.multiprocessing_distributed and args.gpu !=0:
        def print_pass(*args):
            pass
        builtins.print=print_pass

    if args.gpu is not None:
        print("Use GPU: {} for training".format(args.gpu))
    
    if args.distributed:
        if args.dist_url=='env://' and args.rank==-1:
            args.rank=int(os.environ["RANK"])
        if args.multiprocessing_distributed:
            # gpu = 0,1,2,...,ngpus_per_node-1
            print("gpu는",gpu)
            args.rank=args.rank*ngpus_per_node + gpu
        # 내용1-2: init_process_group 선언
        torch.distributed.init_process_group(backend=args.dist_backend,init_method=args.dist_url,
                                            world_size=args.world_size,rank=args.rank)

 

 

gpu 변수는 mp.spawn 함수에 의해 생성된 프로세스마다 0번부터 (프로세스 수)-1까지 하나씩 부여합니다.

 

프로세스마다 하나의 main_worker가 돌기 때문에 print함수가 있으면 같은 print함수를 프로세스 수만큼 출력하기 때문에 이를 방지하기 위해 print_pass함수를 정의, 선언해줍니다.

 

각 프로세스에서는 ddp함수를 사용하기 이전에 init_process_group을 선언해주어야 합니다.

backend = 'nccl'이고, init_method='tcp:127.0.0.1:10001'을 사용했습니다.

 

2. 모델 정의

    # 내용2: model 정의
    print("=> creating model '{}'".format(args.net_type))
    if args.dataset == 'cifar100':
        numberofclass = 100
    elif args.dataset == 'cifar10':
        numberofclass = 10
    if args.net_type == 'resnet':
        model = RN.ResNet(args.dataset, args.depth, numberofclass, args.bottleneck)  # for ResNet
    elif args.net_type == 'pyramidnet':
        model = PYRM.PyramidNet(args.dataset, args.depth, args.alpha, numberofclass,
                                args.bottleneck)
    else:
        raise Exception('unknown network architecture: {}'.format(args.net_type))

 

 

모델 정의는 ddp를 사용할때 따로 변경해줘야 할 부분은 없습니다. 자신의 코드의 모델 선언 부분을 사용해주시면 됩니다.

3. multiprocess 설정

    # 내용3: multiprocess 설정
    if args.distributed:
        if args.gpu is not None:
            torch.cuda.set_device(args.gpu)
            model.cuda(args.gpu)
            # when using a single GPU per process and per DDP, we need to divide tha batch size
            # ourselves based on the total number of GPUs we have
            args.batch_size = int(args.batch_size / ngpus_per_node)
            args.workers = int((args.workers+ngpus_per_node-1)/ngpus_per_node)
            # 내용3-1: model ddp설정
            model = torch.nn.parallel.DistributedDataParallel(model,device_ids=[args.gpu])# args.gpu가 무슨 값인지 알고 싶다.
        else:
            model.cuda()
            # DDP will divide and allocate batch_size to all available GPUs if device_ids are not set
            # 만약에 device_ids를 따로 설정해주지 않으면, 가능한 모든 gpu를 기준으로 ddp가 알아서 배치사이즈와 workers를 나눠준다는 뜻.
            model = torch.nn.parallel.DistributedDataParallel(model)
    elif args.gpu is not None:
        torch.cuda.set_device(args.gpu)
        model=model.cuda(args.gpu)
        raise NotImplementedError("Only DistributedDataParallel is supported.")
    else:
        raise NotImplementedError("Only DistributedDataparallel is supported.")

 

사용하는 gpu가 4개가 존재한다고 가정한다면 각 프로세스마다 args.gpu = 0,1,2,or3 입니다.

따라서 첫 조건문( if args.gpu is not None)의 조건에 부합하게 됩니다. 배치 사이즈와 workers 수를 재설정해줍니다. 이때 ngpus_per_node=4입니다.

4. criterion / optimizer 정의

    # 내용4: criterion / optimizer 정의
    criterion = nn.CrossEntropyLoss().cuda(args.gpu)

    optimizer = torch.optim.SGD(model.parameters(), args.lr,
                                momentum=args.momentum,
                                weight_decay=args.weight_decay, nesterov=True)

 

loss가 프로세스에 할당된 gpu를 사용하도록 해주기만 하면 된다.

5. 데이터 로딩

    # 내용5: 데이터 로딩
    # 내용5-1: transform 정의
    if args.dataset.startswith('cifar'):
        normalize = transforms.Normalize(mean=[x / 255.0 for x in [125.3, 123.0, 113.9]],
                                         std=[x / 255.0 for x in [63.0, 62.1, 66.7]])

        transform_train = transforms.Compose([
            transforms.RandomCrop(32, padding=4),
            transforms.RandomHorizontalFlip(),
            transforms.ToTensor(),
            normalize,
        ])

        transform_test = transforms.Compose([
            transforms.ToTensor(),
            normalize
        ])
        # 내용5-2: dataset 정의
        if args.dataset == 'cifar100':
            train_dataset = datasets.CIFAR100('../data', train=True, download=True, transform=transform_train)
            val_dataset = datasets.CIFAR100('../data', train=False, transform=transform_test)
            numberofclass = 100
        elif args.dataset == 'cifar10':
            train_dataset = datasets.CIFAR10('../data', train=True, download=True, transform=transform_train)
            val_dataset = datasets.CIFAR10('../data', train=False, transform=transform_test)
            numberofclass = 10
            
        # 내용5-3: sampler 정의 (참고: val_loader는 sampler를 사용하지 않는다.)
        if args.distributed:
            train_sampler = torch.utils.data.distributed.DistributedSampler(train_dataset)
        else:
            train_sampler = None

        train_loader = torch.utils.data.DataLoader(
            train_dataset,
            batch_size=args.batch_size, shuffle=(train_sampler is None), num_workers=args.workers, pin_memory=True,sampler=train_sampler)
        val_loader = torch.utils.data.DataLoader(
            val_dataset,
            batch_size=args.batch_size, shuffle=False, num_workers=args.workers, pin_memory=True)


    else:
        raise Exception('unknown dataset: {}'.format(args.dataset))

 

ddp를 사용하려면 내용5-3과 같이

train_sampler = torch.utils.data.distributed.DistributedSampler(train_dataset)

정의하고, shuffle = False로 설정해주어야 한다.

이때 batch_size는 3번에서 줄어든 크기의 batch_size가 들어가게 된다.

 

6. 학습

# 내용 6: for문을 통한 training
    cudnn.benchmark = True
    stime = time.time()
    scaler = torch.cuda.amp.GradScaler()
    for epoch in range(args.start_epoch, args.epochs):
        # 내용 6-1: train_sampler.set_epoch
        # In distributed mode, calling the set_eopch() method at the beggining of 
        # each epoch before creating the "dataloader" iterator is necessary to make
        # suffling work properly across multiple epochs. Otherwise, the same ordering will be always used.
        if args.distributed:
            train_sampler.set_epoch(epoch)
        adjust_learning_rate(optimizer, epoch,args)

        # train for one epoch
        train_loss = train(train_loader, model, criterion, optimizer, epoch,args,scaler)

        # evaluate on validation set
        err1, err5, val_loss = validate(val_loader, model, criterion, epoch,args)

        # remember best prec@1 and save checkpoint
        is_best = err1 <= best_err1
        best_err1 = min(err1, best_err1)
        if is_best:
            best_err5 = err5

        print('Current best accuracy (top-1 and 5 error):', best_err1, best_err5)
        if not args.multiprocessing_distributed or (args.multiprocessing_distributed and args.rank% ngpus_per_node == 0):
            save_checkpoint({
                'epoch': epoch,
                'arch': args.net_type,
                'state_dict': model.state_dict(),
                'best_err1': best_err1,
                'best_err5': best_err5,
                'optimizer': optimizer.state_dict(),
            }, is_best,args=args)
    etime = time.time()
    if not args.multiprocessing_distributed or (args.multiprocessing_distributed and args.rank % ngpus_per_node == 0):
        print("총 걸린시간: ",etime-stime)
        print('Best accuracy (top-1 and 5 error):', best_err1, best_err5)

 

내용 6-1과 같이 매 에폭마다 train_sampler.set_epoch(epoch)를 해주어야 shuffle이 잘 사용된다고 한다.

자세한 설명

포스트의 글이 생각보다 길어져 이는 다음 포스트에 작성하도록 했습니다.

 

참고자료

facebook이 작성한 코드인 Moco코드, PYTORCH 공식 문서를 참고하였습니다.

- Moco

https://github.com/facebookresearch/moco

 

GitHub - facebookresearch/moco: PyTorch implementation of MoCo: https://arxiv.org/abs/1911.05722

PyTorch implementation of MoCo: https://arxiv.org/abs/1911.05722 - GitHub - facebookresearch/moco: PyTorch implementation of MoCo: https://arxiv.org/abs/1911.05722

github.com

 

반응형

'Pytorch > 가속화' 카테고리의 다른 글

[Pytorch] apex / amp 모델 학습 빠르게 시키는 법.  (0) 2022.03.20