我尝试使用 goroutine 插入 Item(2000 个 goroutine)
所以运行下面的代码
package main
import (
"fmt"
"github.com/jmoiron/sqlx"
_ "github.com/lib/pq"
"log"
"sync"
)
type Item struct {
Id int `db:"id"`
Title string `db:"title"`
Description string `db:"description"`
}
// ConnectPostgresDB -> connect postgres db
func ConnectPostgresDB() *sqlx.DB {
connstring := "user=postgres dbname=postgres sslmode=disable password=postgres host=localhost port=8080"
db, err := sqlx.Open("postgres", connstring)
if err != nil {
fmt.Println(err)
return db
}
return db
}
func InsertItem(item Item, wg *sync.WaitGroup) {
defer wg.Done()
db := ConnectPostgresDB()
defer db.Close()
tx, err := db.Beginx()
if err != nil {
fmt.Println(err)
return
}
_, err = tx.Queryx("INSERT INTO items(id, title, description) VALUES($1, $2, $3)", item.Id, item.Title, item.Description)
if err != nil {
fmt.Println(err)
}
err = tx.Commit()
if err != nil {
fmt.Println(err)
return
}
fmt.Println("Data is Successfully inserted!!")
}
func main() {
var wg sync.WaitGroup
//db, err := sqlx.Connect("postgres", "user=postgres dbname=postgres sslmode=disable password=postgres host=localhost port=8080")
for i := 1; i <= 2000; i++ {
item := Item{Id: i, Title: "TestBook", Description: "TestDescription"}
//go GetItem(db, i, &wg)
wg.Add(1)
go InsertItem(item, &wg)
}
wg.Wait()
fmt.Println("All DB Connection is Completed")
}
运行上面的代码后,我认为 items 表中有第 2000 行
但是在items表中,只有150行存在
太多客户端数据库服务器出错,所以我增加了 max_connections 100 -> 4000
然后再试一次
但是结果是一样的
有人知道为什么会出现这些结果吗???
注意
以下答案的所有代码均可在 https://github.com/mwmahlberg/so-postgres 下找到。
问题是你似乎并没有真正注意到错误的原因。
稍微调整一下
main.go
,问题应该会变得相当明显:
package main
import (
"flag"
"fmt"
"log"
"os"
"sync"
"time"
"github.com/jmoiron/sqlx"
_ "github.com/lib/pq"
)
const schema = `
CREATE TABLE IF NOT EXISTS items (
id integer primary key,
title text,
description text
);
`
type Item struct {
Id int `db:"id"`
Title string `db:"title"`
Description string `db:"description"`
}
var (
// Make the waitgroup global: Easier to use and less error-prone
wg sync.WaitGroup
// Make the database URL a configurable flag
dburl string
)
func init() {
// Make the database URL a configurable flag
flag.StringVar(&dburl, "dburl", "user=postgres dbname=postgres sslmode=disable password=postgres host=localhost port=5432", "Postgres DB URL")
}
// handlePanics is a simple function to log the error that caused a panic and exit the program
func handlePanics() {
if r := recover(); r != nil {
log.Println("encountered panic: ", r)
os.Exit(1)
}
}
// InsertItem inserts an item into the database.
// Note that the db is passed as an argument.
func InsertItem(item Item, db *sqlx.DB) {
defer wg.Done()
// With the beginning of the transaction, a connection is acquired from the pool
tx, err := db.Beginx()
if err != nil {
panic(fmt.Errorf("beginning transaction: %s", err))
}
_, err = tx.Exec("INSERT INTO items(id, title, description) VALUES($1, $2, $3)", item.Id, item.Title, item.Description)
if err != nil {
// the rollback is rather superfluous here
// but it's good practice to include it
tx.Rollback()
// panic will cause the goroutine to exit and the waitgroup to decrement
// Also, the handlePanics function will catch the panic and log the error
panic(fmt.Errorf("inserting data %#v: %s", item, err))
}
err = tx.Commit()
if err != nil {
panic(fmt.Errorf("committing transaction: %s", err))
}
log.Printf("Inserted item with id %d\n", item.Id)
}
func main() {
// Recover from panics and log the error for the main goroutine
defer handlePanics()
flag.Parse()
log.Printf("DB URL: %s\n", dburl)
var (
db *sqlx.DB
err error
)
// Only open one connection to the database.
// The postgres driver will open a pool of connections for you.
if db, err = sqlx.Connect("postgres", dburl); err != nil {
log.Fatalln(err)
}
defer db.Close()
// Create the items table
// Note that if this panics, the handlePanics function will catch it and log the error
db.MustExec(schema)
// maxOpen := db.Stats().MaxOpenConnections
// var mutex sync.Mutex
start := time.Now()
for i := 1; i <= 2000; i++ {
wg.Add(1)
go func(i int) {
// For goroutines, you must explicitly set the panic handler
defer handlePanics()
InsertItem(Item{Id: i, Title: "TestBook", Description: "TestDescription"}, db)
}(i)
}
wg.Wait()
elapsed := time.Since(start)
fmt.Printf("All DB Inserts completed after %s\n", elapsed)
}
应用程序确实在我的测试设置中记录了一个错误:
2024/02/25 16:41:27 encountered panic: beginning transaction: pq: sorry, too many clients already
因此,我们需要为此添加一个控件:
// Set the number of connections in the pool
db.DB.SetMaxOpenConns(10)
// use the actual value
maxOpen := db.DB.Stats().MaxOpenConnections
var mutex sync.Mutex
for i := 1; i <= 2000; i++ {
wg.Add(1)
// For goroutines, you must explicitly set the panic handler
go func(i int) {
defer handlePanics()
// use a label to ensure that the goroutine breaks out of inner loop
waitForOpenConnection:
for {
// Lock the mutex to check the number of open connections.
// We need to do this otherwise another goroutine could increment the number of open connections
mutex.Lock()
// Get the connections in the pool that are currently in use
switch open := db.DB.Stats().InUse; {
// If the number of open connections is less than the maximum, insert the item
case open <= maxOpen:
log.Println("Inserting item")
InsertItem(Item{Id: i, Title: "TestBook", Description: "TestDescription"}, db)
// Now that the item has been inserted, unlock the mutex and break out of the inner loop
mutex.Unlock()
break waitForOpenConnection
default:
// Allow other goroutines to read the number of open connections
mutex.Unlock()
}
}
}(i)
}
果然,结果正如所料:
All DB Inserts completed after 514.022334ms
对于如此(看似)简单的事情来说相当麻烦,对吧?
现在真正令人不安的部分来了:
“并发不是并行。”
如果我们看一下简化版本(完整代码位于恰当命名的“简化”分支):
package main
...
const (
schema = `
CREATE TABLE IF NOT EXISTS items (
id integer primary key,
title text,
description text
);
`
insert = `
INSERT INTO items(id, title, description) VALUES($1, $2, $3)
`
)
...
// InsertItem inserts an item into the database.
// Note that the db is passed as an argument.
func InsertItem(item Item, db *sqlx.DB) {
var (
tx *sqlx.Tx
err error
)
// With the beginning of the transaction, a connection is acquired from the pool
if tx, err = db.Beginx(); err != nil {
panic(fmt.Errorf("beginning transaction: %s", err))
}
if _, err = tx.Exec(insert, item.Id, item.Title, item.Description); err != nil {
// the rollback is rather superfluous here
// but it's good practice to include it
tx.Rollback()
// panic will cause the goroutine to exit and the waitgroup to decrement
// Also, the handlePanics function will catch the panic and log the error
panic(fmt.Errorf("inserting data: %s", err))
}
if err = tx.Commit(); err != nil {
panic(fmt.Errorf("committing transaction: %s", err))
}
}
func main() {
// Recover from panics and log the error for the main goroutine
defer handlePanics()
flag.Parse()
log.Printf("DB URL: %s\n", dburl)
var (
db *sqlx.DB
err error
)
// Only open one connection to the database.
// The postgres driver will open a pool of connections for you.
if db, err = sqlx.Connect("postgres", dburl); err != nil {
log.Fatalln(err)
}
defer db.Close()
// Create the items table
// Note that if this panics, the handlePanics function will catch it and log the error
db.MustExec(schema)
start := time.Now()
// Set the number of connections in the pool
db.DB.SetMaxOpenConns(10)
for i := 1; i <= 2000; i++ {
// use a label to ensure that the goroutine breaks out of inner loop
InsertItem(Item{Id: i, Title: "TestBook", Description: "TestDescription"}, db)
}
log.Printf("All DB Inserts completed after %s\n", time.Since(start))
}
main
不仅更具可读性而且更简单 - 它具有相当的速度。实际上,在我完全不科学的测试中,所有运行平均比 goroutine 麻烦 100 毫秒快。
长话短说:传统观点认为过早优化是万恶之源。并发不是并行。
除非必要,否则不要使用 goroutine,检查错误并对它们做出反应。