我正在编写一个 UDF,它将进行 API 调用以获取 JSON 有效负载。这是它的样子 -
@udf(result_type=DataTypes.STRING())
def get_data():
response = requests.get("https:api_endpoint")
logging.info(response)
return json.loads(response.text)
table_env.create_temporary_function("get_data", get_data)
在源表中我有 -
get_data as get_data()
,
在 S3 水槽表中,我有 - get_data VARCHAR
我在
requirement.txt
文件中拥有所有依赖项,并且我做了一个 pip install -r requirements.txt --target=.
然后我用
zip -r pyflink.zip *
压缩内容。
当我运行 Flink 应用程序时,我发现它无法从 requests.txt 文件中找到依赖项。
我错过了什么?我最终还想包含 boto3 来与其他服务交互。
因此,按照此文档,问题得到了解决。将依赖项放在执行 -
的目录中pip install -r requirements.txt --target=my_deps
并将运行时配置添加为 -
kinesis.analytics.flink.run.options
pyFiles
my_deps/
这成功获取了依赖项并且 Flink 作业运行没有错误