我的职责是将一张 1 米 5 行的表从旧的 TimeScaleDB 迁移到新的。
我确实选择了数量为200,偏移量从0开始,然后将它们加载到RAM中并执行INSERT操作。 我设置了计算SELECT - INSERT循环的时间,200行1m(分钟)3s~1m7s
我对 GoLang 还算陌生,也许代码不是很多......
我尝试过调试,似乎对扫描的每一行执行插入=)))
这是 .env 文件
FROM_TIMESCALE_HOST=
FROM_TIMESCALE_PORT=
FROM_TIMESCALE_DATABASE=
FROM_TIMESCALE_USERNAME=
FROM_TIMESCALE_PASSWORD=
MIGRATED_TABLE=table
TO_TIMESCALE_HOST=
TO_TIMESCALE_PORT=
TO_TIMESCALE_DATABASE=
TO_TIMESCALE_USERNAME=
TO_TIMESCALE_PASSWORD=
LIMITED_QUANTITY_ROW=
这是我的 go-migration.go 文件:
package main
import (
"database/sql"
"fmt"
"log"
"os"
"strconv"
"time"
"github.com/joho/godotenv"
_ "github.com/lib/pq"
)
type statistics struct {
TimestampTZ time.Time `db:"time" json:"time"`
ContentID int64 `db:"content_id" json:"content_id"`
LikeCount int64 `db:"like_count" json:"like_count"`
CommentCount int64 `db:"comment_count" json:"comment_count"`
ShareCount int64 `db:"share_count" json:"share_count"`
ViewCount int64 `db:"view_count" json:"view_count"`
SaveCount int64 `db:"save_count" json:"save_count"`
}
// type statistics struct {
// TimestampTZ string
// ContentID int64
// LikeCount sql.NullInt64
// CommentCount sql.NullInt64
// ShareCount sql.NullInt64
// ViewCount sql.NullInt64
// SaveCount sql.NullInt64
// }
func main() {
// Load environment variables from .env file
From_DB, To_DB, migrated_table, limited_quantity_row, err := loadEnv()
if err != nil {
log.Fatal(err)
}
connectionFrom_DB, err := openConnection(From_DB)
if err != nil {
log.Fatal(err)
}
connectionTo_DB, err := openConnection(To_DB)
if err != nil {
log.Fatal(err)
}
startTime := time.Now()
doMigration(connectionFrom_DB, connectionTo_DB, To_DB["TIMESCALE_HOST"], migrated_table, limited_quantity_row, 0)
endTime := time.Now()
fmt.Println(endTime.Sub(startTime))
}
func loadEnv() (map[string]string, map[string]string, string, int, error) {
err := godotenv.Load()
if err != nil {
return nil, nil, "", 0, fmt.Errorf("error loading .env file: %s", err)
}
fmt.Printf("Loaded .env file!\n\n")
migratedTable, ok := os.LookupEnv("MIGRATED_TABLE")
if !ok {
return nil, nil, "", 0, fmt.Errorf("missing migrated table value in .env")
}
limitedQuantityQueriedRowstr, ok := os.LookupEnv("LIMITED_QUANTITY_ROW")
if !ok {
return nil, nil, "", 0, fmt.Errorf("missing limited quantity row value in .env")
}
limitedQuantityRow, err := strconv.Atoi(limitedQuantityQueriedRowstr)
if err != nil {
return nil, nil, "", 0, fmt.Errorf("error converting limited quantity row to integer: %s", err)
}
fromDB, err := getDBInformation("FROM")
if err != nil {
return nil, nil, "", 0, fmt.Errorf("error loading From DB information: %s", err)
}
toDB, err := getDBInformation("TO")
if err != nil {
return nil, nil, "", 0, fmt.Errorf("error loading To DB information: %s", err)
}
return fromDB, toDB, migratedTable, limitedQuantityRow, nil
}
func getDBInformation(input string) (map[string]string, error) {
env := make(map[string]string)
if TimescaleHost, ok := os.LookupEnv(input + "_TIMESCALE_HOST"); !ok {
return nil, fmt.Errorf("missing host ")
} else {
env["TIMESCALE_HOST"] = TimescaleHost
}
if TimescalePort, ok := os.LookupEnv(input + "_TIMESCALE_PORT"); !ok {
return nil, fmt.Errorf("missing port ")
} else {
env["TIMESCALE_PORT"] = TimescalePort
}
if TimescaleDatabase, ok := os.LookupEnv(input + "_TIMESCALE_DATABASE"); !ok {
return nil, fmt.Errorf("missing database's name ")
} else {
env["TIMESCALE_DATABASE"] = TimescaleDatabase
}
if TimescaleUsername, ok := os.LookupEnv(input + "_TIMESCALE_USERNAME"); !ok {
return nil, fmt.Errorf("missing username ")
} else {
env["TIMESCALE_USERNAME"] = TimescaleUsername
}
if TimescalePassword, ok := os.LookupEnv(input + "_TIMESCALE_PASSWORD"); !ok {
return nil, fmt.Errorf("missing password ")
} else {
env["TIMESCALE_PASSWORD"] = TimescalePassword
}
// fmt.Println(env)
fmt.Println(input + " TIMESCALE Information loaded! \n")
return env, nil
}
func openConnection(env map[string]string) (*sql.DB, error) {
// Construct the database connection string
// connStr := fmt.Sprintf("host=%s port=%s user=%s password=%s dbname=%s sslmode=require", env["TIMESCALE_HOST"], env["TIMESCALE_PORT"], env["TIMESCALE_USERNAME"], env["TIMESCALE_PASSWORD"], env["TIMESCALE_DATABASE"])
connStr := fmt.Sprintf("postgres://%s:%s@%s:%s/%s?sslmode=disable", env["TIMESCALE_USERNAME"], env["TIMESCALE_PASSWORD"], env["TIMESCALE_HOST"], env["TIMESCALE_PORT"], env["TIMESCALE_DATABASE"])
// fmt.Println(connStr)
// Open the database connection
db, err := sql.Open("postgres", connStr)
if err != nil {
db.Close()
fmt.Printf("Can't connect to the database at %s.\n", env["TIMESCALE_HOST"])
return nil, err
}
// // Check the database connection
// err = db.Ping()
// if err != nil {
// db.Close()
// fmt.Printf("Can't ping to the database at %s\n", env["TIMESCALE_HOST"])
// return nil, err
// }
fmt.Printf("Connected to the %s database !\n\n", env["TIMESCALE_HOST"])
return db, nil
}
func doMigration(connectionFrom_DB *sql.DB, connectionTo_DB *sql.DB, host string, migrated_table string, limited_quantity_row int, start_offset int64) {
fmt.Printf("Starting query data from %s\n", migrated_table)
//Prepare the insert statement
insertStatement := fmt.Sprintf("INSERT INTO %s (time, content_id, like_count, comment_count, share_count, view_count, save_count) VALUES ($1,$2,$3,$4,$5,$6,$7)", migrated_table)
// insertStatement :=fmt.Sprintf("INSERT INTO %s (time, content_id, like_count, comment_count, share_count, view_count, save_count) VALUES (?,?,?,?,?,?,?))", migrated_table)
// fmt.Println(insertStatement)
// Query with limited_quantity_row start at start_offset and ascending start_offset after each loop
for {
if start_offset >= 59 {
// Just stop to check
break
}
//===================================================================
// do the SELECT queey to the db
statisticsList, error := queryBD1(connectionFrom_DB, migrated_table, limited_quantity_row, start_offset)
if error != nil {
log.Fatal(error)
}
//===================================================================
if len(statisticsList) == 0 {
// No more rows, break the loop
break
}
//===================================================================
// Insert the data into the destination DB
queryDB2(connectionTo_DB, insertStatement, statisticsList)
//===================================================================
// Increment the start_offset for the next iteration
start_offset += int64(limited_quantity_row)
}
}
func printLogMigration(stat statistics, index int64) {
fmt.Printf("%-10d", index)
fmt.Printf("%20s|%10d|%10d|%10d|%10d|%10d|%10d\n", stat.TimestampTZ.String(), stat.ContentID, stat.LikeCount, stat.CommentCount, stat.ShareCount, stat.ViewCount, stat.SaveCount)
}
func queryBD1(connection *sql.DB, migrated_table string, limited_quantity_row int, start_offset int64) ([]statistics, error) {
startTime := time.Now()
rows, queryError := connection.Query(fmt.Sprintf("SELECT * FROM %s LIMIT %d OFFSET %d", migrated_table, limited_quantity_row, start_offset))
if queryError != nil {
log.Fatal(queryError)
}
// Prepare the statement for inserting into the destination DB
index := start_offset + 1
var statsList []statistics
for rows.Next() {
var stats statistics
err := rows.Scan(
&stats.TimestampTZ,
&stats.ContentID,
&stats.LikeCount,
&stats.CommentCount,
&stats.ShareCount,
&stats.ViewCount,
&stats.SaveCount,
)
if err != nil {
log.Fatal(err)
}
printLogMigration(stats, index)
index++
// stmt.ExecContext()
statsList = append(statsList, stats)
}
endTime := time.Now()
fmt.Printf("time to query %f \n", endTime.Sub(startTime).Seconds())
return statsList, nil
}
func queryDB2(connection *sql.DB, insertStatement string, statisticsList []statistics) {
if err != nil {
log.Fatal(err)
}
defer stmt.Close()
for _, stats := range statisticsList {
_, execErr := stmt.Exec(
stats.TimestampTZ,
stats.ContentID,
stats.LikeCount,
stats.CommentCount,
stats.ShareCount,
stats.ViewCount,
stats.SaveCount,
)
if execErr != nil {
log.Fatal(execErr)
}
}
}
如果您跳过限制和偏移量并使用时间戳列来控制您在迁移中的位置,它会加快一点。
http://use-the-index-luke.com/no-offset
另外,使用批量操作会使其速度更快。
如果您有简单的迁移,还可以使用 timescaledb-parallel-copy。