google-cloud-dataflow 相关问题

Google Cloud Dataflow是一种完全托管的云服务,可用于大规模创建和评估数据处理管道。数据流管道基于Apache Beam编程模型,可以在批处理和流模式下运行。 Cloud Dataflow是Google云端平台的一部分。

将相同的用户事件聚合到两个不同的窗口中

我正在尝试使用Apache Beam编写一个简单的管道。假设我正在接受类似以下的用户请求:(国家/地区,user_id,得分,时间戳)我只想对总得分进行总结...

回答 1 投票 0

PCollection to Array-如何将标头动态输入到WriteToText PTransform中?

我正在使用主要在Dataflow运行器上运行的Apache Beam 2.19编写数据流作业。我正在尝试将具有嵌套字段和重复字段的BigQuery输入转换为平坦的CSV。 BQ ...

回答 1 投票 0

如何在Apache Beam Transform中创建循环

我有一个使用多个参数运行的批处理作业。这些参数之一代表清单日期,例如2020-05-01,2020-05-02,2020-05-03,对于每个日期,我都会尝试从GCP存储读取数据。在...

回答 1 投票 0

在Dataflow Javascript UDF中使用performance.now()

我正在尝试获取Google Cloud Dataflow执行的当前时间戳(以微秒为单位),以转换BigQuery记录。对于此问题,我使用以下指令:performance.timing ....

回答 1 投票 0

GCP Dataflow的处理中的步骤s07在步骤s07中停留了至少05m00s,而没有输出或未完成状态结束

我在数据流中遇到此错误:在步骤s07中停留了至少05m00s的过程,而在java.util.concurrent.locks的sun.misc.Unsafe.park(Native Method)上没有完成状态输出或完成...] >

回答 1 投票 0

Dataflow中的Apache梁出现与pardo函数相关的错误,并且梁未定义

[我正在尝试在数据流中创建我的第一个小人,当我使用交互式束流道执行时,我具有相同的代码运行,但是在数据流中,我遇到了各种各样的错误,这些错误并没有太多...

回答 1 投票 0

尽管启用了GCP Dataflow API,但未启用

因此,我正在尝试Dataflow的控制台内教程。它要求我启用我所做的API。现在,当我处于教程中间时,在其中一个步骤中遇到了这个问题:“ error”:{“ code”:400,...

回答 1 投票 0

数据流作业停止随机处理数据

我已经建立了一个管道,用于从pubsub读取数据并将其流式传输到BigQuery。在流传输数据之前,需要对其进行预处理以除去重复项。通过在固定窗口中打开窗口,然后...

回答 1 投票 0

从Google Cloud Dataflow内部写入Firestore

我现在遇到的核心问题是,当我运行部署到Google Cloud Dataflow的数据流管道时,出现错误:java.lang.IllegalStateException:名称为[DEFAULT]的FirebaseApp没有...

回答 1 投票 0

消费者组中的Apache Beam KafkaIO消费者正在读取相同的消息

我正在数据流中使用KafkaIO来读取一个主题的消息。我使用以下代码。 KafkaIO。 read().withReadCommitted()....

回答 1 投票 0

停止数据流流作业而不删除它创建的PubSub订阅

我想停止从PubSub读取的Dataflow流作业,但保留其创建的订阅。有没有办法?

回答 1 投票 0

如何在处理输入文件模式时在Apache Beam中获取DoFn内部的文件名

我正在处理一个目录内的大量文件。我想在已处理数据输出的元数据中添加fileName。因此,如果在处理过程中出现问题,我们可以检查一下...

回答 1 投票 0

Google数据流PCollection连接

我们正在从KAFKA流式传输数据,并将其存储到Google Bigtable。写入bigtable之前,需要根据同一表中的现有值来计算值。当vehcile_id ...

回答 1 投票 0

无法在数据流笔记本上的Jupyter Notebook中导入JsonPickle

我正在python的Apache Beam上构建管道,并在Dataflow上使用笔记本进行原型制作。在尝试加载JSON时,我意识到我使用的JSON编码器(基本上是JSON.loads())...

回答 2 投票 1

是否可以使用KafkaIO.read为单个管道为两个不同的集群指定Kafka引导服务器?

我目前正在使用Google Cloud Dataflow和Apache Beam来消耗来自Kafka主题的消息,该主题存在于两个不同的Kafka集群中,两个集群都包含相同的主题名称,但...

回答 2 投票 0

数据流管道中dynamicWrite FileIO中的动态文件夹名称

我有一个PCollection >。我想将数据按K分组,并将密钥K的所有值写入到名为K的文件夹内的Google存储中的文件中。假设我有2个条目...

回答 1 投票 2

使用GCP Dataflow和Apache Beam Python SDK从GCS读取速度极慢

首先让我开始说一切,首先我要了解使用Beam的Python SDK和GCP Dataflow的方法!问题:我的管道对于几乎所有的管道都运转良好...

回答 1 投票 0

在GCP数据流上运行的Apache Beam如何处理大批SQL表?

我有一个大约1TB数据的SQL表,我想将此表ETL到GCS。我不明白的是Apache Beam如何读取表,是否以块为单位,如果是,则块的大小是多少,...

回答 1 投票 1

Cloud Dataflow作业从一个Bigquery项目读取并写入另一个BigQuery项目

我正在GCP上实施Cloud Dataflow作业,该作业需要处理2个GCP项目。输入和输出都是Bigquery分区表。我现在要解决的问题是我必须读取数据...

回答 1 投票 0

UnsolvableVersionConflictException更新Apache光束版本

当前,我在GCP(数据流)中使用Apache Beam 2.18.0,我想将其更新为2.20.0。问题是,如果我在pom中使用2.20.0而不是2.18.0,则会出现以下错误文件:...

回答 1 投票 1

© www.soinside.com 2019 - 2024. All rights reserved.