Apache Beam是用于批处理和流处理的统一SDK。它允许使用特定于Beam的DSL指定大规模数据处理工作流程。 Beam工作流可以在不同的运行时执行,如Apache Flink,Apache Spark或Google Cloud Dataflow(云服务)。
如何在 Apache Beam (Python SDK) 中使用 `with_outputs` 和 `with_output_types`?
Apache Beam PTransform 可以附加 with_outputs 和 with_output_types。例如, pcoll | beam.CombinePerKey(sum).with_output_types(typing.Tuple[unicode, int]) 和 (词 | 梁.ParDo(
Dataflow Runner 与 GCP Secret Manager 存在问题
我有一个使用 Java 编码的 Apache Beam Dataflow 项目,其中我使用以下子例程来获取数据库凭据: 私有静态 JsonObject getCredentials(String suffix) { 字符串
获取匿名调用者在数据流上运行 wordcount 时没有 storage.objects.create 访问错误
我正在使用python运行apache beam包中wordcount模块的数据流快速启动。我能够在我的机器上本地运行它。但是,当我尝试通过指定
apache beam 和 Big Query TableSchema 中的序列化问题
并感谢您的支持。 我目前正在尝试使用 Apache Beam,以尽可能多地了解它的工作原理。我面临 com.google.api.serv 序列化的问题...
如何使用BigQueryToPostgresOperator
我是在 GCP 上使用 apache-airflow 的新手,我正在尝试在 Dataproc 无服务器内的 DAG 上使用 BigQueryToPostgresOperator 将表从 Bigquery 发送到 Cloud SQL,特别是发送到
Apache Beam DirectRunner 与 FlinkRunner 示例
我使用beam yaml(python sdk)构建了最简单的管道,其中读取csv文件并应打印到日志。 使用默认 DirectRunner 运行时: python -m apache_beam.yaml.main --
我有一个 BigQuery 表,其中包含由结构化数据 (RECORD) 组成的 REPEATED 字段,该字段仅由两个键组成:KEY 和 VALUE。它看起来像这样: [{“KEY”:“TESTING_FLAG”,“...
在 Dataflow Python flex 模板中包含另一个文件,ImportError
是否有一个包含多个文件的 Python 数据流 Flex 模板示例,其中脚本导入同一文件夹中包含的其他文件? 我的项目结构是这样的: ├── 管道...
运行我的数据流作业时,它很早就失败了(没有数据处理,似乎没有启动工作程序),并显示一条错误消息: 工作流程失败。 我尝试运行基于...的流数据流作业
有一个使用 Apache Beam 和 Dataflow 作为运行器的 Python 应用程序。该应用程序使用非公共 Python 包“uplight-telemetry”,该包是使用“extra_packages”配置的,而 cr...
AttributeError:“tuple”对象在有状态 DoFn 期间没有属性“encode”
我正在使用有状态且及时的 DoFn 在我正在实现的固定窗口结束后 2 秒处理数据。 我已经在 Apache Beam 游乐场内测试了我的代码的可重现示例...
相同的 Apache Beam 代码适用于 Direct Runner,但不适用于 Dataflow 运行器
我有一段 apache 束管代码,它从 GCS 存储桶中的文件读取并打印它。它与 DirectRunner 完美配合并打印文件输出,但与 Dataflow 运行器配合...
Cloud Pub/Sub 中的订购交付是否可以与 Cloud Dataflow 配合使用
我有从 pubsub 读取消息的数据流作业,但是,它以错误的顺序处理元素,我正在使用窗口来获取最近的元素,但有时消息不会被提取...
在 Confluence Kafka 的 Flex 模板映像中获取信任库文件
我们尝试将 truststore.jks 文件存储在 Flex Template Docker 中,但在管道中使用它时我们无法找到它。 我们尝试拉取图像,我们可以看到文件是 p...
从 Beam 管道连接 google cloud sql postgres 实例时出现问题
我在连接 Google Cloud SQL 上的 Postgresql 实例时遇到一些问题,想寻求帮助。我不确定解决方案是否是启动连接引擎或其他...
如何在管道的下一步中从 DB I/O 连接器访问 PCollection
我使用Apache-beam编写了一个小型管道。它使用beam-postgres作为输入连接器从数据库表创建PCollection。 代码如下所示 - 导入 apache_beam 作为光束 来自
我一直在尝试在 Google Cloud Dataflow 上部署管道。到目前为止,这是一个相当大的挑战。 我面临导入问题,因为我意识到 ParDo 函数需要requirements.txt...
使用 Beam Dataflow 流式传输 (python) 从 pub/sub 读取并写入 Firestore(本机)
我正在使用 Dataflow 滑动窗口从 pub/sub 读取内容,在创建实体并写入 Firestore 本机之前应用一些转换。我发现 Beam 不支持本机 Firestore...
ApacheBeam python 统计有多少订单低于 15,有多少订单以上或等于,k = count v = sum
我正在读取 .csv,我拆分并仅获取最后一个值。 我将字符串转换为浮点数 我比较是否小于 15,并附加到 lessthanList 值, 否则,追加到 morethanList 导入阿帕奇...
我必须在Python中创建一个Apache数据束,它必须执行以下功能- 从数据库表中读取符合特定条件的条目 为该记录调用第一个 REST API 在响应...