google-cloud-dataflow 相关问题

Google Cloud Dataflow是一种完全托管的云服务,可用于大规模创建和评估数据处理管道。数据流管道基于Apache Beam编程模型,可以在批处理和流模式下运行。 Cloud Dataflow是Google云端平台的一部分。

根据消息内容从Dataflow作业写入动态PubSub主题

我想根据字段的内容动态地将PCollection的不同元素路由到不同的PubSub主题。这些主题并不持久,但假设它们存在......

回答 1 投票 1

Apache Beam和avro:创建没有架构的数据流管道

我正在使用Apache beam构建数据流管道。下面是伪代码:PCollection rows = pipeline.apply(“从PubSub读取Json”, ).apply(“转换......

回答 1 投票 0

重新启动数据流管道后重置度量标准

我在云数据流中有一个流媒体管道,我在其中设置Metrics.counter如下。 class SomeDoFn扩展DoFn {val validIdCounter = Metrics.counter(“user-type”,“valid_ids”)val ...

回答 1 投票 0

FileBasedSource匹配多个模式

假设我在Google云端存储中有以下结构:gs:// bucket / aaa / file1.txt file2.txt ... aab / file1.txt file2.txt ... ... baa / file1.txt file2.txt。 ..

回答 1 投票 0

检测键控状态更改

我是Dataflow编程模型的新手,在我认为应该是一个简单的用例时遇到一些麻烦:我有一个管道读取Pub / Sub的实时数据,这个数据包含...

回答 1 投票 1

将消息发布到Dataflow中的Pubsub主题

将消息发布到Dataflow中的Pub / Sub主题的推荐方法是什么?我使用过客户端API但不认为这是在Dataflow中处理此问题的最佳方法。 PublishResponse回复......

回答 1 投票 0

Python:FileSystems.create()中的自动分片,用于Dataflow中的写入通道

我在ParDo中使用FileSystems能够写入数据存储中的动态目标。但是,我无法像Text.IO那样使用通配符进行文件名自动分片?是......

回答 1 投票 0

从云功能启动数据流管道时出错

我正在尝试从谷歌云功能运行管道。不幸的是,我无法通过我发现的所有例子的第一行。我安装了google apis:npm install googleapis --save ...

回答 1 投票 0

Apache Beam - 数据流 - 序列化和状态共享

在我的一个pipelin的DoFn中,我正在下载需要由另一个DoFn处理的二进制文件。现在一旦下载了二进制文件,我也将它存储在GCS中并输出...的位置

回答 1 投票 0

Google Dataflow - 由GoogleSheets支持的BigQuery工作

我有一个用Java编写的Google Dataflow批处理作业。这个Java代码访问Bigquery并执行一些转换,然后输出回Bigquery。此代码可以访问Bigquery表...

回答 2 投票 4

如何在apache梁中使用熊猫?

如何在Apache beam中实现Pandas?我无法在多列上执行左连接,而Pcollections不支持sql查询。甚至Apache Beam文档都没有正确构图。我检查了 ...

回答 2 投票 3

Apache Beam Dataflow作业在本地执行什么操作?

我遇到了Apache Beam Python SDK定义的Dataflow的一些问题。如果我单步执行我的代码,它会到达pipeline.run()步骤,我认为这意味着执行图表已成功...

回答 1 投票 1

由于未知原因,数据流作业的水印滞后很多

我们正在运行一个使用Kafka的Dataflow工作流程,并使用apache beam AvroIO write API将snappy avro文件写入gcs。我们最多配置了13名工人,应该处理50k ...

回答 1 投票 0

多个表的Pcollection

我有两个bigquery表。表A c_id count_c_id p_id表B id c_name p_type c_id基于表A中的列,我需要使用DF管道从表B中查找详细信息。 PCollection&...

回答 1 投票 0

从Google Cloud Dataflow输出排序的文本文件

我有一个PCollection 在Google Cloud DataFlow中,我通过TextIO.Write.to将其输出到文本文件:PCollection lines = ...; lines.apply(TextIO.Write.to(“GS://桶/ ...

回答 1 投票 1

TextIO.read()。watchForNewFiles()阻止写入BigQuery

我正在尝试创建一个管道,等待GCS文件夹中的新csv文件来处理它们并将输出写入BigQuery。我编写了以下代码:public static void main(String [] args){...

回答 2 投票 0

数据流:我可以使用批处理作业连续写/流写入BigQuery吗?

我似乎无法找到任何关于此的文档。我有一个apache-beam管道,它接收一些信息,将其格式化为TableRows,然后写入BigQuery。 [+]问题:行是......

回答 2 投票 1

TableRow.get上的Google Cloud Dataflow,BigQueryIO和NullPointerException

我是GC Dataflow的新手,并没有找到相关的答案。如果我发现这已经得到回答,请道歉。我正在尝试使用v2.0 SDK创建一个简单的管道,但我遇到了麻烦......

回答 3 投票 0

通过Dataflow管道写入Cloud SQL非常慢

我设法通过JDBCIO DataSourceConfiguration.create连接到云sql(“com.mysql.jdbc.Driver”,“jdbc:mysql:// google /?cloudSqlInstance = ::&socketFactory = com.google.cloud.sql.mysql .. ..

回答 1 投票 0

输出具有空值的TableRow时出现NullPointerException

我正在尝试构建一个TableRow对象,最终将其写入BigQuery表,但如果我在行中包含空值,则会出现NullPointerException。这是完整的堆栈跟踪:例外...

回答 3 投票 0

© www.soinside.com 2019 - 2024. All rights reserved.