Apache Flink 在 Kubernetes 上运行并使用 Minio 时出现“java.lang.NullPointerException: invalid null input: name”的原因是什么

问题描述 投票:0回答:3

我在 10 个节点的集群中的 Kubernetes 上运行 Flink。我还使用 Minio 进行检查点/保存点目的。当我使用 job.jar 运行 Flink 时,总是出现以下错误:

     The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Failed to execute job 'StreamingTest'.
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
    at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
    at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
    at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
    at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
    at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
    at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
    at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
    at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
Caused by: org.apache.flink.util.FlinkException: Failed to execute job 'StreamingTest'.
    at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1821)
    at org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:128)
    at org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:76)
    at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1697)
    at com.dataartisans.flinktraining.examples.datastream_java.basics.RandomStream.main(RandomStream.java:94)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
    ... 8 more
Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit JobGraph.
    at org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$7(RestClusterClient.java:366)
    at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
    at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
    at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
    at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
    at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:292)
    at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
    at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
    at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
    at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561)
    at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:929)
    at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.runtime.rest.util.RestClientException: [Internal server error., <Exception on server side:
org.apache.flink.runtime.client.JobSubmissionException: Failed to submit job.
    at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$internalSubmitJob$3(Dispatcher.java:344)
    at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822)
    at java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797)
    at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
    at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
    at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Could not instantiate JobManager.
    at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$6(Dispatcher.java:398)
    at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
    ... 6 more
Caused by: org.apache.flink.util.FlinkRuntimeException: Failed to create checkpoint storage at checkpoint coordinator side.
    at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.<init>(CheckpointCoordinator.java:305)
    at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.<init>(CheckpointCoordinator.java:224)
    at org.apache.flink.runtime.executiongraph.ExecutionGraph.enableCheckpointing(ExecutionGraph.java:483)
    at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:338)
    at org.apache.flink.runtime.scheduler.SchedulerBase.createExecutionGraph(SchedulerBase.java:269)
    at org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:242)
    at org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:229)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:119)
    at org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:103)
    at org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:284)
    at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:272)
    at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:98)
    at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:40)
    at org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.<init>(JobManagerRunnerImpl.java:140)
    at org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:84)
    at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$6(Dispatcher.java:388)
    ... 7 more
Caused by: org.apache.hadoop.security.KerberosAuthException: failure to login: javax.security.auth.login.LoginException: java.lang.NullPointerException: invalid null input: name
    at com.sun.security.auth.UnixPrincipal.<init>(UnixPrincipal.java:71)
    at com.sun.security.auth.module.UnixLoginModule.login(UnixLoginModule.java:133)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at javax.security.auth.login.LoginContext.invoke(LoginContext.java:755)
    at javax.security.auth.login.LoginContext.access$000(LoginContext.java:195)
    at javax.security.auth.login.LoginContext$4.run(LoginContext.java:682)
    at javax.security.auth.login.LoginContext$4.run(LoginContext.java:680)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.login.LoginContext.invokePriv(LoginContext.java:680)
    at javax.security.auth.login.LoginContext.login(LoginContext.java:587)
    at org.apache.hadoop.security.UserGroupInformation$HadoopLoginContext.login(UserGroupInformation.java:1877)
    at org.apache.hadoop.security.UserGroupInformation.doSubjectLogin(UserGroupInformation.java:1789)
    at org.apache.hadoop.security.UserGroupInformation.createLoginUser(UserGroupInformation.java:704)
    at org.apache.hadoop.security.UserGroupInformation.getLoginUser(UserGroupInformation.java:654)
    at org.apache.hadoop.security.UserGroupInformation.getCurrentUser(UserGroupInformation.java:565)
    at org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:247)
    at org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.create(AbstractS3FileSystemFactory.java:126)
    at org.apache.flink.core.fs.PluginFileSystemFactory.create(PluginFileSystemFactory.java:61)
    at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:468)
    at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:389)
    at org.apache.flink.core.fs.Path.getFileSystem(Path.java:292)
    at org.apache.flink.runtime.state.filesystem.FsCheckpointStorage.<init>(FsCheckpointStorage.java:64)
    at org.apache.flink.runtime.state.filesystem.FsStateBackend.createCheckpointStorage(FsStateBackend.java:501)
    at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.<init>(CheckpointCoordinator.java:302)
    at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.<init>(CheckpointCoordinator.java:224)
    at org.apache.flink.runtime.executiongraph.ExecutionGraph.enableCheckpointing(ExecutionGraph.java:483)
    at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:338)
    at org.apache.flink.runtime.scheduler.SchedulerBase.createExecutionGraph(SchedulerBase.java:269)
    at org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:242)
    at org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:229)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:119)
    at org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:103)
    at org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:284)
    at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:272)
    at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:98)
    at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:40)
    at org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.<init>(JobManagerRunnerImpl.java:140)
    at org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:84)
    at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$6(Dispatcher.java:388)
    at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
    at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
    at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

    at org.apache.hadoop.security.UserGroupInformation.doSubjectLogin(UserGroupInformation.java:1799)
    at org.apache.hadoop.security.UserGroupInformation.createLoginUser(UserGroupInformation.java:704)
    at org.apache.hadoop.security.UserGroupInformation.getLoginUser(UserGroupInformation.java:654)
    at org.apache.hadoop.security.UserGroupInformation.getCurrentUser(UserGroupInformation.java:565)
    at org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:247)
    at org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.create(AbstractS3FileSystemFactory.java:126)
    at org.apache.flink.core.fs.PluginFileSystemFactory.create(PluginFileSystemFactory.java:61)
    at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:468)
    at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:389)
    at org.apache.flink.core.fs.Path.getFileSystem(Path.java:292)
    at org.apache.flink.runtime.state.filesystem.FsCheckpointStorage.<init>(FsCheckpointStorage.java:64)
    at org.apache.flink.runtime.state.filesystem.FsStateBackend.createCheckpointStorage(FsStateBackend.java:501)
    at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.<init>(CheckpointCoordinator.java:302)
    ... 22 more
