apache-beam 相关问题

Apache Beam是用于批处理和流处理的统一SDK。它允许使用特定于Beam的DSL指定大规模数据处理工作流程。 Beam工作流可以在不同的运行时执行,如Apache Flink,Apache Spark或Google Cloud Dataflow(云服务)。

使用 Dataflow Apache Beam 下沉到 BigQuery 的正确格式

bcdate好像出错了,我改成正确的格式还是报错,我的代码如下: def transform_pandas(数据): 将熊猫导入为 pd 导入 json 我...

回答 2 投票 0

在 python 中使用 apache beam 从 pubsub 输出中提取列值

我正在尝试从 PubSub 订阅中提取数据,最后,一旦数据被提取,我想做一些转换。目前,它是字节格式。我尝试了多种方法来提取...

回答 1 投票 0

在 Apache Beam 中使用 PubsubIO.writeStrings() 的优势

Apache Beam 提供 PubsubIO.writeStrings() 方法,该方法返回一个 PTransform,可用于直接从 PCollection 将消息写入 Pubsub:示例。 有没有什么明显的优势...

回答 1 投票 0

使用 Beam 从 Pubsub 读取数据时遇到错误

我是 Beam 的新手,我想亲自动手。我正在读取 JSON 文件并将数据发送到发布-订阅主题。下面是代码片段。我的意图是从 pub-s 读取数据...

回答 0 投票 0

在 Apache beam 中启动管道之前进行一些预处理

我需要使用 Java SDK 创建一个 apache beam 管道,它将从 Google Cloud Storage 读取数据。这些文件由上游进程推送,并且可能包含无效文件。例如,文件 ...

回答 0 投票 0

如何读取文件并保留 python beam 中的行顺序

我正在尝试从 GCS 存储桶中读取 python beam 中的文件并对其进行处理,在处理时我想添加一个新字段作为 line_number ,这是来自 fi 的原始行号...

回答 0 投票 0

Google 数据流转换需要 jks

在 GCP 数据流中,作为转换的一部分,我需要从 URL 下载内容。为此,我需要使用包含安全证书的 .jks 文件。我总是找不到证书,除了...

回答 0 投票 0

如何在 Apache Beam on Dataflow 的基于事件的架构中加入/处理数据?

我有一个用例,我必须使用唯一键加入数据,并等待事件在识别数据中的特定字段时最后处理装袋数据。代码应该...

回答 0 投票 0

在 Python 中使用 Apache Beam 读取多行 JSON

我无法正确从 Google Cloud Storage 读取 JSON 文件。 输入的格式在他的基本结构中看起来像这样: [ { "id": "CANT14", “实体……

回答 0 投票 0

将 Apache Beam Python 与 GCP Dataflow 结合使用时,是否具体化 GroupByKey 的结果是否重要?

将 Apache Beam Python 与 GCP Dataflow 结合使用时,具体化 GroupByKey 的结果是否存在缺点,例如,计算元素的数量。例如: def consume_group_by_key(元素)...

回答 1 投票 0

数据流作业成功后如何激活云功能?

我的 Apache Beam 管道的代码如下。我的云函数(Topic-name: projects/wisdomcircle-350611/topics/uat-timestamp-job-trigger)必须在下面的数据流任务为

回答 0 投票 0

连续执行数据流作业?

如何同步运行 apache beam 数据流管道? 这是 apache beam 自定义管道代码(数据流)。 def run_pipeline(jdbc_url, username, password, types, wisgen_table, recruiter_table,

回答 1 投票 0

为什么我们需要在 Apache Beam ParDo 函数中显式返回列表?

我有下面的代码,它对 CSV 文件执行一些操作。 SplitRow 类(beam.DoFn): def 过程(自我,元素): 返回 [element.split(',')] 类 FilterCardioPatients(beam.DoFn): 定义...

回答 1 投票 0

Beam 管道 spark runner 问题

我有一个光束管道,它从运动流中读取,反序列化内部的 protobuf 数据,更改为字节数组并将其写入另一个运动流(只是一个虚拟管道) 该管道执行

回答 0 投票 0

特定 PTransform 中的报告指标会导致性能问题?

我目前正在使用 Apache Beam 开发数据处理管道,我对在特定 PTransform 中报告指标的最佳实践很感兴趣。在这种情况下,PTransform 将提取一个

回答 1 投票 0

Maven 编译漏课

我在我的项目中使用 maven,我正在尝试在 gcloud shell 中运行我的程序。该程序在我的本地机器上使用 Intelij 正常工作,但在 mvn compile 和 mvn package 后的 gcloud 中不是全部

回答 0 投票 0

Apache Beam 无界管道窗口化

我有一个在 Dataflow 中运行的无界 Apache Beam 管道,它执行一组非常简单的指令: 它读取发布订阅消息 (PubsubIO) 它从消息中提取时间戳,提取数据 f...

回答 1 投票 0

带有 Flink runner 的无状态 Beam 管道 - pubsublite 消息在写入 Kafka 之前得到确认

我是 Beam/Flink 的新手。所以不确定这个问题是否与 Beam 或 Flink 有关。我们正在设置使用 Flink runner 运行 Beam 应用程序。 我有一个相当无状态的流媒体

回答 1 投票 0

在数据流(Apache Beam)作业中导入共享 Python 模块

我正在使用 Google Cloud 的数据流工具构建一系列数据管道。 这是我的文件结构: -- 数据流作业 |-- 自述文件.md |-- 清除导出 | |-- __init__.py | ...

回答 3 投票 0

如何在 Google Cloud Platform (apache beam) 中同步运行管道的作业操作

在 Google Cloud Platform(dataflowRunner) 中运行 apache beam pipeline,可能会有这样的情况,即在所有其他步骤完成后才想运行一些代码。 这是我的 python 代码 p = b...

回答 0 投票 0

© www.soinside.com 2019 - 2024. All rights reserved.