Apache Beam是用于批处理和流处理的统一SDK。它允许使用特定于Beam的DSL指定大规模数据处理工作流程。 Beam工作流可以在不同的运行时执行,如Apache Flink,Apache Spark或Google Cloud Dataflow(云服务)。
Python 中 apache Beam 上的 csv 配对出现 UnicodeEncodeError
我正在努力在 apache beam python 中解析 CSV 文件。但是,当 CSV 文件中存在一些 unicode 字符(例如“ş”)时,它无法解析并出现错误 运行时错误:UnicodeEncodeError:'asc...
如何创建空的PCollection<KV<String, Object>>
我正在尝试创建一个名为 Incident 的自定义对象的空 PCollection 公共类事件实现可序列化{ 私有整数事件ID; 私有字符串appId; 私人长
我正在datalab中“玩”apache beam/dataflow。 我正在尝试从 gcs 读取 csv 文件。 当我使用以下命令创建 pcollection 时: 线= p | 'ReadMyFile' >> beam.io.ReadFromText('gs://' +
Java Apache Beam ProcessElement 方法必须为 void?
在Java Apache Beam中,@ProcessElement方法是否需要为void?或者它可以返回一个 int、string 或 class? 我们正在进行单元测试,并希望验证方法的输出。我知道那里...
如何将两个 PCollection 中的计数合并到一个对象中
我有一个 apache beam 管道,可以读取 CSV 文件并对 bigquery 表进行查询,我需要计算每个 PCollection 中有多少个寄存器,并基于此创建 FinalResult ob...
在 Apache Beam 和 Dataflow 中使用 ReadFromKafka 时出错
我正在尝试使用 Apache Beam 的 Python SDK 连接到 Kafka 主题,并将管道作为数据流作业提交。 这是我的代码片段 导入系统 导入 apache_beam 作为光束 来自 apache_beam。
Apache Beam python fileio.WriteToFiles 过度分片
我在流式Python管道中使用fileio.WriteToFiles。我明确指定了预期的分片计数,如下所示 fileio.WriteToFiles( 路径=..., file_naming=fileio.default_file_na...
如何在使用 apache beam 编写的流式管道中读取 bigquery
我想运行一个从 Google bigquery 表中连续读取的流管道。现在,我的流管道在从 bigquery 表读取一次后停止。 apache beam 文档...
是否可以将模拟与 doFn 一起使用?我在 doFn 中有一个从 gcs 存储桶读取的处理方法,我想测试一下。 我尝试使用 LocalStorageHelper 但它不支持存储桶获取
如何在传递给 pardo 时修改数据流运行时值提供程序参数?
我在尝试修改传递给 Apache Beam Dataflow 管道中 RuntimeValueProvider 的参数时遇到问题。这是我的代码的简化版本: 导入 apache_beam...
python 数据流:GroupByKey 无法应用于具有全局窗口和默认触发器的无界 PCollection
我有一个简单的 python 数据流代码,它使用无界 pcollection 。它只是 从 pubsub 读取 解析为带有输出标签 SUCCESS 和 FAILURE 的 json 使用输出标签 SUCCESS 和 F 验证 json...
Apache Beam 优化 Firestore 读取 python
我有传感器数据到达 pub/sub (protobuf),它作为 python 字典插入到“pipeline_fstore”中。数据一次到达。 在管道中,在“添加元数据...
我有一个写入 BigQuery 表的数据流作业。每个数据流作业都会创建一个新表。 我意识到对 BigQuery 表的写入操作是异步的,即对
在 apache beam ReadFromKafka 中的一个主题中可以确定分区位置之前的超时
我正在从事 Google 数据流工作,我正在使用 apache beam ReadFromKafka 来消费主题消息。我正在消耗 4 个主题。在我们向我们的 kafka clu 添加新的代理之后,管道曾经工作正常......
所以我正在使用带有SpringBoot的Apache Beam,并在.query中使用JDBCIO,我正在从表“customer”中检索记录(从records.customer中选择*,其中customer_code =“abc&quo...
阿帕奇光束|无法安装 apache beam 并显示子进程错误
我已经安装了最新的 pip、python 3.12,但什么都没有..仍然显示此错误: “为收集的包构建轮子:grpcio-tools grpcio-tools 的构建轮子(setup.py):已启动
如何使用 DataFlow 将数据从 Pub/Sub 流式传输到 Google BigTable?
我想问是否有人可以告诉我,甚至给我展示一个数据流作业模板的示例,最好是用 Python 编写,我可以在其中: 持续从 Pub/Sub 主题读取 JSON 数据 处理这个数据...
我们目前是 Dataflow 批处理作业的大用户,并且希望开始使用 Dataflow 流(如果可以可靠地完成)。 这是一个常见的场景:我们需要一个非常大的 Kafka 主题......
在 GCP Dataflow 上以编程方式部署和运行 Beam 管道
我正在尝试使用 google-cloud-dataflow 以编程方式在 GCP 数据流上部署一些光束管道,但不确定如何做到这一点。 这些管道已经打包为 jar,我的目标是......
如何预构建worker容器Dataflow? [洞察“SDK Worker容器镜像预构建:可以启用”]
我想知道如何预构建工作容器并同时使用 setup.py 文件来实现多个文件依赖项。 即使当我使用这个官方模板时,我仍然有见解:“SDK