Caused by: javax.security.auth.login.LoginException: java.lang.NullPointerException: invalid null input: name
    at com.sun.security.auth.UnixPrincipal.<init>(UnixPrincipal.java:71)
    at com.sun.security.auth.module.UnixLoginModule.login(UnixLoginModule.java:133)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at javax.security.auth.login.LoginContext.invoke(LoginContext.java:755)
    at javax.security.auth.login.LoginContext.access$000(LoginContext.java:195)
    at javax.security.auth.login.LoginContext$4.run(LoginContext.java:682)
    at javax.security.auth.login.LoginContext$4.run(LoginContext.java:680)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.login.LoginContext.invokePriv(LoginContext.java:680)
    at javax.security.auth.login.LoginContext.login(LoginContext.java:587)
    at org.apache.hadoop.security.UserGroupInformation$HadoopLoginContext.login(UserGroupInformation.java:1877)
    at org.apache.hadoop.security.UserGroupInformation.doSubjectLogin(UserGroupInformation.java:1789)
    at org.apache.hadoop.security.UserGroupInformation.createLoginUser(UserGroupInformation.java:704)
    at org.apache.hadoop.security.UserGroupInformation.getLoginUser(UserGroupInformation.java:654)
    at org.apache.hadoop.security.UserGroupInformation.getCurrentUser(UserGroupInformation.java:565)
    at org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:247)
    at org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.create(AbstractS3FileSystemFactory.java:126)
    at org.apache.flink.core.fs.PluginFileSystemFactory.create(PluginFileSystemFactory.java:61)
    at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:468)
    at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:389)
    at org.apache.flink.core.fs.Path.getFileSystem(Path.java:292)
    at org.apache.flink.runtime.state.filesystem.FsCheckpointStorage.<init>(FsCheckpointStorage.java:64)
    at org.apache.flink.runtime.state.filesystem.FsStateBackend.createCheckpointStorage(FsStateBackend.java:501)
    at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.<init>(CheckpointCoordinator.java:302)
    at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.<init>(CheckpointCoordinator.java:224)
    at org.apache.flink.runtime.executiongraph.ExecutionGraph.enableCheckpointing(ExecutionGraph.java:483)
    at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:338)
    at org.apache.flink.runtime.scheduler.SchedulerBase.createExecutionGraph(SchedulerBase.java:269)
    at org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:242)
    at org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:229)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:119)
    at org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:103)
    at org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:284)
    at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:272)
    at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:98)
    at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:40)
    at org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.<init>(JobManagerRunnerImpl.java:140)
    at org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:84)
    at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$6(Dispatcher.java:388)
    at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
    at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
    at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

    at javax.security.auth.login.LoginContext.invoke(LoginContext.java:856)
    at javax.security.auth.login.LoginContext.access$000(LoginContext.java:195)
    at javax.security.auth.login.LoginContext$4.run(LoginContext.java:682)
    at javax.security.auth.login.LoginContext$4.run(LoginContext.java:680)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.login.LoginContext.invokePriv(LoginContext.java:680)
    at javax.security.auth.login.LoginContext.login(LoginContext.java:587)
    at org.apache.hadoop.security.UserGroupInformation$HadoopLoginContext.login(UserGroupInformation.java:1877)
    at org.apache.hadoop.security.UserGroupInformation.doSubjectLogin(UserGroupInformation.java:1789)
    ... 34 more

End of exception on server side>]
    at org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:390)
    at org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:374)
    at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952)
    at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
    ... 4 more
  • 我已经复制了所需的插件文件夹(即/plugins/s3-fs-hadoop/flink-s3-fs-hadoop-1.11.2.jar)。
  • 我的 flink 配置(flink-conf.yaml)包括:
state.backend: filesystem
state.checkpoints.dir: s3://state/checkpoints
state.savepoints.dir: s3://state/savepoints
s3.path-style-access: true
s3.endpoint: http://<jobmanagerIP>:9000
s3.access-key: *******
s3.secret-key: *******
  • 我的Flink版本:flink-1.11.2-bin-scala_2.11

这些错误的原因可能是什么?有什么建议吗?

docker kubernetes apache-flink minio checkpoint
3个回答
2
投票

问题来自于这个小细节:POD 默认情况下没有定义的用户名。

当安全系统尝试通过通常的 unix 调用提取 pod 的

name
时,结果是
null
,因为属性
name
从未定义过。

尝试为每个 POD 专门设置一个名称以避免这种情况,例如,自动将每个 POD 的

ID
映射到
name


1
投票

Hadoop S3 的 Jaas 模块检索用户名失败 (

invalid null input: name
)。我还没见过这个,我猜你的 docker 基础镜像不提供某些基本功能。无论如何,我会尝试
flink-s3-fs-presto
,建议将其用于 S3 上的检查点。


0
投票

当 Docker 容器配置为仅由 UID 标识而没有关联的用户名的用户运行时,可能会出现此问题。许多应用程序(包括 Spark)都需要有效的用户名来执行某些操作,尤其是涉及安全和网络配置的操作。

以下是如何通过修改 Dockerfile 以确保容器具有正确定义的用户和组来解决此问题:

RUN adduser --disabled-password --gecos '' --uid 1000 sparkuser

USER sparkuser

:-)

© www.soinside.com 2019 - 2024. All rights reserved.