读取 TCP 读取:连接被对等方重置

问题描述 投票:0回答:1

我已经使用 Golang DynamoDB SDK 一段时间了,最近我开始看到此错误类型再次出现:

RequestError:发送请求失败 原因如下:发布“https://dynamodb.[REGION].amazonaws.com/”:读取 tcp [我的 IP]->[AWS IP]:读取:连接被对等方重置

这种情况似乎仅在向 DynamoDB 写入大量数据时才会发生,尽管该错误并不限于任何特定类型的请求。我在

UpdateItem
BatchWriteItem
请求中都看到了它。此外,由于失败不一致,我无法将其本地化到特定的代码行。该错误似乎与我的服务和 AWS 之间的某种网络问题有关,但是,由于它不会作为限制异常返回,所以我不确定如何调试它。最后,当响应从写入请求返回时,我认为重试逻辑也不是真正的解决方案。

这是我的批量写入代码:

func (conn *Connection) BatchWrite(tableName string, requests []*dynamodb.WriteRequest) error {

    // Get the length of the requests; if there aren't any then return because there's nothing to do
    length := len(requests)
    log.Printf("Attempting to write %d items to DynamoDB", length)
    if length == 0 {
        return nil
    }

    // Get the number of requests to make
    numRequests := length / 25
    if length%25 != 0 {
        numRequests++
    }

    // Create the variables necessary to manage the concurrency
    var wg sync.WaitGroup
    errs := make(chan error, numRequests)

    // Attempt to batch-write the requests to DynamoDB; because DynamoDB limits the number of concurrent
    // items in a batch request to 25, we'll chunk the requests into 25-report segments
    sections := make([][]*dynamodb.WriteRequest, numRequests)
    for i := 0; i < numRequests; i++ {

        // Get the end index which is 25 greater than the current index or the end of the array
        // if we're getting close
        end := (i + 1) * 25
        if end > length {
            end = length
        }

        // Add to the wait group so that we can ensure all the concurrent processes finish
        // before we close down the process
        wg.Add(1)

        // Write the chunk to DynamoDB concurrently
        go func(wg *sync.WaitGroup, index int, start int, end int) {
            defer wg.Done()

            // Call the DynamoDB operation; record any errors that occur
            if section, err := conn.batchWriteInner(tableName, requests[start:end]); err != nil {
                errs <- err
            } else {
                sections[index] = section
            }
        }(&wg, i, i*25, end)
    }

    // Wait for all the goroutines to finish
    wg.Wait()

    // Attempt to read an error from the channel; if we get one then return it
    // Otherwise, continue. We have to use the select here because this is
    // the only way to attempt to read from a channel without it blocking
    select {
    case err, ok := <-errs:
        if ok {
            return err
        }
    default:
        break
    }

    // Now, we've probably gotten retries back so take these and combine them into
    // a single list of requests
    retries := sections[0]
    if len(sections) > 1 {
        for _, section := range sections[1:] {
            retries = append(retries, section...)
        }
    }

    // Rewrite the requests and return the result
    return conn.BatchWrite(tableName, retries)
}

func (conn *Connection) batchWriteInner(tableName string, requests []*dynamodb.WriteRequest) ([]*dynamodb.WriteRequest, error) {

    // Create the request
    request := dynamodb.BatchWriteItemInput{
        ReturnConsumedCapacity:      aws.String(dynamodb.ReturnConsumedCapacityNone),
        ReturnItemCollectionMetrics: aws.String(dynamodb.ReturnItemCollectionMetricsNone),
        RequestItems: map[string][]*dynamodb.WriteRequest{
            tableName: requests,
        },
    }

    // Attempt to batch-write the items with an exponential backoff
    var result *dynamodb.BatchWriteItemOutput
    err := backoff.Retry(func() error {

        // Attempt the batch-write; if it fails then back-off and wait. Otherwise break out
        // of the loop and return
        var err error
        if result, err = conn.inner.BatchWriteItem(&request); err != nil {

            // If we have an error then what we do here will depend on the error code
            // If the error code is for exceeded throughput, exceeded request limit or
            // an internal server error then we'll try again. Otherwise, we'll break out
            // because the error isn't recoverable
            if aerr, ok := err.(awserr.Error); ok {
                switch aerr.Code() {
                case dynamodb.ErrCodeProvisionedThroughputExceededException:
                case dynamodb.ErrCodeRequestLimitExceeded:
                case dynamodb.ErrCodeInternalServerError:
                    return err
                }
            }

            // We received an error that won't be fixed by backing off; return this as a permanent
            // error so we can tell the backoff library that we want to break out of the exponential backoff
            return backoff.Permanent(err)
        }

        return nil
    }, backoff.NewExponentialBackOff())

    // If the batch-write failed then return an error
    if err != nil {
        return nil, err
    }

    // Roll the unprocessed items into a single list and return them
    var list []*dynamodb.WriteRequest
    for _, item := range result.UnprocessedItems {
        list = append(list, item...)
    }

    return list, nil
}

之前有其他人处理过这个问题吗?这里正确的做法是什么?

go amazon-dynamodb
1个回答
0
投票

我遇到了类似的问题,我发现这个问题已在此 AWS SDK GitHub 线程中得到解答:https://github.com/aws/aws-sdk-go/issues/5037

总结一下:

  • 错误直接源自网络层。
  • 它是不可重现的。您可以启用日志记录以确保重试生效,这应该足够好了
© www.soinside.com 2019 - 2024. All rights reserved.