配置 AWS GuardDuty 将调查结果通知发布到 Slack

问题描述 投票:0回答:1

我有一个使用 boto3 与 AWS 服务交互的 Python 脚本。我正在尝试集成此脚本,以便一旦执行,它就会激活 GuardDuty 并创建一个逻辑,将高严重性发现结果转发到 Slack 通道。

我遵循了 this AWS 文档。

这是我迄今为止生成的代码:


def get_aws_clients():
    #Omitted the rest
    global guardduty
    global events
    global sns
    global chatbot
    boto3.setup_default_session(region_name=cfg.aws.region) #us-east-1
    #Omitted the rest
    guardduty = boto3.client('guardduty')
    events = boto3.client('events')
    sns = boto3.client('sns')
    chatbot = boto3.client('chatbot')


def enable_guardduty():
    # Verify if GuardDuty is already enabled
    existing_detectors = guardduty.list_detectors()
    if existing_detectors['DetectorIds']:
        log.info("GuardDuty already enabled.")
        return existing_detectors['DetectorIds'][0]
    # Enable GuardDuty
    response = guardduty.create_detector(Enable=True)
    detector_id = response['DetectorId']
    return detector_id


def configure_cloudwatch_events(rule_name):
    response = events.put_rule(
        Name=rule_name,
        EventPattern=json.dumps({
            "source": ["aws.guardduty"],
            "detail-type": ["GuardDuty Finding"],
            "detail": {
                "severity": [
                    7, 7.0, 7.1, 7.2, 7.3, 7.4, 7.5,7.6, 7.7, 7.8, 7.9,
    8, 8.0, 8.1, 8.2, 8.3, 8.4, 8.5, 8.6, 8.7, 8.8, 8.9
                ]
            }
        }),
        State='ENABLED'
    )
    return response['RuleArn']


def configure_sns_topic(topic_name):
    existing_topics = sns.list_topics()['Topics']
    for topic in existing_topics:
        if topic['TopicArn'].split(':')[-1] == topic_name:
            log.info("Found existing SNS topic")
            return topic['TopicArn']
    response = sns.create_topic(Name=topic_name)
    topic_arn = response['TopicArn']
    policy = {
        "Version": "2012-10-17",
        "Statement": [{
            "Effect": "Allow",
            "Principal": {
                "Service": "events.amazonaws.com"
            },
            "Action": "sns:Publish",
            "Resource": topic_arn
        }]
    }
    sns.set_topic_attributes(
        TopicArn=topic_arn,
        AttributeName='Policy',
        AttributeValue=json.dumps(policy)
    )
    return topic_arn


def connect_sns_to_slack(topic_arn, slack_team_id, slack_channel_id):
    #iam_role_arn = create_chatbot_iam_role()
    iam_role_arn = 'arn:aws:iam::aws:policy/service-role/AWSServiceRoleForAWSChatbot'
    response = chatbot.create_slack_channel_configuration(
        ConfigurationName='GuardDutySlackConfig',
        SlackTeamId=slack_team_id,
        SlackChannelId=slack_channel_id,
        SnsTopicArns=[topic_arn],
        IamRoleArn=iam_role_arn
    )
    return response['SlackChannelConfigurationArn']


def create_chatbot_iam_role():
    role_name = 'UserRoleForChatbot'
    trust_policy = {
        "Version": "2012-10-17",
        "Statement": [{
            "Effect": "Allow",
            "Principal": {
                "Service": "management.chatbot.amazonaws.com"
            },
            "Action": "sts:AssumeRole"
        }]
    }
    try:
        response = iam.create_role(
            RoleName=role_name,
            AssumeRolePolicyDocument=json.dumps(trust_policy)
        )
        role_arn = response['Role']['Arn']
        log.info(f"User Iam role created for chatbot: {role_arn}")
        iam.attach_role_policy(
            RoleName=role_name,
            PolicyArn='arn:aws:iam::aws:policy/ReadOnlyAccess'
        )
        return role_arn

    except iam.exceptions.EntityAlreadyExistsException:
        print("IAM role already existing.")
        return None


def create_cloudwatch_event_rule(rule_name, target_arn):
    response = events.put_targets(
        Rule=rule_name,
        Targets=[{
            'Id': '1',
            'Arn': target_arn
        }]
    )
    return response['FailedEntryCount'] == 0


def configure_guard_duty_to_slack():
    try:
        # Configure GuardDuty
        detector_id = enable_guardduty()
        log.info(f"GuardDuty enabled with DetectorId: {detector_id}")
        # Configure CloudWatch Events
        rule_name = 'GuardDutyFindingRule'
        rule_arn = configure_cloudwatch_events(rule_name)
        log.info(f"CloudWatch Events rule configured with ARN: {rule_arn}")
        # Configure SNS Topic
        topic_arn = configure_sns_topic('GuardDutyFindingTopic')
        log.info(f"SNS Topic configured with ARN: {topic_arn}")
        # Connect SNS Topic to Slack through Chatbot
        slack_channel_id = 'XXXXXXXX'
        slack_team_id = 'YYYYYYYYY'
        slack_config_arn = connect_sns_to_slack(topic_arn, slack_team_id, slack_channel_id)
        log.info(f"Slack configuration created with ARN: {slack_config_arn}")
        # Target Cloudwatch to SNS topic
        success = create_cloudwatch_event_rule(rule_name, topic_arn)
        if success:
            log.info("CloudWatch Event rule created successfully.")
        else:
            log.info("Failed to create CloudWatch Event rule.")
    except Exception as ex:
        log.error(f"*** {ex}")
        sys.exit(1)

调用

chatbot.create_slack_channel_configuration
方法时出现此异常:

*** Could not connect to the endpoint URL: "https://chatbot.us-east-1.amazonaws.com/create-slack-channel-configuration"

我相信到目前为止一切都运行良好。所有服务均已成功创建,我可以通过AWS控制台验证它们。 我还尝试通过 AWS 控制台配置 Slack 客户端,效果确实如此。我在 Slack 频道上收到通知,这是一个积极的结果。但是,我需要通过 Python 脚本以编程方式自动执行所有这些任务。

我不确定这种方法是否正确,如果我遗漏了什么,那么我是 AWS 新手。任何帮助或建议将不胜感激。预先感谢!

amazon-web-services boto3 chatbot slack amazon-guardduty
1个回答
0
投票

这很可能是因为 AWS Chatbot 是一项全球服务,不接受任何区域。

我看到您有

boto3.setup_default_session(region_name=cfg.aws.region) #us-east-1
,它将 us-east-1 设置为默认区域,并使所有 boto3 客户端全部使用区域终端节点,但是 AWS Chatbot 没有区域终端节点,只有全局终端节点。

如果您以其他方式设置区域,那么它会起作用:

guardduty = boto3.client('guardduty', region_name='us-east-1')
events = boto3.client('events', region_name='us-east-1')
sns = boto3.client('sns', region_name='us-east-1')
chatbot = boto3.client('chatbot')  # don't set a region here
© www.soinside.com 2019 - 2024. All rights reserved.