所以我对 python3 中的异步编程还很陌生。我正在使用 types_aiobotocore_sqs 和 aiobotocore 库创建 aws sqs 客户端。但我收到标题中提到的错误。
以下是代码实现
url.py file:
@api_view(["GET"])
def root_view(request):
async def run_async():
sqs_helper = await Helper.create(QueueName.TEST_QUEUE.value)
message = await sqs_helper.get_queue_url()
return message
response = asyncio.run(run_async())
return Response(str(response))
sqs_helper 文件
class Helper:
def __init__(self, queue_name: str) -> None:
self.queue_name = queue_name
self.access_key_id = env("AWS_ACCESS_KEY_ID")
self.secret_access_key = env("AWS_SECRET_ACCESS_KEY")
self.region_name = env("REGION_NAME")
self.sqs = None
@classmethod
async def create(cls, queue_name):
instance = cls(queue_name)
await instance.setup()
return instance
async def setup(self):
self.sqs_client = await self.create_sqs_client()
self.sqs = Sqs(self.sqs_client, self.queue_name)
async def create_sqs_client(self):
session = get_session()
async with session.create_client('sqs', region_name=self.region_name, aws_access_key_id=self.access_key_id, aws_secret_access_key=self.secret_access_key) as client:
client: SQSClient
print("TYPE" + str(client))
return client
async def get_queue_url(self):
return await self.sqs.get_queue_url()
发起队列的Sqs客户端
class Sqs:
def __init__(self, client: SQSClient, queue_name: str) -> None:
self.queue_name = queue_name
self._queue_url = ""
self.client = client
async def get_queue_url(self) -> str:
if not self._queue_url:
try:
response = await self.client.get_queue_url(QueueName=self.queue_name)
self._queue_url = response["QueueUrl"]
except ClientError as err:
if (
err.response.get("Error", {}).get("Code")
== "AWS.SimpleQueueService.NonExistentQueue"
):
raise QueueDoesNotExist(
f"Queue {self.queue_name} does not exist"
) from err
raise err
return self._queue_url
我在
处收到特定错误response = 在第三个代码片段中的 Sqs 类中等待 self.client.get_queue_url(QueueName=self.queue_name) 。
当您使用
async with session.create_client(...) as client
时,客户端对象仅限于该上下文,并且将 None
位于该上下文之外。如果库允许,您应该直接分配 client
,而不使用 async with
。这确保了客户端可以在该上下文之外访问,从而可能修复错误:
async def create_sqs_client(self):
session = get_session()
client = await session.create_client('sqs', region_name=self.region_name, aws_access_key_id=self.access_key_id, aws_secret_access_key=self.secret_access_key)
print("TYPE" + str(client))
return client
确保还手动处理客户端的关闭,因为您没有使用
async with
来为您完成此操作。