作为一个团队,我们继承了一个使用 Amazon SNS 向移动应用程序发布移动通知的项目,该系统已经活跃多年。
系统工作原理如下
我们注意到在较旧的设置中我们有一些过时的端点数据,例如重新安装、卸载的应用程序...
我们想做两件事
谢谢
import boto3
def start_process(aws_profile_name):
session = boto3.Session(profile_name=aws_profile_name)
sns_client = session.client('sns')
print("Started Process")
list_of_sns = sns_client.list_topics()
list_of_topics = [x.get("TopicArn") for x in list_of_sns.get('Topics')]
for topic in list_of_topics:
try:
subscriptions = sns_client.list_subscriptions_by_topic(TopicArn=topic)
list_of_subscription = [s.get("SubscriptionArn") for s in subscriptions.get('Subscriptions')]
unsubscribe(list_of_subscription, sns_client)
sns_client.delete_topic(TopicArn=topic)
print(f"Successfully Deleted Topic - {topic}")
except Exception:
print(f"Failed to delete the sns - {topic}")
subscriptions = sns_client.list_subscriptions()
print(subscriptions)
list_of_subscription = [s.get("SubscriptionArn") for s in subscriptions.get('Subscriptions')]
unsubscribe(list_of_subscription, sns_client)
def unsubscribe(subscription_arn: list, sns_client):
for arn in subscription_arn:
try:
sns_client.unsubscribe(SubscriptionArn=arn)
print(f"Successfully unsubscribe - {arn}")
except Exception:
print(f"Failed to unsubscribe - {arn}")
print("""
Here Profile name like where access key
and secret key you stored Locally
To See
--------------------------
cat ~/.aws/creddential
To Add new One:
--------------------------
aws configure --profile <your custom profile name>
""")
profile_name = input(f"Enter the profile name (i;e 'archive'): ")
start_process(profile_name)
您可以使用以下命令删除不在任何主题中的订阅:
from typing import Set
import boto3
def get_all_subscriptions() -> Set[str]:
"""
Get all subscription ARNs.
Returns:
Set[str]: Set of subscription ARNs.
"""
# Initialize the SNS client
sns = boto3.client("sns")
all_subscriptions = set()
# Pagination loop
next_token = None
while True:
# Get subscriptions with pagination
response = (
sns.list_subscriptions(NextToken=next_token) if next_token else sns.list_subscriptions()
)
# Extract subscription ARNs and add to the set
all_subscriptions.update({sub["SubscriptionArn"] for sub in response["Subscriptions"]})
# Check if there are more subscriptions to fetch
next_token = response.get("NextToken")
if not next_token:
break
return all_subscriptions
def get_subscriptions_in_topics() -> Set[str]:
"""
Get all subscription ARNs that are subscribed to topics.
Returns:
Set[str]: Set of subscription ARNs in topics.
"""
# Initialize the SNS client
sns = boto3.client("sns")
all_subscriptions_in_topics = set()
# Pagination loop
next_token = None
while True:
# Get all topics with pagination
response = sns.list_topics(NextToken=next_token) if next_token else sns.list_topics()
# Extract topic ARNs
for topic in response["Topics"]:
# Get subscriptions for each topic with pagination
next_sub_token = None
while True:
# Get subscriptions for the current page
sub_response = (
sns.list_subscriptions_by_topic(
TopicArn=topic["TopicArn"], NextToken=next_sub_token
)
if next_sub_token
else sns.list_subscriptions_by_topic(TopicArn=topic["TopicArn"])
)
# Extract subscription ARNs and add to the set
all_subscriptions_in_topics.update(
{sub["SubscriptionArn"] for sub in sub_response["Subscriptions"]}
)
# Check if there are more subscriptions to fetch for this topic
next_sub_token = sub_response.get("NextToken")
if not next_sub_token:
break
# Check if there are more topics to fetch
next_token = response.get("NextToken")
if not next_token:
break
return all_subscriptions_in_topics
def find_stale_subscriptions() -> Set[str]:
"""
Find stale subscriptions that are not subscribed to any topic.
Returns:
Set[str]: Set of stale subscription ARNs.
"""
all_subscriptions = get_all_subscriptions()
subscriptions_in_topics = get_subscriptions_in_topics()
# Find stale subscriptions
stale_subscriptions = all_subscriptions - subscriptions_in_topics
return stale_subscriptions
def delete_stale_subscriptions(stale_subscriptions: Set[str]) -> None:
"""
Delete stale subscriptions.
Args:
stale_subscriptions (Set[str]): Set of stale subscription ARNs.
Returns:
None
"""
# Initialize the SNS client
sns = boto3.client("sns")
# Delete stale subscriptions
for subscription_arn in stale_subscriptions:
sns.unsubscribe(SubscriptionArn=subscription_arn)
print(f"Deleted subscription: {subscription_arn}")
if __name__ == "__main__":
stale_subscriptions = find_stale_subscriptions()
if stale_subscriptions:
print(f"Stale subscriptions found: {stale_subscriptions=}")
delete_stale_subscriptions(stale_subscriptions)
else:
print("No stale subscriptions found.")