在Pyflink中添加外部依赖并在UDF中使用

问题描述 投票:0回答:1

我正在编写一个 UDF,它将进行 API 调用以获取 JSON 有效负载。这是它的样子 -

@udf(result_type=DataTypes.STRING())
def get_data():
    response = requests.get("https:api_endpoint")
    logging.info(response)
    return json.loads(response.text)

table_env.create_temporary_function("get_data", get_data)

在源表中我有 -

get_data as get_data()
, 在 S3 水槽表中,我有 -
get_data VARCHAR

我在

requirement.txt
文件中拥有所有依赖项,并且我做了一个
pip install -r requirements.txt --target=.

然后我用

zip -r pyflink.zip *
压缩内容。 当我运行 Flink 应用程序时,我发现它无法从 requests.txt 文件中找到依赖项。

我错过了什么?我最终还想包含 boto3 来与其他服务交互。

apache-flink pyflink amazon-kinesis-analytics
1个回答
0
投票

因此,按照此文档,问题得到了解决。将依赖项放在执行 -

的目录中

pip install -r requirements.txt --target=my_deps

并将运行时配置添加为 -

kinesis.analytics.flink.run.options
pyFiles
my_deps/

这成功获取了依赖项并且 Flink 作业运行没有错误

© www.soinside.com 2019 - 2024. All rights reserved.