google-cloud-dataflow 相关问题

Google Cloud Dataflow是一种完全托管的云服务,可用于大规模创建和评估数据处理管道。数据流管道基于Apache Beam编程模型,可以在批处理和流模式下运行。 Cloud Dataflow是Google云端平台的一部分。

Apache Beam Combine与GroupByKey的对比

所以,我面临着这个似乎很经典的问题,使用Apache Beam(Flink为引擎),为非绑定的流提取时间框架的topop。假设输入的是 sites+hits tuples: {"aaa.com", 1001}, {"bbb......"。

回答 1 投票 0

云端Datalab权限 - 共享访问时,VM URL上有403个。

我已经成功创建并托管了一个 Cloud Datalab 虚拟机。我可以从我的账户(项目所有者)访问虚拟机的 URL,但我的合作者无法访问 *.blogspot.com URL (HTTP 403) ,除非我给 ...

回答 1 投票 1

CombineFn中的任务没有正确地最终完成Apache光束

我使用Python 3.7 SDK和apache beam 2.17.0做数据流。代码在本地运行,但我从pubsub收集数据。我尝试按键组合,一切都很顺利,直到管道调用"..."。

回答 1 投票 1

从Spring控制器执行Google Cloud Dataflow管道

我将如何使用Spring执行Apache Beam管道到Google Cloud Dataflow?这个问题类似于在Spring Boot项目中运行Apache Beam pipeline到Google Data Flow,但是...

回答 1 投票 0

Apache beam在Dataflow中得到与生成器对象不可下标相关的错误。

我试图在数据流中创建我的第一条流水线,当我使用交互式光束运行器执行时,我有相同的代码运行,但在数据流中,我得到了所有类型的错误,这并没有使很多......

回答 1 投票 0

POutput不能转换为WriteResult。

我试图按照KafkaToBigQuery的DataflowTemplate来处理BigQuery在我代码中的错误。PCollection convertedTableRows = pipline .apply("ReadFromKafka", ...

回答 1 投票 0

在Python的Apache Beam中使用OrFinally定义自定义触发器的正确语法?

我试图为一个滑动窗口定义一个自定义的触发器,这个触发器可以重复触发每一个元素,但最后也可以在水印结束时触发。我看了周围的文档,对于 ...

回答 1 投票 0

在多个DataFlow作业线之间共享实例--可以吗?

是否可以使用DataflowPipelineRunnerrunner(async)提交X个独立的作业线,并让作业共享工人池,而不是拆分单个作业?

回答 2 投票 4

谷歌云安全账户上下文在Intellij Application Runner中无法使用。

当我在Intellij Idea中运行我的Dataflow Beam应用程序时,我无法对其进行认证。最近,我曾一度成功,但现在不行了。认证失败,显示403 forbidden'"Access ...

回答 1 投票 0

数据流批量作业步骤完成后的运行功能。

我有一个Dataflow作业,它有一个扇形的步骤,每个步骤都将结果写入GCS上的不同文件夹。在一个批处理作业的执行过程中,每个文件夹会写入数百个文件。我想...

回答 1 投票 0

如何在beam SQL中从输入数据中选择一组字段作为重复字段数组?

問題說明:我有一個包含以下字段的輸入pcollection。{ firstname_1, lastname_1, dob, firstname_2, lastname_2, firstname_3, lastname_3, } 然后 ....

回答 1 投票 0

Apache beam从GCS读取Avro文件并写入BigQuery。

运行一个java作业来读取Avro文件,并一直得到错误。寻求帮助 - 以下是代码 - / 获取Avro Schema String schemaJson = getSchema(options.getAvroSchema()); Schema ...

回答 1 投票 0

每秒处理35万个请求,并将数据保存到谷歌云存储中。

我需要实现微服务,它在逻辑和架构方面相当简单,但需要每秒处理大约305k个请求。它要做的就是摄取JSON数据,验证 ...

回答 1 投票 -1

如何在apache beam Dataflow python批量作业中设置处理超时?

我目前使用stopit库https:/github.comglenfantstopit来设置批处理作业中每个元素的处理超时。这些作业在直接运行器上工作,我能够超时功能 ...

回答 1 投票 0

在哪里初始化ParDo中的可重用对象?

我的parDo在我的beam作业中的一个例子(用Dataflow runner运行): class StreamEventToJsonConverter : DoFn。 () { @ProcessElement fun processElement(@Element element: ...

回答 1 投票 0

Apache Beam如何处理中间面板?

我有这样一段简单的代码 def print_windows(element, window=beam.DoFn.WindowParam, timestamp=beam.DoFn.TimestampParam): print(window) print(timestamp) print(element) print('-----------------------...

回答 1 投票 0

我们能不能把avro文件写到动态创建的GCS桶里(基于tenantID)?

基本上,我试图做的是,创建GCS bucket的基础上tenantID(作为事件的一部分),并写这些事件使用FileIO.writeDynamic使用动态文件命名在谷歌数据流作业。在...

回答 1 投票 0

如何为GCP数据流作业建立租户级指标计数器?

目前,我正在尝试使用apache beam Metrics为GCP数据流作业创建自定义指标,并希望检查我们是否可以根据租户跟踪分组计数器。例如,我们有事件产生...

回答 1 投票 0

Bigquery中的WRITE_TRUNCATE。

我试图在Bigquery中使用write_truncate来截断表,但它没有发生,而是像write_append一样工作。它在追加数据,但没有截断表。谁能告诉我,在Bigquery中使用write_truncate,但是没有发生,而是像write_append一样工作。

回答 1 投票 0

Apache Beam - 如何从所有的窗口中通过键对PCollection<KV<String, Int>&gt进行求和。

给定一个PCollection >有了固定时间的窗口,如何将所有窗口的Int按字符串键相加呢? 比如PCollection。 > pc = ...; ...

回答 2 投票 1

© www.soinside.com 2019 - 2024. All rights reserved.