通过 http 标头搜索 Athena AWS WAF 日志

问题描述 投票:0回答:3

我已将 AWS WAF 日志记录设置到 S3,并按照文档中所述创建了 Athena 表 ( https://docs.aws.amazon.com/athena/latest/ug/waf-logs.html )

但是,http 标头存储为

array<struct<name: string, value: string>>
而不是地图(出于正当理由)。我想写一个像这样的查询

select headers.user-agent, headers.if-none-match from waf_logs where something;

if-none-match 可能会也可能不会出现在标头列表中。

使用

CROSS JOIN UNNEST(httprequest.headers)
不起作用,因为这会创建多行。使用地图表示法不起作用,因为它是一个结构数组,而不是地图。

互联网上有很多关于如何设置表的页面,但没有太多关于实际查询的示例,而且我也找不到关于如何通过嵌套属性进行查询的页面。

我真的很感激任何建议。谢谢!

amazon-web-services amazon-athena amazon-waf
3个回答
2
投票

这是一个 WAF 查询,应该可以解决请求标头的问题:

WITH waf_data AS (
  SELECT
    waf.action as action,
    waf.httprequest.clientip as clientip,
    waf.httprequest.country as country,
    map_agg(f.name, f.value) AS kv
  FROM "waf_logs" waf,
  UNNEST(waf.httprequest.headers) AS t(f)
  GROUP BY 1, 2, 3 
)
SELECT
  waf_data.action,
  waf_data.clientip,
  waf_data.country,
  waf_data.kv['Host'] AS host,
  waf_data.kv['User-Agent'] as UserAgent,
  waf_data.kv['Cookie'] as cookie
FROM waf_data
WHERE waf_data.kv['Host'] like 'waf_alb.us-east-2.elb.amazonaws.com'
LIMIT 10;

1
投票

我强烈建议所有开发人员尽可能不要使用 UNNEST,大多数时候你不需要笛卡尔积,并且你绝对应该避免多次取消嵌套,因为行数将高得难以想象。只需投射

httprequest.headers
然后使用过滤器提取值。

取消嵌套然后再次分组只是为了获取值也是繁琐且不稳定的,如果你需要通过按键从

httprequest.headers
中提取多个值,你根本无法轻松稳定地获得 100% 正确的结果。

请记住,大多数开发人员认为他们很了解 SQL 并掌握了它,只有少数人意识到正确使用它是多么困难,以及 SQL 返回一个虚假且令人信服的结果是多么容易,让你认为你编写了正确的 SQL 并得到了正确的结果。

WITH
    targets AS (
        SELECT
            *,
            cast(
                httprequest.headers as ARRAY(ROW(name VARCHAR, value VARCHAR))
            ) as headers
        FROM your_app.waf_logs
        WHERE
            "action" NOT IN ('BLOCK', 'ALLOW')
          AND to_iso8601(from_unixtime(timestamp / 1000)) >= '2023-04-18'
    ),
    result AS (
        SELECT
            to_iso8601(from_unixtime(timestamp / 1000)) as time_ISO_8601,
            terminatingruleid,
            labels,
            try(
                filter(
                    headers,
                    x -> LOWER(x.name) = 'user-agent'
                )[1].value
            ) AS UserAgent,
            try(
                filter(
                    headers,
                    x -> LOWER(x.name) = 'x-forwarded-for'
                )[1].value
            ) AS XForwardedForIP
        FROM targets
    )
SELECT
    *
FROM result
;

0
投票

我使用以下内容提取 HTTP 标头值(此处

Host
字段按名称):

SELECT action, header.value as hostname, clientip, timestamp
from (
    SELECT 
      httprequest.clientip as clientip, 
      action,
      timestamp, 
      httprequest.headers as headers 
    FROM waf_logs 
) 
cross join unnest(headers) as c(header)
where lower(header.name) = 'host'

我使用这篇文章来提取行数组,其格式类似于waf日志数据库中的

array(row("name" varchar,"value" varchar))

© www.soinside.com 2019 - 2024. All rights reserved.