在 Airflow GrpcOperator 中模板化请求消息

问题描述 投票:0回答:1

Airflow 的

GrpcOperator
支持在
data
字段中传递模板变量。但是,需要进入
data
字段的 protobuf 生成的消息对象在解析时进行评估,这会破坏模板。

例如,在下面的任务中,我想向消息正文中包含 DAG 运行日期

ds
的 gRPC 服务器发送请求:

task_grpc = GrpcOperator(
    dag=dag,
    task_id="task_grpc",
    grpc_conn_id="grpc_default",
    stub_class=CustomGrpcServiceStub,
    call_func="CustomGrpcServiceFunction",
    response_callback=CustomGrpcService_callback,
    streaming=False,
    data = {"request": proto_pb2.CustomGrpcServiceRequest(request_date="{{ ds }}")}
)

这将会失败,因为 request_date 始终呈现为

"{ ds }"
。是否有解决方法可以在 protobuf 消息对象中实现模板化字段?

我尝试按照https://github.com/apache/airflow/discussions/36661中的建议编写用户定义的宏,但这不会改变结果。

airflow grpc airflow-2.x grpc-python
1个回答
0
投票

使用自定义运算符,可以在

data
时间(而不是
execute
时间)为 GrpcOperator 提供
init
。所有模板化变量(在本例中只是
request_date
)都可以放入自定义运算符的构造函数中,同时添加到
template_fields
的序列中。在
execute
时间,模板化变量可以用于组装
data
对象。

对于给定的示例,自定义运算符是

class CustomGrpcServiceRequestOperator(GrpcOperator):
    template_fields: Sequence[str] = tuple({"request_date"} | set(GrpcOperator.template_fields))

    def __init__(self, request_date: str, **kwargs):
        super().__init__(**kwargs)
        self.request_date = request_date


    def execute(self, context: Context):
        self.data = {
            "request": proto_pb2.CustomGrpcServiceRequest(request_date=self.request_date)
            }
        super().execute(context)

然后可以称为

task_grpc = CustomGrpcServiceRequestOperator(
    dag=dag,
    task_id="task_grpc",
    grpc_conn_id="grpc_default",
    stub_class=CustomGrpcServiceStub,
    call_func="CustomGrpcServiceFunction",
    response_callback=CustomGrpcService_callback,
    streaming=False,
    request_date="{{{{ ds }}}}"
)
© www.soinside.com 2019 - 2024. All rights reserved.