在多个 GPU 上并行化 DeepFace

问题描述 投票:0回答:1

我正在尝试使用 DeepFace python 库对长视频进行人脸识别和分析:https://github.com/serengil/deepface

使用开箱即用的库,我可以通过从视频中选择帧然后迭代 for 循环来获得所需的结果。

Single GPU

import decord
import tensorflow as tf
from deepface import DeepFace

video_path = 'myvideopath'
vr = decord.VideoReader(video_path)

for i in range(0, 100, FRAME_STEP):
    image_bgr = vr[i].asnumpy()[:,:,::-1]
    results = DeepFace.find(img_path = image_bgr, **other_parameters)

这可行,但对于我需要处理的视频和帧量来说太慢了。

运行模型时,我注意到它使用

~600 MB
进行预测,因此我应该能够在同一物理 GPU 上运行多个实例。我仅使用 DeepFace 进行预测,而不是训练或微调任何模型。

gpus = tf.config.list_physical_devices('GPU')
for gpu in gpus:
    try:
        tf.config.set_logical_device_configuration(gpu, [tf.config.LogicalDeviceConfiguration(memory_limit=630)] * 12)
    except RuntimeError as e:
        # Virtual devices must be set before GPUs have been initialized
        print(e)
    
logical_gpus = tf.config.list_logical_devices('GPU')
print(len(gpus), "Physical GPU,", len(logical_gpus), "Logical GPUs")

2 Physical GPU, 24 Logical GPUs

我希望能够并行化

DeepFace.find
DeepFace.analyze
函数。

我尝试做的第一件事是拥有一个空闲 GPU 设备队列并使用

concurrent.futures.ThreadPoolExecutor

def multigpu_helper(index, device_name, image_bgr, fn, fn_dict, q):
    print(f'{index:5} {device_name}')
    start_timer = timeit.default_timer()
    with tf.device(device_name):
        results = fn(img_path=image_bgr, **fn_dict)
    q.put(device_name)
    end_timer = timeit.default_timer()
    print(f'MultiGPU Time: {end_timer-start_timer} sec.')
    return results


def multigpu_process(iterable, vr, fn, fn_dict):
    logical_devices = tf.config.list_logical_devices(device_type='GPU')
    print(logical_devices)

    q = queue.Queue()
    for logical_device in logical_devices:
        q.put(logical_device.name)

    results_dict = dict()

    item_list = list(iterable)

    with concurrent.futures.ThreadPoolExecutor(max_workers=len(logical_devices)) as pool:
        future_jobs = dict()

        while item_list:
            device_name = q.get()
            index = item_list.pop(0)
            image_bgr = vr[index].asnumpy()[:, :, ::-1]
            future_jobs[pool.submit(multigpu_helper, index, device_name, image_bgr, fn, fn_dict, q)] = index

        for future in concurrent.futures.as_completed(future_jobs):
            index = future_jobs.get(future)
            results = future.result()
            results_dict[index] = results

    return results_dict

我能够让代码执行并输出结果,但这并不比在单个 GPU 上的单个 for 循环中执行更快。

