使用goroutine插入Item无法正常工作

问题描述 投票:0回答:1

我尝试使用 goroutine 插入 Item(2000 个 goroutine)

所以运行下面的代码


package main

import (
    "fmt"
    "github.com/jmoiron/sqlx"
    _ "github.com/lib/pq"
    "log"
    "sync"
)

type Item struct {
    Id          int    `db:"id"`
    Title       string `db:"title"`
    Description string `db:"description"`
}

// ConnectPostgresDB -> connect postgres db
func ConnectPostgresDB() *sqlx.DB {
    connstring := "user=postgres dbname=postgres sslmode=disable password=postgres host=localhost port=8080"
    db, err := sqlx.Open("postgres", connstring)
    if err != nil {
        fmt.Println(err)
        return db
    }
    return db
}

func InsertItem(item Item, wg *sync.WaitGroup) {
    defer wg.Done()
    db := ConnectPostgresDB()
    defer db.Close()
    tx, err := db.Beginx()
    if err != nil {
        fmt.Println(err)
        return
    }

    _, err = tx.Queryx("INSERT INTO items(id, title, description) VALUES($1, $2, $3)", item.Id, item.Title, item.Description)
    if err != nil {
        fmt.Println(err)
    }

    err = tx.Commit()
    if err != nil {
        fmt.Println(err)
        return
    }

    fmt.Println("Data is Successfully inserted!!")
}





func main() {
    var wg sync.WaitGroup
    //db, err := sqlx.Connect("postgres", "user=postgres dbname=postgres sslmode=disable password=postgres host=localhost port=8080")
    for i := 1; i <= 2000; i++ {
        item := Item{Id: i, Title: "TestBook", Description: "TestDescription"}
        //go GetItem(db, i, &wg)
        wg.Add(1)
        go InsertItem(item, &wg)

    }
    wg.Wait()
    fmt.Println("All DB Connection is Completed")
}

运行上面的代码后,我认为 items 表中有第 2000 行

但是在items表中,只有150行存在

太多客户端数据库服务器出错,所以我增加了 max_connections 100 -> 4000

然后再试一次

但是结果是一样的

有人知道为什么会出现这些结果吗???

postgresql go goroutine
1个回答
0
投票

注意

以下答案的所有代码均可在 https://github.com/mwmahlberg/so-postgres 下找到。

问题是你似乎并没有真正注意到错误的原因。

稍微调整一下

main.go
,问题应该会变得相当明显:

package main

import (
    "flag"
    "fmt"
    "log"
    "os"
    "sync"
    "time"

    "github.com/jmoiron/sqlx"
    _ "github.com/lib/pq"
)

const schema = `
CREATE TABLE IF NOT EXISTS items (
        id integer primary key,
        title text,
        description text
);
`

type Item struct {
    Id          int    `db:"id"`
    Title       string `db:"title"`
    Description string `db:"description"`
}

var (
    // Make the waitgroup global: Easier to use and less error-prone
    wg sync.WaitGroup

    // Make the database URL a configurable flag
    dburl string
)

func init() {
    // Make the database URL a configurable flag
    flag.StringVar(&dburl, "dburl", "user=postgres dbname=postgres sslmode=disable password=postgres host=localhost port=5432", "Postgres DB URL")
}

// handlePanics is a simple function to log the error that caused a panic and exit the program
func handlePanics() {
    if r := recover(); r != nil {
        log.Println("encountered panic: ", r)
        os.Exit(1)
    }
}

// InsertItem inserts an item into the database.
// Note that the db is passed as an argument.
func InsertItem(item Item, db *sqlx.DB) {
    defer wg.Done()
    // With the beginning of the transaction, a connection is acquired from the pool
    tx, err := db.Beginx()
    if err != nil {
        panic(fmt.Errorf("beginning transaction: %s", err))
    }

    _, err = tx.Exec("INSERT INTO items(id, title, description) VALUES($1, $2, $3)", item.Id, item.Title, item.Description)
    if err != nil {
        // the rollback is rather superfluous here
        // but it's good practice to include it
        tx.Rollback()

        // panic will cause the goroutine to exit and the waitgroup to decrement
        // Also, the handlePanics function will catch the panic and log the error
        panic(fmt.Errorf("inserting data %#v: %s", item, err))
    }

    err = tx.Commit()
    if err != nil {
        panic(fmt.Errorf("committing transaction: %s", err))
    }

    log.Printf("Inserted item with id %d\n", item.Id)
}

