Google Cloud Dataflow是一种完全托管的云服务,可用于大规模创建和评估数据处理管道。数据流管道基于Apache Beam编程模型,可以在批处理和流模式下运行。 Cloud Dataflow是Google云端平台的一部分。
Dataflow 目前支持自定义容器,如下页所示, https://cloud.google.com/dataflow/docs/guides/using-custom-containers 我想知道我们是否可以使用我们自己的 VM 映像进行 sta...
我正在将 IoT 数据从 OPC UA 服务器发送到发布/订阅主题。该主题的每条消息都包含大约 100 个传感器的 15 分钟的每分钟数据。数据流从这个发布/订阅中读取...
如何在 GCP 数据流中使用 python 管道代码读取 BigQuery 表
有人可以分享语法以在用 python 为 GCP 数据流编写的管道中读/写 bigquery 表
嗨,我试图找到允许在 flex 模式下运行的数据流的最大大小以及检查大小的方法。 我知道经典模板每个模板有 10MB 的限制。 但是在谷歌云上搜索
此管道代码作为 Direct runner 运行良好,但在 Dataflow runner 上运行时出错 'name 'read_inputpath_date' is not defined'
从 apache_beam.options.pipeline_options 导入 PipelineOptions 导入操作系统 导入日志 将 apache_beam 导入为光束 导入 gzip 导入json,io 从 google.cloud 导入存储 os.environ["
Dataflow/ApacheBeam 将输入限制为第一个 X 数量?
我有一个有限的 PCollection,但我只想获得第一个 X 数量的输入并丢弃其余的。有没有办法使用 Dataflow 2.X/ApacheBeam 来做到这一点?
我正在尝试扩展 Google 的 Dataflow 模板以将数据从 BQ 移动到 Cloud Storage 上的 parquet 文件,但我在尝试控制 parquet 文件大小时受阻。 https://cloud.google.com/dataflow/docs/...
Go 中的 Apache Beam IO:sql:未知驱动程序“cloudsql-postgres”(忘记导入?)
按照此处的指南,我正在尝试将数据流作业连接到 Go 中 GCP 上的 Cloud SQL Postgres 实例。 我可以在我的机器上本地运行该作业。我已经确认我的权限是...
GCP Dataflow ReadFromKafka 创建大量连接
我们正在使用 Python 创建数据流作业以从 Kafka(Amazon MSK,6 个代理,5 个分区主题)读取数据。数据流作业部署在具有 Cloud NAT(单个公共 IP)的 VPC 中,此 IP 是
读取 apache beam 数据帧中的压缩 json 文件
beam dataframe 看起来支持读取压缩的 json 文件 apache_beam.dataframe.io.read_json 但是,当我尝试通过代码读取文件时: 从 apache_beam.dataframe.io 导入
运行任何诗歌可执行文件时出现此错误 追溯(最近一次通话): 中的文件“/root/.local/bin/poetry”,第 5 行 来自 poetry.console.applicat...
将列转换为 Azure Dataflow 中另一列的 JSON 对象
我有以下格式的数据,我正在使用数据流将记录格式化为 JSON 格式,并将其存储到数据的另一列中。 输入 想要使用 Dataflow 转换为以下格式: 输出
如何使用LoggerFactory在数据流Apache Beam应用中获取日志?
我正在 Google DataFlow 上运行 Beam 应用程序,但我无法在数据流中看到信息日志。 公开课 Abc { private static final Logger LOG = LoggerFactory.getLogger(Abc.cla...
将 DataFrame 加载到 BigQuery 表时出错(pyarrow.lib.ArrowTypeError:<class 'str'> 类型的对象无法转换为 int)
我正在尝试将数据从 SQL SERVER 加载到 GCP Bigquery uisng Dataflow。运行管道时出现以下错误: 将 DataFrame 加载到 BigQuery 表时出错(pyarrow.lib.ArrowTypeErr ...
谷歌云数据流错误NoSuchMethodException:没有这样的功能
我正在使用数据流函数将 pubsub 消息以 json 的形式转换为字符串,以提交到具有正确模式的 bigquery 表中。 我使用以下 UDF 函数
我有两个不同的数据源。假设 A 包含有关客户的信息:名称,城市,分区(GUID),一个数据源 B 包含一列 namedivision(GUID)。 我的第一个数据源是
用于将 csv 文件加载到 bigquery 的 python 脚本
我是数据流初学者,使用这个通用脚本使用数据流将 csv 文件加载到 bigquery。 导入 argparse 导入 csv 导入日志 将 apache_beam 导入为光束 来自 apache_beam.options.
使用数据库中的数据源的组合框有问题: 用户选择一个值,然后点击提交按钮将数据保存到数据库中。 用于填充组合框的列表...
我目前正在处理 4 个数据流作业,每个作业都在 3 个独立的环境中进行复制。在过去的 3 个月中,所有 12 个实例都已成功运行。但是,我观察到...
为什么 Apache Beam 似乎在单个 worker 上并行化元素?
我正在使用 Google Cloud Dataflow 运行程序测试一个非常简单的 Apache Beam 管道,该运行程序从 Pub/Sub 读取音频元素,通过 Tensorflow 模型运行元素,并将结果写入 Pub...