在 Swift 中使用 AWSTask 对象的正确方法是什么?

问题描述 投票:0回答:3

您好,提前感谢您的宝贵时间。

在我的代码中,我向 AWSSQS 发出各种请求,这些请求都返回 AWSTask。我发现使用这些 AWSTask 对象非常困难,同时还试图将特定于 AWS 的所有逻辑保留在单个类中,以便我可以在需要时轻松切换到不同的云服务。

理想情况下,我想做的是以串行方式异步执行一系列 AWS 任务。通常我只会将任务添加到自定义串行调度队列中,但由于 AWSTask 对象本身就是异步任务,因此我无法这样做。

这是一个简单的例子,说明了我遇到的问题。它没有任何现实世界的目的,但它很好地说明了问题。下面,我有代码来创建 SQS 队列、向 SQS 队列发送消息、从 SQS 队列接收消息以及删除 SQS 队列。假设我想以串行、异步的方式完成这四件事。换句话说,我想在尝试下一个任务之前确保上一个任务成功。

视图控制器

DispatchQueue.global(qos: DispatchQoS.QoSClass.userInitiated).async {
        awsClass.runTest()
        DispatchQueue.main.async {
            print("Test Finished")
        }
    }

AWSClass

public func createQueue(){
    guard let createQueueRequest = AWSSQSCreateQueueRequest() else{fatalError()}

    createQueueRequest.queueName = "TestQueue"

    sqs.createQueue(createQueueRequest).continueWith(block: {(task) -> AnyObject? in
        if task.error != nil {
            print(task.error!)
        }
        else if task.result != nil {
            self.queueUrl = task.result!.queueUrl!
            print("created queue at: \(self.queueUrl!)")
        }
        return nil
    })
}

public func deleteQueue(){
    if queueUrl != nil {
        guard let deleteQueueRequest = AWSSQSDeleteQueueRequest() else{fatalError()}

        deleteQueueRequest.queueUrl = queueUrl

        sqs.deleteQueue(deleteQueueRequest).continueWith(block: {(task) -> AnyObject? in
            if task.error != nil {
                print(task.error!)
            }
            else if task.result != nil {
                print("queue sucessfully deleted from \(self.queueUrl!)")
                self.queueUrl = nil
            }
            return nil
        })
    }
    else{
        print("Queue has already been deleted")
    }
}

public func sendMessage(messageData: String, toConnectId: String) {
    guard let sendMessageRequest = AWSSQSSendMessageRequest() else{fatalError()}
    sendMessageRequest.queueUrl = toConnectId
    sendMessageRequest.delaySeconds = 0
    sendMessageRequest.messageBody = messageData
    sqs.sendMessage(sendMessageRequest).continueWith(block: {(task) -> AnyObject? in
        if task.error != nil {
            print(task.error!)
        }
        else if task.result != nil {
            print("successfully sent message to \(toConnectId)")
        }
        return nil
    })
}

public func receiveMessage(){
    guard let receiveMessageRequest = AWSSQSReceiveMessageRequest() else{fatalError()}
    receiveMessageRequest.queueUrl = self.queueUrl
    receiveMessageRequest.maxNumberOfMessages = 1

    sqs.receiveMessage(receiveMessageRequest).continueWith(block: {(task) -> AnyObject? in
        if task.error != nil {
            print(task.error!)
        }
        else if task.result != nil {
            let message = (task.result?.messages?.first)!
            print("successfully received message with body:  \(message.body ?? "failed")")
        }
        return nil
    })
}

public func runTest(){
    let mySerialQueue = DispatchQueue(label: "mySerialQueue")
    mySerialQueue.sync {
        self.createQueue()
    }
    mySerialQueue.sync {
        self.sendMessage(messageData: "test", toConnectId: "https://someUrl")
    }
    mySerialQueue.sync {
        self.receiveMessage()
    }
    mySerialQueue.sync {
        self.deleteQueue()
    }
}

由于 AWSTasks 与完成函数异步,因此代码会快速进行所有四个调用,然后在这些任务完成时调用完成函数。相反,我希望第一个任务的完成功能在下一个任务开始之前完成。

ios swift amazon-web-services grand-central-dispatch amazon-sqs
3个回答
2
投票

AWSTask 对象旨在“链接”在一起。 文档可以在这里找到:http://docs.aws.amazon.com/mobile/sdkforios/developerguide/awstask.html

这里有一个小例子:

sqs.createQueue(/* parameters */).continueWithSuccess(block: {(task) -> Void in
    // Success
    return sqs.sendMessage(/* parameters */)
}).continueWithSuccess(block: {(task) -> Void in
    // Success
    return sqs.receiveMessage(/* parameters */)
}).continueWithSuccess(block: {(task) -> Void in
    // Success
    return sqs.deleteQueue(/* parameters */)
})

1
投票

好吧,我找到了解决问题的方法。它完全按照预期工作,但它是在这个令人讨厌的完成函数链中实现的。如果有人知道更优雅的解决方案,我洗耳恭听!

