apache-beam 相关问题

Apache Beam是用于批处理和流处理的统一SDK。它允许使用特定于Beam的DSL指定大规模数据处理工作流程。 Beam工作流可以在不同的运行时执行,如Apache Flink,Apache Spark或Google Cloud Dataflow(云服务)。

是否有 Apache Beam 功能来收集固定数量的元素?

我对 Apache Beam 和 Dataflow 相当陌生,我想从具有 n 个元素的 PCollection 中收集或“批处理”k 个元素。在这种情况下,n 不是固定数字并且...

回答 1 投票 0

Dataflow:在流作业中刷新有界 PCollection

我正在运行一个流数据流作业,从无界的 PCollection 接收事件。 我想将无界 PCollection 与有界 PCollection 结合起来。 有界 PCollection 的内容是

回答 1 投票 0

Google Dataflow:在流式管道中的 BigQuery 中插入 + 更新

主要对象 一个 python 流管道,我在其中读取来自 pub/sub 的输入。 分析输入后,有两个选项可用: 如果 x=1 -> 插入 如果 x=2 -> 更新 测试 这可不...

回答 2 投票 0

如何使用 Flex 模板部署多语言 Google Dataflow 管道?

我有一个使用 Java 外部转换的 Google Dataflow Batch 管道。 当我使用 Java -jar 启动扩展服务并使用 Dataflow Runne 启动管道时,它运行良好...

回答 1 投票 0

如何让 extra_package 选项适用于 Dataflow Flex 模板?

我有一个数据流弹性模板,我正在尝试运行它,它必须安装一个私人存储库。我按照此处的 Beam 文档进行操作,该文档说使用 --extra_package 管道选项来指定 p...

回答 2 投票 0

Dataflow Flex 模板失败,并出现错误“未知的非复合转换瓮”

我正在尝试使用 Google Cloud Dataflow Flex 模板运行 Apache Beam 流,但我创建的模板继续失败,并显示以下错误消息: java.lang.IllegalArgumentException:

回答 1 投票 0

Apache Beam Python SDK - 使用 JDBC io 从 Postgres 读取

我正在寻找有关如何使用 Beam Python SDK 从 Postgres 读取数据/向 Postgres 写入数据的资源。到目前为止,我了解到 apache_beam.io.jdbc 是我们最好的选择(如果有更好的选择,请告诉我) ...

回答 1 投票 0

解释 Apache Beam python 语法

我已经阅读了 Beam 文档,也浏览了 Python 文档,但没有找到对大多数示例 Apache Beam 代码中使用的语法的良好解释。 可以...

回答 2 投票 0

如何防止作业重新启动期间 GCP Dataflow 数据丢失(Flex 模板 SpanenrToBigQuery)

我将变更流数据从 Spanner 传输到 Big Query 中。我使用默认的 --template-file-gcs-location=gs://dataflow-templates-us-central1/latest/flex/Spanner_Change_Streams_to_BigQuery flex 模板...

回答 1 投票 0

使用嵌套函数在 Google Dataflow 上运行 Apache Beam 管道时出现名称错误

我正在使用 Python 开发 Apache Beam 管道,在 Google Dataflow 上运行管道时遇到 NameError。该错误特别提到“json_encoder”未定义......

回答 1 投票 0

GCP Dataflow 无法从写入该文件的 GCP 存储位置读取“pipeline.pb”文件

我正在尝试使用以下命令运行数据流管道: !python3 ~/pipelines/Beam/pipeline.py \ --project='project_id' \ --region='区域' \ --dataset_id='dataset_id' \ --

回答 1 投票 0

如何在 PCollection 中使用遗留类?

我想在 Beam PCollections 中使用旧版 API 中的 Java 类。我专门与 PCollection of Row 合作。我无法访问旧版 API 的源代码,因此无法对其进行注释或声明...

回答 1 投票 0

是否可以向从 avro 模式生成的类添加注释?

我使用gradle插件从avro模式(com.github.davidmc24.gradle.plugin.avro)生成类,是否可以为前@DefaultSchema(JavaBeanSchema.class)添加任何注释到类级别...

回答 1 投票 0

在数据流 2.x 中将 TableRow 转换为 JSON 格式字符串的最简单方法?

缺少编写自己的函数来执行此操作,将数据流 2.x 管道内的 TableRow 对象转换为 JSON 格式的字符串的最简单方法是什么? 我认为下面的代码可以工作,但是...

回答 3 投票 0

Apache Beam 写入 Kafka 时的错误处理

通过KafkaIO发送到Kafka时如何正确捕获异常? KafkaIO.write() .withBootstrapServers(kafkaBroker) .withTopic(主题) ...

回答 1 投票 0

java.lang.NoClassDefFoundError:org/apache/beam/sdk/coders/CoderProviderRegistrar

我在尝试将着色 jar 作为 Spark 作业提交到 dataproc 时收到此错误: java.lang.NoClassDefFoundError: org/apache/beam/sdk/coders/CoderProviderRegistrar 我确信这堂课...

回答 1 投票 0

Airflow 任务失败,返回码 Negsignal.SIGKILL

您好 Stack Overflow 社区, 我正在 GCP Cloud Composer 上运行 Airflow(版本 2.5.3)DAG,其中有几个任务将触发基于 java 的数据流作业。任务的代码看起来像...

回答 1 投票 0

Apache Beam 在相同数量的元素后触发窗格

我们使用 Apache Beam 向 API 发送一些数据。对于单个 API 调用,API 仅接受一定数量的元素。 为了将元素分组为微批次,我们使用了混合触发器,当其中一个元素时就会触发...

回答 1 投票 0

Apache Beam BigqueryIO(Java)io.grpc.StatusRuntimeException:INVALID_ARGUMENT:创建 upsert 流需要主集群键

我正在使用apache beam java从一个bigquery表中读取并使用applyRowMutations()写入另一个bigquery表,但它不起作用。 我已经使用

回答 1 投票 0

如何使用 Apache Beam Python SDK 读取 MQTT

我正在尝试使用具有多语言支持的 Apache Beam Python SDK v2.50.0 读取 MQTT 主题 - MQTT IO 可通过 Java SDK 获得,但不能通过 Python SDK 获得。这是我的文档

回答 1 投票 0

© www.soinside.com 2019 - 2024. All rights reserved.