func main() {

    // Recover from panics and log the error for the main goroutine
    defer handlePanics()

    flag.Parse()
    log.Printf("DB URL: %s\n", dburl)

    var (
        db  *sqlx.DB
        err error
    )

    // Only open one connection to the database.
    // The postgres driver will open a pool of connections for you.
    if db, err = sqlx.Connect("postgres", dburl); err != nil {
        log.Fatalln(err)
    }
    defer db.Close()

    // Create the items table
    // Note that if this panics, the handlePanics function will catch it and log the error
    db.MustExec(schema)

    // maxOpen := db.Stats().MaxOpenConnections
    // var mutex sync.Mutex
    start := time.Now()
    for i := 1; i <= 2000; i++ {

        wg.Add(1)

        go func(i int) {
            // For goroutines, you must explicitly set the panic handler
            defer handlePanics()
            InsertItem(Item{Id: i, Title: "TestBook", Description: "TestDescription"}, db)
        }(i)
    }
    wg.Wait()
    elapsed := time.Since(start)
    fmt.Printf("All DB Inserts completed after %s\n", elapsed)
}

应用程序确实在我的测试设置中记录了一个错误:

2024/02/25 16:41:27 encountered panic:  beginning transaction: pq: sorry, too many clients already

因此,我们需要为此添加一个控件:


// Set the number of connections in the pool
db.DB.SetMaxOpenConns(10)

// use the actual value
maxOpen := db.DB.Stats().MaxOpenConnections

var mutex sync.Mutex
for i := 1; i <= 2000; i++ {

    wg.Add(1)

    // For goroutines, you must explicitly set the panic handler
    go func(i int) {

        defer handlePanics()

        // use a label to ensure that the goroutine breaks out of inner loop
    waitForOpenConnection:
        for {
            // Lock the mutex to check the number of open connections.
            // We need to do this otherwise another goroutine could increment the number of open connections
            mutex.Lock()

            // Get the connections in the pool that are currently in use
            switch open := db.DB.Stats().InUse; {

            // If the number of open connections is less than the maximum, insert the item
            case open <= maxOpen:
                log.Println("Inserting item")
                InsertItem(Item{Id: i, Title: "TestBook", Description: "TestDescription"}, db)
                // Now that the item has been inserted, unlock the mutex and break out of the inner loop
                mutex.Unlock()
                break waitForOpenConnection
            default:
                // Allow other goroutines to read the number of open connections
                mutex.Unlock()
            }
        }
    }(i)
}

果然,结果正如所料:

All DB Inserts completed after 514.022334ms

对于如此(看似)简单的事情来说相当麻烦,对吧?

现在真正令人不安的部分来了:

并发不是并行。

Go谚语

如果我们看一下简化版本(完整代码位于恰当命名的“简化”分支):

package main

...

const (
    schema = `
CREATE TABLE IF NOT EXISTS items (
        id integer primary key,
        title text,
        description text
);
`
    insert = `
INSERT INTO items(id, title, description) VALUES($1, $2, $3)
`
)

...

// InsertItem inserts an item into the database.
// Note that the db is passed as an argument.
func InsertItem(item Item, db *sqlx.DB) {

    var (
        tx  *sqlx.Tx
        err error
    )

    // With the beginning of the transaction, a connection is acquired from the pool
    if tx, err = db.Beginx(); err != nil {
        panic(fmt.Errorf("beginning transaction: %s", err))
    }

    if _, err = tx.Exec(insert, item.Id, item.Title, item.Description); err != nil {
        // the rollback is rather superfluous here
        // but it's good practice to include it
        tx.Rollback()

        // panic will cause the goroutine to exit and the waitgroup to decrement
        // Also, the handlePanics function will catch the panic and log the error
        panic(fmt.Errorf("inserting data: %s", err))
    }

    if err = tx.Commit(); err != nil {
        panic(fmt.Errorf("committing transaction: %s", err))
    }

}

func main() {

    // Recover from panics and log the error for the main goroutine
    defer handlePanics()

    flag.Parse()
    log.Printf("DB URL: %s\n", dburl)

    var (
        db  *sqlx.DB
        err error
    )

    // Only open one connection to the database.
    // The postgres driver will open a pool of connections for you.
    if db, err = sqlx.Connect("postgres", dburl); err != nil {
        log.Fatalln(err)
    }
    defer db.Close()

    // Create the items table
    // Note that if this panics, the handlePanics function will catch it and log the error
    db.MustExec(schema)
    start := time.Now()

    // Set the number of connections in the pool
    db.DB.SetMaxOpenConns(10)

    for i := 1; i <= 2000; i++ {
        // use a label to ensure that the goroutine breaks out of inner loop
        InsertItem(Item{Id: i, Title: "TestBook", Description: "TestDescription"}, db)
    }
    log.Printf("All DB Inserts completed after %s\n", time.Since(start))
}

main
不仅更具可读性而且更简单 - 它具有相当的速度。实际上,在我完全不科学的测试中,所有运行平均比 goroutine 麻烦 100 毫秒

总结

长话短说:传统观点认为过早优化是万恶之源。并发不是并行。

除非必要,否则不要使用 goroutine,检查错误并对它们做出反应。

© www.soinside.com 2019 - 2024. All rights reserved.