我是 golang 和 Spanner 的新手,我想每 5 分钟将 Spanner DB 的快照保存到 Google 云存储。我想使用的格式是 Parquet 或 JSON。
stmt = spanner.NewStatement("SELECT * FROM " + tableName + " WHERE UpdatedAt >= @startDateTime AND UpdatedAt <= @endDateTime")
iter := txn.Query(ctx, stmt)
defer iter.Stop()
for {
row, err := iter.Next()
if err == iterator.Done {
break
}
if err != nil {
log.Println("Failed to read data, err = %s", err)
}
}
我已经获得了所有行,但我不知道如何提取所有列值并将其写入 Parquet 或 JSON 文件或将其上传到 GCS。是否可以在不知道值的类型或列名称的情况下提取所有列值?任何帮助将不胜感激。
检索值需要列类型。请参阅 Row 文档中的“支持的类型及其相应的 Cloud Spanner 列类型”。您可以从Row.ColumnNames获取列名称。将 Row.ToStruct 与与表相对应的结构一起使用,并将其写入 json 可能是有意义的,例如使用“encoding/json”包的 Marshal。
我想分享我繁琐的解决方案,希望它能帮助将来的人。就我而言,我的任务是在短时间内保存 Spanner DB 的快照,并将这些数据以 parquet 格式保存到 GCS。这样以后我们就可以使用大查询来查询这些数据。
首先,我通过这样的简单语句获得了我想要的扳手行:
stmt := spanner.NewStatement(fmt.Sprintf("SELECT * FROM %s WHERE UpdatedAt >= @startDateTime AND UpdatedAt <= @endDateTime", tableName))
stmt.Params["startDateTime"] = time.Unix(1520330400, 0)
stmt.Params["endDateTime"] = time.Unix(1520376600, 0)
iter := txn.Query(ctx, stmt)
values := readRows(iter)
func readRows(iter *spanner.RowIterator) []spanner.Row {
var rows []spanner.Row
defer iter.Stop()
for {
row, err := iter.Next()
if err == iterator.Done {
break
}
if err != nil {
log.Println("Failed to read data, err = %s", err)
}
rows = append(rows, *row)
}
return rows
}
是的,这很容易。但这是令人鼓舞的,因为这是我第一次用 golang 编码。然而,我花了一段时间才发现,在不知道每列类型的情况下不可能解码该值,但我所需要的只是每列的字符串值并将其保存为 parquet 格式。
所以我编写了另一个查询来获取每列的扳手类型,如下所示:
typeStmt = spanner.NewStatement("SELECT t.column_name, t.spanner_type FROM information_schema.columns AS t WHERE t.table_name = @tableName")
typeStmt.Params["tableName"] = tableName
iterTypes := txn.Query(ctx, typeStmt)
types := readRows(iterTypes)
// use a map to keep all the types
dataTypes := make(map[string]string)
for i := 0; i < len(types); i++ {
var columnName string
var dataType string
types[i].Column(0, &columnName)
types[i].Column(1, &dataType)
dataTypes[columnName] = dataType
}
formattedRows, md := extractDataByType(dataTypes, values)
我用开关将扳手类型转换为 go 类型:
func decodeValueByType(index int, row spanner.Row, value interface{}) {
err := row.Column(index, value)
if err != nil {
log.Println("Failed to extract value, err = %s", err)
}
}
func prepareParquetWriter(md *[]string, parquetType string, columnNames []string, index int) {
if len(*md) < len(columnNames) {
*md = append(*md, fmt.Sprintf("name=%s, type=%s", columnNames[index], parquetType))
}
}
func extractDataByType(types map[string]string, rows []spanner.Row) ([][]string, []string) {
var formattedRows [][]string
var md []string
for _, row := range rows {
columnNames := row.ColumnNames()
var vals []string
for i := 0; i < row.Size(); i++ {
switch types[columnNames[i]] {
case "STRING(MAX)":
var value spanner.NullString
decodeValueByType(i, row, &value)
prepareParquetWriter(&md, "UTF8", columnNames, i)
vals = append(vals, fmt.Sprintf("%v", value))
case "TIMESTAMP":
var value spanner.NullTime
decodeValueByType(i, row, &value)
prepareParquetWriter(&md, "TIMESTAMP_MILLIS", columnNames, i)
vals = append(vals, fmt.Sprintf("%v", value))
case "INT64":
var value spanner.NullInt64
decodeValueByType(i, row, &value)
prepareParquetWriter(&md, "INT64", columnNames, i)
vals = append(vals, fmt.Sprintf("%v", value))
case "BOOL":
var value spanner.NullBool
decodeValueByType(i, row, &value)
prepareParquetWriter(&md, "BOOLEAN", columnNames, i)
vals = append(vals, fmt.Sprintf("%v", value))
}
}
formattedRows = append(formattedRows, vals)
}
log.Println("parquet format: %s", md)
return formattedRows, md
}
最后,我在二维数组中获取了数据,并在数组中生成了镶木地板配置。
我还没有完成 GCS 的 parquet writer,但我使用 xitongsys/parquet-go 在本地编写文件,如下所示:
fw, err := ParquetFile.NewLocalFileWriter(fmt.Sprintf("dataInParquet/%s_%s.parquet", name, time.Now().Format("20060102150405")))
if err != nil {
log.Println("Can't open file", err)
return
}
pw, err := ParquetWriter.NewCSVWriter(md, fw, 4)
if err != nil {
log.Println("Can't create csv writer", err)
return
}
for _, row := range formattedRows {
rec := make([]*string, len(row))
for i := 0; i < len(row); i++ {
rec[i] = &row[i]
}
if err = pw.WriteString(rec); err != nil {
log.Println("WriteString error", err)
}
}
if err = pw.WriteStop(); err != nil {
log.Println("WriteStop error", err)
}
log.Println("Write Finished")
fw.Close()
如果有人知道更好的方法,请告诉我。谢谢。 ;-)
顺便说一句,这只是我的实验代码,如果您想使用任何此代码,请进行相应调整。我的生产实现需要支持更多功能,例如使用 goroutine 查询多个数据库、支持 Spanner 和 MySQL、以 Parquet 或 JSON 格式保存数据。如果有人在做类似的事情,希望听到更多的想法。
go spanner官网提供了一个根据数据类型动态解码行的例子:
for i := 0; i < row.Size(); i++ {
var col spanner.GenericColumnValue
if err := row.Column(i, &col); err != nil {
// TODO: Handle error.
}
switch col.Type.Code {
case sppb.TypeCode_INT64:
var v int64
if err := col.Decode(&v); err != nil {
// TODO: Handle error.
}
fmt.Println("int", v)
case sppb.TypeCode_STRING:
var v string
if err := col.Decode(&v); err != nil {
// TODO: Handle error.
}
fmt.Println("string", v)
}
}
解码后更容易转向 JSON。