[LogicalDevice(name='/device:GPU:0', device_type='GPU'), LogicalDevice(name='/device:GPU:1', device_type='GPU'), LogicalDevice(name='/device:GPU:2', device_type='GPU'), LogicalDevice(name='/device:GPU:3', device_type='GPU'), LogicalDevice(name='/device:GPU:4', device_type='GPU'), LogicalDevice(name='/device:GPU:5', device_type='GPU'), LogicalDevice(name='/device:GPU:6', device_type='GPU'), LogicalDevice(name='/device:GPU:7', device_type='GPU'), LogicalDevice(name='/device:GPU:8', device_type='GPU'), LogicalDevice(name='/device:GPU:9', device_type='GPU'), LogicalDevice(name='/device:GPU:10', device_type='GPU'), LogicalDevice(name='/device:GPU:11', device_type='GPU'), LogicalDevice(name='/device:GPU:12', device_type='GPU'), LogicalDevice(name='/device:GPU:13', device_type='GPU'), LogicalDevice(name='/device:GPU:14', device_type='GPU'), LogicalDevice(name='/device:GPU:15', device_type='GPU'), LogicalDevice(name='/device:GPU:16', device_type='GPU'), LogicalDevice(name='/device:GPU:17', device_type='GPU'), LogicalDevice(name='/device:GPU:18', device_type='GPU'), LogicalDevice(name='/device:GPU:19', device_type='GPU'), LogicalDevice(name='/device:GPU:20', device_type='GPU'), LogicalDevice(name='/device:GPU:21', device_type='GPU'), LogicalDevice(name='/device:GPU:22', device_type='GPU'), LogicalDevice(name='/device:GPU:23', device_type='GPU')]
    0 /device:GPU:0
   30 /device:GPU:1
   60 /device:GPU:2
   90 /device:GPU:3
  120 /device:GPU:4
  150 /device:GPU:5
  180 /device:GPU:6
  210 /device:GPU:7
  240 /device:GPU:8
  270 /device:GPU:9
  300 /device:GPU:10
  330 /device:GPU:11
  360 /device:GPU:12
  390 /device:GPU:13
  420 /device:GPU:14
  450 /device:GPU:15
  480 /device:GPU:16
  510 /device:GPU:17
  540 /device:GPU:18
  570 /device:GPU:19
  600 /device:GPU:20
  630 /device:GPU:21
  660 /device:GPU:22
  690 /device:GPU:23
MultiGPU Time: 16.968208671023604 sec.
  720 /device:GPU:2
MultiGPU Time: 17.829027735977434 sec.
  750 /device:GPU:1
MultiGPU Time: 17.852755011990666 sec.
  780 /device:GPU:8
MultiGPU Time: 19.71368485200219 sec.MultiGPU Time: 19.543589979992248 sec.

MultiGPU Time: 19.8676836140221 sec.
  810 /device:GPU:4
MultiGPU Time: 19.85990399698494 sec.
  840 /device:GPU:11
  870 /device:GPU:0
MultiGPU Time: 20.076353634009138 sec.
  900 /device:GPU:6
  930 /device:GPU:3
MultiGPU Time: 20.145404886978213 sec.
MultiGPU Time: 20.27192261395976 sec.
  960 /device:GPU:9
  990 /device:GPU:7
MultiGPU Time: 20.459441539016552 sec.
MultiGPU Time: 20.418532160052564 sec.
MultiGPU Time: 20.581610807043035 sec.
MultiGPU Time: 20.545571406022646 sec.
MultiGPU Time: 20.832303048984613 sec.
MultiGPU Time: 20.97456920897821 sec.
MultiGPU Time: 20.994418176996987 sec.
MultiGPU Time: 21.35945221298607 sec.
MultiGPU Time: 21.50979186099721 sec.
MultiGPU Time: 21.405662977020256 sec.
MultiGPU Time: 21.542257393943146 sec.
MultiGPU Time: 22.063301149988547 sec.
MultiGPU Time: 21.665760322008282 sec.
MultiGPU Time: 22.105394209967926 sec.
MultiGPU Time: 6.661869053030387 sec.
MultiGPU Time: 9.814038792042993 sec.
MultiGPU Time: 7.658941667003091 sec.
MultiGPU Time: 8.546573753003031 sec.
MultiGPU Time: 10.831304075953085 sec.
MultiGPU Time: 9.250181486015208 sec.
MultiGPU Time: 8.87483947101282 sec.
MultiGPU Time: 12.432360459002666 sec.
MultiGPU Time: 9.511910478991922 sec.
MultiGPU Time: 9.66243519296404 sec.
Face Recognition MultiGPU Total Time: 29.63435428502271 sec.

事实上,for 循环中

