AWS 管理的 apache flink。 “<someuuid>pyflink/bin/pyflink-udf-runner.sh”:错误=13,权限被拒绝“不使用udfs

问题描述 投票:0回答:1

我正在尝试在 AWS-flink 中运行一个非常简单的应用程序。

  • 阿帕奇弗林克1.18
  • Kinesis 连接器 1.18
  • Python Apache Flink == 1.18
  • 使用 pom.xml 收集 flink-connector-kinesis 4.2.0-1.18 的 jar 依赖项
  • 将“uber-jar”保存到 pyflink/lib/
  • python3.10 和 3.8 都以完全相同的方式失败。 在本地,一切都运行良好,但每当我在 aws 中启动作业时,它都会因工作文件夹上的权限被拒绝而中断。

显然,worker 用户无法访问worker 中的文件(?)

这是代码,我一直在淡化它,直到我得到这个。

from pyflink.datastream.stream_execution_environment import StreamExecutionEnvironment


def main():
    env = StreamExecutionEnvironment.get_execution_environment()
    try:
        env.from_collection(['a', 'b', 'c', 'd']).map(lambda x: x).map(print)
        env.execute('executethis')
    except Exception as ex:
        print('EXCEPTION!!!! ', ex)
    pass

if __name__ == '__main__':
    main()

堆栈跟踪(来自 flink 仪表板):

java.lang.RuntimeException: Failed to create stage bundle factory!
at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.createStageBundleFactory(BeamPythonFunctionRunner.java:656)
at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.open(BeamPythonFunctionRunner.java:281)
at org.apache.flink.streaming.api.operators.python.process.AbstractExternalPythonFunctionOperator.open(AbstractExternalPythonFunctionOperator.java:57)
at org.apache.flink.streaming.api.operators.python.process.AbstractExternalDataStreamPythonFunctionOperator.open(AbstractExternalDataStreamPythonFunctionOperator.java:85)
at org.apache.flink.streaming.api.operators.python.process.AbstractExternalOneInputPythonFunctionOperator.open(AbstractExternalOneInputPythonFunctionOperator.java:117)
at org.apache.flink.streaming.api.operators.python.process.ExternalPythonProcessOperator.open(ExternalPythonProcessOperator.java:64)
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:107)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:753)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(StreamTaskActionExecutor.java:100)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:728)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:693)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:955)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:924)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:748)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:564)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.UncheckedExecutionException: java.io.IOException: Cannot run program "/tmp/python-dist-ea8c2605-8024-4255-a9d0-f9432a304ee7/python-files/py_site_packages38/py_site_packages38/pyflink/bin/pyflink-udf-runner.sh": error=13, Permission denied
at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:5022)
at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.<init>(DefaultJobBundleFactory.java:498)
at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.<init>(DefaultJobBundleFactory.java:482)
at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory.forStage(DefaultJobBundleFactory.java:342)
at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.createStageBundleFactory(BeamPythonFunctionRunner.java:654)
15 more
**Caused by: java.io.IOException: Cannot run program "/tmp/python-dist-ea8c2605-8024-4255-a9d0-f9432a304ee7/python-files/py_site_packages38/py_site_packages38/pyflink/bin/pyflink-udf-runner.sh": error=13, Permission denied
**at java.base/java.lang.ProcessBuilder.start(ProcessBuilder.java:1128)
at java.base/java.lang.ProcessBuilder.start(ProcessBuilder.java:1071)
at org.apache.beam.runners.fnexecution.environment.ProcessManager.startProcess(ProcessManager.java:147)
at org.apache.beam.runners.fnexecution.environment.ProcessManager.startProcess(ProcessManager.java:122)
at org.apache.beam.runners.fnexecution.environment.ProcessEnvironmentFactory.createEnvironment(ProcessEnvironmentFactory.java:104)
at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:284)
at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:240)
at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3571)
at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2313)
at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2190)
at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2080)
at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache.get(LocalCache.java:4012)
at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:4035)
at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:5013)
at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:5020)
19 more
Suppressed: java.lang.NullPointerException: Process for id does not exist: 7-1
at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull(Preconditions.java:921)
at org.apache.beam.runners.fnexecution.environment.ProcessManager.stopProcess(ProcessManager.java:172)
at org.apache.beam.runners.fnexecution.environment.ProcessEnvironmentFactory.createEnvironment(ProcessEnvironmentFactory.java:124)
29 more
Caused by: java.io.IOException: error=13, Permission denied
at java.base/java.lang.ProcessImpl.forkAndExec(Native Method)
at java.base/java.lang.ProcessImpl.<init>(ProcessImpl.java:340)
at java.base/java.lang.ProcessImpl.start(ProcessImpl.java:271)
at java.base/java.lang.ProcessBuilder.start(ProcessBuilder.java:1107)

我希望应用程序停止触发该异常。

  • 我将所有运动分析、权限添加到策略中。
  • 我将策略的所有 s3 权限添加到代码所在的前缀。
  • 我将所有运动权限添加到输入和输出流。
  • 我将所有 kinesis Analytics v2 分析权限添加到策略中。
  • 我按照文档所述添加了cloudwatch权限。
  • chmod 777 软件包,然后将其放入包含应用程序的 .zip 中。
  • 我使 .jar 与 aws 中的 apache flink 1.18 版本相匹配,并将 python 要求限制为 1.18。
  • 我将 python 降级到 3.8,以便 python 3.8 的库和所有来自 python 的都是 flink1.18。
  • 我摆脱了所有不必要的代码和任何运动或我正在使用的任何东西。
  • 我添加了尝试/例外。
  • 我将所有库都放在 uber-jar (在 jarfile 中引用)属性中。
  • 我在互联网上找了好几个小时,有一些相似但不一样的东西。最接近的是 https://lists.apache.org/thread/8b5kbf3prvrr0jo1bhk1y91osmr76v5r 但那个家伙正在使用 UDF,显然,我不是。

此外,我认为我不知道从 AWS 托管服务运行时如何更改文件的权限。这是首先要尝试的事情,但我不知道这是否可能。

谢谢。

amazon-web-services apache-flink flink-streaming pyflink
1个回答
0
投票

好吧,我明白了。答案很愚蠢,但它是:

  • 在我读过的一些文档中,我需要将 virtualenv 中的所有内容复制为 pyfiles 依赖项。这不是真的:这是与非 FLINK 相关的所有内容!

不知何故,pyfiles 中的 flink 依赖项导致引擎将其解包到“双”文件夹中:py_site_packages38/py_site_packages38/pyflink(而不仅仅是 py_site_packages38/pyflink)。这个新文件夹获得限制性权限。

解决了这个问题。我希望如果有人遇到同样的问题这会有所帮助。

© www.soinside.com 2019 - 2024. All rights reserved.