我们将 Apache Flink 版本 1.17.1 与 Scala 结合使用。 我们正在尝试将流数据写入 ABFS 文件系统。 请参阅 Scala 中的简单示例代码。
object SimpleStream {
val env = StreamExecutionEnvironment.getExecutionEnvironment()
def saveEvents() {
import scala.collection.JavaConverters._
val testEvents: DataStream[String] =
env.fromCollection(List("a", "b", "c").asJava)
val sink: FileSink[String] = FileSink
.forRowFormat(,
new Path("abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/output"),
new SimpleStringEncoder[String]("UTF-8")
)
.build()
testEvents.sinkTo(sink)
env.execute()
}
def main(args: Array[String]): Unit = {
saveEvents()
}
}
代码失败,但出现以下异常:
Caused by: org.apache.hadoop.fs.UnsupportedFileSystemException: org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem for scheme "file"
org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 366387423316a625a5d5eba7e9a6eb39)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:105)
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:851)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:245)
at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1095)
at org.apache.flink.client.cli.CliFrontend.lambda$mainInternal$9(CliFrontend.java:1189)
at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
at org.apache.flink.client.cli.CliFrontend.mainInternal(CliFrontend.java:1189)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1157)
Caused by: java.util.concurrent.ExecutionException: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 366387423316a625a5d5eba7e9a6eb39)
at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999)
at org.apache.flink.client.program.StreamContextEnvironment.getJobExecutionResult(StreamContextEnvironment.java:171)
at org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:122)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:2058)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:2036)
at SimpleStream$.saveEvents(SimpleStream.scala:64)
at SimpleStream$.main(SimpleStream.scala:68)
at SimpleStream.main(SimpleStream.scala)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
... 9 more
Caused by: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 366387423316a625a5d5eba7e9a6eb39)
at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:130)
at java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642)
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073)
at org.apache.flink.util.concurrent.FutureUtils.lambda$retryOperationWithDelay$6(FutureUtils.java:302)
at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073)
at org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$32(RestClusterClient.java:787)
at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073)
at org.apache.flink.util.concurrent.FutureUtils.lambda$retryOperationWithDelay$6(FutureUtils.java:302)
at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
at java.base/java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:610)
at java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1085)
at java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:128)
... 23 more
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:139)
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:83)
at org.apache.flink.runtime.scheduler.DefaultScheduler.recordTaskFailure(DefaultScheduler.java:258)
at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:249)
at org.apache.flink.runtime.scheduler.DefaultScheduler.onTaskFailed(DefaultScheduler.java:242)
at org.apache.flink.runtime.scheduler.SchedulerBase.onTaskExecutionStateUpdate(SchedulerBase.java:748)
at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:725)
at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:80)
at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:479)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:309)
at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:307)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:222)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:84)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:168)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
at scala.PartialFunction.applyOrElse(PartialFunction.scala:127)
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
at akka.actor.Actor.aroundReceive(Actor.scala:537)
at akka.actor.Actor.aroundReceive$(Actor.scala:535)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:579)
at akka.actor.ActorCell.invoke(ActorCell.scala:547)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
at akka.dispatch.Mailbox.run(Mailbox.scala:231)
at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
Caused by: org.apache.hadoop.fs.UnsupportedFileSystemException: org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem for scheme "file"
at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3443)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3466)
at org.apache.hadoop.fs.FileSystem.access$300(FileSystem.java:174)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3574)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3521)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:540)
at org.apache.hadoop.fs.FileSystem.getLocal(FileSystem.java:496)
at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.confChanged(LocalDirAllocator.java:316)
at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathForWrite(LocalDirAllocator.java:393)
at org.apache.hadoop.fs.LocalDirAllocator.getLocalPathForWrite(LocalDirAllocator.java:165)
at org.apache.hadoop.fs.LocalDirAllocator.getLocalPathForWrite(LocalDirAllocator.java:146)
at org.apache.hadoop.fs.store.DataBlocks$DiskBlockFactory.createTmpFileForWrite(DataBlocks.java:980)
at org.apache.hadoop.fs.store.DataBlocks$DiskBlockFactory.create(DataBlocks.java:960)
at org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream.createBlockIfNeeded(AbfsOutputStream.java:262)
at org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream.<init>(AbfsOutputStream.java:173)
at org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.createFile(AzureBlobFileSystemStore.java:580)
at org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem.create(AzureBlobFileSystem.java:301)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1195)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1175)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1064)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1052)
at org.apache.flink.fs.azurefs.AzureBlobFsRecoverableDataOutputStream.<init>(AzureBlobFsRecoverableDataOutputStream.java:63)
at org.apache.flink.fs.azurefs.AzureBlobRecoverableWriter.getRecoverableFsDataOutputStream(AzureBlobRecoverableWriter.java:53)
at org.apache.flink.fs.azure.common.hadoop.HadoopRecoverableWriter.open(HadoopRecoverableWriter.java:83)
at org.apache.flink.streaming.api.functions.sink.filesystem.OutputStreamBasedPartFileWriter$OutputStreamBasedBucketWriter.openNewInProgressFile(OutputStreamBasedPartFileWriter.java:124)
at org.apache.flink.streaming.api.functions.sink.filesystem.RowWiseBucketWriter.openNewInProgressFile(RowWiseBucketWriter.java:34)
at org.apache.flink.connector.file.sink.writer.FileWriterBucket.rollPartFile(FileWriterBucket.java:261)
at org.apache.flink.connector.file.sink.writer.FileWriterBucket.write(FileWriterBucket.java:188)
at org.apache.flink.connector.file.sink.writer.FileWriter.write(FileWriter.java:198)
at org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:158)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:237)
at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:146)
at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
at java.base/java.lang.Thread.run(Thread.java:829)
我们对存储帐户使用以下 Flink 配置:
fs.azure.account.auth.type.<storage-account-name>.dfs.core.windows.net: SharedKey
fs.azure.account.key.<storage-account-name>.dfs.core.windows.net: ***
#
fs.azure.enable.flush: true
fs.azure.account.hns.enabled: true
#
fs.azure.createRemoteFileSystemDuringInitialization: true
fs.azure.always.use.https: true
此外,我们还将 flink-azure-fs-hadoop-1.17.1 库放入 Flink 插件文件夹中。
我们的设置中缺少什么?
下面的 Jira 票证表明 ABFS 应该已经可以与 Flink File Sink 很好地配合使用。 https://issues.apache.org/jira/browse/FLINK-30128
我们还尝试了 Hadoop 文档中的不同选项,但错误仍然存在。
https://hadoop.apache.org/docs/stable/hadoop-azure/abfs.html