使用 GoLang 迁移 TimeScaleDB 需要太长时间?

问题描述 投票:0回答:1

我的职责是将一张 1 米 5 行的表从旧的 TimeScaleDB 迁移到新的。

我确实选择了数量为200,偏移量从0开始,然后将它们加载到RAM中并执行INSERT操作。 我设置了计算SELECT - INSERT循环的时间,200行1m(分钟)3s~1m7s

我对 GoLang 还算陌生,也许代码不是很多......

我尝试过调试,似乎对扫描的每一行执行插入=)))

这是 .env 文件

FROM_TIMESCALE_HOST=
FROM_TIMESCALE_PORT=
FROM_TIMESCALE_DATABASE=
FROM_TIMESCALE_USERNAME=
FROM_TIMESCALE_PASSWORD=
MIGRATED_TABLE=table
TO_TIMESCALE_HOST=
TO_TIMESCALE_PORT=
TO_TIMESCALE_DATABASE=
TO_TIMESCALE_USERNAME=
TO_TIMESCALE_PASSWORD=
LIMITED_QUANTITY_ROW=

这是我的 go-migration.go 文件:

package main

import (
    "database/sql"
    "fmt"
    "log"
    "os"
    "strconv"
    "time"

    "github.com/joho/godotenv"
    _ "github.com/lib/pq"
)

type statistics struct {
    TimestampTZ  time.Time `db:"time" json:"time"`
    ContentID    int64     `db:"content_id" json:"content_id"`
    LikeCount    int64     `db:"like_count" json:"like_count"`
    CommentCount int64     `db:"comment_count" json:"comment_count"`
    ShareCount   int64     `db:"share_count" json:"share_count"`
    ViewCount    int64     `db:"view_count" json:"view_count"`
    SaveCount    int64     `db:"save_count" json:"save_count"`
}

// type statistics struct {
//  TimestampTZ  string
//  ContentID    int64
//  LikeCount    sql.NullInt64
//  CommentCount sql.NullInt64
//  ShareCount   sql.NullInt64
//  ViewCount    sql.NullInt64
//  SaveCount    sql.NullInt64
// }

func main() {
    // Load environment variables from .env file
    From_DB, To_DB, migrated_table, limited_quantity_row, err := loadEnv()
    if err != nil {
        log.Fatal(err)
    }
    connectionFrom_DB, err := openConnection(From_DB)
    if err != nil {
        log.Fatal(err)
    }
    connectionTo_DB, err := openConnection(To_DB)
    if err != nil {
        log.Fatal(err)
    }

    startTime := time.Now()

    doMigration(connectionFrom_DB, connectionTo_DB, To_DB["TIMESCALE_HOST"], migrated_table, limited_quantity_row, 0)

    endTime := time.Now()

    fmt.Println(endTime.Sub(startTime))
}

func loadEnv() (map[string]string, map[string]string, string, int, error) {
    err := godotenv.Load()
    if err != nil {
        return nil, nil, "", 0, fmt.Errorf("error loading .env file: %s", err)
    }
    fmt.Printf("Loaded .env file!\n\n")

    migratedTable, ok := os.LookupEnv("MIGRATED_TABLE")
    if !ok {
        return nil, nil, "", 0, fmt.Errorf("missing migrated table value in .env")
    }

    limitedQuantityQueriedRowstr, ok := os.LookupEnv("LIMITED_QUANTITY_ROW")
    if !ok {
        return nil, nil, "", 0, fmt.Errorf("missing limited quantity row value in .env")
    }
    limitedQuantityRow, err := strconv.Atoi(limitedQuantityQueriedRowstr)
    if err != nil {
        return nil, nil, "", 0, fmt.Errorf("error converting limited quantity row to integer: %s", err)
    }

    fromDB, err := getDBInformation("FROM")
    if err != nil {
        return nil, nil, "", 0, fmt.Errorf("error loading From DB information: %s", err)
    }

    toDB, err := getDBInformation("TO")
    if err != nil {
        return nil, nil, "", 0, fmt.Errorf("error loading To DB information: %s", err)
    }

    return fromDB, toDB, migratedTable, limitedQuantityRow, nil
}
func getDBInformation(input string) (map[string]string, error) {
    env := make(map[string]string)

    if TimescaleHost, ok := os.LookupEnv(input + "_TIMESCALE_HOST"); !ok {
        return nil, fmt.Errorf("missing host ")
    } else {
        env["TIMESCALE_HOST"] = TimescaleHost
    }

    if TimescalePort, ok := os.LookupEnv(input + "_TIMESCALE_PORT"); !ok {
        return nil, fmt.Errorf("missing port ")
    } else {
        env["TIMESCALE_PORT"] = TimescalePort
    }

    if TimescaleDatabase, ok := os.LookupEnv(input + "_TIMESCALE_DATABASE"); !ok {
        return nil, fmt.Errorf("missing database's name ")
    } else {
        env["TIMESCALE_DATABASE"] = TimescaleDatabase
    }

    if TimescaleUsername, ok := os.LookupEnv(input + "_TIMESCALE_USERNAME"); !ok {
        return nil, fmt.Errorf("missing username ")
    } else {
        env["TIMESCALE_USERNAME"] = TimescaleUsername
    }

    if TimescalePassword, ok := os.LookupEnv(input + "_TIMESCALE_PASSWORD"); !ok {
        return nil, fmt.Errorf("missing password ")
    } else {
        env["TIMESCALE_PASSWORD"] = TimescalePassword
    }
    // fmt.Println(env)
    fmt.Println(input + " TIMESCALE Information loaded! \n")
    return env, nil

}

