将 Spark Dataframe 作为 Apache Hudi 表写入具有对象锁的 S3 存储桶

问题描述 投票:0回答:1

我有一些数据集(CSV 和 Parquet 文件),我想将它们转换并构建为启用了对象锁定的 S3 存储桶中的 Hudi 表。

从 pyspark 的官方文档中,我了解到它目前不正式支持带有对象锁定的 S3 的 Content-MD5,但我想知道是否有人可以帮助我,因为这对于 S3 存储桶的大型组织来说可能是一个有用的案例根据策略始终处于对象锁定状态。注意:我无法选择创建没有对象锁的存储桶。

我已经写了以下代码:

from methods.fast_hudi import FastHudi
import os 
from pyspark.sql.functions import concat_ws, md5 
import zipfile import hashlib

with zipfile.ZipFile(r'./data/package.zip', 'r') as zip_ref: zip_ref.extractall(r'./data/package')

os.environ['AWS_ACCESS_KEY'] = '' os.environ['AWS_SECRET_ACCESS_KEY'] = '' os.environ['AWS_SESSION_TOKEN'] = ''

spark_config = { 'spark.jars.packages':'org.apache.hadoop:hadoop-aws:3.3.4,org.apache.hudi:hudi-spark3.4-bundle_2.12:0.14.1,org.apache.hudi:hudi-utilities-bundle_2.12:0.13.1,org.apache.spark:spark-avro_2.13:3.4.0,org.apache.calcite:calcite-core:1.34.0,com.amazonaws:aws-java-sdk-bundle:1.12.486', "spark.serializer": "org.apache.spark.serializer.KryoSerializer", "spark.sql.catalog.spark_catalog": "org.apache.spark.sql.hudi.catalog.HoodieCatalog", "spark.sql.extensions": "org.apache.spark.sql.hudi.HoodieSparkSessionExtension", 'spark.hadoop.fs.s3a.aws.credentials.provider':'org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider', "spark.hadoop.fs.s3a.impl":"org.apache.hadoop.fs.s3a.S3AFileSystem", 'spark.hadoop.fs.s3a.access.key': os.getenv('AWS_ACCESS_KEY', None), 'spark.hadoop.fs.s3a.secret.key': os.getenv('AWS_SECRET_ACCESS_KEY', None), 'spark.hadoop.fs.s3a.session.token': os.getenv('AWS_SESSION_TOKEN', None), 'spark.hadoop.fs.s3a.aws.credentials.provider': "com.amazonaws.auth.profile.ProfileCredentialsProvider", 'spark.driver.memory': '16g', 'spark.hadoop.fs.s3a.fast.upload': 'true', 'spark.hadoop.fs.s3a.upload.buffer': 'bytebuffer' }

spark = FastHudi('Showroom Variants Bronze Layer - Hudi', spark_config) base_data = spark.read( 'csv', 'data/package/showroom_variant.csv' )

base_data.show()

hudi_table_name = 'showroom_variants'

hoodie_options = { 'hoodie.table.name': hudi_table_name, 'hoodie.datasource.write.table.type': 'COPY_ON_WRITE', 'hoodie.datasource.write.recordkey.field': 'showroom_variant_id', 'hoodie.datasource.write.precombine.field': 'global_brand_name', 'hoodie.datasource.write.partitionpath.field': 'global_sales_parent_name,global_brand_name,body_type', 'hoodie.datasource.write.operation': 'insert', 'hoodie.datasource.write.table.name': hudi_table_name, 'hoodie.deltastreamer.source.dfs.listing.max.fileid': '-1', 'hoodie.datasource.write.hive_style_partitioning': 'true' }

spark.write_as_hudi( base_data, 'overwrite', 'org.apache.hudi', f's3a://<s3_bucket>/datalake/{hudi_table_name}', hoodie_options) 

这是导入中使用的类:

from pyspark.sql import SparkSession 
from functools import reduce

