我正在编写一个AWS Lambda代码来查询RDS表,将其转换为JSON,然后返回它。但是我没有看到JSON中的所有记录都超过SQL查询返回的记录。假设我要查询表中的1500条记录,但是每次JSON中只有1496至1500条记录(少了0-5条记录)。我怀疑我是否已将sync.WaitGroup
弄乱了。
下面是SQL Server查询
SELECT TOP 1500 * FROM IMBookingApp.dbo.BA_Contact__c
WHERE ContactId > 0
下面是我的代码
// Convert the rows object to slice of objects for every row
func parseRow(rows *sql.Rows, totalColumns int) []string {
receiver := make([]string, totalColumns)
is := make([]interface{}, len(receiver))
for i := range is {
is[i] = &receiver[i]
}
err := rows.Scan(is...)
if err != nil {
fmt.Println("Error reading rows: " + err.Error())
}
TotalRecordsInParseRowfunction++
return receiver
}
// Query the given table and return JSON response
func queryTable(conn *sql.DB, query string) (string, error) {
// Query Table
rows, err := conn.Query(query)
if err != nil {
fmt.Println("DATABASE ERROR:", err)
return "", errors.New("DATABASE ERROR:" + err.Error())
}
println("Rows:", rows)
defer rows.Close()
// Get the column names
columns, err := rows.Columns()
// fmt.Println("columns", columns)
if err != nil {
fmt.Println("DATABASE ERROR:", err)
return "", errors.New("DATABASE ERROR:" + err.Error())
}
totalColumns := len(columns)
var resp []map[string]string // Declare the type of final response which will be used to create JSON
var waitgroup sync.WaitGroup
// Iterate over all the rows returned
for rows.Next() {
waitgroup.Add(1)
TotalRecordsCount++
row := parseRow(rows, totalColumns)
go func() {
// Create a map of the row
respRow := map[string]string{} // Declare the type of each row of response
for count := range row {
respRow[columns[count]] = row[count]
}
// fmt.Println("\n\nrespRow", respRow)
resp = append(resp, respRow)
TotalRecordsAppendedCount++
waitgroup.Done()
}()
}
waitgroup.Wait()
// If no rows are returned
if len(resp) == 0 {
fmt.Println("MESSAGE: No records are available")
return "", errors.New("MESSAGE: No records are available")
}
// Create JSON
respJSON, _ := json.Marshal(resp)
fmt.Println("Response", string(respJSON))
fmt.Println("\n--------------Summary---------------")
fmt.Println("TotalRecordsInParseRowfunction", TotalRecordsInParseRowfunction)
fmt.Println("TotalRecordsCount", TotalRecordsCount)
fmt.Println("TotalRecordsAppendedCount", TotalRecordsAppendedCount)
fmt.Println("Object Length", len(resp))
return string(respJSON), nil // Return JSON
}
下面是我得到的输出摘要
--------------Summary---------------
TotalRecordsInParseRowfunction 1500
TotalRecordsCount 1500
TotalRecordsAppendedCount 1500
Object Length 1496
您的代码很活泼。多个goroutine正在写入resp
而没有任何互斥,因此您会丢失数据。
您可以在其周围添加互斥锁解锁。但是,您在goroutine中拥有的代码不保证其自己的goroutine是因为它是简单的映射附加项。在goroutine中处理该代码会容易得多,并且可能会在没有goroutine调度开销的情况下更快地运行。除非您计划在该goroutine中添加更多逻辑,否则建议您删除它。
这里提供了有关可能发生的情况的更多信息:首先,在go的当前版本中,仅当goroutine调用某些库函数时,goroutine才会屈服于其他人。查看代码,您的goroutine不太可能会屈服。由于您已经观察到数据丢失(这意味着存在竞争状况),因此您可能有多个核心。
比赛在这里:
resp = append(resp, respRow)
[在没有互斥的情况下,一个例程可能会查看resp
,发现它可以写入其第n
个元素。另一个goroutine(在单独的内核上运行)可以执行相同的操作,并在其中成功写入。但是第一个goroutine仍然认为该元素为空,因此将其覆盖并更新resp
。发生这种情况时,您将丢失一个元素。
如果在此代码中添加互斥,则实际上将强制所有goroutine按顺序运行,因为它们实际上并没有做其他任何事情。另外,由于goroutine执行顺序是随机的,因此您最终会得到随机排列的resp
。简而言之,这是您应按顺序执行代码的实例之一。