func openConnection(env map[string]string) (*sql.DB, error) {
    // Construct the database connection string
    // connStr := fmt.Sprintf("host=%s port=%s user=%s password=%s dbname=%s sslmode=require", env["TIMESCALE_HOST"], env["TIMESCALE_PORT"], env["TIMESCALE_USERNAME"], env["TIMESCALE_PASSWORD"], env["TIMESCALE_DATABASE"])
    connStr := fmt.Sprintf("postgres://%s:%s@%s:%s/%s?sslmode=disable", env["TIMESCALE_USERNAME"], env["TIMESCALE_PASSWORD"], env["TIMESCALE_HOST"], env["TIMESCALE_PORT"], env["TIMESCALE_DATABASE"])
    // fmt.Println(connStr)
    // Open the database connection
    db, err := sql.Open("postgres", connStr)
    if err != nil {
        db.Close()
        fmt.Printf("Can't connect to the database at %s.\n", env["TIMESCALE_HOST"])
        return nil, err
    }

    // // Check the database connection
    // err = db.Ping()
    // if err != nil {
    //  db.Close()
    //  fmt.Printf("Can't ping to the database at %s\n", env["TIMESCALE_HOST"])
    //  return nil, err
    // }
    fmt.Printf("Connected to the %s database !\n\n", env["TIMESCALE_HOST"])
    return db, nil
}

func doMigration(connectionFrom_DB *sql.DB, connectionTo_DB *sql.DB, host string, migrated_table string, limited_quantity_row int, start_offset int64) {
    fmt.Printf("Starting query data from %s\n", migrated_table)
    //Prepare the insert statement
    insertStatement := fmt.Sprintf("INSERT INTO %s (time, content_id, like_count, comment_count, share_count, view_count, save_count) VALUES ($1,$2,$3,$4,$5,$6,$7)", migrated_table)
    // insertStatement :=fmt.Sprintf("INSERT INTO %s (time, content_id, like_count, comment_count, share_count, view_count, save_count) VALUES (?,?,?,?,?,?,?))", migrated_table)
    // fmt.Println(insertStatement)

    // Query with limited_quantity_row start at start_offset and ascending start_offset after each loop
    for {
        if start_offset >= 59 {
            // Just stop to check
            break
        }
        //===================================================================
        // do the SELECT queey to the db
        statisticsList, error := queryBD1(connectionFrom_DB, migrated_table, limited_quantity_row, start_offset)
        if error != nil {
            log.Fatal(error)
        }
        //===================================================================
        if len(statisticsList) == 0 {
            // No more rows, break the loop
            break
        }
        //===================================================================
        // Insert the data into the destination DB
        queryDB2(connectionTo_DB, insertStatement, statisticsList)
        //===================================================================
        // Increment the start_offset for the next iteration
        start_offset += int64(limited_quantity_row)
    }
}

func printLogMigration(stat statistics, index int64) {
    fmt.Printf("%-10d", index)
    fmt.Printf("%20s|%10d|%10d|%10d|%10d|%10d|%10d\n", stat.TimestampTZ.String(), stat.ContentID, stat.LikeCount, stat.CommentCount, stat.ShareCount, stat.ViewCount, stat.SaveCount)
}
func queryBD1(connection *sql.DB, migrated_table string, limited_quantity_row int, start_offset int64) ([]statistics, error) {
    startTime := time.Now()
    rows, queryError := connection.Query(fmt.Sprintf("SELECT * FROM %s LIMIT %d OFFSET %d", migrated_table, limited_quantity_row, start_offset))
    if queryError != nil {
        log.Fatal(queryError)
    }

    // Prepare the statement for inserting into the destination DB
    index := start_offset + 1
    var statsList []statistics
    for rows.Next() {
        var stats statistics
        err := rows.Scan(
            &stats.TimestampTZ,
            &stats.ContentID,
            &stats.LikeCount,
            &stats.CommentCount,
            &stats.ShareCount,
            &stats.ViewCount,
            &stats.SaveCount,
        )
        if err != nil {
            log.Fatal(err)
        }

        printLogMigration(stats, index)
        index++
        // stmt.ExecContext()
        statsList = append(statsList, stats)
    }
    endTime := time.Now()
    fmt.Printf("time to query %f \n", endTime.Sub(startTime).Seconds())
    return statsList, nil
}
func queryDB2(connection *sql.DB, insertStatement string, statisticsList []statistics) {
     
    if err != nil {
        log.Fatal(err)
    }
    defer stmt.Close()
    for _, stats := range statisticsList {
        _, execErr := stmt.Exec(
            stats.TimestampTZ,
            stats.ContentID,
            stats.LikeCount,
            stats.CommentCount,
            stats.ShareCount,
            stats.ViewCount,
            stats.SaveCount,
        )
        if execErr != nil {
            log.Fatal(execErr)
        }
    }
}

postgresql go database-migration timescaledb
1个回答
0
投票

如果您跳过限制和偏移量并使用时间戳列来控制您在迁移中的位置,它会加快一点。

http://use-the-index-luke.com/no-offset

另外,使用批量操作会使其速度更快。

如果您有简单的迁移,还可以使用 timescaledb-parallel-copy

© www.soinside.com 2019 - 2024. All rights reserved.