清理旧的 SNS 订阅

问题描述 投票:0回答:2

作为一个团队,我们继承了一个使用 Amazon SNS 向移动应用程序发布移动通知的项目,该系统已经活跃多年。

系统工作原理如下

  • 我们创建客户特定主题
  • 我们使用 GCM 或 APN 令牌注册新端点

我们注意到在较旧的设置中我们有一些过时的端点数据,例如重新安装、卸载的应用程序...

我们想做两件事

  • 清理不再活动的旧端点(我们可以纯粹根据端点的“状态”来执行此操作吗?)
  • 当设备处于非活动状态时主动做出反应 - 我读到了如何使用 http 端点订阅“EndpointUpdated”事件 - 但我不清楚我们如何确定“EndpointUpdated”是卸载,...所以我们可以清理这个吗?

谢谢

amazon-web-services mobile amazon-sns
2个回答
0
投票
  1. Python 3.6
  2. 波托3
  3. 需要在本地配置AWS配置文件帐户。
import boto3


def start_process(aws_profile_name):
    session = boto3.Session(profile_name=aws_profile_name)
    sns_client = session.client('sns')
    print("Started Process")
    list_of_sns = sns_client.list_topics()
    list_of_topics = [x.get("TopicArn") for x in list_of_sns.get('Topics')]
    for topic in list_of_topics:
        try:
            subscriptions = sns_client.list_subscriptions_by_topic(TopicArn=topic)
            list_of_subscription = [s.get("SubscriptionArn") for s in subscriptions.get('Subscriptions')]
            unsubscribe(list_of_subscription, sns_client)
            sns_client.delete_topic(TopicArn=topic)
            print(f"Successfully Deleted Topic - {topic}")
        except Exception:
            print(f"Failed to delete the sns - {topic}")
    subscriptions = sns_client.list_subscriptions()
    print(subscriptions)
    list_of_subscription = [s.get("SubscriptionArn") for s in subscriptions.get('Subscriptions')]
    unsubscribe(list_of_subscription, sns_client)


def unsubscribe(subscription_arn: list, sns_client):
    for arn in subscription_arn:
        try:
            sns_client.unsubscribe(SubscriptionArn=arn)
            print(f"Successfully unsubscribe - {arn}")
        except Exception:
            print(f"Failed to unsubscribe - {arn}")


print("""
Here Profile name like where access key 
and secret key you stored Locally

To See
--------------------------
cat ~/.aws/creddential
To Add new One:
--------------------------
aws configure --profile <your custom profile name>

""")
profile_name = input(f"Enter the profile name (i;e  'archive'): ")
start_process(profile_name)


0
投票

您可以使用以下命令删除不在任何主题中的订阅:

from typing import Set

import boto3


def get_all_subscriptions() -> Set[str]:
    """
    Get all subscription ARNs.

    Returns:
        Set[str]: Set of subscription ARNs.
    """
    # Initialize the SNS client
    sns = boto3.client("sns")

    all_subscriptions = set()

    # Pagination loop
    next_token = None
    while True:
        # Get subscriptions with pagination
        response = (
            sns.list_subscriptions(NextToken=next_token) if next_token else sns.list_subscriptions()
        )

        # Extract subscription ARNs and add to the set
        all_subscriptions.update({sub["SubscriptionArn"] for sub in response["Subscriptions"]})

        # Check if there are more subscriptions to fetch
        next_token = response.get("NextToken")
        if not next_token:
            break

    return all_subscriptions


def get_subscriptions_in_topics() -> Set[str]:
    """
    Get all subscription ARNs that are subscribed to topics.

    Returns:
        Set[str]: Set of subscription ARNs in topics.
    """
    # Initialize the SNS client
    sns = boto3.client("sns")

    all_subscriptions_in_topics = set()

    # Pagination loop
    next_token = None
    while True:
        # Get all topics with pagination
        response = sns.list_topics(NextToken=next_token) if next_token else sns.list_topics()

        # Extract topic ARNs
        for topic in response["Topics"]:
            # Get subscriptions for each topic with pagination
            next_sub_token = None
            while True:
                # Get subscriptions for the current page
                sub_response = (
                    sns.list_subscriptions_by_topic(
                        TopicArn=topic["TopicArn"], NextToken=next_sub_token
                    )
                    if next_sub_token
                    else sns.list_subscriptions_by_topic(TopicArn=topic["TopicArn"])
                )
                # Extract subscription ARNs and add to the set
                all_subscriptions_in_topics.update(
                    {sub["SubscriptionArn"] for sub in sub_response["Subscriptions"]}
                )

                # Check if there are more subscriptions to fetch for this topic
                next_sub_token = sub_response.get("NextToken")
                if not next_sub_token:
                    break

        # Check if there are more topics to fetch
        next_token = response.get("NextToken")
        if not next_token:
            break

    return all_subscriptions_in_topics


def find_stale_subscriptions() -> Set[str]:
    """
    Find stale subscriptions that are not subscribed to any topic.

    Returns:
        Set[str]: Set of stale subscription ARNs.
    """
    all_subscriptions = get_all_subscriptions()
    subscriptions_in_topics = get_subscriptions_in_topics()

    # Find stale subscriptions
    stale_subscriptions = all_subscriptions - subscriptions_in_topics

    return stale_subscriptions


def delete_stale_subscriptions(stale_subscriptions: Set[str]) -> None:
    """
    Delete stale subscriptions.

    Args:
        stale_subscriptions (Set[str]): Set of stale subscription ARNs.

    Returns:
        None
    """
    # Initialize the SNS client
    sns = boto3.client("sns")

    # Delete stale subscriptions
    for subscription_arn in stale_subscriptions:
        sns.unsubscribe(SubscriptionArn=subscription_arn)
        print(f"Deleted subscription: {subscription_arn}")


if __name__ == "__main__":
    stale_subscriptions = find_stale_subscriptions()
    if stale_subscriptions:
        print(f"Stale subscriptions found: {stale_subscriptions=}")
        delete_stale_subscriptions(stale_subscriptions)
    else:
        print("No stale subscriptions found.")
© www.soinside.com 2019 - 2024. All rights reserved.