Apache Beam是用于批处理和流处理的统一SDK。它允许使用特定于Beam的DSL指定大规模数据处理工作流程。 Beam工作流可以在不同的运行时执行,如Apache Flink,Apache Spark或Google Cloud Dataflow(云服务)。
GCP Dataflow 无法从写入该文件的 GCP 存储位置读取“pipeline.pb”文件
我正在尝试使用以下命令运行数据流管道: !python3 ~/pipelines/Beam/pipeline.py \ --project='project_id' \ --region='区域' \ --dataset_id='dataset_id' \ --
我想在 Beam PCollections 中使用旧版 API 中的 Java 类。我专门与 PCollection of Row 合作。我无法访问旧版 API 的源代码,因此无法对其进行注释或声明...
我使用gradle插件从avro模式(com.github.davidmc24.gradle.plugin.avro)生成类,是否可以为前@DefaultSchema(JavaBeanSchema.class)添加任何注释到类级别...
在数据流 2.x 中将 TableRow 转换为 JSON 格式字符串的最简单方法?
缺少编写自己的函数来执行此操作,将数据流 2.x 管道内的 TableRow 对象转换为 JSON 格式的字符串的最简单方法是什么? 我认为下面的代码可以工作,但是...
通过KafkaIO发送到Kafka时如何正确捕获异常? KafkaIO.write() .withBootstrapServers(kafkaBroker) .withTopic(主题) ...
java.lang.NoClassDefFoundError:org/apache/beam/sdk/coders/CoderProviderRegistrar
我在尝试将着色 jar 作为 Spark 作业提交到 dataproc 时收到此错误: java.lang.NoClassDefFoundError: org/apache/beam/sdk/coders/CoderProviderRegistrar 我确信这堂课...
Airflow 任务失败,返回码 Negsignal.SIGKILL
您好 Stack Overflow 社区, 我正在 GCP Cloud Composer 上运行 Airflow(版本 2.5.3)DAG,其中有几个任务将触发基于 java 的数据流作业。任务的代码看起来像...
我们使用 Apache Beam 向 API 发送一些数据。对于单个 API 调用,API 仅接受一定数量的元素。 为了将元素分组为微批次,我们使用了混合触发器,当其中一个元素时就会触发...
Apache Beam BigqueryIO(Java)io.grpc.StatusRuntimeException:INVALID_ARGUMENT:创建 upsert 流需要主集群键
我正在使用apache beam java从一个bigquery表中读取并使用applyRowMutations()写入另一个bigquery表,但它不起作用。 我已经使用
如何使用 Apache Beam Python SDK 读取 MQTT
我正在尝试使用具有多语言支持的 Apache Beam Python SDK v2.50.0 读取 MQTT 主题 - MQTT IO 可通过 Java SDK 获得,但不能通过 Python SDK 获得。这是我的文档
连接两个Pcollection时没有schema时无法调用getSchema
我希望你一切都好。 我有两个 PCollection,我想应用 leftOuterJoin,但是当我执行此操作时,出现此错误,我不明白为什么: java.lang.IllegalStateException: Cannot
在 Python 中使用 Apache Beam 在 GCP Dataflow 中创建模板
任务: 我需要从本地 Oracle 获取数据并将其转储到 BigQuery 中。 我尝试过的选项 有多种方法可以实现相同的目的。 我尝试了第一个选项,即使用 Datastream 但由于
问题回复:FailedInserts PCollection / Beam 2.45 与 2.49 行为确认
上下文:GitHub 问题 26853 我对上述引用的功能请求以及 Beam 2.45 与 2.49 的差异有疑问。 如果 FailedInserts PCollection 未被消耗,BigQueryIO...
我想使用 Apache Beam 将 Transform 应用于侧面输入 PCollection。 应该对基本 PCollection 的每个元素执行侧面输入的转换,以及
Beam 单元测试 `assert_that` 和 `equal_to` 实际上并不比较结果
我注意到Beam的assert_that和equal_to似乎无法比较我的预期。为了进一步测试这一点,我正在使用一个非常简单的代码进行测试: 导入单元测试 导入 apache_b...
我有一个用例,可以在以下条件下从 PubSub 在 GCS 存储桶中创建文件: 窗口大小限制 发布到 PubSub 主题的消息大小超过 500MB 例如, i) 如果我们指定窗口 si...
我已经在 Apache Beam 上工作了几天。我想快速迭代我正在工作的应用程序,并确保我正在构建的管道没有错误。在spark中我们可以使用sc。
我想使用数据流在 GCS 存储桶中的每个文件之间映射函数。我想我已经很接近了,但由于某种原因,结果只是一个空文件。我在下面包含了我的代码,重现了专业版...
如何将 Huggingface 数据集加载到 Apache Beam 管道中?
假设我有一个像“oscar2301”这样非常大的数据集。 我有一个 Apache Beam 管道,我想传递 HuggingFace 数据集。当我尝试将数据集传递给 beam.Create 时,代码
Beam 作业在本地计算机上成功运行,但在数据流运行器上失败
我构建了一个梁作业,其中: 从 pubsub 读取数据(例如 {"user_id":"u1", "event_name":"logout", "region":"US"} 等消息) 从