我是一名相当新的 Prefect,正在努力在 Prefect Flow 中使用 Dask 持久性。我不确定这是代码问题还是 Prefect 内的限制。
我做了以下简单的例子来说明我遇到的问题。我的真实用例要复杂得多,但我希望这个示例能够简明地说明我的问题的核心。我想将 Dask DataFrame 持久保存到 Prefect DaskTaskRunner 提供的 Dask 客户端。然后我想将该持久数据帧传递给多个下游任务。
"""
Example of persisting a Dask DataFrame with Prefect. DOES NOT work.
"""
import dask.dataframe as dd
import pandas as pd
from prefect import task, flow
from prefect_dask import get_dask_client, DaskTaskRunner
from dask.distributed import wait
@task
def load_ddf_A() -> dd.DataFrame:
"""
Loads sample data into a Dask DataFrame and attempts to perist it to the Dask client. This does not work.
"""
with get_dask_client():
data = [
[1,2,3], [4,5,6], [7,8,9]
,[11,12,13], [14,15,16], [17,18,19]
,[21,22,23], [24,25,26], [27,28,29]
,[31,32,33], [34,35,36], [37,38,39]
,[41,42,43], [44,45,46], [47,48,49]
,[51,52,53], [54,55,56], [57,58,59]
,[61,62,63], [64,65,66], [67,68,69]
,[71,72,23], [74,75,76], [77,78,79]
,[81,82,33], [84,85,86], [87,88,89]
,[91,92,93], [94,95,96], [97,98,99]
]
cols = ['A', 'B', 'C']
df = pd.DataFrame(data=data, columns=cols)
ddf = dd.from_pandas(df, npartitions=1)
ddf = ddf.persist()
wait(ddf)
return ddf
@task
def calc_ddf_B(source_ddf: dd.DataFrame) -> dd.DataFrame:
"""
Multiplies a source Dask DataFrame by 2.
"""
return source_ddf * 2
@task
def calc_ddf_C(source_ddf: dd.DataFrame) -> dd.DataFrame:
"""
Multiplies a source Dask DataFrame by 2.
"""
return source_ddf * 3
@flow(task_runner=DaskTaskRunner())
def example_flow():
"""
Example Prefect Flow that loads sample data into Dask DataFrame A and then creates dataframes B and C by multiplying A by 2 and 3, respectively.
"""
ddf_A = load_ddf_A.submit()
ddf_B = calc_ddf_B.submit(ddf_A)
ddf_C = calc_ddf_C.submit(ddf_A)
print('ddf_B is: \n')
print(ddf_B.result().head(10))
print('\nddf_C is: \n')
print(ddf_C.result().head(10))
if __name__ == '__main__':
example_flow()
尝试持久化数据帧时,此方法会失败。我收到以下错误消息:
14:22:22.878 | ERROR | Task run 'load_ddf_A-0' - Crash detected! Execution was cancelled by the runtime environment.
2023-08-28 14:22:22,922 - distributed.worker - WARNING - Compute Failed
Key: load_ddf_A-0-72b3fab73f2c45c8a3bdc15afffee986-1
Function: begin_task_run
args: ()
kwargs: {'task': <prefect.tasks.Task object at 0x000001AFC5275310>, 'task_run': TaskRun(id=UUID('72b3fab7-3f2c-45c8-a3bd-c15afffee986'), name='load_ddf_A-0', flow_run_id=UUID('613a493a-d2cc-4ae1-ad5a-a339f8588ffd'), task_ke
y='__main__.load_ddf_A', dynamic_key='0', cache_key=None, cache_expiration=None, task_version=None, empirical_policy=TaskRunPolicy(max_retries=0, retry_delay_seconds=0.0, retries=0, retry_delay=0, retry_jitter_factor=None), ta
gs=[], state_id=UUID('38834eaa-1282-4121-8e8a-8a4b736d767f'), task_inputs={}, state_type=StateType.PENDING, state_name='Pending', run_count=0, flow_run_run_count=0, expected_start_time=DateTime(2023, 8, 28, 21, 22, 17, 768737,
tzinfo=Timezone('+00:00')), next_scheduled_start_time=None, start_time=None, end_time=None, total_run_time=datetime.timedelta(0), estimated_run_time=datetime.timedelta(0), estimated_start_time_delta=datetime.timedelta(microse
conds=13275), state=Pending(message=None, type=PENDING, result=None)), 'parameters': {}, 'wait_for': None,
Exception: 'CancelledError()'
14:22:22.922 | WARNING | distributed.worker - Compute Failed
Key: load_ddf_A-0-72b3fab73f2c45c8a3bdc15afffee986-1
Function: begin_task_run
args: ()
kwargs: {'task': <prefect.tasks.Task object at 0x000001AFC5275310>, 'task_run': TaskRun(id=UUID('72b3fab7-3f2c-45c8-a3bd-c15afffee986'), name='load_ddf_A-0', flow_run_id=UUID('613a493a-d2cc-4ae1-ad5a-a339f8588ffd'), task_ke
y='__main__.load_ddf_A', dynamic_key='0', cache_key=None, cache_expiration=None, task_version=None, empirical_policy=TaskRunPolicy(max_retries=0, retry_delay_seconds=0.0, retries=0, retry_delay=0, retry_jitter_factor=None), ta
gs=[], state_id=UUID('38834eaa-1282-4121-8e8a-8a4b736d767f'), task_inputs={}, state_type=StateType.PENDING, state_name='Pending', run_count=0, flow_run_run_count=0, expected_start_time=DateTime(2023, 8, 28, 21, 22, 17, 768737,
tzinfo=Timezone('+00:00')), next_scheduled_start_time=None, start_time=None, end_time=None, total_run_time=datetime.timedelta(0), estimated_run_time=datetime.timedelta(0), estimated_start_time_delta=datetime.timedelta(microse
conds=13275), state=Pending(message=None, type=PENDING, result=None)), 'parameters': {}, 'wait_for': None,
Exception: 'CancelledError()'
以下代码实现相同的逻辑,但使用纯 Dask。它按预期工作。
"""
Example of persisting a Dask DataFrame without Prefect. DOES work.
"""
import dask.dataframe as dd
import pandas as pd
from dask.distributed import wait, LocalCluster, Client
def load_ddf_A() -> dd.DataFrame:
"""
Loads sample data into a Dask DataFrame and attempts to perist it to the Dask client. This does work.
"""
data = [
[1,2,3], [4,5,6], [7,8,9]
,[11,12,13], [14,15,16], [17,18,19]
,[21,22,23], [24,25,26], [27,28,29]
,[31,32,33], [34,35,36], [37,38,39]
,[41,42,43], [44,45,46], [47,48,49]
,[51,52,53], [54,55,56], [57,58,59]
,[61,62,63], [64,65,66], [67,68,69]
,[71,72,23], [74,75,76], [77,78,79]
,[81,82,33], [84,85,86], [87,88,89]
,[91,92,93], [94,95,96], [97,98,99]
]
cols = ['A', 'B', 'C']
df = pd.DataFrame(data=data, columns=cols)
ddf = dd.from_pandas(df, npartitions=1)
ddf = ddf.persist()
wait(ddf)
return ddf
def calc_ddf_B(source_ddf: dd.DataFrame) -> dd.DataFrame:
"""
Multiplies a source Dask DataFrame by 2.
"""
return source_ddf * 2
def calc_ddf_C(source_ddf: dd.DataFrame) -> dd.DataFrame:
"""
Multiplies a source Dask DataFrame by 3.
"""
return source_ddf * 3
def example_flow():
"""
Example Dask workflow that loads sample data into Dask DataFrame A and then creates dataframes B and C by multiplying A by 2 and 3, respectively.
"""
ddf_A = load_ddf_A()
ddf_B = calc_ddf_B(ddf_A)
ddf_C = calc_ddf_C(ddf_A)
print('ddf_B is: \n')
print(ddf_B.head(10))
print('\nddf_C is: \n')
print(ddf_C.head(10))
if __name__ == '__main__':
cluster = LocalCluster()
client = Client(cluster)
example_flow()
任何关于我对 Prefect 的错误的建议将不胜感激!
您正在向 Dask 提交
load_ddf_A
任务,该任务又对您的数据框执行 Dask .persist()
。
任务的结果将是一个包含一些未解决的 Dask future 的数据框,我不确定 Prefect 任务在传递结果时是否正确处理了这些问题。
确实,如果您调用
.compute()
而不是 .persist()
,您会看到流程正确完成,但我知道这并不理想。