我的场景是,我可以在 CloudWatch 中搜索错误消息,并得到我想要的所有结果。但从中,我想获取@requestId(仅适用于与错误匹配的结果)。从@requestId,我想返回所有日志。
我尝试解析消息,其中 requestId 的 guid 存在,如下所示:
parse "Z * Task timed out" as msgId
然后过滤
filter strcontains(@message, msgId)
但这返回零结果。 同样,我也尝试过:
filter ispresent(msgId)
但这只是从解析命令返回任何不为空的结果。
除了添加
dedup
命令并创建自己的列表以创建单独的搜索之外,我似乎找不到实现此目的的方法。
你能做我想做的事吗? 或者如果没有,您对替代方案有何建议?
虽然这是不可能的,但可以通过您选择的语言使用开发工具包或 AWS CLI 来实现。
获得上面的列表后,我可以迭代每个响应并获取日志流、请求 ID、时间戳(并使用 python 将其保存在 df 中)。
写一个这样的方法:
import re
import json
import subprocess
import pandas as pd
# vars
profile_name = <your aws cli profile name>
log_group_name = <your log group name eg: /aws/lambda/your-lambda>
def get_log_data_events(log_stream_name, start_time):
log_output = subprocess.run(
[
'aws',
'logs',
'get-log-events',
'--log-group-name', log_group_name,
'--log-stream-name', log_stream_name,
'--start-time', start_time,
'--start-from-head',
'--profile', profile_name
], capture_output=True, text=True)
log_output_json = json.loads(log_output.stdout)
return log_output_json['events']
['events']的返回只是给你事件的json数据,而不是你可能不需要的额外数据。
然后使用 df['log_data'].apply 这样
df['log_data'] = df.apply(lambda row: get_log_data_events(row['@logStream'], str(row['@timestamp'].timestamp()).split('.')[0]), axis=1)
**(注意,分割是从默认时间戳中删除 .000 值并转换为一个好的纪元。(预先要求将在您的 df 中将其作为时间戳数据类型))
之后,我应用了一些搜索查询来查找我需要的特定数据:
# vars
search_in_message = 'Processing Message: {'
def get_payload_event(log_data):
payload_search = search_in_message
for event in log_data:
if payload_search in event['message']:
#
json_str = re.search(r'{.*}', event['message']).group()
print(json_str)
return json.loads(json_str)
这让我可以使用另一个 df.apply 隔离我正在寻找的确切消息
df['message'] = df['log_data'].apply(lambda log_data: get_payload_event(log_data))
最终,我能够获取这些有效负载并重新提交到 sqs 队列。但这超出了我原来问题的范围。