apache-beam 相关问题

Apache Beam是用于批处理和流处理的统一SDK。它允许使用特定于Beam的DSL指定大规模数据处理工作流程。 Beam工作流可以在不同的运行时执行,如Apache Flink,Apache Spark或Google Cloud Dataflow(云服务)。

如何在 Apache Beam (Python SDK) 中使用 `with_outputs` 和 `with_output_types`?

Apache Beam PTransform 可以附加 with_outputs 和 with_output_types。例如, pcoll | beam.CombinePerKey(sum).with_output_types(typing.Tuple[unicode, int]) 和 (词 | 梁.ParDo(

回答 1 投票 0

Dataflow Runner 与 GCP Secret Manager 存在问题

我有一个使用 Java 编码的 Apache Beam Dataflow 项目,其中我使用以下子例程来获取数据库凭据: 私有静态 JsonObject getCredentials(String suffix) { 字符串

回答 1 投票 0

获取匿名调用者在数据流上运行 wordcount 时没有 storage.objects.create 访问错误

我正在使用python运行apache beam包中wordcount模块的数据流快速启动。我能够在我的机器上本地运行它。但是,当我尝试通过指定

回答 2 投票 0

apache beam 和 Big Query TableSchema 中的序列化问题

并感谢您的支持。 我目前正在尝试使用 Apache Beam,以尽可能多地了解它的工作原理。我面临 com.google.api.serv 序列化的问题...

回答 1 投票 0

如何使用BigQueryToPostgresOperator

我是在 GCP 上使用 apache-airflow 的新手,我正在尝试在 Dataproc 无服务器内的 DAG 上使用 BigQueryToPostgresOperator 将表从 Bigquery 发送到 Cloud SQL,特别是发送到

回答 1 投票 0

Apache Beam DirectRunner 与 FlinkRunner 示例

我使用beam yaml(python sdk)构建了最简单的管道,其中读取csv文件并应打印到日志。 使用默认 DirectRunner 运行时: python -m apache_beam.yaml.main --

回答 1 投票 0

将 BigQuery 重复数据类型转换为 parquet

我有一个 BigQuery 表,其中包含由结构化数据 (RECORD) 组成的 REPEATED 字段,该字段仅由两个键组成:KEY 和 VALUE。它看起来像这样: [{“KEY”:“TESTING_FLAG”,“...

回答 1 投票 0

在 Dataflow Python flex 模板中包含另一个文件,ImportError

是否有一个包含多个文件的 Python 数据流 Flex 模板示例,其中脚本导入同一文件夹中包含的其他文件? 我的项目结构是这样的: ├── 管道...

回答 5 投票 0

数据流流错误并显示工作流失败错误消息

运行我的数据流作业时,它很早就失败了(没有数据处理,似乎没有启动工作程序),并显示一条错误消息: 工作流程失败。 我尝试运行基于...的流数据流作业

回答 1 投票 0

数据流工作线程中无法访问环境变量

有一个使用 Apache Beam 和 Dataflow 作为运行器的 Python 应用程序。该应用程序使用非公共 Python 包“uplight-telemetry”,该包是使用“extra_packages”配置的,而 cr...

回答 1 投票 0

AttributeError:“tuple”对象在有状态 DoFn 期间没有属性“encode”

我正在使用有状态且及时的 DoFn 在我正在实现的固定窗口结束后 2 秒处理数据。 我已经在 Apache Beam 游乐场内测试了我的代码的可重现示例...

回答 1 投票 0

相同的 Apache Beam 代码适用于 Direct Runner,但不适用于 Dataflow 运行器

我有一段 apache 束管代码,它从 GCS 存储桶中的文件读取并打印它。它与 DirectRunner 完美配合并打印文件输出,但与 Dataflow 运行器配合...

回答 2 投票 0

Cloud Pub/Sub 中的订购交付是否可以与 Cloud Dataflow 配合使用

我有从 pubsub 读取消息的数据流作业,但是,它以错误的顺序处理元素,我正在使用窗口来获取最近的元素,但有时消息不会被提取...

回答 1 投票 0

在 Confluence Kafka 的 Flex 模板映像中获取信任库文件

我们尝试将 truststore.jks 文件存储在 Flex Template Docker 中,但在管道中使用它时我们无法找到它。 我们尝试拉取图像,我们可以看到文件是 p...

回答 2 投票 0

从 Beam 管道连接 google cloud sql postgres 实例时出现问题

我在连接 Google Cloud SQL 上的 Postgresql 实例时遇到一些问题,想寻求帮助。我不确定解决方案是否是启动连接引擎或其他...

回答 2 投票 0

如何在管道的下一步中从 DB I/O 连接器访问 PCollection

我使用Apache-beam编写了一个小型管道。它使用beam-postgres作为输入连接器从数据库表创建PCollection。 代码如下所示 - 导入 apache_beam 作为光束 来自

回答 2 投票 0

无法传入Dataflow的Requirements.txt

我一直在尝试在 Google Cloud Dataflow 上部署管道。到目前为止,这是一个相当大的挑战。 我面临导入问题,因为我意识到 ParDo 函数需要requirements.txt...

回答 2 投票 0

使用 Beam Dataflow 流式传输 (python) 从 pub/sub 读取并写入 Firestore(本机)

我正在使用 Dataflow 滑动窗口从 pub/sub 读取内容,在创建实体并写入 Firestore 本机之前应用一些转换。我发现 Beam 不支持本机 Firestore...

回答 1 投票 0

ApacheBeam python 统计有多少订单低于 15,有多少订单以上或等于,k = count v = sum

我正在读取 .csv,我拆分并仅获取最后一个值。 我将字符串转换为浮点数 我比较是否小于 15,并附加到 lessthanList 值, 否则,追加到 morethanList 导入阿帕奇...

回答 1 投票 0

如何在下一步中访问先前 Ptransform 的输出

我必须在Python中创建一个Apache数据束,它必须执行以下功能- 从数据库表中读取符合特定条件的条目 为该记录调用第一个 REST API 在响应...

回答 1 投票 0

© www.soinside.com 2019 - 2024. All rights reserved.