视图控制器

print("Starting Test")
    DispatchQueue.global(qos: DispatchQoS.QoSClass.userInitiated).async {
        atomConnector.runTest(completion: {
            print("test finshed")
        })
    }

AWSClass

public func createQueue(completion: @escaping () -> Void){
    guard let createQueueRequest = AWSSQSCreateQueueRequest() else{fatalError()}

    createQueueRequest.queueName = "TestQueue"

    sqs.createQueue(createQueueRequest).continueWith(block: {(task) -> Void in
        if task.error != nil {
            print(task.error!)
        }
        else if task.result != nil {
            self.queueUrl = task.result!.queueUrl!
            print("created queue at: \(self.queueUrl!)")
            completion()
        }
    })
}

public func deleteQueue(completion: @escaping () -> Void){
    if queueUrl != nil {
        guard let deleteQueueRequest = AWSSQSDeleteQueueRequest() else{fatalError()}

        deleteQueueRequest.queueUrl = queueUrl

        sqs.deleteQueue(deleteQueueRequest).continueWith(block: {(task) -> Void in
            if task.error != nil {
                print(task.error!)
            }
            else if task.result != nil {
                print("queue sucessfully deleted from \(self.queueUrl!)")
                self.queueUrl = nil
                completion()
            }
        })
    }
    else{
        print("Queue has already been deleted")
    }
}

public func sendMessage(messageData: String, toConnectId: String, completion: @escaping () -> Void) {
    guard let sendMessageRequest = AWSSQSSendMessageRequest() else{fatalError()}
    sendMessageRequest.queueUrl = toConnectId
    sendMessageRequest.delaySeconds = 0
    sendMessageRequest.messageBody = messageData
    sqs.sendMessage(sendMessageRequest).continueWith(block: {(task) -> Void in
        if task.error != nil {
            print(task.error!)
        }
        else if task.result != nil {
            print("successfully sent message to \(toConnectId)")
            completion()
        }
    })
}

public func receiveMessage(completion: @escaping () -> Void){
    guard let receiveMessageRequest = AWSSQSReceiveMessageRequest() else{fatalError()}
    receiveMessageRequest.queueUrl = self.queueUrl
    receiveMessageRequest.maxNumberOfMessages = 1

    sqs.receiveMessage(receiveMessageRequest).continueWith(block: {(task) -> Void in
        if task.error != nil {
            print(task.error!)
        }
        else if task.result != nil {
            let message = (task.result?.messages?.first)!
            print("successfully received message with body:  \(message.body ?? "failed")")
            self.deleteMessage(receiptHandle: message.receiptHandle, completion: completion)
        }
    })
}

public func deleteMessage(receiptHandle: String?, completion: @escaping () -> Void){
    guard let deleteMessageRequest = AWSSQSDeleteMessageRequest() else{fatalError()}
    deleteMessageRequest.queueUrl = self.queueUrl
    deleteMessageRequest.receiptHandle = receiptHandle

    sqs.deleteMessage(deleteMessageRequest).continueWith(block: {(task) -> Void in
        if task.error != nil {
            print(task.error!)
        }
        else if task.result != nil {
            print("successfully deleted message with receiptHandle:  \(receiptHandle)")
            completion()
        }
    })
}

public func runTest(completion: @escaping () -> Void){

    self.createQueue(completion: {
        self.sendMessage(messageData: "test", toConnectId: "https://someUrl", completion: {
            self.receiveMessage(completion: {
                self.deleteQueue(completion: {
                    completion()
                })
            })
        })
    })

}

0
投票

我正在使用 Swift Concurrency 来使用 AWS SDK。我发现使用

await withCheckedContinuation
是使用
AWSTask
的正确方法。我觉得这是一个更优雅的解决方案。

这是一个如何调整 Alec 的

func createQueue
以使用 Swift Concurrency 的示例

public func createQueue() async {
  guard let createQueueRequest = AWSSQSCreateQueueRequest() else{fatalError()}

  createQueueRequest.queueName = "TestQueue"

  let task = sqs.createQueue(createQueueRequest)
  await withCheckedContinuation { continuation in
    task.continueWith { _ in
      if task.error != nil {
        print(task.error!)
      }
      else if task.result != nil {
        self.queueUrl = task.result!.queueUrl!
        print("created queue at: \(self.queueUrl!)")
      }
      continuation.resume()
      return true
    }
  }
}

AWSTask.continueWith
进入
await withCheckedContinuation
块内。设计您的
AWSTask.continueWith
块,使得
continuation.resume()
在块内仅被调用一次。

一旦使用上述模型将所有方法转换为 Swift Concurrency,那么

func runTest
就更加优雅了

public func runTest() async {
  await self.createQueue()
  await self.sendMessage(messageData: "test", toConnectId: "https://someUrl")
  await self.receiveMessage()
  await self.deleteQueue()
}
© www.soinside.com 2019 - 2024. All rights reserved.