Airflow 的
GrpcOperator
支持在 data
字段中传递模板变量。但是,需要进入 data
字段的 protobuf 生成的消息对象在解析时进行评估,这会破坏模板。
例如,在下面的任务中,我想向消息正文中包含 DAG 运行日期
ds
的 gRPC 服务器发送请求:
task_grpc = GrpcOperator(
dag=dag,
task_id="task_grpc",
grpc_conn_id="grpc_default",
stub_class=CustomGrpcServiceStub,
call_func="CustomGrpcServiceFunction",
response_callback=CustomGrpcService_callback,
streaming=False,
data = {"request": proto_pb2.CustomGrpcServiceRequest(request_date="{{ ds }}")}
)
这将会失败,因为 request_date 始终呈现为
"{ ds }"
。是否有解决方法可以在 protobuf 消息对象中实现模板化字段?
我尝试按照https://github.com/apache/airflow/discussions/36661中的建议编写用户定义的宏,但这不会改变结果。
使用自定义运算符,可以在
data
时间(而不是 execute
时间)为 GrpcOperator 提供 init
。所有模板化变量(在本例中只是 request_date
)都可以放入自定义运算符的构造函数中,同时添加到 template_fields
的序列中。在execute
时间,模板化变量可以用于组装data
对象。
对于给定的示例,自定义运算符是
class CustomGrpcServiceRequestOperator(GrpcOperator):
template_fields: Sequence[str] = tuple({"request_date"} | set(GrpcOperator.template_fields))
def __init__(self, request_date: str, **kwargs):
super().__init__(**kwargs)
self.request_date = request_date
def execute(self, context: Context):
self.data = {
"request": proto_pb2.CustomGrpcServiceRequest(request_date=self.request_date)
}
super().execute(context)
然后可以称为
task_grpc = CustomGrpcServiceRequestOperator(
dag=dag,
task_id="task_grpc",
grpc_conn_id="grpc_default",
stub_class=CustomGrpcServiceStub,
call_func="CustomGrpcServiceFunction",
response_callback=CustomGrpcService_callback,
streaming=False,
request_date="{{{{ ds }}}}"
)