如何使用数据流将数据从Google Pub / Sub批量处理到Cloud Storage?

问题描述 投票:0回答:1

我正在建立一个Change Data Capture管道,该管道从MYSQL数据库读取数据并在BigQuery中创建一个副本。我将在发布/订阅中推送更改,并使用Dataflow将其转移到Google Cloud Storage。我已经能够弄清楚如何流式传输更改,但是我需要对数据库中的一些表运行批处理。

在从无限制的来源(例如Pub / Sub)读取数据时,可以使用Dataflow来运行批处理作业吗?我可以运行此批处理作业以将数据从发布/订阅传输到Cloud Storage,然后再将该数据加载到BigQuery吗?我想要批处理作业,因为流作业的成本更高。

google-bigquery google-cloud-storage batch-processing google-cloud-pubsub dataflow
1个回答
0
投票

谢谢您的精确度。

首先,当您在数据流(Beam框架)中使用PubSub时,仅在streaming mode中可以使用>

[远程执行期间,当前仅在流传输管道中支持Cloud Pub / Sub源和接收器。

如果您的过程不需要实时,则可以跳过数据流并节省金钱。您可以将Cloud Functions或Cloud Run用于我建议的流程(如果需要,也可以使用App Engine,但不是我的第一个建议)。

[在两种情况下,都创建一个由Cloud Scheduler定期(每周?)触发的进程(云运行或云功能)。

解决方案1

  • 将您的进程连接到请求订阅
  • [每次您阅读一条消息(或一条消息,例如1000条)时,都将流写入BigQuery。 ->但是,在大查询中流写入不是免费的($ 0.05每Gb)
  • 循环直到队列为空。将超时设置为最大值(使用Cloud Function为9分钟,对于Cloud Run为15分钟),以防止出现任何超时问题
  • 解决方案2

  • 将您的进程连接到请求订阅
  • 读取大量消息(例如1000条),并将它们写入内存(写入数组)。
  • 循环直到队列为空。将超时设置为最大值(使用Cloud Function为9分钟,对于Cloud Run为15分钟),以防止出现任何超时问题。还要将内存设置为最大值(2Gb),以防止内存不足崩溃。
  • [从您的内存中数据数组创建到BigQuery的加载作业。 ->这里的加载作业是免费的,每天和每张表最多只能有1000个加载作业。
  • 但是,如果您的应用+数据大小大于内存大小,则此解决方案可能会失败。一种替代方法是每隔一百万行(例如,取决于每行的大小和内存占用量)在GCS中创建一个文件。用唯一的前缀命名文件,例如日期(YYYYMMDD-tempFileXX),并在每次创建文件时增加XX。然后,不是从内存中的数据而是通过文件名(gs://myBucket/YYYYMMDD-tempFile*)中带有通配符的GCS中的数据创建加载作业。这样,所有与前缀匹配的文件都会被加载。

推荐

PubSub消息在pubsub订阅中最多保留7天。我建议您至少每3天触发一次过程,以便有时间进行反应和调试,然后再将消息删除到订阅中。

个人经验

写入BigQuery的流对于少量数据而言是廉价的。我花了几分钱,建议您考虑第一种解决方案,您可以为此付出代价。管理和代码更小/更轻松!
© www.soinside.com 2019 - 2024. All rights reserved.