使用goroutine丢失数据

问题描述 投票:0回答:1

我正在编写一个AWS Lambda代码来查询RDS表,将其转换为JSON,然后返回它。但是我没有看到JSON中的所有记录都超过SQL查询返回的记录。假设我要查询表中的1500条记录,但是每次JSON中只有1496至1500条记录(少了0-5条记录)。我怀疑我是否已将sync.WaitGroup弄乱了。

下面是SQL Server查询

SELECT TOP 1500 * FROM IMBookingApp.dbo.BA_Contact__c
WHERE ContactId > 0

下面是我的代码

// Convert the rows object to slice of objects for every row
func parseRow(rows *sql.Rows, totalColumns int) []string {

    receiver := make([]string, totalColumns)

    is := make([]interface{}, len(receiver))
    for i := range is {
        is[i] = &receiver[i]
    }

    err := rows.Scan(is...)

    if err != nil {
        fmt.Println("Error reading rows: " + err.Error())
    }

    TotalRecordsInParseRowfunction++
    return receiver
}

// Query the given table and return JSON response
func queryTable(conn *sql.DB, query string) (string, error) {

    // Query Table
    rows, err := conn.Query(query)
    if err != nil {
        fmt.Println("DATABASE ERROR:", err)
        return "", errors.New("DATABASE ERROR:" + err.Error())
    }

    println("Rows:", rows)
    defer rows.Close()

    // Get the column names
    columns, err := rows.Columns()
    // fmt.Println("columns", columns)
    if err != nil {
        fmt.Println("DATABASE ERROR:", err)
        return "", errors.New("DATABASE ERROR:" + err.Error())
    }

    totalColumns := len(columns)
    var resp []map[string]string // Declare the type of final response which will be used to create JSON
    var waitgroup sync.WaitGroup

    // Iterate over all the rows returned
    for rows.Next() {

        waitgroup.Add(1)
        TotalRecordsCount++
        row := parseRow(rows, totalColumns)

        go func() {
            // Create a map of the row
            respRow := map[string]string{} // Declare the type of each row of response
            for count := range row {
                respRow[columns[count]] = row[count]
            }
            // fmt.Println("\n\nrespRow", respRow)

            resp = append(resp, respRow)
            TotalRecordsAppendedCount++
            waitgroup.Done()
        }()
    }
    waitgroup.Wait()

    // If no rows are returned
    if len(resp) == 0 {
        fmt.Println("MESSAGE: No records are available")
        return "", errors.New("MESSAGE: No records are available")
    }

    // Create JSON
    respJSON, _ := json.Marshal(resp)
    fmt.Println("Response", string(respJSON))

    fmt.Println("\n--------------Summary---------------")
    fmt.Println("TotalRecordsInParseRowfunction", TotalRecordsInParseRowfunction)
    fmt.Println("TotalRecordsCount", TotalRecordsCount)
    fmt.Println("TotalRecordsAppendedCount", TotalRecordsAppendedCount)
    fmt.Println("Object Length", len(resp))

    return string(respJSON), nil // Return JSON

}

下面是我得到的输出摘要

--------------Summary---------------
TotalRecordsInParseRowfunction 1500
TotalRecordsCount 1500
TotalRecordsAppendedCount 1500
Object Length 1496
sql-server go aws-lambda goroutine
1个回答
1
投票

您的代码很活泼。多个goroutine正在写入resp而没有任何互斥,因此您会丢失数据。

您可以在其周围添加互斥锁解锁。但是,您在goroutine中拥有的代码不保证其自己的goroutine是因为它是简单的映射附加项。在goroutine中处理该代码会容易得多,并且可能会在没有goroutine调度开销的情况下更快地运行。除非您计划在该goroutine中添加更多逻辑,否则建议您删除它。

© www.soinside.com 2019 - 2024. All rights reserved.