class FastSpark: 
    def init(self, app_name: str, spark_config: dict = None) -> None:
        self.spark_obj = None self.app_name = app_name self.spark_config = spark_config

    def get_or_create_session(self) -> SparkSession:
    
        if self.spark_config is None:
            return SparkSession.builder \
                .appName(self.app_name) \
                .getOrCreate()
        else:
            spark_session = reduce(lambda x, y: x.config(y[0], y[1]),self.spark_config.items(),SparkSession.builder.appName(self.app_name))
            
            return spark_session.getOrCreate()
    
    def read(self, read_format: str, load_path: str, read_options: dict = None):
        self.spark_obj = self.get_or_create_session()
        if read_options is None:
            return self.spark_obj.read.format(read_format) \
                .load(load_path)
        else:
            read_obj = reduce(lambda x, y: x.option(y[0], y[1]), read_options.items(),
                              self.spark_obj.read.format(read_format))
            return read_obj.load(load_path)
from methods.fast_spark import FastSpark 
import os 
from functools import reduce

class FastHudi(FastSpark):

    def __init__(self, app_name: str, spark_config: dict = None) -> None:
        super().__init__(app_name, spark_config)
        self.AWS_ACCESS_KEY = os.getenv('AWS_ACCESS_KEY', None)
        self.AWS_SECRET_ACCESS_KEY = os.getenv('AWS_SECRET_ACCESS_KEY', None)
        self.spark = self.get_or_create_session()
    
        if self.AWS_ACCESS_KEY is None or self.AWS_SECRET_ACCESS_KEY is None:
               raise ValueError("Either of the following variables need to be set: AWS_ACCESS_KEY,AWS_SECRET_ACCESS_KEY")
    
    def write_as_hudi(self, write_df, write_mode: str, write_format: str = 'org.apache.hudi', target_path: str = None,hoodie_options: dict = None, header=None):
        
        write_df.write.format(write_format) \
            .options(**hoodie_options) \
            .mode(write_mode) \
            .save(target_path, header=header)
    
    def stop_spark(self):
        self.spark.stop()

库版本: pyspark==3.4.0

当我尝试运行上面的代码时,它会抛出此错误:

