apache-beam 相关问题

Apache Beam是用于批处理和流处理的统一SDK。它允许使用特定于Beam的DSL指定大规模数据处理工作流程。 Beam工作流可以在不同的运行时执行,如Apache Flink,Apache Spark或Google Cloud Dataflow(云服务)。

Python 中 apache Beam 上的 csv 配对出现 UnicodeEncodeError

我正在努力在 apache beam python 中解析 CSV 文件。但是,当 CSV 文件中存在一些 unicode 字符(例如“ş”)时,它无法解析并出现错误 运行时错误:UnicodeEncodeError:'asc...

回答 1 投票 0

如何创建空的PCollection<KV<String, Object>>

我正在尝试创建一个名为 Incident 的自定义对象的空 PCollection 公共类事件实现可序列化{ 私有整数事件ID; 私有字符串appId; 私人长

回答 1 投票 0

Apache Beam / GCP 数据流编码问题

我正在datalab中“玩”apache beam/dataflow。 我正在尝试从 gcs 读取 csv 文件。 当我使用以下命令创建 pcollection 时: 线= p | 'ReadMyFile' >> beam.io.ReadFromText('gs://' +

回答 3 投票 0

Java Apache Beam ProcessElement 方法必须为 void?

在Java Apache Beam中,@ProcessElement方法是否需要为void?或者它可以返回一个 int、string 或 class? 我们正在进行单元测试,并希望验证方法的输出。我知道那里...

回答 1 投票 0

如何将两个 PCollection 中的计数合并到一个对象中

我有一个 apache beam 管道,可以读取 CSV 文件并对 bigquery 表进行查询,我需要计算每个 PCollection 中有多少个寄存器,并基于此创建 FinalResult ob...

回答 1 投票 0

在 Apache Beam 和 Dataflow 中使用 ReadFromKafka 时出错

我正在尝试使用 Apache Beam 的 Python SDK 连接到 Kafka 主题,并将管道作为数据流作业提交。 这是我的代码片段 导入系统 导入 apache_beam 作为光束 来自 apache_beam。

回答 1 投票 0

Apache Beam python fileio.WriteToFiles 过度分片

我在流式Python管道中使用fileio.WriteToFiles。我明确指定了预期的分片计数,如下所示 fileio.WriteToFiles( 路径=..., file_naming=fileio.default_file_na...

回答 1 投票 0

如何在使用 apache beam 编写的流式管道中读取 bigquery

我想运行一个从 Google bigquery 表中连续读取的流管道。现在,我的流管道在从 bigquery 表读取一次后停止。 apache beam 文档...

回答 1 投票 0

如何在java中使用mocks测试doFn?

是否可以将模拟与 doFn 一起使用?我在 doFn 中有一个从 gcs 存储桶读取的处理方法,我想测试一下。 我尝试使用 LocalStorageHelper 但它不支持存储桶获取

回答 1 投票 0

如何在传递给 pardo 时修改数据流运行时值提供程序参数?

我在尝试修改传递给 Apache Beam Dataflow 管道中 RuntimeValueProvider 的参数时遇到问题。这是我的代码的简化版本: 导入 apache_beam...

回答 1 投票 0

python 数据流:GroupByKey 无法应用于具有全局窗口和默认触发器的无界 PCollection

我有一个简单的 python 数据流代码,它使用无界 pcollection 。它只是 从 pubsub 读取 解析为带有输出标签 SUCCESS 和 FAILURE 的 json 使用输出标签 SUCCESS 和 F 验证 json...

回答 1 投票 0

Apache Beam 优化 Firestore 读取 python

我有传感器数据到达 pub/sub (protobuf),它作为 python 字典插入到“pipeline_fstore”中。数据一次到达。 在管道中,在“添加元数据...

回答 1 投票 0

如何检查 BigQuery 表的状态

我有一个写入 BigQuery 表的数据流作业。每个数据流作业都会创建一个新表。 我意识到对 BigQuery 表的写入操作是异步的,即对

回答 1 投票 0

在 apache beam ReadFromKafka 中的一个主题中可以确定分区位置之前的超时

我正在从事 Google 数据流工作,我正在使用 apache beam ReadFromKafka 来消费主题消息。我正在消耗 4 个主题。在我们向我们的 kafka clu 添加新的代理之后,管道曾经工作正常......

回答 1 投票 0

什么时候使用Entity,什么时候使用DTO?

所以我正在使用带有SpringBoot的Apache Beam,并在.query中使用JDBCIO,我正在从表“customer”中检索记录(从records.customer中选择*,其中customer_code =“abc&quo...

回答 3 投票 0

阿帕奇光束|无法安装 apache beam 并显示子进程错误

我已经安装了最新的 pip、python 3.12,但什么都没有..仍然显示此错误: “为收集的包构建轮子:grpcio-tools grpcio-tools 的构建轮子(setup.py):已启动

回答 1 投票 0

如何使用 DataFlow 将数据从 Pub/Sub 流式传输到 Google BigTable?

我想问是否有人可以告诉我,甚至给我展示一个数据流作业模板的示例,最好是用 Python 编写,我可以在其中: 持续从 Pub/Sub 主题读取 JSON 数据 处理这个数据...

回答 1 投票 0

数据流从Kafka读取数据不会丢失?

我们目前是 Dataflow 批处理作业的大用户,并且希望开始使用 Dataflow 流(如果可以可靠地完成)。 这是一个常见的场景:我们需要一个非常大的 Kafka 主题......

回答 1 投票 0

在 GCP Dataflow 上以编程方式部署和运行 Beam 管道

我正在尝试使用 google-cloud-dataflow 以编程方式在 GCP 数据流上部署一些光束管道,但不确定如何做到这一点。 这些管道已经打包为 jar,我的目标是......

回答 1 投票 0

如何预构建worker容器Dataflow? [洞察“SDK Worker容器镜像预构建:可以启用”]

我想知道如何预构建工作容器并同时使用 setup.py 文件来实现多个文件依赖项。 即使当我使用这个官方模板时,我仍然有见解:“SDK

回答 1 投票 0

© www.soinside.com 2019 - 2024. All rights reserved.