我已经使用 Golang DynamoDB SDK 一段时间了,最近我开始看到此错误类型再次出现:
RequestError:发送请求失败 原因如下:发布“https://dynamodb.[REGION].amazonaws.com/”:读取 tcp [我的 IP]->[AWS IP]:读取:连接被对等方重置
这种情况似乎仅在向 DynamoDB 写入大量数据时才会发生,尽管该错误并不限于任何特定类型的请求。我在
UpdateItem
和 BatchWriteItem
请求中都看到了它。此外,由于失败不一致,我无法将其本地化到特定的代码行。该错误似乎与我的服务和 AWS 之间的某种网络问题有关,但是,由于它不会作为限制异常返回,所以我不确定如何调试它。最后,当响应从写入请求返回时,我认为重试逻辑也不是真正的解决方案。
这是我的批量写入代码:
func (conn *Connection) BatchWrite(tableName string, requests []*dynamodb.WriteRequest) error {
// Get the length of the requests; if there aren't any then return because there's nothing to do
length := len(requests)
log.Printf("Attempting to write %d items to DynamoDB", length)
if length == 0 {
return nil
}
// Get the number of requests to make
numRequests := length / 25
if length%25 != 0 {
numRequests++
}
// Create the variables necessary to manage the concurrency
var wg sync.WaitGroup
errs := make(chan error, numRequests)
// Attempt to batch-write the requests to DynamoDB; because DynamoDB limits the number of concurrent
// items in a batch request to 25, we'll chunk the requests into 25-report segments
sections := make([][]*dynamodb.WriteRequest, numRequests)
for i := 0; i < numRequests; i++ {
// Get the end index which is 25 greater than the current index or the end of the array
// if we're getting close
end := (i + 1) * 25
if end > length {
end = length
}
// Add to the wait group so that we can ensure all the concurrent processes finish
// before we close down the process
wg.Add(1)
// Write the chunk to DynamoDB concurrently
go func(wg *sync.WaitGroup, index int, start int, end int) {
defer wg.Done()
// Call the DynamoDB operation; record any errors that occur
if section, err := conn.batchWriteInner(tableName, requests[start:end]); err != nil {
errs <- err
} else {
sections[index] = section
}
}(&wg, i, i*25, end)
}
// Wait for all the goroutines to finish
wg.Wait()
// Attempt to read an error from the channel; if we get one then return it
// Otherwise, continue. We have to use the select here because this is
// the only way to attempt to read from a channel without it blocking
select {
case err, ok := <-errs:
if ok {
return err
}
default:
break
}
// Now, we've probably gotten retries back so take these and combine them into
// a single list of requests
retries := sections[0]
if len(sections) > 1 {
for _, section := range sections[1:] {
retries = append(retries, section...)
}
}
// Rewrite the requests and return the result
return conn.BatchWrite(tableName, retries)
}
func (conn *Connection) batchWriteInner(tableName string, requests []*dynamodb.WriteRequest) ([]*dynamodb.WriteRequest, error) {
// Create the request
request := dynamodb.BatchWriteItemInput{
ReturnConsumedCapacity: aws.String(dynamodb.ReturnConsumedCapacityNone),
ReturnItemCollectionMetrics: aws.String(dynamodb.ReturnItemCollectionMetricsNone),
RequestItems: map[string][]*dynamodb.WriteRequest{
tableName: requests,
},
}
// Attempt to batch-write the items with an exponential backoff
var result *dynamodb.BatchWriteItemOutput
err := backoff.Retry(func() error {
// Attempt the batch-write; if it fails then back-off and wait. Otherwise break out
// of the loop and return
var err error
if result, err = conn.inner.BatchWriteItem(&request); err != nil {
// If we have an error then what we do here will depend on the error code
// If the error code is for exceeded throughput, exceeded request limit or
// an internal server error then we'll try again. Otherwise, we'll break out
// because the error isn't recoverable
if aerr, ok := err.(awserr.Error); ok {
switch aerr.Code() {
case dynamodb.ErrCodeProvisionedThroughputExceededException:
case dynamodb.ErrCodeRequestLimitExceeded:
case dynamodb.ErrCodeInternalServerError:
return err
}
}
// We received an error that won't be fixed by backing off; return this as a permanent
// error so we can tell the backoff library that we want to break out of the exponential backoff
return backoff.Permanent(err)
}
return nil
}, backoff.NewExponentialBackOff())
// If the batch-write failed then return an error
if err != nil {
return nil, err
}
// Roll the unprocessed items into a single list and return them
var list []*dynamodb.WriteRequest
for _, item := range result.UnprocessedItems {
list = append(list, item...)
}
return list, nil
}
之前有其他人处理过这个问题吗?这里正确的做法是什么?
我遇到了类似的问题,我发现这个问题已在此 AWS SDK GitHub 线程中得到解答:https://github.com/aws/aws-sdk-go/issues/5037
总结一下: