我正在尝试使用 Python 和 Boto 框架将日志发送到 AWS CloudWatch Logs。我正在这样做:
res=logs.put_log_events("FOO", "BAR",
[{'timestamp':int(round(time.time() * 1000)),
'message':time.strftime("%m/%d/%Y %H:%M:%S")+' Scheduled monitoring check' }],
sequence_token=None)
每次运行时都会出现错误:
boto.logs.exceptions.InvalidSequenceTokenException: InvalidSequenceTokenException: 400 Bad Request
{u'message': u'The given sequenceToken is invalid. The next expected sequenceToken is: 49540113336360065754596906019042392283494234157161146226', u'expectedSequenceToken': u'49540113336360065754596906019042392283494234157161146226', u'__type': u'InvalidSequenceTokenException'}
存储该令牌对我来说有些不切实际。这没有意义,为什么我不能直接追加到日志流中?
我该如何解决这个问题?
AWS Cloud Watch Putlogevent 代码
import boto3
import time
client = boto3.client('logs')
LOG_GROUP='cloudwatch_customlog'
LOG_STREAM='{}-{}'.format(time.strftime('%Y-%m-%d'),'logstream')
try:
client.create_log_group(logGroupName=LOG_GROUP)
except client.exceptions.ResourceAlreadyExistsException:
pass
try:
client.create_log_stream(logGroupName=LOG_GROUP, logStreamName=LOG_STREAM)
except client.exceptions.ResourceAlreadyExistsException:
pass
response = client.describe_log_streams(
logGroupName=LOG_GROUP,
logStreamNamePrefix=LOG_STREAM
)
event_log = {
'logGroupName': LOG_GROUP,
'logStreamName': LOG_STREAM,
'logEvents': [
{
'timestamp': int(round(time.time() * 1000)),
'message': time.strftime('%Y-%m-%d %H:%M:%S')+'\t Your custom log messages'
}
],
}
if 'uploadSequenceToken' in response['logStreams'][0]:
event_log.update({'sequenceToken': response['logStreams'][0] ['uploadSequenceToken']})
response = client.put_log_events(**event_log)
print(response)
您可以通过首先通过 describe_log_streams():
查找 uploadSequenceToken 来解决这个问题本质上,该过程是您使用 logStreamNamePrefix 来专门标识要附加到的日志流。然后从响应中解析 uploadSequenceToken。
响应语法
{ 'logStreams': [ { 'logStreamName': 'string', 'creationTime': 123, 'firstEventTimestamp': 123, 'lastEventTimestamp': 123, 'lastIngestionTime': 123, 'uploadSequenceToken': 'string', 'arn': 'string', 'storedBytes': 123 }, ], 'nextToken': 'string' }
返回与指定日志组关联的所有日志流。响应中返回的列表按日志流名称进行 ASCII 排序。
默认情况下,此操作最多返回 50 个日志流。如果有更多日志流要列出,则响应将在响应正文中包含 nextToken 值。您还可以通过在请求中指定 limit 参数来限制响应中返回的日志流的数量。此操作的限制为每秒 5 个事务,之后事务将受到限制。
请求语法
response = client.describe_log_streams( logGroupName='string', logStreamNamePrefix='string', orderBy='LogStreamName'|'LastEventTime', descending=True|False, nextToken='string', limit=123 )
用有根据的猜测来回答“为什么”部分:这是可扩展异步服务的本质。 如果 Amazon
不要求您维护序列号,那么他们永远无法在多个实例上扩展其 CloudWatch 服务,同时仍然能够保证您的日志以与发生时的完全相同的顺序出现(想象一下调试问题时无序的日志条目是多么烦人)。时钟、网络延迟或日志接收器路径上的其他延迟的任何微小偏差都会引入排序问题。 但是,由于他们
确实要求您提供序列号,突然间他们可以轻松地扩展他们的服务,并且简单地合并排序传入的日志条目,同时仍然保留正确的日志顺序,您的日志顺序。
expectedSequenceToken
获取
字段中的序列标记。 但是,问题是 boto3 在异常中没有InvalidSequenceTokenException
expectedSequenceToken
字段,正如
问题中讨论的那样: Boto3不支持从异常中解析额外的参数,它只添加一个Code和一个Message。标记为功能请求,我认为这是我们应该添加的内容,但目前最好的解决方法是解析错误消息。
显然,解析消息来获取令牌并不理想,因为消息的格式可能会改变。但它提供了一个简单的工作解决方案,无需调用
describe_log_streams
。
def append_log(group: str, stream: str, msg: str):
logs = boto3.client('logs')
def put(token=None, repeat: int = 0):
events = [{
'timestamp': int(round(time.time() * 1000)),
'message': msg
}]
try:
if token:
logs.put_log_events(logGroupName=group, logStreamName=stream, logEvents=events, sequenceToken=token)
else:
logs.put_log_events(logGroupName=group, logStreamName=stream, logEvents=events)
except (logs.exceptions.InvalidSequenceTokenException, logs.exceptions.DataAlreadyAcceptedException) as e:
error_msg = e.response['Error']['Message']
if repeat > 10:
raise Exception("Too many repeats to write log")
put(error_msg[error_msg.index(":") + 1:].strip(), repeat + 1)
try:
put()
except logs.exceptions.ResourceNotFoundException:
try:
logs.create_log_stream(logGroupName=group, logStreamName=stream)
except logs.exceptions.ResourceNotFoundException:
logs.create_log_group(logGroupName=group)
logs.create_log_stream(logGroupName=group, logStreamName=stream)
put()
如果组和流不存在,该函数将创建该组和流。
import time
class CloudWatch:
def __init__(self, boto3, log_group):
self.client = boto3.client("logs")
self.log_group = log_group
self.sequence_token = None
def log(self, message):
print(message) # Delete this if you don't want stdout as well.
log_stream = time.strftime('%Y-%m-%d')
event_log = {
'logGroupName': self.log_group,
'logStreamName': log_stream,
'logEvents': [
{
'timestamp': int(round(time.time() * 1000)),
'message': message
}
],
}
if self.sequence_token is not None:
event_log.update({"sequenceToken" : self.sequence_token})
for _ in range(3):
try:
response = self.client.put_log_events(**event_log)
self.sequence_token = response["nextSequenceToken"]
return
except self.client.exceptions.ResourceNotFoundException:
try:
self.client.create_log_group(logGroupName=self.log_group)
except self.client.exceptions.ResourceAlreadyExistsException:
pass
try:
self.client.create_log_stream(logGroupName=self.log_group, logStreamName=log_stream)
except self.client.exceptions.ResourceAlreadyExistsException:
pass
except self.client.exceptions.InvalidSequenceTokenException as e:
event_log.update({"sequenceToken" : e.response["Error"]["Message"].split("is: ")[-1]})
continue
except self.client.exceptions.DataAlreadyAcceptedException:
return
因此这些示例现在包含不必要的复杂性。
这可能作为简单的 cloudwatch 日志客户端很有用。
class CloudwatchLogs:
def __init__(self, log_group):
self.logs = boto3.client('logs')
self.log_stream = f'{datetime.datetime.utcnow().strftime("%Y-%m-%dT%H-%M-%S")}-logstream'
self.log_group = log_group
self.created = False
def _maybe_create(self):
if not self.created:
try:
self.logs.create_log_stream(logGroupName=self.log_group, logStreamName=self.log_stream)
self.created = True
except self.logs.exceptions.ResourceAlreadyExistsException:
pass
def log(self, data: List[dict]):
def as_cloudwatch(item) -> dict:
return {'timestamp': int(time.time_ns() / 1_000_000), 'message': json.dumps(item)}
self._maybe_create()
self.logs.put_log_events(
logGroupName=self.log_group,
logStreamName=self.log_stream,
logEvents=[as_cloudwatch(i) for i in data]
)
那么你可以说
logs = CloudwatchLogs(log_group="/log/group")
logs.log([ { "eventName": "NiceEvent", "text": "Something" } ] )