在大规模情况下,CloudTrail (CT) 日志格式被证明效率低下,每天会产生超过 30 亿条记录。如此庞大的体积与 JSON 格式相结合,阻碍了 Athena 的性能。为了解决这个问题,我正在开展一项 AWS Glue 作业,以整合这些数据并将其转换为更易于管理的 Parquet 文件。然而,我遇到了一个意想不到的问题。
这是我的代码片段:
operation_kwargs = {
"database": data_lake_database_name,
"table_name": original_table_name,
"push_down_predicate": f"region=='{region}' and year=='{year}' and month=='{month}' and day=='{day}'",
"catalogPartitionPredicate": f"region='{region}' and year='{year}' and month='{month}' and day='{day}'",
"transformation_ctx": "raw_data",
}
raw_data = glue_context.create_dynamic_frame_from_catalog(**operation_kwargs)
raw_data = raw_data.repartition(100)
print(raw_data.toDF().show(5))
当我执行此操作时,
print
输出仅显示分区和名为 Records
的列。我怀疑这可能是由于 CloudTrail (CT) 文件的结构造成的。作为上下文,我按照此 AWS 指南创建了 CT 的 Glue 表:
查询 AWS CloudTrail 日志
这是我正在使用的 DataFrame (DF) 的快照::
+--------------------+---------+----+-----+---+
| Records| region|year|month|day|
+--------------------+---------+----+-----+---+
|[{1.09, {AssumedR...|us-east-1|2023| 10| 03|
|[{1.09, {AWSServi...|us-east-1|2023| 10| 03|
|[{1.09, {AWSServi...|us-east-1|2023| 10| 03|
|[{1.09, {AWSServi...|us-east-1|2023| 10| 03|
|[{1.09, {AWSServi...|us-east-1|2023| 10| 03|
+--------------------+---------+----+-----+---+
only showing top 5 rows
我在这里做错了什么?
我觉得这应该不是那么难,因为该表具有正确的 SerDe 并且可以从 Athena 查询。
虽然您提供的代码片段中没有明确说明,但您似乎只是使用
create_dynamic_frame_from_catalog()
函数从原始表创建动态框架。这将创建一个具有默认架构 <Records, metadata>
的动态框架。
要使用 AWS Glue 将 CloudTrail 日志正确转换为 Parquet,您需要对动态框架执行转换,以从
Records
列中提取相关列,然后将其转换为 Spark DataFrame。步骤包括:
.toDF()
Records
从 getItem()
列中选择我们需要的列,并将它们全部命名为花哨。Records
从 DataFrame 中踢出
.drop()
.fromDF()
glueContext.write_dynamic_frame.from_options()
将动态框架保存为很酷的镶木地板文件。