apache-beam 相关问题

Apache Beam是用于批处理和流处理的统一SDK。它允许使用特定于Beam的DSL指定大规模数据处理工作流程。 Beam工作流可以在不同的运行时执行,如Apache Flink,Apache Spark或Google Cloud Dataflow(云服务)。

Apache Beam Python 管道 - 等待生成第二个 PCollection

我正在读取GCS存储桶并生成一个json文件,稍后我需要将其导入到BQ中。 但我收到错误“OSError:根据文件模式找不到文件” 与光束。管道(选项=

回答 1 投票 0

向管道发出正常关闭信号

我目前正在使用 Beam 2.3.0。我花了两天时间研究如何使用 DirectRunner 优雅地关闭管道。 将 blockOnRun 设置为 false 并调用取消只会杀死

回答 2 投票 0

将 Apache Beam 标记输出(数据流运行程序)写入不同的 BQ 表

我似乎在将标记的 PCollection 写入 BQ 中的多个目标表时遇到问题。管道执行时没有错误,但没有数据写入。 如果我在没有

回答 3 投票 0

如何替换 BigQuery Apache Beam Java 数据流中的现有行

目前,我有一个名为 cdn_daily_user_playback_requests_1MONTH 的 BigQuery 表。其中包含基于日常记录的大量数据。所以会有来自整个的数据......

回答 1 投票 0

带有 CheckStopReadingFn 的 Beam Pipeline 在从 CheckStopReadingFn 返回 true 时抛出 IllegalStateException

我正在使用 Apache Beam 2.29.0 从 Kafka 进行消费。我添加了函数 CheckStopReadingFn 以在测试返回 true 后停止从 Kafka 读取。我的管道创建是这样的: 返回页。

回答 1 投票 0

数据流管道日志显示来自组织外部用户的连接请求

我已经在 Apache Beam Python SDK 中创建了一个数据流管道,并且运行良好。 我正在查看日志,时不时地我会看到来自现实世界 IP 上的随机用户名的连接请求...

回答 1 投票 0

Google Dataflow 工作人员完成度达 99%

我被这个问题困扰了一个多星期了。当我运行以下管道时,我的工作人员会翻阅我的数据 - 直到 99% 标记,他们会无限期地挂在这个标记上。我...

回答 1 投票 0

使用数据流作为 PCollection 从 GCS 存储桶读取 Avro 文件<TableRow>

我想知道如何从 GCS 中读取 Avro 文件的内容作为 PCollection 我正在尝试这样: public static PCollection avroFileReader(Pipeline pipeline,String inputAvroFi...

回答 1 投票 0

DataFlow 代码返回 DoFn 类的空输出

我正在尝试使用数据流运行下面的代码,其中我在类中定义了 2-3 个函数,其中 2 个函数正在工作,但 send_email() 正在工作,也没有抛出任何错误。 请...

回答 1 投票 0

等待两个PubSub数据流后再加入数据

我有两个独立的PubSub数据流。他们每个人都会收到一条通知。我想在出现通知时检索文件并使用 CoGroupByKey 加入这些文件。这两个文件都...

回答 1 投票 0

由于当前可用区资源不足,数据流管道创建失败

我已经设置了一个数据流管道,它从 Pub/Sub 获取消息,转换为字典并打印消息。 这是我写的脚本: 导入 apache_beam 作为光束 导入日志记录 导入

回答 1 投票 0

使用 apache beam java 比较 2 个 csv 文件

我的任务是使用 Apache Beam Java 比较 2 个 csv 文件。我的文件如下 文件 1(ID) 1000672801 1000676083 1000686637 1000680800 文件2(参考图像) gs://productimages/p_10006...

回答 1 投票 0

Python apache beam 从 kafka 读取 - 我无法连接到 kafka

我想做一个python apache beam Kafka客户端,它将从Kafka获取流数据(Kafka返回大量数据,如数百万/数十亿),按键和值过滤数据并返回数据...

回答 1 投票 0

无法安装 apache-beam==2.35.0、apache-beam[gcp]==2.35.0 apache-beam[gcp]==2.47.0 和 apache-beam[gcp]==2.48.0 版本有冲突依赖

我有一个安装 apache-beam==2.35.0 的 kfp 组件。我在 GCP 管道上的 Vertex AI 中运行它。 组件头: @成分( base_image="dockerhub/python:3.7.12", 包裹到我...

回答 1 投票 0

如何使用 Flink runner 将 Beam Python 作业提交到 Kubernetes 上?

我想在 Kubernetes 内的 Flink 运行器上使用 Beam 运行连续流处理作业。我一直在这里关注本教程(https://python.plainenglish.io/apache-beam-flink-cluster-

回答 2 投票 0

Flink 上的 Apache Beam 支持快照启动/停止吗?

流处理应用程序在 Flink 集群上运行,使用原始 Flink 支持拍摄快照,然后通过 Flink REST API 从快照重新启动作业,例如停止与

回答 1 投票 0

如何将 JsonObject 转换为 Avro 通用记录?

我还是这个领域的新手,目前我正在尝试使用 Apache Beam 中的 ParDo 和 DoFn 反序列化 parquet 文件中的数据,对记录进行一些修改,然后更新

回答 0 投票 0

错误“PDone”对象没有属性“窗口”数据流 WriteToPubSub

我有一个梁管道,它从两个 Postgres CloudSQL DB 读取记录,进行一些数据转换,并通过 WriteToPubSub 模块将数据推送到 Google PubSub。 \ 我能够运行这个管道...

回答 0 投票 0

如何将 Spanner 更改数据捕获列名称映射到 BigQuery 中的不同列名称

我创建了Spanner数据库表的变更数据捕获(CDC),但问题是在BigQuery表中,列名不同,导致数据流管道失败。怎么...

回答 0 投票 0

在流管道中获取 EOFExceptions,以使用数据流管道将数据插入到启用了 TLS 的 memoystore Redis 实例中

我正在尝试从 pubsub 读取并将其写入内存存储 redis 实例。我使用 jedispool 因为该进程是多线程的。我能够将来自 pubsub 的数据写入实例...

回答 0 投票 0

© www.soinside.com 2019 - 2024. All rights reserved.