apache-beam 相关问题

Apache Beam是用于批处理和流处理的统一SDK。它允许使用特定于Beam的DSL指定大规模数据处理工作流程。 Beam工作流可以在不同的运行时执行,如Apache Flink,Apache Spark或Google Cloud Dataflow(云服务)。

在 Windows 中使用窗口流数据使用“:”时无法写入文件

我在运行 Java ApacheBeam 代码时遇到问题。当尝试在我的 Windows 系统上创建名称中带有冒号的文件时,程序会抛出错误,指出文件名是

回答 1 投票 0

如何避免在写入固定窗口输出的 Google Dataflow 流管道中进行随机播放

希望实现一个 Dataflow(或 flink)流管道,从 pub-sub 读取数据,将数据转换为 parquet,并每隔几分钟写入输出。 这需要固定窗口吗?如果是的话...

回答 1 投票 0

数据流作业在 CPU 利用率达到 ~100% 后反复停止虚拟机并启动另一个虚拟机,而不是并行工作

我有一个批处理数据流作业,它从 Bigquery 读取一些列,将它们转换为 beam.Row 格式,然后并行地为每列应用 SqlTransform。我将工人数量设置为...

回答 1 投票 0

在 Flink Python UDF 中使用 Numba

我想在依赖于 Numba (>= 0.50) 的 UDF 中使用 Python 库(pyod,最新)。我用 Python 创建了一个聚合 UDF,我对这个概念并不陌生。 我在使用过程中遇到错误...

回答 1 投票 0

GCP 数据流作业中的并发处理

我尝试使用 ThreadPoolExecutor 同时运行三个不同的类。但数据流作业似乎没有同时运行,并且它正在一个接一个地运行每个类。还有...

回答 1 投票 0

测试管道 Apache Beam 时出现断言错误

