Athena/Trino/Presto 代码使用自定义行分隔符解析文本文件

问题描述 投票:0回答:2

我想使用 Trino/Presto 代码解析纯文本文件中的一些日志文件,其中记录跨越多行。我的数据如下所示:每条记录有多行,每行都有一个变量。每条记录由 -------------- 分隔。我想解析记录并以镶木地板格式保存,因此行中的每个记录和列中的每个变量。

-----------------------
var1=abcde/n
var2=12345/n
....
varn=xxxx/n
-----------------------
var1=defg/n
var2=78910/n
....
varn=yyyy/n

所需输出:

var1, var2 ...  varn
------------------------------
abcde 12345 ... xxx
defg  78910 ... xxx
amazon-athena presto text-processing trino
2个回答
0
投票

SQL,包括其在 TrinoPresto 中的扩展,主要设计用于操作表格格式的结构化数据。解析非结构化、多行文本文件不属于 SQL 的典型用例,虽然这是可能的,但可能需要复杂且难以维护的代码。而且 Trino 或 Presto 可能没有内置函数来轻松处理自定义行分隔符或多行记录。

或者:像Apache HadoopApache Spark这样的框架是为处理大型数据集而设计的,并且可以处理非结构化数据。 Spark 尤其具有处理文本数据的强大功能,并且可以轻松写入 Parquet 格式。

您还拥有ETL(提取、转换、加载)工具,例如Apache NiFi,专为数据转换任务而设计。这些工具通常提供用于定义数据转换的图形界面,并且可以处理复杂的解析任务。


例如,考虑一个使用

pandas
库 (
pip install pandas pyarrow
) 解析日志文件并将解析后的数据保存到 Parquet 文件的 Python 脚本。我假设您数据中的
/n
是一个拼写错误,实际上应该是
\n
来指示新行:

import pandas as pd
from io import StringIO

# Function to parse the log file
def parse_log_file(file_path):
    with open(file_path, 'r') as file:
        content = file.read()
    # Split the content by the record separator
    records = content.split('-----------------------')[1:-1]  # Exclude the first and last element which are empty
    # Parse each record into a dictionary
    parsed_records = []
    for record in records:
        lines = record.strip().split('\n')
        record_dict = {line.split('=')[0]: line.split('=')[1] for line in lines if line}
        parsed_records.append(record_dict)
    return parsed_records

# Call the function and convert the result to a DataFrame
log_data = parse_log_file('your_log_file.txt')
df = pd.DataFrame(log_data)

# Save the DataFrame to a Parquet file
df.to_parquet('parsed_data.parquet', index=False)

函数

parse_log_file
读取日志文件,通过记录分隔符(
-----------------------
)分割内容,并将每条记录解析为字典。使用
pd.DataFrame
将字典列表转换为 DataFrame,然后使用
to_parquet
方法
将其保存到 Parquet 文件中。

这适用于您提供的数据格式,只要所述数据的格式是一致
如果数据格式有变化,...您可能需要修改解析逻辑。


0
投票

这里解释了如何使用 Trino/Presto 代码解析具有跨多行记录的日志文件:

创建外部表:首先,您需要在 Trino/Presto 中创建一个指向日志文件的外部表。您可以使用创建表 带有 ROW FORMAT DELIMITED 子句的语句来指定分隔符 用于 你的日志文件。由于每条记录均以“--------------”分隔,因此您可以 将其定义为行分隔符。这是一个例子:

 `CREATE TABLE logs (
  log_record varchar
)
WITH (
  format = 'TEXTFILE',
  external_location = 's3://your-bucket/logs/',
  row_format = 'DELIMITED',
  field_delimiter = '\n',
  line_delimiter = '--------------'
);`

确保将“s3://your-bucket/logs/”替换为日志文件的适当位置。

提取变量:创建表后,您可以从每个日志记录中提取变量。您可以使用 Trino/Presto 的字符串函数(例如 split、substring 和 regexp_extract)来提取每个变量的值。以下是如何从 log_record 列中提取 var1、var2 和 varn 的示例:

    SELECT
  regexp_extract(log_record, 'var1=(.*?)/n', 1) AS var1,
  regexp_extract(log_record, 'var2=(.*?)/n', 1) AS var2,
  ...
  regexp_extract(log_record, 'varn=(.*?)/n', 1) AS varn
FROM
  logs;

此查询使用 regexp_extract 匹配模式 var1=(.*?)/n 并为每个日志记录提取“var1=”和“/n”之间的值。对您想要提取的所有变量重复此模式。

另存为 Parquet:既然您已经提取了变量,您可以将结果保存为 Parquet 文件。为此,您可以使用带有 STORED AS PARQUET 子句的 CREATE TABLE AS 语句。这是一个例子:

CREATE TABLE parsed_logs
WITH (
  format = 'PARQUET',
  external_location = 's3://your-bucket/parsed-logs/'
)
AS
SELECT
  regexp_extract(log_record, 'var1=(.*?)/n', 1) AS var1,
  regexp_extract(log_record, 'var2=(.*?)/n', 1) AS var2,
  ...
 regexp_extract(log_record, 'varn=(.*?)/n', 1) AS varn
FROM
  logs;

确保将“s3://your-bucket/parsed-logs/”替换为存储 Parquet 文件所需的位置。

期望的输出:生成的 parsed_logs 表将把每个日志记录作为一行,每个变量作为一列,以 Parquet 格式存储。要显示所需的输出,您可以查询 parsed_logs 表并使用 Trino/Presto 的格式化函数来格式化结果。以下是如何实现这一目标的示例:

    SELECT
  concat_ws(' ', var1, var2, ..., varn) AS log_variables
FROM
  parsed_logs;

此查询使用 concat_ws 连接 var1、var2、...、varn 列,并用空格 (' ') 分隔。

通过执行以下步骤,您可以使用 Trino/Presto 代码解析日志文件,并将其保存为所需的 Parquet 格式,其中每条记录作为一行,每个变量作为一列。 🚀

© www.soinside.com 2019 - 2024. All rights reserved.