如何在golang中将spanner行提取为Json或Parquet格式?

问题描述 投票:0回答:3

我是 golang 和 Spanner 的新手,我想每 5 分钟将 Spanner DB 的快照保存到 Google 云存储。我想使用的格式是 Parquet 或 JSON。

stmt = spanner.NewStatement("SELECT * FROM " + tableName + " WHERE UpdatedAt >= @startDateTime AND UpdatedAt <= @endDateTime")
iter := txn.Query(ctx, stmt)
defer iter.Stop()
for {
    row, err := iter.Next()
    if err == iterator.Done {
        break
    }
    if err != nil {
        log.Println("Failed to read data, err = %s", err)
    }
}

我已经获得了所有行,但我不知道如何提取所有列值并将其写入 Parquet 或 JSON 文件或将其上传到 GCS。是否可以在不知道值的类型或列名称的情况下提取所有列值?任何帮助将不胜感激。

json go parquet google-cloud-spanner
3个回答
3
投票

检索值需要列类型。请参阅 Row 文档中的“支持的类型及其相应的 Cloud Spanner 列类型”。您可以从Row.ColumnNames获取列名称。将 Row.ToStruct 与与表相对应的结构一起使用,并将其写入 json 可能是有意义的,例如使用“encoding/json”包的 Marshal


3
投票

我想分享我繁琐的解决方案,希望它能帮助将来的人。就我而言,我的任务是在短时间内保存 Spanner DB 的快照,并将这些数据以 parquet 格式保存到 GCS。这样以后我们就可以使用大查询来查询这些数据。

首先,我通过这样的简单语句获得了我想要的扳手行:

stmt := spanner.NewStatement(fmt.Sprintf("SELECT * FROM %s WHERE UpdatedAt >= @startDateTime AND UpdatedAt <= @endDateTime", tableName))
    stmt.Params["startDateTime"] = time.Unix(1520330400, 0)
    stmt.Params["endDateTime"] = time.Unix(1520376600, 0)
iter := txn.Query(ctx, stmt)
values := readRows(iter)

func readRows(iter *spanner.RowIterator) []spanner.Row {
    var rows []spanner.Row
    defer iter.Stop()
    for {
        row, err := iter.Next()
        if err == iterator.Done {
            break
        }
        if err != nil {
            log.Println("Failed to read data, err = %s", err)
        }
        rows = append(rows, *row)
    }
    return rows
}

是的,这很容易。但这是令人鼓舞的,因为这是我第一次用 golang 编码。然而,我花了一段时间才发现,在不知道每列类型的情况下不可能解码该值,但我所需要的只是每列的字符串值并将其保存为 parquet 格式。

所以我编写了另一个查询来获取每列的扳手类型,如下所示:

typeStmt = spanner.NewStatement("SELECT t.column_name, t.spanner_type FROM information_schema.columns AS t WHERE t.table_name = @tableName")
typeStmt.Params["tableName"] = tableName

iterTypes := txn.Query(ctx, typeStmt)
types := readRows(iterTypes)
// use a map to keep all the types
dataTypes := make(map[string]string)
        for i := 0; i < len(types); i++ {
            var columnName string
            var dataType string
            types[i].Column(0, &columnName)
            types[i].Column(1, &dataType)
            dataTypes[columnName] = dataType
        }
formattedRows, md := extractDataByType(dataTypes, values)

我用开关将扳手类型转换为 go 类型:

    func decodeValueByType(index int, row spanner.Row, value interface{}) {
            err := row.Column(index, value)
            if err != nil {
                log.Println("Failed to extract value, err = %s", err)
            }
        }

    func prepareParquetWriter(md *[]string, parquetType string, columnNames []string, index int) {
        if len(*md) < len(columnNames) {
            *md = append(*md, fmt.Sprintf("name=%s, type=%s", columnNames[index], parquetType))
        }
    }

func extractDataByType(types map[string]string, rows []spanner.Row) ([][]string, []string) {
    var formattedRows [][]string
    var md []string
    for _, row := range rows {
        columnNames := row.ColumnNames()
        var vals []string
        for i := 0; i < row.Size(); i++ {
            switch types[columnNames[i]] {
            case "STRING(MAX)":
                var value spanner.NullString
                decodeValueByType(i, row, &value)
                prepareParquetWriter(&md, "UTF8", columnNames, i)
                vals = append(vals, fmt.Sprintf("%v", value))
            case "TIMESTAMP":
                var value spanner.NullTime
                decodeValueByType(i, row, &value)
                prepareParquetWriter(&md, "TIMESTAMP_MILLIS", columnNames, i)
                vals = append(vals, fmt.Sprintf("%v", value))
            case "INT64":
                var value spanner.NullInt64
                decodeValueByType(i, row, &value)
                prepareParquetWriter(&md, "INT64", columnNames, i)
                vals = append(vals, fmt.Sprintf("%v", value))
            case "BOOL":
                var value spanner.NullBool
                decodeValueByType(i, row, &value)
                prepareParquetWriter(&md, "BOOLEAN", columnNames, i)
                vals = append(vals, fmt.Sprintf("%v", value))
            }
        }
        formattedRows = append(formattedRows, vals)
    }
    log.Println("parquet format: %s", md)
    return formattedRows, md
}

最后,我在二维数组中获取了数据,并在数组中生成了镶木地板配置。

我还没有完成 GCS 的 parquet writer,但我使用 xitongsys/parquet-go 在本地编写文件,如下所示:

fw, err := ParquetFile.NewLocalFileWriter(fmt.Sprintf("dataInParquet/%s_%s.parquet", name, time.Now().Format("20060102150405")))
            if err != nil {
                log.Println("Can't open file", err)
                return
            }
            pw, err := ParquetWriter.NewCSVWriter(md, fw, 4)
            if err != nil {
                log.Println("Can't create csv writer", err)
                return
            }
            for _, row := range formattedRows {
                rec := make([]*string, len(row))
                for i := 0; i < len(row); i++ {
                    rec[i] = &row[i]
                }
                if err = pw.WriteString(rec); err != nil {
                    log.Println("WriteString error", err)
                }
            }
            if err = pw.WriteStop(); err != nil {
                log.Println("WriteStop error", err)
            }
            log.Println("Write Finished")
            fw.Close()

如果有人知道更好的方法,请告诉我。谢谢。 ;-)

顺便说一句,这只是我的实验代码,如果您想使用任何此代码,请进行相应调整。我的生产实现需要支持更多功能,例如使用 goroutine 查询多个数据库、支持 Spanner 和 MySQL、以 Parquet 或 JSON 格式保存数据。如果有人在做类似的事情,希望听到更多的想法。


0
投票

go spanner官网提供了一个根据数据类型动态解码行的例子:

    for i := 0; i < row.Size(); i++ {
        var col spanner.GenericColumnValue
        if err := row.Column(i, &col); err != nil {
            // TODO: Handle error.
        }
        switch col.Type.Code {
        case sppb.TypeCode_INT64:
            var v int64
            if err := col.Decode(&v); err != nil {
                // TODO: Handle error.
            }
            fmt.Println("int", v)
        case sppb.TypeCode_STRING:
            var v string
            if err := col.Decode(&v); err != nil {
                // TODO: Handle error.
            }
            fmt.Println("string", v)
        }
    }

解码后更容易转向 JSON。

© www.soinside.com 2019 - 2024. All rights reserved.