py4j.protocol.Py4JJavaError: An error occurred while calling o121.save. : org.apache.hadoop.fs.s3a.AWSBadRequestException: PUT 0-byte object  on datalake/showroom_variants: com.amazonaws.services.s3.model.AmazonS3Exception: Content-MD5 OR x-amz-checksum- HTTP header is required for Put Object requests with Object Lock parameters (Service: Amazon S3; Status Code: 400; Error Code: InvalidRequest; Request ID: <request_id>; S3 Extended Request ID: <request_id>; Proxy: null), S3 Extended Request ID: <some_id>:InvalidRequest: Content-MD5 OR x-amz-checksum- HTTP header is required for Put Object requests with Object Lock parameters (Service: Amazon S3; Status Code: 400; Error Code: InvalidRequest; Request ID: <request_id>; S3 Extended Request ID: <extended_request_id>; Proxy: null)         at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:249)         at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:119)         at org.apache.hadoop.fs.s3a.Invoker.lambda$retry$4(Invoker.java:322)         at org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:414)         at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:318)         at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:293)         at org.apache.hadoop.fs.s3a.S3AFileSystem.createEmptyObject(S3AFileSystem.java:4532)         at org.apache.hadoop.fs.s3a.S3AFileSystem.access$1900(S3AFileSystem.java:259)         at org.apache.hadoop.fs.s3a.S3AFileSystem$MkdirOperationCallbacksImpl.createFakeDirectory(S3AFileSystem.java:3461)         at org.apache.hadoop.fs.s3a.impl.MkdirOperation.execute(MkdirOperation.java:121)         at org.apache.hadoop.fs.s3a.impl.MkdirOperation.execute(MkdirOperation.java:45)         at org.apache.hadoop.fs.s3a.impl.ExecutingStoreOperation.apply(ExecutingStoreOperation.java:76)         at org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.lambda$trackDurationOfOperation$5(IOStatisticsBinding.java:499)         at org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDuration(IOStatisticsBinding.java:444)         at org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2337)         at org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2356)         at org.apache.hadoop.fs.s3a.S3AFileSystem.mkdirs(S3AFileSystem.java:3428)         at org.apache.hadoop.fs.FileSystem.mkdirs(FileSystem.java:2388)         at org.apache.hudi.common.table.HoodieTableMetaClient.initTableAndGetMetaClient(HoodieTableMetaClient.java:481)         at org.apache.hudi.common.table.HoodieTableMetaClient$PropertyBuilder.initTable(HoodieTableMetaClient.java:1201)         at org.apache.hudi.HoodieSparkSqlWriterInternal.writeInternal(HoodieSparkSqlWriter.scala:323)         at org.apache.hudi.HoodieSparkSqlWriterInternal.write(HoodieSparkSqlWriter.scala:204)         at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:121)         at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:150)         at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:47)         at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)         at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)         at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)         at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98)         at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:118)         at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:195)         at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:103)         at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)         at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)         at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)         at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)         at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:512)         at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:104)         at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:512)         at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:31)         at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)         at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)         at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)         at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)         at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:488)         at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)         at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)         at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)         at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:133)         at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:856)         at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:387)         at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:360)         at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)         at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)         at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)         at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)         at java.base/java.lang.reflect.Method.invoke(Method.java:566)         at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)         at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)         at py4j.Gateway.invoke(Gateway.java:282)         at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)         at py4j.commands.CallCommand.execute(CallCommand.java:79)         at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)         at py4j.ClientServerConnection.run(ClientServerConnection.java:106)         at java.base/java.lang.Thread.run(Thread.java:829) Caused by: com.amazonaws.services.s3.model.AmazonS3Exception: Content-MD5 OR x-amz-checksum- HTTP header is required for Put Object requests with Object Lock parameters (Service: Amazon S3; Status Code: 400; Error Code: InvalidRequest; Request ID: DKTYYJHRJGJZFQ1X; S3 Extended Request ID: 37QbiLXXUA+I4SaRhZZvqSNQz5P2zPfykuw9nzmgmRhBpYPN1c9GdN76pzZec7Rpd7b0ek4IdXs9GQcN99eUUfkwd3+Xofv2lq9At781zwI=; Proxy: null), S3 Extended Request ID: 37QbiLXXUA+I4SaRhZZvqSNQz5P2zPfykuw9nzmgmRhBpYPN1c9GdN76pzZec7Rpd7b0ek4IdXs9GQcN99eUUfkwd3+Xofv2lq9At781zwI=         at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1879)         at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleServiceErrorResponse(AmazonHttpClient.java:1418)         at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1387)         at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1157)         at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:814)         at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:781)         at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:755)         at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:715)         at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:697)         at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:561)         at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:541)         at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5470)         at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5417)         at com.amazonaws.services.s3.AmazonS3Client.access$300(AmazonS3Client.java:422)         at com.amazonaws.services.s3.AmazonS3Client$PutObjectStrategy.invokeServiceCall(AmazonS3Client.java:6551)         at com.amazonaws.services.s3.AmazonS3Client.uploadObject(AmazonS3Client.java:1862)         at com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1822)         at org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$putObjectDirect$17(S3AFileSystem.java:2877)         at org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfSupplier(IOStatisticsBinding.java:604)         at org.apache.hadoop.fs.s3a.S3AFileSystem.putObjectDirect(S3AFileSystem.java:2874)         at org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$createEmptyObject$32(S3AFileSystem.java:4534)         at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:117)         ... 63 more
python amazon-s3 hadoop pyspark apache-hudi
1个回答
0
投票
据我所知,S3A 代码库从未在对象锁存储桶上进行过测试。使用堆栈跟踪归档 HADOOP JIRA。不要指望任何立即响应,任何进来的内容都将仅适用于 hadoop-3.4.x...目前还没有任何 Spark 版本。

更有可能:我们将在文档中使用带有“当前不支持”的错误消息

也就是说,HADOOP-19072 现在正在 mkdir 上工作(2024 年 2 月);包括关闭某些安全检查的选项。我们应该改进 createFakeDirectory() 逻辑,以吞掉带有对象锁定错误的访问异常,因为它表明那里有东西,所以没有什么可担心的。

请创建 jira,我们应该添加这个...只是您遇到了从未遇到过的错误。我们确实会在尝试未经许可的情况下删除内容时遇到错误,因此这也应该包含在内。

© www.soinside.com 2019 - 2024. All rights reserved.