apache-beam 相关问题

Apache Beam是用于批处理和流处理的统一SDK。它允许使用特定于Beam的DSL指定大规模数据处理工作流程。 Beam工作流可以在不同的运行时执行,如Apache Flink,Apache Spark或Google Cloud Dataflow(云服务)。

如何在apache beam Dataflow python批量作业中设置处理超时?

我目前使用stopit库https:/github.comglenfantstopit来设置批处理作业中每个元素的处理超时。这些作业在直接运行器上工作,我能够超时功能 ...

回答 1 投票 0

在哪里初始化ParDo中的可重用对象?

我的parDo在我的beam作业中的一个例子(用Dataflow runner运行): class StreamEventToJsonConverter : DoFn。 () { @ProcessElement fun processElement(@Element element: ...

回答 1 投票 0

Apache Beam Jupyter Notebook.ImportError:无法从'google.cloud'导入名称'storage'(未知位置)。ImportError: cannot import name 'storage' from 'google.cloud' (unknown location)

我想把存储导入到Apache Beam内核的jupyter笔记本中,但它说未知位置。如果我尝试导入其他google-cloud库,如bigquery或datastore,它的工作, ...

回答 1 投票 0

Apache Beam如何处理中间面板?

我有这样一段简单的代码 def print_windows(element, window=beam.DoFn.WindowParam, timestamp=beam.DoFn.TimestampParam): print(window) print(timestamp) print(element) print('-----------------------...

回答 1 投票 0

我们能不能把avro文件写到动态创建的GCS桶里(基于tenantID)?

基本上,我试图做的是,创建GCS bucket的基础上tenantID(作为事件的一部分),并写这些事件使用FileIO.writeDynamic使用动态文件命名在谷歌数据流作业。在...

回答 1 投票 0

如何为GCP数据流作业建立租户级指标计数器?

目前,我正在尝试使用apache beam Metrics为GCP数据流作业创建自定义指标,并希望检查我们是否可以根据租户跟踪分组计数器。例如,我们有事件产生...

回答 1 投票 0

Bigquery中的WRITE_TRUNCATE。

我试图在Bigquery中使用write_truncate来截断表,但它没有发生,而是像write_append一样工作。它在追加数据,但没有截断表。谁能告诉我,在Bigquery中使用write_truncate,但是没有发生,而是像write_append一样工作。

回答 1 投票 0

Apache Beam - 如何从所有的窗口中通过键对PCollection<KV<String, Int>&gt进行求和。

给定一个PCollection >有了固定时间的窗口,如何将所有窗口的Int按字符串键相加呢? 比如PCollection。 > pc = ...; ...

回答 2 投票 1

如何在Kotlin wApache Beam中扩展DoFn?

我试图在一个使用Apache Beam的应用程序中使用Kotlin,我得到了这样的警告:@ProcessElement processElement(String)。@ProcessElement processElement(String, OutputReceiver),参数类型为DoFn.OutputReceiver。

回答 1 投票 0

Pardo函数在google数据流中不产生任何输出。

我试图在dataflow中创建我的第一个pipleine,我有相同的代码运行时,我执行使用交互式beam runner,但在dataflow上,我得到的所有类型的错误,这是不多......

回答 1 投票 0

引起:java.lang.UnsupportedOperationException。BigQuery源必须在读取前被分割

我试图使用Java BigqueryIO.read方法从bigquery中读取数据,但得到以下错误信息。 public POutput expand(PBegin pBegin) { final String queryOperation = "select query"; ...

回答 1 投票 0

在数据流上触发Apache beam windowing,使用 elementCountAtLeast时没有像预期的那样发射。

我想让apache beam在一定数量的元素到达后发射。Apche beam的版本是2.18.0,我从pubsub消耗,但数据到达的模式是事先知道的......。

回答 1 投票 0

如何在apache beam java sdk中从minIO中读取文件。

我刚刚开始使用minio和apache beam。我在play.min.io上创建了一个bucket,并添加了一些文件(假设存储的文件是1.txt和2.txt)。我想访问存储在那个 ...

回答 1 投票 0

会话窗口化是否已经将数据洗牌到同一个worker中?

我的流水线的简单伪代码:流水线 .apply(KVPair) .apply(sessionWindow) .apply(groupByKey) 我的问题是:要想实现会话窗口化,beam是否已经将所有数据与 ...

回答 1 投票 0

Apache Beam:使用MongoDbIO.read()刷新我从MongoDB读取的一个侧输入。

不知道这个GenerateSequence对我有什么帮助,因为我必须每小时或每天定期从Mongo中读取值,创建了一个读取MongoDB的ParDo,还添加了窗口到......

回答 2 投票 0

Apache Beam是正确的特征预处理工具吗?

所以这是一个有点奇怪的问题,因为它与如何使用工具无关,而更多的是关于为什么要使用它。我正在部署一个模型,想用Apache-beam来运行特征处理 ...

回答 1 投票 0

使用可序列化函数进行大查询读取 - 如何从GenericRecord中获取NUMERIC类型。

你好,我正在使用Beam从BQ表中读取,看到使用SerializableFunction的read()比readTableRows()性能更好。按照https:/beam.apache.orgreleases的例子......

回答 1 投票 0

数据流的附加参数进入光束管道

我正在研究Dataflow,我已经通过Python SDK建立了我的自定义管道。我想在Dataflow用户界面上添加参数到我的自定义管道中,使用附加参数。...

回答 1 投票 0

如何在Apache beam中读取带有起始日期的pubsub信息?

我有一个简单的工作是从pubsub读取历史数据,以日期为界,例如我想从日期2020-04-10开始读取消息,2020-04-20,然后每天将消息保存在google bucket中......。

回答 1 投票 0

我们可以在apache-beam中使用FTP导入数据吗?

我正在使用Apache-Beam编写一个数据流工作,它需要使用FTPS [from ftplib import FTP_TLS] 服务器在BigQuery中导入数据。但是当我尝试导入FTPS类[ftps = FTP_TLS('...

回答 1 投票 0

© www.soinside.com 2019 - 2024. All rights reserved.