Prefect - 无法将 Dask 数据帧保留到 Dask 客户端

问题描述 投票:0回答:1

我是一名相当新的 Prefect,正在努力在 Prefect Flow 中使用 Dask 持久性。我不确定这是代码问题还是 Prefect 内的限制。

我做了以下简单的例子来说明我遇到的问题。我的真实用例要复杂得多,但我希望这个示例能够简明地说明我的问题的核心。我想将 Dask DataFrame 持久保存到 Prefect DaskTaskRunner 提供的 Dask 客户端。然后我想将该持久数据帧传递给多个下游任务。

"""
Example of persisting a Dask DataFrame with Prefect. DOES NOT work.
"""

import dask.dataframe as dd
import pandas as pd
from prefect import task, flow
from prefect_dask import get_dask_client, DaskTaskRunner
from dask.distributed import wait

@task
def load_ddf_A() -> dd.DataFrame:
    """
    Loads sample data into a Dask DataFrame and attempts to perist it to the Dask client. This does not work.
    """

    with get_dask_client():
        data = [
            [1,2,3], [4,5,6], [7,8,9]
            ,[11,12,13], [14,15,16], [17,18,19]
            ,[21,22,23], [24,25,26], [27,28,29]
            ,[31,32,33], [34,35,36], [37,38,39]
            ,[41,42,43], [44,45,46], [47,48,49]
            ,[51,52,53], [54,55,56], [57,58,59]
            ,[61,62,63], [64,65,66], [67,68,69]
            ,[71,72,23], [74,75,76], [77,78,79]
            ,[81,82,33], [84,85,86], [87,88,89]
            ,[91,92,93], [94,95,96], [97,98,99]
        ]

        cols = ['A', 'B', 'C']

        df = pd.DataFrame(data=data, columns=cols)
        ddf = dd.from_pandas(df, npartitions=1)
        ddf = ddf.persist()
        wait(ddf)

    return ddf

@task
def calc_ddf_B(source_ddf: dd.DataFrame) -> dd.DataFrame:
    """
    Multiplies a source Dask DataFrame by 2.
    """

    return source_ddf * 2

@task
def calc_ddf_C(source_ddf: dd.DataFrame) -> dd.DataFrame:
    """
    Multiplies a source Dask DataFrame by 2.
    """

    return source_ddf * 3

@flow(task_runner=DaskTaskRunner())
def example_flow():
    """
    Example Prefect Flow that loads sample data into Dask DataFrame A and then creates dataframes B and C by multiplying A by 2 and 3, respectively.
    """

    ddf_A = load_ddf_A.submit()
    ddf_B = calc_ddf_B.submit(ddf_A)
    ddf_C = calc_ddf_C.submit(ddf_A)

    print('ddf_B is: \n')
    print(ddf_B.result().head(10))
    print('\nddf_C is: \n')
    print(ddf_C.result().head(10))

if __name__ == '__main__':
    example_flow()

尝试持久化数据帧时,此方法会失败。我收到以下错误消息:

