我有一些数据集(CSV 和 Parquet 文件),我想将它们转换并构建为启用了对象锁定的 S3 存储桶中的 Hudi 表。
从 pyspark 的官方文档中,我了解到它目前不正式支持带有对象锁定的 S3 的 Content-MD5,但我想知道是否有人可以帮助我,因为这对于 S3 存储桶的大型组织来说可能是一个有用的案例根据策略始终处于对象锁定状态。注意:我无法选择创建没有对象锁的存储桶。
我已经写了以下代码:
from methods.fast_hudi import FastHudi
import os
from pyspark.sql.functions import concat_ws, md5
import zipfile import hashlib
with zipfile.ZipFile(r'./data/package.zip', 'r') as zip_ref: zip_ref.extractall(r'./data/package')
os.environ['AWS_ACCESS_KEY'] = '' os.environ['AWS_SECRET_ACCESS_KEY'] = '' os.environ['AWS_SESSION_TOKEN'] = ''
spark_config = { 'spark.jars.packages':'org.apache.hadoop:hadoop-aws:3.3.4,org.apache.hudi:hudi-spark3.4-bundle_2.12:0.14.1,org.apache.hudi:hudi-utilities-bundle_2.12:0.13.1,org.apache.spark:spark-avro_2.13:3.4.0,org.apache.calcite:calcite-core:1.34.0,com.amazonaws:aws-java-sdk-bundle:1.12.486', "spark.serializer": "org.apache.spark.serializer.KryoSerializer", "spark.sql.catalog.spark_catalog": "org.apache.spark.sql.hudi.catalog.HoodieCatalog", "spark.sql.extensions": "org.apache.spark.sql.hudi.HoodieSparkSessionExtension", 'spark.hadoop.fs.s3a.aws.credentials.provider':'org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider', "spark.hadoop.fs.s3a.impl":"org.apache.hadoop.fs.s3a.S3AFileSystem", 'spark.hadoop.fs.s3a.access.key': os.getenv('AWS_ACCESS_KEY', None), 'spark.hadoop.fs.s3a.secret.key': os.getenv('AWS_SECRET_ACCESS_KEY', None), 'spark.hadoop.fs.s3a.session.token': os.getenv('AWS_SESSION_TOKEN', None), 'spark.hadoop.fs.s3a.aws.credentials.provider': "com.amazonaws.auth.profile.ProfileCredentialsProvider", 'spark.driver.memory': '16g', 'spark.hadoop.fs.s3a.fast.upload': 'true', 'spark.hadoop.fs.s3a.upload.buffer': 'bytebuffer' }
spark = FastHudi('Showroom Variants Bronze Layer - Hudi', spark_config) base_data = spark.read( 'csv', 'data/package/showroom_variant.csv' )
base_data.show()
hudi_table_name = 'showroom_variants'
hoodie_options = { 'hoodie.table.name': hudi_table_name, 'hoodie.datasource.write.table.type': 'COPY_ON_WRITE', 'hoodie.datasource.write.recordkey.field': 'showroom_variant_id', 'hoodie.datasource.write.precombine.field': 'global_brand_name', 'hoodie.datasource.write.partitionpath.field': 'global_sales_parent_name,global_brand_name,body_type', 'hoodie.datasource.write.operation': 'insert', 'hoodie.datasource.write.table.name': hudi_table_name, 'hoodie.deltastreamer.source.dfs.listing.max.fileid': '-1', 'hoodie.datasource.write.hive_style_partitioning': 'true' }
spark.write_as_hudi( base_data, 'overwrite', 'org.apache.hudi', f's3a://<s3_bucket>/datalake/{hudi_table_name}', hoodie_options)
这是导入中使用的类:
from pyspark.sql import SparkSession
from functools import reduce
class FastSpark:
def init(self, app_name: str, spark_config: dict = None) -> None:
self.spark_obj = None self.app_name = app_name self.spark_config = spark_config
def get_or_create_session(self) -> SparkSession:
if self.spark_config is None:
return SparkSession.builder \
.appName(self.app_name) \
.getOrCreate()
else:
spark_session = reduce(lambda x, y: x.config(y[0], y[1]),self.spark_config.items(),SparkSession.builder.appName(self.app_name))
return spark_session.getOrCreate()
def read(self, read_format: str, load_path: str, read_options: dict = None):
self.spark_obj = self.get_or_create_session()
if read_options is None:
return self.spark_obj.read.format(read_format) \
.load(load_path)
else:
read_obj = reduce(lambda x, y: x.option(y[0], y[1]), read_options.items(),
self.spark_obj.read.format(read_format))
return read_obj.load(load_path)
from methods.fast_spark import FastSpark
import os
from functools import reduce
class FastHudi(FastSpark):
def __init__(self, app_name: str, spark_config: dict = None) -> None:
super().__init__(app_name, spark_config)
self.AWS_ACCESS_KEY = os.getenv('AWS_ACCESS_KEY', None)
self.AWS_SECRET_ACCESS_KEY = os.getenv('AWS_SECRET_ACCESS_KEY', None)
self.spark = self.get_or_create_session()
if self.AWS_ACCESS_KEY is None or self.AWS_SECRET_ACCESS_KEY is None:
raise ValueError("Either of the following variables need to be set: AWS_ACCESS_KEY,AWS_SECRET_ACCESS_KEY")
def write_as_hudi(self, write_df, write_mode: str, write_format: str = 'org.apache.hudi', target_path: str = None,hoodie_options: dict = None, header=None):
write_df.write.format(write_format) \
.options(**hoodie_options) \
.mode(write_mode) \
.save(target_path, header=header)
def stop_spark(self):
self.spark.stop()
库版本: pyspark==3.4.0
当我尝试运行上面的代码时,它会抛出此错误:
py4j.protocol.Py4JJavaError: An error occurred while calling o121.save. : org.apache.hadoop.fs.s3a.AWSBadRequestException: PUT 0-byte object on datalake/showroom_variants: com.amazonaws.services.s3.model.AmazonS3Exception: Content-MD5 OR x-amz-checksum- HTTP header is required for Put Object requests with Object Lock parameters (Service: Amazon S3; Status Code: 400; Error Code: InvalidRequest; Request ID: <request_id>; S3 Extended Request ID: <request_id>; Proxy: null), S3 Extended Request ID: <some_id>:InvalidRequest: Content-MD5 OR x-amz-checksum- HTTP header is required for Put Object requests with Object Lock parameters (Service: Amazon S3; Status Code: 400; Error Code: InvalidRequest; Request ID: <request_id>; S3 Extended Request ID: <extended_request_id>; Proxy: null) at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:249) at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:119) at org.apache.hadoop.fs.s3a.Invoker.lambda$retry$4(Invoker.java:322) at org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:414) at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:318) at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:293) at org.apache.hadoop.fs.s3a.S3AFileSystem.createEmptyObject(S3AFileSystem.java:4532) at org.apache.hadoop.fs.s3a.S3AFileSystem.access$1900(S3AFileSystem.java:259) at org.apache.hadoop.fs.s3a.S3AFileSystem$MkdirOperationCallbacksImpl.createFakeDirectory(S3AFileSystem.java:3461) at org.apache.hadoop.fs.s3a.impl.MkdirOperation.execute(MkdirOperation.java:121) at org.apache.hadoop.fs.s3a.impl.MkdirOperation.execute(MkdirOperation.java:45) at org.apache.hadoop.fs.s3a.impl.ExecutingStoreOperation.apply(ExecutingStoreOperation.java:76) at org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.lambda$trackDurationOfOperation$5(IOStatisticsBinding.java:499) at org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDuration(IOStatisticsBinding.java:444) at org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2337) at org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2356) at org.apache.hadoop.fs.s3a.S3AFileSystem.mkdirs(S3AFileSystem.java:3428) at org.apache.hadoop.fs.FileSystem.mkdirs(FileSystem.java:2388) at org.apache.hudi.common.table.HoodieTableMetaClient.initTableAndGetMetaClient(HoodieTableMetaClient.java:481) at org.apache.hudi.common.table.HoodieTableMetaClient$PropertyBuilder.initTable(HoodieTableMetaClient.java:1201) at org.apache.hudi.HoodieSparkSqlWriterInternal.writeInternal(HoodieSparkSqlWriter.scala:323) at org.apache.hudi.HoodieSparkSqlWriterInternal.write(HoodieSparkSqlWriter.scala:204) at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:121) at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:150) at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:47) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73) at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84) at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:118) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:195) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:103) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65) at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98) at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:512) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:104) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:512) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:31) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:488) at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94) at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81) at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79) at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:133) at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:856) at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:387) at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:360) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182) at py4j.ClientServerConnection.run(ClientServerConnection.java:106) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: com.amazonaws.services.s3.model.AmazonS3Exception: Content-MD5 OR x-amz-checksum- HTTP header is required for Put Object requests with Object Lock parameters (Service: Amazon S3; Status Code: 400; Error Code: InvalidRequest; Request ID: DKTYYJHRJGJZFQ1X; S3 Extended Request ID: 37QbiLXXUA+I4SaRhZZvqSNQz5P2zPfykuw9nzmgmRhBpYPN1c9GdN76pzZec7Rpd7b0ek4IdXs9GQcN99eUUfkwd3+Xofv2lq9At781zwI=; Proxy: null), S3 Extended Request ID: 37QbiLXXUA+I4SaRhZZvqSNQz5P2zPfykuw9nzmgmRhBpYPN1c9GdN76pzZec7Rpd7b0ek4IdXs9GQcN99eUUfkwd3+Xofv2lq9At781zwI= at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1879) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleServiceErrorResponse(AmazonHttpClient.java:1418) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1387) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1157) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:814) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:781) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:755) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:715) at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:697) at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:561) at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:541) at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5470) at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5417) at com.amazonaws.services.s3.AmazonS3Client.access$300(AmazonS3Client.java:422) at com.amazonaws.services.s3.AmazonS3Client$PutObjectStrategy.invokeServiceCall(AmazonS3Client.java:6551) at com.amazonaws.services.s3.AmazonS3Client.uploadObject(AmazonS3Client.java:1862) at com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1822) at org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$putObjectDirect$17(S3AFileSystem.java:2877) at org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfSupplier(IOStatisticsBinding.java:604) at org.apache.hadoop.fs.s3a.S3AFileSystem.putObjectDirect(S3AFileSystem.java:2874) at org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$createEmptyObject$32(S3AFileSystem.java:4534) at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:117) ... 63 more
更有可能:我们将在文档中使用带有“当前不支持”的错误消息
也就是说,HADOOP-19072 现在正在 mkdir 上工作(2024 年 2 月);包括关闭某些安全检查的选项。我们应该改进 createFakeDirectory() 逻辑,以吞掉带有对象锁定错误的访问异常,因为它表明那里有东西,所以没有什么可担心的。
请创建 jira,我们应该添加这个...只是您遇到了从未遇到过的错误。我们确实会在尝试未经许可的情况下删除内容时遇到错误,因此这也应该包含在内。