我有一个使用 boto3 与 AWS 服务交互的 Python 脚本。我正在尝试集成此脚本,以便一旦执行,它就会激活 GuardDuty 并创建一个逻辑,将高严重性发现结果转发到 Slack 通道。
我遵循了 this AWS 文档。
这是我迄今为止生成的代码:
def get_aws_clients():
#Omitted the rest
global guardduty
global events
global sns
global chatbot
boto3.setup_default_session(region_name=cfg.aws.region) #us-east-1
#Omitted the rest
guardduty = boto3.client('guardduty')
events = boto3.client('events')
sns = boto3.client('sns')
chatbot = boto3.client('chatbot')
def enable_guardduty():
# Verify if GuardDuty is already enabled
existing_detectors = guardduty.list_detectors()
if existing_detectors['DetectorIds']:
log.info("GuardDuty already enabled.")
return existing_detectors['DetectorIds'][0]
# Enable GuardDuty
response = guardduty.create_detector(Enable=True)
detector_id = response['DetectorId']
return detector_id
def configure_cloudwatch_events(rule_name):
response = events.put_rule(
Name=rule_name,
EventPattern=json.dumps({
"source": ["aws.guardduty"],
"detail-type": ["GuardDuty Finding"],
"detail": {
"severity": [
7, 7.0, 7.1, 7.2, 7.3, 7.4, 7.5,7.6, 7.7, 7.8, 7.9,
8, 8.0, 8.1, 8.2, 8.3, 8.4, 8.5, 8.6, 8.7, 8.8, 8.9
]
}
}),
State='ENABLED'
)
return response['RuleArn']
def configure_sns_topic(topic_name):
existing_topics = sns.list_topics()['Topics']
for topic in existing_topics:
if topic['TopicArn'].split(':')[-1] == topic_name:
log.info("Found existing SNS topic")
return topic['TopicArn']
response = sns.create_topic(Name=topic_name)
topic_arn = response['TopicArn']
policy = {
"Version": "2012-10-17",
"Statement": [{
"Effect": "Allow",
"Principal": {
"Service": "events.amazonaws.com"
},
"Action": "sns:Publish",
"Resource": topic_arn
}]
}
sns.set_topic_attributes(
TopicArn=topic_arn,
AttributeName='Policy',
AttributeValue=json.dumps(policy)
)
return topic_arn
def connect_sns_to_slack(topic_arn, slack_team_id, slack_channel_id):
#iam_role_arn = create_chatbot_iam_role()
iam_role_arn = 'arn:aws:iam::aws:policy/service-role/AWSServiceRoleForAWSChatbot'
response = chatbot.create_slack_channel_configuration(
ConfigurationName='GuardDutySlackConfig',
SlackTeamId=slack_team_id,
SlackChannelId=slack_channel_id,
SnsTopicArns=[topic_arn],
IamRoleArn=iam_role_arn
)
return response['SlackChannelConfigurationArn']
def create_chatbot_iam_role():
role_name = 'UserRoleForChatbot'
trust_policy = {
"Version": "2012-10-17",
"Statement": [{
"Effect": "Allow",
"Principal": {
"Service": "management.chatbot.amazonaws.com"
},
"Action": "sts:AssumeRole"
}]
}
try:
response = iam.create_role(
RoleName=role_name,
AssumeRolePolicyDocument=json.dumps(trust_policy)
)
role_arn = response['Role']['Arn']
log.info(f"User Iam role created for chatbot: {role_arn}")
iam.attach_role_policy(
RoleName=role_name,
PolicyArn='arn:aws:iam::aws:policy/ReadOnlyAccess'
)
return role_arn
except iam.exceptions.EntityAlreadyExistsException:
print("IAM role already existing.")
return None
def create_cloudwatch_event_rule(rule_name, target_arn):
response = events.put_targets(
Rule=rule_name,
Targets=[{
'Id': '1',
'Arn': target_arn
}]
)
return response['FailedEntryCount'] == 0
def configure_guard_duty_to_slack():
try:
# Configure GuardDuty
detector_id = enable_guardduty()
log.info(f"GuardDuty enabled with DetectorId: {detector_id}")
# Configure CloudWatch Events
rule_name = 'GuardDutyFindingRule'
rule_arn = configure_cloudwatch_events(rule_name)
log.info(f"CloudWatch Events rule configured with ARN: {rule_arn}")
# Configure SNS Topic
topic_arn = configure_sns_topic('GuardDutyFindingTopic')
log.info(f"SNS Topic configured with ARN: {topic_arn}")
# Connect SNS Topic to Slack through Chatbot
slack_channel_id = 'XXXXXXXX'
slack_team_id = 'YYYYYYYYY'
slack_config_arn = connect_sns_to_slack(topic_arn, slack_team_id, slack_channel_id)
log.info(f"Slack configuration created with ARN: {slack_config_arn}")
# Target Cloudwatch to SNS topic
success = create_cloudwatch_event_rule(rule_name, topic_arn)
if success:
log.info("CloudWatch Event rule created successfully.")
else:
log.info("Failed to create CloudWatch Event rule.")
except Exception as ex:
log.error(f"*** {ex}")
sys.exit(1)
调用
chatbot.create_slack_channel_configuration
方法时出现此异常:
*** Could not connect to the endpoint URL: "https://chatbot.us-east-1.amazonaws.com/create-slack-channel-configuration"
我相信到目前为止一切都运行良好。所有服务均已成功创建,我可以通过AWS控制台验证它们。 我还尝试通过 AWS 控制台配置 Slack 客户端,效果确实如此。我在 Slack 频道上收到通知,这是一个积极的结果。但是,我需要通过 Python 脚本以编程方式自动执行所有这些任务。
我不确定这种方法是否正确,如果我遗漏了什么,那么我是 AWS 新手。任何帮助或建议将不胜感激。预先感谢!
这很可能是因为 AWS Chatbot 是一项全球服务,不接受任何区域。
我看到您有
boto3.setup_default_session(region_name=cfg.aws.region) #us-east-1
,它将 us-east-1 设置为默认区域,并使所有 boto3 客户端全部使用区域终端节点,但是 AWS Chatbot 没有区域终端节点,只有全局终端节点。
如果您以其他方式设置区域,那么它会起作用:
guardduty = boto3.client('guardduty', region_name='us-east-1')
events = boto3.client('events', region_name='us-east-1')
sns = boto3.client('sns', region_name='us-east-1')
chatbot = boto3.client('chatbot') # don't set a region here