DeepFace.find
函数的单次 GPU 迭代大约需要
0.5 sec
。看来多线程导致所有线程在其累积时间附近完成,这是较慢且不受欢迎的。

我第二次尝试不使用队列,只是将输入索引分成单独的列表,然后单独处理。

def cycle_baskets(items: List[Any], maxbaskets: int) -> List[List[Any]]:
    baskets = [[] for _ in range(min(maxbaskets, len(items)))]
    for item, basket in zip(items, cycle(baskets)):
        basket.append(item)
    return baskets


def multigpu_helper_split(device_name, item_list, video_path, fn, fn_dict):
    print(device_name)
    start_timer = timeit.default_timer()

    results_dict = dict()
    
    vr = decord.VideoReader(str(video_path))

    with tf.device(device_name):
        for index in item_list:
            start_index_timer = timeit.default_timer()

            image_bgr = vr[index].asnumpy()[:, :, ::-1]
            results_dict[index] = fn(img_path=image_bgr, **fn_dict)

            end_index_timer = timeit.default_timer()
            print(f'Device {device_name} Index {index:5} {end_index_timer - start_index_timer} sec.')

    end_timer = timeit.default_timer()
    print(f'MultiGPU Time: {end_timer - start_timer} sec.')
    return results_dict


def multigpu_process_split(iterable, video_path, fn, fn_dict):
    logical_devices = [device.name for device in tf.config.list_logical_devices(device_type='GPU')]
    print(logical_devices)

    results_dict = dict()

    item_lists = cycle_baskets(list(iterable), len(logical_devices))

    with concurrent.futures.ThreadPoolExecutor(max_workers=len(logical_devices)) as pool:
        future_jobs = {pool.submit(multigpu_helper_split, logical_devices[i], item_lists[i], video_path, fn, fn_dict) for i in range(len(logical_devices))}

        for future in concurrent.futures.as_completed(future_jobs):
            results_dict.update(future.result())

    return results_dict

这也相当慢并且还导致内核崩溃。

Device /device:GPU:18 Index   540 305.03293917299015 sec.
MultiGPU Time: 311.7356750360341 sec.
Device /device:GPU:22 Index   660 305.6161605300149 sec.
MultiGPU Time: 312.3281374910148 sec.
Device /device:GPU:5 Index   150 309.5672924729879 sec.
Device /device:GPU:13 Index   390 311.9252848789911 sec.
MultiGPU Time: 318.34215058299014 sec.
Device /device:GPU:0 Index     0 312.96517166896956 sec.
Device /device:GPU:3 Index    90 312.41818467900157 sec.
Device /device:GPU:4 Index   120 312.507540087041 sec.
Device /device:GPU:10 Index   300 312.49839297297876 sec.
MultiGPU Time: 319.4717267890228 sec.
Device /device:GPU:23 Index   690 313.53694368101424 sec.
MultiGPU Time: 320.6566755659878 sec.

我意识到

with tf.device(device_name):
覆盖了整个
DeepFace
函数。看看
DeepFace
源代码,看起来比tensorflow要多得多,而我真正想要并行化的是
model.predict()

DeepFace.py

def represent():
...
# represent
        if "keras" in str(type(model)):
            # new tf versions show progress bar and it is annoying
            embedding = model.predict(img, verbose=0)[0].tolist()
        else:
            # SFace and Dlib are not keras models and no verbose arguments
            embedding = model.predict(img)[0].tolist()

我如何能够并行化

DeepFace.find
DeepFace.analyze
函数以在我拥有的 24 个逻辑 GPU 上运行?我希望能够获得 x24 加速来处理选定的帧。

如果我可以在 DeepFace 函数本身周围封装一些东西,那就更好了,但如果这是不可能的,那么我可以尝试并行化 DeepFace 库的源代码。

tensorflow gpu tensorflow2.0 ray deepface
1个回答
0
投票

我能够通过使用

DeepFace
并行化一些内部函数来并行化
ray

© www.soinside.com 2019 - 2024. All rights reserved.