MongoDB 到 BigQuery

问题描述 投票:0回答:4

将 mlab 托管的 MongoDB 数据导出到 google bigquery 的最佳方式是什么?

最初,我尝试从 MongoDB 一次性加载到 BigQuery,后来我考虑使用 Pub/Sub 将实时数据流传输到 BigQuery。

我需要有关从 mongodb 到 bigquery 的第一次加载的帮助。

mongodb google-bigquery
4个回答
22
投票

在我看来,最佳实践是构建自己的提取器。这可以使用您选择的语言来完成,并且可以提取为 CSV 或 JSON。

但是,如果您寻求一种快速的方法,并且您的数据不是很大并且可以容纳在一台服务器中,那么我建议使用

mongoexport
。假设您有一个简单的文档结构,如下所示:

{
    "_id" : "tdfMXH0En5of2rZXSQ2wpzVhZ",
    "statuses" : [ 
        {
            "status" : "dc9e5511-466c-4146-888a-574918cc2534",
            "score" : 53.24388894
        }
    ],
    "stored_at" : ISODate("2017-04-12T07:04:23.545Z")
}

然后您需要定义您的 BigQuery 架构 (

mongodb_schema.json
),例如:

$ cat > mongodb_schema.json <<EOF
[
    { "name":"_id", "type": "STRING" },
    { "name":"stored_at", "type": "record", "fields": [
        { "name":"date", "type": "STRING" }
    ]},
    { "name":"statuses", "type": "record", "mode": "repeated", "fields": [
        { "name":"status", "type": "STRING" },
        { "name":"score", "type": "FLOAT" }
    ]}
]
EOF

现在,有趣的部分开始了:-) 从 MongoDB 中以 JSON 形式提取数据。假设您有一个副本集名称为

statuses
的集群,您的数据库为
sample
,您的集合为
status

mongoexport \
    --host statuses/db-01:27017,db-02:27017,db-03:27017 \
    -vv \
    --db "sample" \
    --collection "status" \
    --type "json" \
    --limit 100000 \
    --out ~/sample.json

正如您在上面看到的,我将输出限制为 100k 条记录,因为我建议您在对所有数据执行此操作之前运行示例并加载到 BigQuery。运行上述命令后,您应该在

sample.json
中获得示例数据,但有一个字段
$date
会导致您加载到 BigQuery 时出错。要解决这个问题,我们可以使用
sed
将它们替换为简单的字段名称:

# Fix Date field to make it compatible with BQ
sed -i 's/"\$date"/"date"/g' sample.json

现在您可以使用以下命令压缩、上传到 Google Cloud Storage (GCS),然后加载到 BigQuery:

# Compress for faster load
gzip sample.json

# Move to GCloud
gsutil mv ./sample.json.gz gs://your-bucket/sample/sample.json.gz

# Load to BQ
bq load \
    --source_format=NEWLINE_DELIMITED_JSON \
    --max_bad_records=999999 \
    --ignore_unknown_values=true \
    --encoding=UTF-8 \
    --replace \
    "YOUR_DATASET.mongodb_sample" \
    "gs://your-bucket/sample/*.json.gz" \
    "mongodb_schema.json"

如果一切正常,则返回并从

--limit 100000
命令中删除
mongoexport
,然后再次重新运行上述命令以加载所有内容而不是 100k 样本。

替代解决方案:

如果您想要更大的灵活性并且性能不是您所关心的,那么您也可以使用

mongo
CLI 工具。这样,您就可以在 JavaScript 中编写提取逻辑并针对您的数据执行它,然后将输出发送到 BigQuery。以下是我对相同流程所做的操作,但使用 JavaScript 以 CSV 格式输出,以便我可以更轻松地将其加载到 BigQuery:

# Export Logic in JavaScript
cat > export-csv.js <<EOF
var size = 100000;
var maxCount = 1;
for (x = 0; x < maxCount; x = x + 1) {
    var recToSkip = x * size;
    db.entities.find().skip(recToSkip).limit(size).forEach(function(record) {
        var row = record._id + "," + record.stored_at.toISOString();;
        record.statuses.forEach(function (l) {
            print(row + "," + l.status + "," + l.score)
        });
    });
}
EOF

# Execute on Mongo CLI
_MONGO_HOSTS="db-01:27017,db-02:27017,db-03:27017/sample?replicaSet=statuses"
mongo --quiet \
    "${_MONGO_HOSTS}" \
    export-csv.js \
    | split -l 500000 --filter='gzip > $FILE.csv.gz' - sample_

# Load all Splitted Files to Google Cloud Storage
gsutil -m mv ./sample_* gs://your-bucket/sample/

# Load files to BigQuery
bq load \
    --source_format=CSV \
    --max_bad_records=999999 \
    --ignore_unknown_values=true \
    --encoding=UTF-8 \
    --replace \
    "YOUR_DATASET.mongodb_sample" \
    "gs://your-bucket/sample/sample_*.csv.gz" \
    "ID,StoredDate:DATETIME,Status,Score:FLOAT"

提示:在上面的脚本中,我通过管道输出做了一些小技巧,以便能够使用

sample_
前缀将输出拆分为多个文件。此外,在分割期间,它会对输出进行 GZip 压缩,以便您可以更轻松地将其加载到 GCS。


2
投票

从 MongoDB 文档的基本阅读来看,听起来您可以使用

mongoexport
将数据库转储为 JSON。完成此操作后,请参阅BigQuery 加载数据主题,了解如何在将 JSON 文件复制到 GCS 后从 JSON 文件创建表。


1
投票

您可以从 MongoDB 读取数据并将其流式传输到 BigQuery。您可以在 NodeJS here 中找到示例。

这是链接示例的扩展,可防止重复记录(只要它们仍在流式缓冲区中):

    const { BigQuery } = require('@google-cloud/bigquery');
    const bigqueryClient = new BigQuery();

    ...

    const jsonData = // Array of documents from MongoDB

    const inputRows = jsonData.map(row => ({
      insertId: row._id,
      json: row
    }));
    const insertOptions = {
      raw: true
    };

    await bigqueryClient
      .dataset(datasetId)
      .table(tableId)
      .insert(inputRows, insertOptions);


0
投票

如果您不想编写自己的自定义提取/加载代码,我建议您查看更改数据捕获(CDC)和实现它的工具。 CDC 可帮助您将每条新数据从 MongoDB 复制到 BigQuery。例如,Estuary Flow 拥有两个数据存储的托管连接器。您无需运行任何自定义软件,只需几分钟设置连接配置,即可开始同步两个数据库。

以下是有关设置 MongoDB 捕获连接器的详细指南:https://docs.estuary.dev/getting-started/tutorials/real_time_cdc_with_mongodb/

© www.soinside.com 2019 - 2024. All rights reserved.