dagster op 函数的类类型<str>无效

问题描述 投票:0回答:1

我正在运行下面的代码。我有一份工作和一份工作,但是当我做 dagster dev 时,我遇到了以下问题。还有另一个操作从表中加载数据,在这里我想将此数据放入另一个表中。正如我通过 dagster 预览看到的那样,数据框正在加载。加载操作以数据帧的形式给出输出。

@op
def save_df_to_db(db: resource.sql_resource, dataframe, custom_table_name: str):
    log=get_dagster_logger()
    log.info("Connecting to db", custom_table_name)
    with db.get().begin() as con:
        dataframe.to_sql(custom_table_name, con, if_exists='append', index=False)
    ...

职位定义

@job
def start_day_job():
delivery_location=import_ops.get_location()
import_ops.save_df_to_db(dataframe=delivery_location, custom_table_name="Delivery_Location")

import_schedule=ScheduleDefinition(
    job=start_day_job, cron_schedule="0 7 * * 2-6",execution_timezone="Europe/Zurich"
)

但是我收到此错误。

```在 @job start_day_job 中,在 op 调用“save_df_to_db”中收到输入 (custom_table_name) 的无效类类型 str。在组合期间调用 ndoes 时,必须将先前节点调用的输出或输入传递给复合函数作为输入`


How do i resolve this error?

I have tried the following.

1. Passed as custom parameter.
2. Passed as config schema

Still same issue. 
nodes jobs invocation dagster
1个回答
0
投票

这是因为文字值(在本例中为

pd.DataFrame
str
custom_table_name)无法直接传递给运算符。相反,它们必须作为
Out
从另一个运算符返回,或者作为配置值返回。请参阅以下示例,了解如何指定
RunConfig
table_name

import pandas as pd
from dagster import op, job, Out, Definitions, Config, RunConfig
from dagster_duckdb import DuckDBResource


class SaveTableConfig(Config):
    custom_table_name: str


@op(out=Out(pd.DataFrame))
def create_dataframe():
    data = {
        "Column1": [1, 2, 3],
        "Column2": ["A", "B", "C"],
        "Column3": [True, False, True],
    }
    df = pd.DataFrame(data)
    return df


@op
def save_df_to_db(dataframe: pd.DataFrame, db: DuckDBResource, config: SaveTableConfig):
    with db.get_connection() as conn:
        dataframe.to_sql(
            config.custom_table_name, conn, if_exists="append", index=False
        )


@job(
    config=RunConfig(
        ops={"save_df_to_db": SaveTableConfig(custom_table_name="Delivery_Location")}
    )
)
def start_day_job():
    df = create_dataframe()
    save_df_to_db(dataframe=df)


defs = Definitions(
    jobs=[start_day_job],
    resources={
        "db": DuckDBResource(
            database="my.duckdb",
        )
    },
)

在此示例中,我选择使用 DuckDB 来确认一切按预期工作。干杯。

© www.soinside.com 2019 - 2024. All rights reserved.