Google Cloud Dataflow是一种完全托管的云服务,可用于大规模创建和评估数据处理管道。数据流管道基于Apache Beam编程模型,可以在批处理和流模式下运行。 Cloud Dataflow是Google云端平台的一部分。
有一个使用 Apache Beam 和 Dataflow 作为运行器的 Python 应用程序。该应用程序使用非公共 Python 包“uplight-telemetry”,该包是使用“extra_packages”配置的,而 cr...
我想创建一个分割条件,检查名为“年份”的列中的记录是否为 4 位数字,例如我检查它们是否代表年份(例如 2023 年)。有人可以告诉我如何创建
我需要创建一个表,用于存储数据库中所有表名及其行数的记录。 我尝试创建一个数据流,在其中使用转换选择特定数据......
如何让我的 GCP 工作流程在遇到 404 错误时保持循环?
我正在开发 GCP 工作流程并遇到问题 - 当我遇到 404 错误时循环停止。我尝试查看 GCP 文档,但到目前为止还没有成功。关于如何使循环继续进行的任何想法......
Google Cloud Dataflow 作业失败:发现意外参数
失败注释 当我设置数据流管道并从模板(“云存储上的文本文件到 BigQuery”)创建作业时,我遇到了这个问题。 作业创建失败:无法创建工作流程...
AttributeError:“tuple”对象在有状态 DoFn 期间没有属性“encode”
我正在使用有状态且及时的 DoFn 在我正在实现的固定窗口结束后 2 秒处理数据。 我已经在 Apache Beam 游乐场内测试了我的代码的可重现示例...
TL;DR:如何在 Par.Do 转换中访问在作业创建时传递给作业的参数? 我有两个模板,一个用于开发,一个用于生产,它们都工作正常,除了一个 va...
相同的 Apache Beam 代码适用于 Direct Runner,但不适用于 Dataflow 运行器
我有一段 apache 束管代码,它从 GCS 存储桶中的文件读取并打印它。它与 DirectRunner 完美配合并打印文件输出,但与 Dataflow 运行器配合...
Cloud Pub/Sub 中的订购交付是否可以与 Cloud Dataflow 配合使用
我有从 pubsub 读取消息的数据流作业,但是,它以错误的顺序处理元素,我正在使用窗口来获取最近的元素,但有时消息不会被提取...
在 Confluence Kafka 的 Flex 模板映像中获取信任库文件
我们尝试将 truststore.jks 文件存储在 Flex Template Docker 中,但在管道中使用它时我们无法找到它。 我们尝试拉取图像,我们可以看到文件是 p...
我有一个数据流,其中添加了一个整数参数 pMovType。在数据流源中,我尝试获取它,但不能。当我尝试以 $pMovType 形式使用它时,它只会给出“无效的伪列”。 ...
如何在管道的下一步中从 DB I/O 连接器访问 PCollection
我使用Apache-beam编写了一个小型管道。它使用beam-postgres作为输入连接器从数据库表创建PCollection。 代码如下所示 - 导入 apache_beam 作为光束 来自
我正在尝试使用 Google 提供的模板 PubSub 到 BigQuery 设置数据流作业。但是我在启动时收到此错误: 消息:资源“projects/my-project/global/networks/d...
我一直在尝试在 Google Cloud Dataflow 上部署管道。到目前为止,这是一个相当大的挑战。 我面临导入问题,因为我意识到 ParDo 函数需要requirements.txt...
使用 Beam Dataflow 流式传输 (python) 从 pub/sub 读取并写入 Firestore(本机)
我正在使用 Dataflow 滑动窗口从 pub/sub 读取内容,在创建实体并写入 Firestore 本机之前应用一些转换。我发现 Beam 不支持本机 Firestore...
我必须在Python中创建一个Apache数据束,它必须执行以下功能- 从数据库表中读取符合特定条件的条目 为该记录调用第一个 REST API 在响应...
AvroCoder.isDeterministic 返回 false。 为什么 AvroCoder 不是确定性的? Avro 记录不会总是被编码到相同的字节流中吗? 由于 Avro Coder 不是确定性的 Avro
我在数据流上有一个简单的并行光束Python作业。它使用 1 个 cpu 花费近 20 分钟,然后扩展到数百个,并在另外 20 分钟内完成。有没有办法让它自动缩放...
Apache Beam(数据流):是否可以创建具有多个窗口需求的管道
我正在尝试思考如何构建一些数据管道需求,我只是想知道以下是否可能: 我可以创建一个可以完全实时传输数据的 Apache Beam 管道吗...
我正在尝试将文件从 GCS 存储桶复制到数据流中的 /tmp 位置。为此,我尝试了下面的代码 - 导入 apache_beam 作为光束 .... .... 类copyFile(beam.DoFn): def __init__(se...