我已将 AWS WAF 日志记录设置到 S3,并按照文档中所述创建了 Athena 表 ( https://docs.aws.amazon.com/athena/latest/ug/waf-logs.html )
但是,http 标头存储为
array<struct<name: string, value: string>>
而不是地图(出于正当理由)。我想写一个像这样的查询
select headers.user-agent, headers.if-none-match from waf_logs where something;
if-none-match 可能会也可能不会出现在标头列表中。
使用
CROSS JOIN UNNEST(httprequest.headers)
不起作用,因为这会创建多行。使用地图表示法不起作用,因为它是一个结构数组,而不是地图。
互联网上有很多关于如何设置表的页面,但没有太多关于实际查询的示例,而且我也找不到关于如何通过嵌套属性进行查询的页面。
我真的很感激任何建议。谢谢!
这是一个 WAF 查询,应该可以解决请求标头的问题:
WITH waf_data AS (
SELECT
waf.action as action,
waf.httprequest.clientip as clientip,
waf.httprequest.country as country,
map_agg(f.name, f.value) AS kv
FROM "waf_logs" waf,
UNNEST(waf.httprequest.headers) AS t(f)
GROUP BY 1, 2, 3
)
SELECT
waf_data.action,
waf_data.clientip,
waf_data.country,
waf_data.kv['Host'] AS host,
waf_data.kv['User-Agent'] as UserAgent,
waf_data.kv['Cookie'] as cookie
FROM waf_data
WHERE waf_data.kv['Host'] like 'waf_alb.us-east-2.elb.amazonaws.com'
LIMIT 10;
我强烈建议所有开发人员尽可能不要使用 UNNEST,大多数时候你不需要笛卡尔积,并且你绝对应该避免多次取消嵌套,因为行数将高得难以想象。只需投射
httprequest.headers
然后使用过滤器提取值。
取消嵌套然后再次分组只是为了获取值也是繁琐且不稳定的,如果你需要通过按键从
httprequest.headers
中提取多个值,你根本无法轻松稳定地获得 100% 正确的结果。
请记住,大多数开发人员认为他们很了解 SQL 并掌握了它,只有少数人意识到正确使用它是多么困难,以及 SQL 返回一个虚假且令人信服的结果是多么容易,让你认为你编写了正确的 SQL 并得到了正确的结果。
WITH
targets AS (
SELECT
*,
cast(
httprequest.headers as ARRAY(ROW(name VARCHAR, value VARCHAR))
) as headers
FROM your_app.waf_logs
WHERE
"action" NOT IN ('BLOCK', 'ALLOW')
AND to_iso8601(from_unixtime(timestamp / 1000)) >= '2023-04-18'
),
result AS (
SELECT
to_iso8601(from_unixtime(timestamp / 1000)) as time_ISO_8601,
terminatingruleid,
labels,
try(
filter(
headers,
x -> LOWER(x.name) = 'user-agent'
)[1].value
) AS UserAgent,
try(
filter(
headers,
x -> LOWER(x.name) = 'x-forwarded-for'
)[1].value
) AS XForwardedForIP
FROM targets
)
SELECT
*
FROM result
;
我使用以下内容提取 HTTP 标头值(此处
Host
字段按名称):
SELECT action, header.value as hostname, clientip, timestamp
from (
SELECT
httprequest.clientip as clientip,
action,
timestamp,
httprequest.headers as headers
FROM waf_logs
)
cross join unnest(headers) as c(header)
where lower(header.name) = 'host'
我使用这篇文章来提取行数组,其格式类似于waf日志数据库中的
array(row("name" varchar,"value" varchar))