在 PyFlink 中访问来自 ADLS 的流数据

问题描述 投票:0回答:1

尝试从 ADLS 设置 Pyflink 流,当前尝试使用 StreamExecutionEnvironment.from_source() 方法读取 json 文件。

代码如下:

from flink.plan.Environment import get_environment
from pyflink.datastream.functions import SourceFunction
from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode
from pyflink.datastream.connectors.file_system import (FileSource, StreamFormat, FileSink,
                                                       OutputFileConfig, RollingPolicy, BucketAssigner)
from pyflink.common import WatermarkStrategy, Encoder, Types

from azure.storage.filedatalake import FileSystemClient

file_system = FileSystemClient.from_connection_string(connection_str, file_system_name="my_fs")
# setting the stream environment object
env = StreamExecutionEnvironment.get_execution_environment()
env.set_runtime_mode(RuntimeExecutionMode.STREAMING)
env.add_jars("file:///opt/flink/plugins/azure/flink-azure-fs-hadoop-1.16.0.jar")
env.add_classpaths("file:///opt/flink/plugins/azure/flink-azure-fs-hadoop-1.16.0.jar")

file_client = file_system.get_file_client(my_file)
input_path = 'abfss://' + file_client.url[8:]
print('URL is  ===== >>>>',file_client.url)

# Source
ds = env.from_source(
            source=FileSource.for_record_stream_format(StreamFormat.text_line_format(),
                                                       input_path)
                             .process_static_file_set().build(),
            watermark_strategy=WatermarkStrategy.for_monotonous_timestamps(),
            source_name="file_source"
 )

ds.sink_to(
            sink=FileSink.for_row_format(
                base_path=output_path,
                encoder=Encoder.simple_string_encoder())
            .with_bucket_assigner(BucketAssigner.base_path_bucket_assigner())
                .build())
ds.print()
env.execute()

我收到以下错误:

原因:org.apache.flink.core.fs.UnsupportedFileSystemSchemeException:找不到方案“abfs”的文件系统实现。 Flink 通过以下插件直接支持该方案:flink-fs-azure-hadoop。请确保每个插件都位于插件目录中自己的子文件夹中。请参阅 https://nightlies.apache.org/flink/flink-docs-stable/docs/deployment/filesystems/plugins/ 了解更多信息。如果您想为该方案使用 Hadoop 文件系统,请将该方案添加到配置 fs.allowed-fallback-filesystems 中。有关支持的文件系统的完整列表,请参阅 https://nightlies.apache.org/flink/flink-docs-stable/ops/filesystems/

Jar 文件已添加到文档中给出的插件文件夹中: https://nightlies.apache.org/flink/flink-docs-stable/docs/deployment/filesystems/plugins/

此外,存储帐户密钥也添加到 config.yaml 文件中。

或者尝试使用以下方法将源添加为 DataStream:

ds = env.read_text_file(input_path) 

azure-blob-storage apache-flink flink-streaming azure-data-lake-gen2 pyflink
1个回答
0
投票

我对 wasb 也有类似的问题。在文档中,您必须将 flink-azure-fs-hadoop-1.16.0.jar 放在 flink dist 中名为 plugins 的文件夹中。但怎么也行不通。Example

© www.soinside.com 2019 - 2024. All rights reserved.