将AWS Kinesis Firehose回填到Elasticsearch Service失败的记录

问题描述 投票:5回答:1

我们有一个firehose将记录发送到Elasticsearch Service集群。我们的集群已经填满,一些记录未通过S3。 https://docs.aws.amazon.com/firehose/latest/dev/basic-deliver.html#retry上的文档表明失败的记录可用于回填:“跳过的文档将传递到elasticsearch_failed /文件夹中的S3存储桶,您可以将其用于手动回填”但我无法找到任何文档怎么做到这一点。

查看记录,它们似乎是包含JSON blob的文本文件的gzip文件,其中“rawData”字段包含我们发送给firehose的原始记录的base64编码字符串。

是否有现成的工具来处理S3中的这些gzip文件,将其分解并重新提交记录?文档暗示你可以“只是手动回填”,这是一个非常标准化的流程,所以我的假设是有人之前做过这个,但我还是找不到。

elasticsearch amazon-kinesis-firehose
1个回答
0
投票

我想手动回填意味着使用其中一个AWS开发工具包再次将文档发送到Elasticsearch。 python中的一个例子(使用boto3),从S3读取失败文件并将文档发送到Elasticsearch:

es_client = boto3.client('es', region_name=REGION, aws_access_key_id=ACCESS_KEY_ID, aws_secret_access_key=SECRET_ACCESS_KEY)
s3_client = boto3.client('s3', region_name=REGION, aws_access_key_id=ACCESS_KEY_ID, aws_secret_access_key=SECRET_ACCESS_KEY)

file = s3_client.get_object(Bucket=bucket, Key=key)
text = file['Body'].read().decode("utf-8")
failure_cases = list(map(lambda x: json.loads(x), filter(None, text.split('\n'))))

for case in failure_cases:
    try:
        data = base64.b64decode(case['rawData'])
        es_instance.create(index=case['esIndexName'], id=case['esDocumentId'], body=data)
        logger.debug("Successfully sent {}".format(case['esDocumentId']))
    except RequestError:
        logger.info("Retry failed for Document ID {}\nReason: {}"
                    .format(case['esDocumentId'], case['errorMessage']))
© www.soinside.com 2019 - 2024. All rights reserved.