Apache Beam是用于批处理和流处理的统一SDK。它允许使用特定于Beam的DSL指定大规模数据处理工作流程。 Beam工作流可以在不同的运行时执行,如Apache Flink,Apache Spark或Google Cloud Dataflow(云服务)。
Beam ReadFromKafka `with_metadata=True` 编码错误
使用Python SDK 2.49.0(调用Javaharness)中的ReadFromKafka在使用with_metadata=True时会引发编码错误: java.lang.IllegalArgumentException:无法编码元素'org.apache.beam ....
我需要实现一个具有并行管道的作业数据流(一个用于文件configuration.json中找到的每个实体)。 第一步是从 pub/sub 读取一个事件,通知文件到达
我正在尝试使用 Apache Beam Pardo 访问 Avro Generic Record 中的嵌套字段。 我可以进入第一层,但我不知道如何访问更进一步的字段。 因为如果你...
此管道代码作为 Direct runner 运行良好,但在 Dataflow runner 上运行时出错 'name 'read_inputpath_date' is not defined'
从 apache_beam.options.pipeline_options 导入 PipelineOptions 导入操作系统 导入日志 将 apache_beam 导入为光束 导入 gzip 导入json,io 从 google.cloud 导入存储 os.environ["
Dataflow/ApacheBeam 将输入限制为第一个 X 数量?
我有一个有限的 PCollection,但我只想获得第一个 X 数量的输入并丢弃其余的。有没有办法使用 Dataflow 2.X/ApacheBeam 来做到这一点?
我正在尝试扩展 Google 的 Dataflow 模板以将数据从 BQ 移动到 Cloud Storage 上的 parquet 文件,但我在尝试控制 parquet 文件大小时受阻。 https://cloud.google.com/dataflow/docs/...
Go 中的 Apache Beam IO:sql:未知驱动程序“cloudsql-postgres”(忘记导入?)
按照此处的指南,我正在尝试将数据流作业连接到 Go 中 GCP 上的 Cloud SQL Postgres 实例。 我可以在我的机器上本地运行该作业。我已经确认我的权限是...
GCP Dataflow ReadFromKafka 创建大量连接
我们正在使用 Python 创建数据流作业以从 Kafka(Amazon MSK,6 个代理,5 个分区主题)读取数据。数据流作业部署在具有 Cloud NAT(单个公共 IP)的 VPC 中,此 IP 是
读取 apache beam 数据帧中的压缩 json 文件
beam dataframe 看起来支持读取压缩的 json 文件 apache_beam.dataframe.io.read_json 但是,当我尝试通过代码读取文件时: 从 apache_beam.dataframe.io 导入
导入 org.apache.beam.runners.direct.DirectRunner; 导入 org.apache.beam.sdk.Pipeline; 导入 org.apache.beam.sdk.io.jdbc.JdbcIO; 导入 org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO; 进口...
如何使用LoggerFactory在数据流Apache Beam应用中获取日志?
我正在 Google DataFlow 上运行 Beam 应用程序,但我无法在数据流中看到信息日志。 公开课 Abc { private static final Logger LOG = LoggerFactory.getLogger(Abc.cla...
我正在尝试在 BEAM 管道中将 Log4j2 日志记录实现与 Slf4j api 一起使用。在 Maven 中它看起来像这样: org.apache.logging.log4j 我正在尝试在 BEAM 管道中将 Log4j2 日志记录实现与 Slf4j api 一起使用。在 Maven 中它看起来像这样: <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-core</artifactId> <version>2.20.0</version> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-slf4j2-impl</artifactId> <version>2.20.0</version> </dependency> 在本地运行良好。日志记录也适用于我启动管道的容器。但是,在工作人员内部,此设置会导致一些问题: Caused by: org.apache.logging.log4j.LoggingException: log4j-slf4j2-impl cannot be present with log4j-to-slf4j 这个附加库 log4j-to-slf4j 不是我项目的一部分,也没有列在依赖项中(也不在传递依赖项中)。看起来是BEAM sdk自己加的。问题是:如何配置 BEAM 以使用我的 Log4j 版本?
用于将 csv 文件加载到 bigquery 的 python 脚本
我是数据流初学者,使用这个通用脚本使用数据流将 csv 文件加载到 bigquery。 导入 argparse 导入 csv 导入日志 将 apache_beam 导入为光束 来自 apache_beam.options.
我正在尝试使用聚合查询作为 Apache Beam MongoDBIO QueryFn 的一部分。但我没有得到任何结果。 List 文档 = new ArrayList<>(); 文档.add( 新
我目前正在处理 4 个数据流作业,每个作业都在 3 个独立的环境中进行复制。在过去的 3 个月中,所有 12 个实例都已成功运行。但是,我观察到...
如何为 python beam 管道和 apache flink 创建 docker 容器?
我有点费劲地想把我的头围在容器化一个用 python 编写并由 flink 执行的光束管道上。按照官方文档,我认为修改现有容器......
我正在尝试按照本教程在 GCP 上的 python 中运行光束脚本: [https://levelup.gitconnected.com/scaling-scikit-learn-with-apache-beam-251eb6fcf75b][1] 但我不断收到以下电子邮件...
为什么 Apache Beam 似乎在单个 worker 上并行化元素?
我正在使用 Google Cloud Dataflow 运行程序测试一个非常简单的 Apache Beam 管道,该运行程序从 Pub/Sub 读取音频元素,通过 Tensorflow 模型运行元素,并将结果写入 Pub...
这里的医生说 Dataflow runner 的 PubsubIO 实现在消息被第一个融合阶段成功处理后自动确认消息以及该阶段的副作用
Apache Beam - 无界 PCollection 中的计数消息(每个窗口)
我需要一个简单的任务来计算来自无界数据源的固定窗口中的消息数。 步骤是: 从发布/订阅中读取数据 定义窗口固定时间 创建一个(键,值),其中一个键...