Python/Boto - 在没有序列令牌的情况下写入 AWS CloudWatch Logs

问题描述 投票:0回答:8

我正在尝试使用 Python 和 Boto 框架将日志发送到 AWS CloudWatch Logs。我正在这样做:

res=logs.put_log_events("FOO", "BAR",
     [{'timestamp':int(round(time.time() * 1000)),
       'message':time.strftime("%m/%d/%Y %H:%M:%S")+' Scheduled  monitoring check' }], 
     sequence_token=None)

每次运行时都会出现错误:

boto.logs.exceptions.InvalidSequenceTokenException: InvalidSequenceTokenException: 400 Bad Request
{u'message': u'The given sequenceToken is invalid. The next expected sequenceToken is: 49540113336360065754596906019042392283494234157161146226', u'expectedSequenceToken': u'49540113336360065754596906019042392283494234157161146226', u'__type': u'InvalidSequenceTokenException'}

存储该令牌对我来说有些不切实际。这没有意义,为什么我不能直接追加到日志流中?

我该如何解决这个问题?

python amazon-web-services boto
8个回答
22
投票

AWS Cloud Watch Putlogevent 代码

import boto3
import time


client = boto3.client('logs')

LOG_GROUP='cloudwatch_customlog'
LOG_STREAM='{}-{}'.format(time.strftime('%Y-%m-%d'),'logstream')

try:
   client.create_log_group(logGroupName=LOG_GROUP)
except client.exceptions.ResourceAlreadyExistsException:
   pass

try:
   client.create_log_stream(logGroupName=LOG_GROUP, logStreamName=LOG_STREAM)
except client.exceptions.ResourceAlreadyExistsException:
   pass

response = client.describe_log_streams(
   logGroupName=LOG_GROUP,
   logStreamNamePrefix=LOG_STREAM
)

event_log = {
   'logGroupName': LOG_GROUP,
   'logStreamName': LOG_STREAM,
   'logEvents': [
       {
           'timestamp': int(round(time.time() * 1000)),
           'message': time.strftime('%Y-%m-%d %H:%M:%S')+'\t Your custom log messages'
       }
   ],
}

if 'uploadSequenceToken' in response['logStreams'][0]:
   event_log.update({'sequenceToken': response['logStreams'][0] ['uploadSequenceToken']})

response = client.put_log_events(**event_log)
print(response)

7
投票

您可以通过首先通过 describe_log_streams():

查找 uploadSequenceToken 来解决这个问题

本质上,该过程是您使用 logStreamNamePrefix 来专门标识要附加到的日志流。然后从响应中解析 uploadSequenceToken。

响应语法

 {
     'logStreams': [
         {
             'logStreamName': 'string',
             'creationTime': 123,
             'firstEventTimestamp': 123,
             'lastEventTimestamp': 123,
             'lastIngestionTime': 123,
             'uploadSequenceToken': 'string',
             'arn': 'string',
             'storedBytes': 123
         },
     ],
     'nextToken': 'string'
 }

返回与指定日志组关联的所有日志流。响应中返回的列表按日志流名称进行 ASCII 排序。

默认情况下,此操作最多返回 50 个日志流。如果有更多日志流要列出,则响应将在响应正文中包含 nextToken 值。您还可以通过在请求中指定 limit 参数来限制响应中返回的日志流的数量。此操作的限制为每秒 5 个事务,之后事务将受到限制。

请求语法

response = client.describe_log_streams(
    logGroupName='string',
    logStreamNamePrefix='string',
    orderBy='LogStreamName'|'LastEventTime',
    descending=True|False,
    nextToken='string',
    limit=123
)

6
投票

用有根据的猜测来回答“为什么”部分:这是可扩展异步服务的本质。 如果 Amazon

要求您维护序列号,那么他们永远无法在多个实例上扩展其 CloudWatch 服务,同时仍然能够保证您的日志以与发生时的完全相同的顺序出现(想象一下调试问题时无序的日志条目是多么烦人)。时钟、网络延迟或日志接收器路径上的其他延迟的任何微小偏差都会引入排序问题。 但是,由于他们

确实

要求您提供序列号,突然间他们可以轻松地扩展他们的服务,并且简单地合并排序传入的日志条目,同时仍然保留正确的日志顺序,您的日志顺序。


4
投票

