apache-beam 相关问题

Apache Beam是用于批处理和流处理的统一SDK。它允许使用特定于Beam的DSL指定大规模数据处理工作流程。 Beam工作流可以在不同的运行时执行,如Apache Flink,Apache Spark或Google Cloud Dataflow(云服务)。

Apache Beam 所说的“组合函数不一定被调用一次”是什么意思?

在关于组合函数的 Beam 文档中,它说了以下内容: 当您应用组合变换时,您必须提供包含组合元素的逻辑的函数或

回答 1 投票 0

Apache Beam(数据流):是否可以创建具有多个窗口需求的管道

我正在尝试思考如何构建一些数据管道需求,我只是想知道以下是否可能: 我可以创建一个可以完全实时传输数据的 Apache Beam 管道吗...

回答 1 投票 0

数据流管道运行错误:SDK 断开连接

我正在尝试使用 DataflowRunner 和 --no_use_public_ips 运行测试数据流管道。 它正在从 Bigquery 读取一个小表并将 csv 写入存储中,所有这些都在同一个项目中。 python3 ./数据...

回答 1 投票 0

使用 GCP Dataflow 在 Airflow 中进行 BeamRunJavaPipelineOperator 设置(执行非模板化作业!)

我正在努力让我的 BeamRunJavaPipeline() 在 Airflow 中工作以在 GCP 上运行数据流作业。 我已经在 Google Cloud Storage 中拥有了 jar 文件。 我基本上是在寻找指针和实用的

回答 1 投票 0

尝试让 Airflow 中的 BeamRunJavaPipelineOperator 使用 GCP Dataflow 工作(执行非模板化工作!)

我正在努力让我的 BeamRunJavaPipeline() 在 Airflow 中工作以在 GCP 上运行数据流作业。 我已经在 Google Cloud Storage 中拥有了 jar 文件。 我基本上是在寻找指针和实用的

回答 1 投票 0

Google Cloud Dataflow BigQuery 到 Bigtable 传输 - 限制写入速度?

我有许多数据流模板可将数据从 BigQuery 复制到 Bigtable 表。 其中最大的数据约为 900 万行、22GB。 没有复杂的突变,它只是一个co...

回答 1 投票 0

通过 GCP 数据流和 GCP 功能处理文件

我目前正在使用以下代码来处理放置在输入存储桶中的 csv 文件。我使用数据流处理它们,但我在数据流作业中遇到错误。 我已经列出了代码和错误...

回答 1 投票 0

带有 STORAGE_API_WRITE 的 BigQueryIO 批处理管道不会截断表

我有一个 BATCH 管道,需要写入 BigQuery 来截断表。我正在使用方法 STORAGE_API_WRITE 并且该表不会被截断,而是会附加值。 .申请(

回答 1 投票 0

无法从 Dataflow 将数据插入 BigQuery(使用 Python SDK)

当我尝试从数据流写入 BigQuery 时,我试图找出 BigQuery 所期望的正确架构。 我采用了 Apache Beam 流媒体 pubsub-to-pubsub 示例,只是......

回答 1 投票 0

Apache Beam Streaming 写入/读取 BigQuery

我正在运行一个流管道,我尝试在 BigQuery 中写入,然后从中读取。在阅读之前,有没有办法确保我刚刚写的内容存在? 我正在使用Python:

回答 1 投票 0

是否有 Apache Beam 功能来收集固定数量的元素?

我对 Apache Beam 和 Dataflow 相当陌生,我想从具有 n 个元素的 PCollection 中收集或“批处理”k 个元素。在这种情况下,n 不是固定数字并且...

回答 1 投票 0

Dataflow:在流作业中刷新有界 PCollection

我正在运行一个流数据流作业,从无界的 PCollection 接收事件。 我想将无界 PCollection 与有界 PCollection 结合起来。 有界 PCollection 的内容是

回答 1 投票 0

Google Dataflow:在流式管道中的 BigQuery 中插入 + 更新

主要对象 一个 python 流管道,我在其中读取来自 pub/sub 的输入。 分析输入后,有两个选项可用: 如果 x=1 -> 插入 如果 x=2 -> 更新 测试 这可不...

回答 2 投票 0

如何使用 Flex 模板部署多语言 Google Dataflow 管道?

我有一个使用 Java 外部转换的 Google Dataflow Batch 管道。 当我使用 Java -jar 启动扩展服务并使用 Dataflow Runne 启动管道时,它运行良好...

回答 1 投票 0

如何让 extra_package 选项适用于 Dataflow Flex 模板?

我有一个数据流弹性模板,我正在尝试运行它,它必须安装一个私人存储库。我按照此处的 Beam 文档进行操作,该文档说使用 --extra_package 管道选项来指定 p...

回答 2 投票 0

Dataflow Flex 模板失败,并出现错误“未知的非复合转换瓮”

我正在尝试使用 Google Cloud Dataflow Flex 模板运行 Apache Beam 流,但我创建的模板继续失败,并显示以下错误消息: java.lang.IllegalArgumentException:

回答 1 投票 0

Apache Beam Python SDK - 使用 JDBC io 从 Postgres 读取

我正在寻找有关如何使用 Beam Python SDK 从 Postgres 读取数据/向 Postgres 写入数据的资源。到目前为止,我了解到 apache_beam.io.jdbc 是我们最好的选择(如果有更好的选择,请告诉我) ...

回答 1 投票 0

解释 Apache Beam python 语法

我已经阅读了 Beam 文档,也浏览了 Python 文档,但没有找到对大多数示例 Apache Beam 代码中使用的语法的良好解释。 可以...

回答 2 投票 0

如何防止作业重新启动期间 GCP Dataflow 数据丢失(Flex 模板 SpanenrToBigQuery)

我将变更流数据从 Spanner 传输到 Big Query 中。我使用默认的 --template-file-gcs-location=gs://dataflow-templates-us-central1/latest/flex/Spanner_Change_Streams_to_BigQuery flex 模板...

回答 1 投票 0

使用嵌套函数在 Google Dataflow 上运行 Apache Beam 管道时出现名称错误

我正在使用 Python 开发 Apache Beam 管道,在 Google Dataflow 上运行管道时遇到 NameError。该错误特别提到“json_encoder”未定义......

回答 1 投票 0

© www.soinside.com 2019 - 2024. All rights reserved.