Apache Beam是用于批处理和流处理的统一SDK。它允许使用特定于Beam的DSL指定大规模数据处理工作流程。 Beam工作流可以在不同的运行时执行,如Apache Flink,Apache Spark或Google Cloud Dataflow(云服务)。
我已经设置了一个数据流管道,它从 Pub/Sub 获取消息,转换为字典并打印消息。 这是我写的脚本: 导入 apache_beam 作为光束 导入日志记录 导入
使用 apache beam java 比较 2 个 csv 文件
我的任务是使用 Apache Beam Java 比较 2 个 csv 文件。我的文件如下 文件 1(ID) 1000672801 1000676083 1000686637 1000680800 文件2(参考图像) gs://productimages/p_10006...
Python apache beam 从 kafka 读取 - 我无法连接到 kafka
我想做一个python apache beam Kafka客户端,它将从Kafka获取流数据(Kafka返回大量数据,如数百万/数十亿),按键和值过滤数据并返回数据...
我有一个安装 apache-beam==2.35.0 的 kfp 组件。我在 GCP 管道上的 Vertex AI 中运行它。 组件头: @成分( base_image="dockerhub/python:3.7.12", 包裹到我...
如何使用 Flink runner 将 Beam Python 作业提交到 Kubernetes 上?
我想在 Kubernetes 内的 Flink 运行器上使用 Beam 运行连续流处理作业。我一直在这里关注本教程(https://python.plainenglish.io/apache-beam-flink-cluster-
Flink 上的 Apache Beam 支持快照启动/停止吗?
流处理应用程序在 Flink 集群上运行,使用原始 Flink 支持拍摄快照,然后通过 Flink REST API 从快照重新启动作业,例如停止与
我还是这个领域的新手,目前我正在尝试使用 Apache Beam 中的 ParDo 和 DoFn 反序列化 parquet 文件中的数据,对记录进行一些修改,然后更新
错误“PDone”对象没有属性“窗口”数据流 WriteToPubSub
我有一个梁管道,它从两个 Postgres CloudSQL DB 读取记录,进行一些数据转换,并通过 WriteToPubSub 模块将数据推送到 Google PubSub。 \ 我能够运行这个管道...
如何将 Spanner 更改数据捕获列名称映射到 BigQuery 中的不同列名称
我创建了Spanner数据库表的变更数据捕获(CDC),但问题是在BigQuery表中,列名不同,导致数据流管道失败。怎么...
在流管道中获取 EOFExceptions,以使用数据流管道将数据插入到启用了 TLS 的 memoystore Redis 实例中
我正在尝试从 pubsub 读取并将其写入内存存储 redis 实例。我使用 jedispool 因为该进程是多线程的。我能够将来自 pubsub 的数据写入实例...
Beam ReadFromKafka `with_metadata=True` 编码错误
使用Python SDK 2.49.0(调用Javaharness)中的ReadFromKafka在使用with_metadata=True时会引发编码错误: java.lang.IllegalArgumentException:无法编码元素'org.apache.beam ....
我需要实现一个具有并行管道的作业数据流(一个用于文件configuration.json中找到的每个实体)。 第一步是从 pub/sub 读取一个事件,通知文件到达
我正在尝试使用 Apache Beam Pardo 访问 Avro Generic Record 中的嵌套字段。 我可以进入第一层,但我不知道如何访问更进一步的字段。 因为如果你...
此管道代码作为 Direct runner 运行良好,但在 Dataflow runner 上运行时出错 'name 'read_inputpath_date' is not defined'
从 apache_beam.options.pipeline_options 导入 PipelineOptions 导入操作系统 导入日志 将 apache_beam 导入为光束 导入 gzip 导入json,io 从 google.cloud 导入存储 os.environ["
Dataflow/ApacheBeam 将输入限制为第一个 X 数量?
我有一个有限的 PCollection,但我只想获得第一个 X 数量的输入并丢弃其余的。有没有办法使用 Dataflow 2.X/ApacheBeam 来做到这一点?
我正在尝试扩展 Google 的 Dataflow 模板以将数据从 BQ 移动到 Cloud Storage 上的 parquet 文件,但我在尝试控制 parquet 文件大小时受阻。 https://cloud.google.com/dataflow/docs/...
Go 中的 Apache Beam IO:sql:未知驱动程序“cloudsql-postgres”(忘记导入?)
按照此处的指南,我正在尝试将数据流作业连接到 Go 中 GCP 上的 Cloud SQL Postgres 实例。 我可以在我的机器上本地运行该作业。我已经确认我的权限是...
GCP Dataflow ReadFromKafka 创建大量连接
我们正在使用 Python 创建数据流作业以从 Kafka(Amazon MSK,6 个代理,5 个分区主题)读取数据。数据流作业部署在具有 Cloud NAT(单个公共 IP)的 VPC 中,此 IP 是
读取 apache beam 数据帧中的压缩 json 文件
beam dataframe 看起来支持读取压缩的 json 文件 apache_beam.dataframe.io.read_json 但是,当我尝试通过代码读取文件时: 从 apache_beam.dataframe.io 导入
导入 org.apache.beam.runners.direct.DirectRunner; 导入 org.apache.beam.sdk.Pipeline; 导入 org.apache.beam.sdk.io.jdbc.JdbcIO; 导入 org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO; 进口...