在测试管道时,我收到此错误,即使错误日志显示对象是相等的: 公共无效testGenerateUserPageViews()抛出异常{ 最终PC合集 在测试管道时,我收到此错误,甚至错误日志显示对象是相等的: public void testGenerateUserPageViews() throws Exception{ final PCollection<SessionModel> input = p.apply(Create.of(SESSION_MODEL)); final PCollection<UserPageViews> output = input.apply(ParDo.of(new GenerateUserPageViews())); PAssert.that(output).containsInAnyOrder(USER_PAGEVIEWS); p.run().waitUntilFinish(); } java.lang.AssertionError: ParDo(GenerateUserPageViews)/ParMultiDo(GenerateUserPageViews).output: Expected: iterable over [<com.userprofile.models.UserPageViews@e1688b19>] in any order but: Not matched: <com.userprofile.models.UserPageViews@e1688b19> at org.apache.beam.sdk.testing.PAssert$PAssertionSite.capture(PAssert.java:174) at org.apache.beam.sdk.testing.PAssert.that(PAssert.java:416) at org.apache.beam.sdk.testing.PAssert.that(PAssert.java:408) at 我猜com.userprofile.models.UserPageViews没有完全实现的equals或hashCode方法。 解决方案是我将 String 变量替换为“”值代替 null。但我仍然不明白为什么两种情况下对象引用都是相同的。 我遇到了同样的问题,我的对象看起来相同,但断言失败。这是因为 equals() 方法对这些对象返回 false。解决方案是逐一检查对象的每个字段,或者如果 equals() 因此失败,则将有问题的字段设为 null。 选项 1:对各个字段进行断言 data class MyObject( val field1: String, val field2: Exception? //field that returns false on equals() ) @Test fun option1() { val pCollectionActual: PCollection<MyObject> = /** your actual call here */ val expectedElement = MyObject(field1 = "field1", field2 = Exception()) PAssert.that(pCollectionActual).satisfies { collection -> Iterators.size(collection.iterator()) shouldBe 1 val iterator = collection.iterator() val actualElement = iterator.next() actualElement.field1 shouldBe expectedElement.field1 actualElement.field2 shouldBe expectedElement.field2 iterator.hasNext() shouldBe false null } pipeline.run().waitUntilFinish() } 选项 2:将不匹配字段设置为空 @Test fun option2() { val pCollectionActual: PCollection<MyObject> = /** actual call here */ val element = MyObject(field1 = "field1", field2 = null) val expectedList: List<MyObject> = listOf(element) PAssert.that(pCollectionActual).containsInAnyOrder(expectedList) pipeline.run().waitUntilFinish() }

回答 3 投票 0

Apache Beam,在 DoFn 的 @Setup Lifecycle 方法中初始化的模拟外部客户端

我创建了以下 DoFn 类,但我不确定如何模拟 APIClient,因为使用模拟的 APICall 对象在 Junit 中创建 JsonToGenericRecordMapper 的实例正在被覆盖...

回答 2 投票 0

如何组合 PCollections,同时仍保留 TaggedOutputs?

我正在创建一个特定于我公司的管道和用例的包,以简化未来的 Apache Beam(数据流)管道。我想要管理的一件关键事情是捕获和重定向失败......

回答 1 投票 0

从 BigQuery 加载数百万元数据作为缓存,以丰富 Dataflow 中的流事件

我有以下用例:我正在流式传输来自 Kafka 主题的事件。我的计划是阅读这些事件,对于每一个事件,我都需要使用 BigQuery 中存在的一些元数据来丰富它。让我们假设...

回答 1 投票 0

如何在管道中使用 Apache Beam 的 PulsarIO.write(缺少必需属性:clientUrl)

我有一个光束管道,我想将输出写入脉冲星主题。 最后我说 pCollection.apply("发送到 Pulsar", PulsarIO.write().withClientUrl(pulsarClientUrl).withTopic(

回答 1 投票 0

如何在 apache beam 中顺序运行单个管道来写入和读取数据

我使用 apache beam 和 python 在数据流(gcp)上使用它来加载和转换数据巴克斯。 我有一个管道,它分为不同的部分。 第一部分,写入bigquery。

回答 1 投票 0

如何为 Apache Beam/Dataflow 经典模板 (Python) 和数据管道实现 CI/CD 管道

在 Python 中实现 Apache Beam/Dataflow 经典模板和管道的 CI/CD 构建过程的最佳方法是什么?我只找到了有关 Java 的教程,其中包括工件重新...

回答 2 投票 0

如何使用本地运行的 ApacheBeam 写入 BigQuery 表而不运行 Google Cloud CLI 可执行文件?

我可以使用外部Python引擎从BigQuery表中读取数据,但当我使用本地运行的Apache Beam + Python时,我无法进行身份验证。 文档仅提供 CLI 选项“当您运行

回答 1 投票 0

Dataflow Tensorflow Transform 将转换后的数据写入 BigQuery

在 GCP Dataflow 管道中,我尝试将转换组件中的转换数据写入 Bigquery 中,但出现以下错误。首先,如果有人能让我知道是否...

回答 1 投票 0

将 slf4j-api-2.0.0-alpha1.jar 添加到项目结构并将 slf4j 依赖项添加到 pom.xml 后,“找不到 SLF4J 提供程序”

我正在尝试使用 IntelliJ 将 Apache Beam 与 Java 结合使用,我得到了 “SLF4J:未找到 SLF4J 提供商。 SLF4J:默认为无操作(NOP)记录器实现”错误 当我有

回答 4 投票 0

与 Micronaut 和 Apache Beam 的 Maven 项目中的 SLF4J 实现冲突

您好 Stack Overflow 社区, 我正在使用 Micronaut 4 和 Apache Beam 开发 Maven 项目,面临 SLF4J 日志记录问题。尽管我的依赖项中有 logback-classic,但我还是遇到了

回答 1 投票 0

当我想要使用的模式数据是 pcollection 本身的一部分时,如何在 PcollectionTuple 中为 Pcollection 设置编码器

我正在尝试使用动态模式处理数据事件(来自 pubsub 的流数据)。架构可以更改,并且我正在以架构 id 键控的架构注册表中维护架构。 我将通过

回答 2 投票 0

AttributeError:尝试访问写入结果中不存在的属性“0”时出错

**我想使用以下代码将数据提取到bigquery中,它给了我以下错误 ** 我对 apache beam 很陌生,所以请帮助我更好地理解。 文件“C:\Users\Stranger\

回答 1 投票 0

Apache Beam - AttributeError:尝试访问写入结果中不存在的属性“0”时出错

**我想使用以下代码将数据提取到bigquery中,它给了我以下错误 ** 我对 apache beam 很陌生,所以请帮助我更好地理解。 文件“C:\Users\Stranger\

回答 0 投票 0

Google Cloud:等待工作人员更新超时

我使用Apache Beam Python SDK和谷歌云,这里写了一个类,我用它来处理一些业务规则。我运行了一个包含 9 个不同业务规则的管道,这些规则都使用相同的...

回答 1 投票 0

© www.soinside.com 2019 - 2024. All rights reserved.