14:22:22.878 | ERROR   | Task run 'load_ddf_A-0' - Crash detected! Execution was cancelled by the runtime environment.
2023-08-28 14:22:22,922 - distributed.worker - WARNING - Compute Failed
Key:       load_ddf_A-0-72b3fab73f2c45c8a3bdc15afffee986-1
Function:  begin_task_run
args:      ()
kwargs:    {'task': <prefect.tasks.Task object at 0x000001AFC5275310>, 'task_run': TaskRun(id=UUID('72b3fab7-3f2c-45c8-a3bd-c15afffee986'), name='load_ddf_A-0', flow_run_id=UUID('613a493a-d2cc-4ae1-ad5a-a339f8588ffd'), task_ke
y='__main__.load_ddf_A', dynamic_key='0', cache_key=None, cache_expiration=None, task_version=None, empirical_policy=TaskRunPolicy(max_retries=0, retry_delay_seconds=0.0, retries=0, retry_delay=0, retry_jitter_factor=None), ta
gs=[], state_id=UUID('38834eaa-1282-4121-8e8a-8a4b736d767f'), task_inputs={}, state_type=StateType.PENDING, state_name='Pending', run_count=0, flow_run_run_count=0, expected_start_time=DateTime(2023, 8, 28, 21, 22, 17, 768737,
 tzinfo=Timezone('+00:00')), next_scheduled_start_time=None, start_time=None, end_time=None, total_run_time=datetime.timedelta(0), estimated_run_time=datetime.timedelta(0), estimated_start_time_delta=datetime.timedelta(microse
conds=13275), state=Pending(message=None, type=PENDING, result=None)), 'parameters': {}, 'wait_for': None, 
Exception: 'CancelledError()'

14:22:22.922 | WARNING | distributed.worker - Compute Failed
Key:       load_ddf_A-0-72b3fab73f2c45c8a3bdc15afffee986-1
Function:  begin_task_run
args:      ()
kwargs:    {'task': <prefect.tasks.Task object at 0x000001AFC5275310>, 'task_run': TaskRun(id=UUID('72b3fab7-3f2c-45c8-a3bd-c15afffee986'), name='load_ddf_A-0', flow_run_id=UUID('613a493a-d2cc-4ae1-ad5a-a339f8588ffd'), task_ke
y='__main__.load_ddf_A', dynamic_key='0', cache_key=None, cache_expiration=None, task_version=None, empirical_policy=TaskRunPolicy(max_retries=0, retry_delay_seconds=0.0, retries=0, retry_delay=0, retry_jitter_factor=None), ta
gs=[], state_id=UUID('38834eaa-1282-4121-8e8a-8a4b736d767f'), task_inputs={}, state_type=StateType.PENDING, state_name='Pending', run_count=0, flow_run_run_count=0, expected_start_time=DateTime(2023, 8, 28, 21, 22, 17, 768737,
 tzinfo=Timezone('+00:00')), next_scheduled_start_time=None, start_time=None, end_time=None, total_run_time=datetime.timedelta(0), estimated_run_time=datetime.timedelta(0), estimated_start_time_delta=datetime.timedelta(microse
conds=13275), state=Pending(message=None, type=PENDING, result=None)), 'parameters': {}, 'wait_for': None,
Exception: 'CancelledError()'

以下代码实现相同的逻辑,但使用纯 Dask。它按预期工作。


"""
Example of persisting a Dask DataFrame without Prefect. DOES work.
"""

import dask.dataframe as dd
import pandas as pd
from dask.distributed import wait, LocalCluster, Client

def load_ddf_A() -> dd.DataFrame:
    """
    Loads sample data into a Dask DataFrame and attempts to perist it to the Dask client. This does work.
    """

    data = [
        [1,2,3], [4,5,6], [7,8,9]
        ,[11,12,13], [14,15,16], [17,18,19]
        ,[21,22,23], [24,25,26], [27,28,29]
        ,[31,32,33], [34,35,36], [37,38,39]
        ,[41,42,43], [44,45,46], [47,48,49]
        ,[51,52,53], [54,55,56], [57,58,59]
        ,[61,62,63], [64,65,66], [67,68,69]
        ,[71,72,23], [74,75,76], [77,78,79]
        ,[81,82,33], [84,85,86], [87,88,89]
        ,[91,92,93], [94,95,96], [97,98,99]
    ]

    cols = ['A', 'B', 'C']

    df = pd.DataFrame(data=data, columns=cols)
    ddf = dd.from_pandas(df, npartitions=1)
    ddf = ddf.persist()
    wait(ddf)

    return ddf

def calc_ddf_B(source_ddf: dd.DataFrame) -> dd.DataFrame:
    """
    Multiplies a source Dask DataFrame by 2.
    """

    return source_ddf * 2

def calc_ddf_C(source_ddf: dd.DataFrame) -> dd.DataFrame:
    """
    Multiplies a source Dask DataFrame by 3.
    """

    return source_ddf * 3

def example_flow():
    """
    Example Dask workflow that loads sample data into Dask DataFrame A and then creates dataframes B and C by multiplying A by 2 and 3, respectively.
    """

    ddf_A = load_ddf_A()
    ddf_B = calc_ddf_B(ddf_A)
    ddf_C = calc_ddf_C(ddf_A)

    print('ddf_B is: \n')
    print(ddf_B.head(10))
    print('\nddf_C is: \n')
    print(ddf_C.head(10))

if __name__ == '__main__':
    cluster = LocalCluster()
    client = Client(cluster)
    example_flow()

任何关于我对 Prefect 的错误的建议将不胜感激!

dask-distributed dask-dataframe prefect
1个回答
0
投票

您正在向 Dask 提交

load_ddf_A
任务,该任务又对您的数据框执行 Dask
.persist()
。 任务的结果将是一个包含一些未解决的 Dask future 的数据框,我不确定 Prefect 任务在传递结果时是否正确处理了这些问题。

确实,如果您调用

.compute()
而不是
.persist()
,您会看到流程正确完成,但我知道这并不理想。

© www.soinside.com 2019 - 2024. All rights reserved.