每个 PutLogEvents 请求都必须包含从前一个请求的响应中获取的sequenceToken。在新创建的日志流中上传不需要sequenceToken。

来源


4
投票
文档

中提到的:

您还可以从
expectedSequenceToken

获取

InvalidSequenceTokenException
字段中的序列标记。


但是,问题是 boto3 在异常中没有
expectedSequenceToken

字段,正如

问题
中讨论的那样:

Boto3不支持从异常中解析额外的参数,它只添加一个Code和一个Message。标记为功能请求,我认为这是我们应该添加的内容,但目前最好的解决方法是解析错误消息。

显然,解析消息来获取令牌并不理想,因为消息的格式可能会改变。但它提供了一个简单的工作解决方案,无需调用
describe_log_streams


def append_log(group: str, stream: str, msg: str): logs = boto3.client('logs') def put(token=None, repeat: int = 0): events = [{ 'timestamp': int(round(time.time() * 1000)), 'message': msg }] try: if token: logs.put_log_events(logGroupName=group, logStreamName=stream, logEvents=events, sequenceToken=token) else: logs.put_log_events(logGroupName=group, logStreamName=stream, logEvents=events) except (logs.exceptions.InvalidSequenceTokenException, logs.exceptions.DataAlreadyAcceptedException) as e: error_msg = e.response['Error']['Message'] if repeat > 10: raise Exception("Too many repeats to write log") put(error_msg[error_msg.index(":") + 1:].strip(), repeat + 1) try: put() except logs.exceptions.ResourceNotFoundException: try: logs.create_log_stream(logGroupName=group, logStreamName=stream) except logs.exceptions.ResourceNotFoundException: logs.create_log_group(logGroupName=group) logs.create_log_stream(logGroupName=group, logStreamName=stream) put()

如果组和流不存在,该函数将创建该组和流。


3
投票

import time class CloudWatch: def __init__(self, boto3, log_group): self.client = boto3.client("logs") self.log_group = log_group self.sequence_token = None def log(self, message): print(message) # Delete this if you don't want stdout as well. log_stream = time.strftime('%Y-%m-%d') event_log = { 'logGroupName': self.log_group, 'logStreamName': log_stream, 'logEvents': [ { 'timestamp': int(round(time.time() * 1000)), 'message': message } ], } if self.sequence_token is not None: event_log.update({"sequenceToken" : self.sequence_token}) for _ in range(3): try: response = self.client.put_log_events(**event_log) self.sequence_token = response["nextSequenceToken"] return except self.client.exceptions.ResourceNotFoundException: try: self.client.create_log_group(logGroupName=self.log_group) except self.client.exceptions.ResourceAlreadyExistsException: pass try: self.client.create_log_stream(logGroupName=self.log_group, logStreamName=log_stream) except self.client.exceptions.ResourceAlreadyExistsException: pass except self.client.exceptions.InvalidSequenceTokenException as e: event_log.update({"sequenceToken" : e.response["Error"]["Message"].split("is: ")[-1]}) continue except self.client.exceptions.DataAlreadyAcceptedException: return



2
投票


0
投票
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/logs/client/put_log_events.html

say

警告 现在,PutLogEvents 操作中会忽略序列标记

因此这些示例现在包含不必要的复杂性。

这可能作为简单的 cloudwatch 日志客户端很有用。

class CloudwatchLogs: def __init__(self, log_group): self.logs = boto3.client('logs') self.log_stream = f'{datetime.datetime.utcnow().strftime("%Y-%m-%dT%H-%M-%S")}-logstream' self.log_group = log_group self.created = False def _maybe_create(self): if not self.created: try: self.logs.create_log_stream(logGroupName=self.log_group, logStreamName=self.log_stream) self.created = True except self.logs.exceptions.ResourceAlreadyExistsException: pass def log(self, data: List[dict]): def as_cloudwatch(item) -> dict: return {'timestamp': int(time.time_ns() / 1_000_000), 'message': json.dumps(item)} self._maybe_create() self.logs.put_log_events( logGroupName=self.log_group, logStreamName=self.log_stream, logEvents=[as_cloudwatch(i) for i in data] )

那么你可以说

logs = CloudwatchLogs(log_group="/log/group") logs.log([ { "eventName": "NiceEvent", "text": "Something" } ] )

© www.soinside.com 2019 - 2024. All rights reserved.