apache-beam 相关问题

Apache Beam是用于批处理和流处理的统一SDK。它允许使用特定于Beam的DSL指定大规模数据处理工作流程。 Beam工作流可以在不同的运行时执行,如Apache Flink,Apache Spark或Google Cloud Dataflow(云服务)。

由于当前可用区资源不足,数据流管道创建失败

我已经设置了一个数据流管道,它从 Pub/Sub 获取消息,转换为字典并打印消息。 这是我写的脚本: 导入 apache_beam 作为光束 导入日志记录 导入

回答 1 投票 0

使用 apache beam java 比较 2 个 csv 文件

我的任务是使用 Apache Beam Java 比较 2 个 csv 文件。我的文件如下 文件 1(ID) 1000672801 1000676083 1000686637 1000680800 文件2(参考图像) gs://productimages/p_10006...

回答 1 投票 0

Python apache beam 从 kafka 读取 - 我无法连接到 kafka

我想做一个python apache beam Kafka客户端,它将从Kafka获取流数据(Kafka返回大量数据,如数百万/数十亿),按键和值过滤数据并返回数据...

回答 1 投票 0

无法安装 apache-beam==2.35.0、apache-beam[gcp]==2.35.0 apache-beam[gcp]==2.47.0 和 apache-beam[gcp]==2.48.0 版本有冲突依赖

我有一个安装 apache-beam==2.35.0 的 kfp 组件。我在 GCP 管道上的 Vertex AI 中运行它。 组件头: @成分( base_image="dockerhub/python:3.7.12", 包裹到我...

回答 1 投票 0

如何使用 Flink runner 将 Beam Python 作业提交到 Kubernetes 上?

我想在 Kubernetes 内的 Flink 运行器上使用 Beam 运行连续流处理作业。我一直在这里关注本教程(https://python.plainenglish.io/apache-beam-flink-cluster-

回答 2 投票 0

Flink 上的 Apache Beam 支持快照启动/停止吗?

流处理应用程序在 Flink 集群上运行,使用原始 Flink 支持拍摄快照,然后通过 Flink REST API 从快照重新启动作业,例如停止与

回答 1 投票 0

如何将 JsonObject 转换为 Avro 通用记录?

我还是这个领域的新手,目前我正在尝试使用 Apache Beam 中的 ParDo 和 DoFn 反序列化 parquet 文件中的数据,对记录进行一些修改,然后更新

回答 0 投票 0

错误“PDone”对象没有属性“窗口”数据流 WriteToPubSub

我有一个梁管道,它从两个 Postgres CloudSQL DB 读取记录,进行一些数据转换,并通过 WriteToPubSub 模块将数据推送到 Google PubSub。 \ 我能够运行这个管道...

回答 0 投票 0

如何将 Spanner 更改数据捕获列名称映射到 BigQuery 中的不同列名称

我创建了Spanner数据库表的变更数据捕获(CDC),但问题是在BigQuery表中,列名不同,导致数据流管道失败。怎么...

回答 0 投票 0

在流管道中获取 EOFExceptions,以使用数据流管道将数据插入到启用了 TLS 的 memoystore Redis 实例中

我正在尝试从 pubsub 读取并将其写入内存存储 redis 实例。我使用 jedispool 因为该进程是多线程的。我能够将来自 pubsub 的数据写入实例...

回答 0 投票 0

Beam ReadFromKafka `with_metadata=True` 编码错误

使用Python SDK 2.49.0(调用Javaharness)中的ReadFromKafka在使用with_metadata=True时会引发编码错误: java.lang.IllegalArgumentException:无法编码元素'org.apache.beam ....

回答 0 投票 0

数据流并行管道

我需要实现一个具有并行管道的作业数据流(一个用于文件configuration.json中找到的每个实体)。 第一步是从 pub/sub 读取一个事件,通知文件到达

回答 0 投票 0

如何读取/更新嵌套 Avro 通用记录中的值?

我正在尝试使用 Apache Beam Pardo 访问 Avro Generic Record 中的嵌套字段。 我可以进入第一层,但我不知道如何访问更进一步的字段。 因为如果你...

回答 1 投票 0

此管道代码作为 Direct runner 运行良好,但在 Dataflow runner 上运行时出错 'name 'read_inputpath_date' is not defined'

从 apache_beam.options.pipeline_options 导入 PipelineOptions 导入操作系统 导入日志 将 apache_beam 导入为光束 导入 gzip 导入json,io 从 google.cloud 导入存储 os.environ["

回答 1 投票 0

Dataflow/ApacheBeam 将输入限制为第一个 X 数量?

我有一个有限的 PCollection,但我只想获得第一个 X 数量的输入并丢弃其余的。有没有办法使用 Dataflow 2.X/ApacheBeam 来做到这一点?

回答 3 投票 0

使用 Apache Beam 时指定镶木地板文件大小

我正在尝试扩展 Google 的 Dataflow 模板以将数据从 BQ 移动到 Cloud Storage 上的 parquet 文件,但我在尝试控制 parquet 文件大小时受阻。 https://cloud.google.com/dataflow/docs/...

回答 0 投票 0

Go 中的 Apache Beam IO:sql:未知驱动程序“cloudsql-postgres”(忘记导入?)

按照此处的指南,我正在尝试将数据流作业连接到 Go 中 GCP 上的 Cloud SQL Postgres 实例。 我可以在我的机器上本地运行该作业。我已经确认我的权限是...

回答 0 投票 0

GCP Dataflow ReadFromKafka 创建大量连接

我们正在使用 Python 创建数据流作业以从 Kafka(Amazon MSK,6 个代理,5 个分区主题)读取数据。数据流作业部署在具有 Cloud NAT(单个公共 IP)的 VPC 中,此 IP 是

回答 1 投票 0

读取 apache beam 数据帧中的压缩 json 文件

beam dataframe 看起来支持读取压缩的 json 文件 apache_beam.dataframe.io.read_json 但是,当我尝试通过代码读取文件时: 从 apache_beam.dataframe.io 导入

回答 0 投票 0

Java Apache 光束

导入 org.apache.beam.runners.direct.DirectRunner; 导入 org.apache.beam.sdk.Pipeline; 导入 org.apache.beam.sdk.io.jdbc.JdbcIO; 导入 org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO; 进口...

回答 0 投票 0

© www.soinside.com 2019 - 2024. All rights reserved.