我正在尝试在 AWS-flink 中运行一个非常简单的应用程序。
显然,worker 用户无法访问worker 中的文件(?)
这是代码,我一直在淡化它,直到我得到这个。
from pyflink.datastream.stream_execution_environment import StreamExecutionEnvironment
def main():
env = StreamExecutionEnvironment.get_execution_environment()
try:
env.from_collection(['a', 'b', 'c', 'd']).map(lambda x: x).map(print)
env.execute('executethis')
except Exception as ex:
print('EXCEPTION!!!! ', ex)
pass
if __name__ == '__main__':
main()
堆栈跟踪(来自 flink 仪表板):
java.lang.RuntimeException: Failed to create stage bundle factory!
at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.createStageBundleFactory(BeamPythonFunctionRunner.java:656)
at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.open(BeamPythonFunctionRunner.java:281)
at org.apache.flink.streaming.api.operators.python.process.AbstractExternalPythonFunctionOperator.open(AbstractExternalPythonFunctionOperator.java:57)
at org.apache.flink.streaming.api.operators.python.process.AbstractExternalDataStreamPythonFunctionOperator.open(AbstractExternalDataStreamPythonFunctionOperator.java:85)
at org.apache.flink.streaming.api.operators.python.process.AbstractExternalOneInputPythonFunctionOperator.open(AbstractExternalOneInputPythonFunctionOperator.java:117)
at org.apache.flink.streaming.api.operators.python.process.ExternalPythonProcessOperator.open(ExternalPythonProcessOperator.java:64)
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:107)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:753)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(StreamTaskActionExecutor.java:100)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:728)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:693)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:955)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:924)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:748)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:564)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.UncheckedExecutionException: java.io.IOException: Cannot run program "/tmp/python-dist-ea8c2605-8024-4255-a9d0-f9432a304ee7/python-files/py_site_packages38/py_site_packages38/pyflink/bin/pyflink-udf-runner.sh": error=13, Permission denied
at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:5022)
at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.<init>(DefaultJobBundleFactory.java:498)
at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.<init>(DefaultJobBundleFactory.java:482)
at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory.forStage(DefaultJobBundleFactory.java:342)
at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.createStageBundleFactory(BeamPythonFunctionRunner.java:654)
15 more
**Caused by: java.io.IOException: Cannot run program "/tmp/python-dist-ea8c2605-8024-4255-a9d0-f9432a304ee7/python-files/py_site_packages38/py_site_packages38/pyflink/bin/pyflink-udf-runner.sh": error=13, Permission denied
**at java.base/java.lang.ProcessBuilder.start(ProcessBuilder.java:1128)
at java.base/java.lang.ProcessBuilder.start(ProcessBuilder.java:1071)
at org.apache.beam.runners.fnexecution.environment.ProcessManager.startProcess(ProcessManager.java:147)
at org.apache.beam.runners.fnexecution.environment.ProcessManager.startProcess(ProcessManager.java:122)
at org.apache.beam.runners.fnexecution.environment.ProcessEnvironmentFactory.createEnvironment(ProcessEnvironmentFactory.java:104)
at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:284)
at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:240)
at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3571)
at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2313)
at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2190)
at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2080)
at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache.get(LocalCache.java:4012)
at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:4035)
at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:5013)
at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:5020)
19 more
Suppressed: java.lang.NullPointerException: Process for id does not exist: 7-1
at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull(Preconditions.java:921)
at org.apache.beam.runners.fnexecution.environment.ProcessManager.stopProcess(ProcessManager.java:172)
at org.apache.beam.runners.fnexecution.environment.ProcessEnvironmentFactory.createEnvironment(ProcessEnvironmentFactory.java:124)
29 more
Caused by: java.io.IOException: error=13, Permission denied
at java.base/java.lang.ProcessImpl.forkAndExec(Native Method)
at java.base/java.lang.ProcessImpl.<init>(ProcessImpl.java:340)
at java.base/java.lang.ProcessImpl.start(ProcessImpl.java:271)
at java.base/java.lang.ProcessBuilder.start(ProcessBuilder.java:1107)
我希望应用程序停止触发该异常。
此外,我认为我不知道从 AWS 托管服务运行时如何更改文件的权限。这是首先要尝试的事情,但我不知道这是否可能。
谢谢。
好吧,我明白了。答案很愚蠢,但它是:
不知何故,pyfiles 中的 flink 依赖项导致引擎将其解包到“双”文件夹中:py_site_packages38/py_site_packages38/pyflink(而不仅仅是 py_site_packages38/pyflink)。这个新文件夹获得限制性权限。
解决了这个问题。我希望如果有人遇到同样的问题这会有所帮助。