用Camel并行处理大型SQL表

问题描述 投票:1回答:1

我试图从Apache Camel的Informix表中每天处理大约700万行,但我无法弄清楚它是如何实现的。

我第一次使用非常低的数据集(约50k行)尝试使用.split(body()).parallelProcessing(),如下所示:

from("quartz2://transaccionesGroup/myTimerTransaction?cron=0+1+0+*+*+?")
.bean(QueryTable.class, "queryData").split(body()).parallelProcessing() // Essentially executes a query on my table and returns a list of MyTable.class
.bean(ProcessTable.class, "processData") // Converts each MyTable object into another type of object (NewData.class) for later processing, storing in them in a synchronized list
.end().to("direct:transform-data");

from("direct:transform-data")
.bean(ProcessNewData.class, "processNewData").split(body()).parallelProcessing() // Obtains list
.bean(AnalyzeData.class, "analyze") // Analyzes the data
.bean(PersistData.class, "persist") // Persists the new data on other tables
.end();

当我在.bean(QueryTable.class, "queryData").split(body()).parallelProcessing()上使用500k行尝试它时,这当然导致了“OutOfMemory”错误,因为它首先尝试在解析之前缓存查询中的所有数据。我尝试将fetchSize设置为100,但我得到了相同的错误,并且使用maxRows只会获得我指定的行数并忽略其余部分。

我的下一次尝试是使用Camel的一个组件,如sql-componentjdbc,并尝试使用Splitter处理单独线程中的每一行,但我得到了同样的问题。

SQL:

from("quartz2://transaccionesGroup/myTimerTransaction?cron=0+1+0+*+*+?")
.bean(QueryTable.class, "queryDataParams") // Gets the params for my query
.to("sql:SELECT * FROM my_table WHERE date_received BETWEEN :#startDate AND :#endDate?dataSource=dataSourceInformix").split(body()).parallelProcessing()
// The rest would be essentially the same

JDBC:

from("quartz2://transaccionesGroup/myTimerTransaction?cron=0+1+0+*+*+?")
.bean(QueryTable.class, "queryString") // Gets the query to execute
.to("jdbc:dataSourceInformix").split(body()).parallelProcessing()

我的最后一次尝试是使用maxMessagesPerPoll用于sql和outputType=StreamList用于jdbc组件但不幸的是前者一次只处理一行(并且它必须是消费者才能这样使用)而后者给了我一个java.sql.SQLException: Cursor not open异常。

SQL:

from("sql:" + query +"?dataSource=dataSourceInformix&maxMessagesPerPoll=100") // I need to be able to use the quartz2 component

JDBC:

.to("jdbc:dataSourceInformix?outputType=StreamList").split(body()).streaming() // Throws exception

最终目标是能够处理数百万行而不消耗大量内存,以防止出现“OutOfMemory”错误。如果可能,我的想法是做以下事情:

  1. 在quartz cron-trigger上创建我的查询
  2. 获得并分组N个结果
  3. 发送一组结果进行处理(在另一个线程中),同时获得另一个组
  4. 重复,直到处理完所有数据

我知道这个问题类似于this one,但答案并没有真正帮助我的情况。我还注意到,在sql组件的文档中,它为生产者提供了一个outputType=StreamList选项,但它是在版本2.18.1上实现的2.18版及更高版本。

任何帮助和提示都会非常有帮助!

谢谢。

其他一些信息:Apache Camel版本:2.14.1数据库:Informix

java apache-camel informix
1个回答
0
投票

经过相当多的研究,更多的试验和错误以及来自NotaJD的tips,我找到了一个可以解决的问题(仍在测试中)。实际上制作了2个解决方案,但它们的执行类型不同。

信息:

为了解释,我将使用以下信息:

  • 表有700万条记录(行)
  • AggregationStrategyImpl通过以下方式扩展AggregationStrategy: 返回交换体中的List<Object>Predicate时,聚合List<Object> >= 50000完成 聚合超时设置为30000毫秒
  • CustomThreadPool是Camel的ThreadPoolBuilder类的伪实现: PoolSize:100 MaxPoolSize:50000 MaxQueueSize:500 TimeUnit:MILLISECONDS KeepAliveTime:30000
  • 这两种实现都是自动装配的

解决方案1:

from("quartz2://myGroup/myTimerTransaction?cron=0+1+0+*+*+?")
.bean(QueryTable.class, "createQuery")

代码仍将在Quartz cron-timer(每天00:01)上运行,但这次我的QueryTable.class将获取正确的查询来执行(而不是SELECT *,我现在指定了我需要的列)并将其设置为交换体。

.to("jdbc:dataSourceInformix?resetAutoCommit=false&outputType=StreamList").split(body()).streaming()
.bean(TransformRecord.class, "process")

Camel jdbc组件将从交换体中获取查询,将resetAutoCommit设置为false,因此它不会抛出Cursor not open错误,将输出设置为streamming并拆分流执行,因此我不会立即查询所有记录但相反,一个接一个。然后,每个获取的记录将通过TransformRecord.class转换为适当的POJO。

.aggregate(constant(true), aggregationStrategyImpl)
.completionPredicate(aggregationStrategyImpl.getCompletionPredicate())
.completionTimeout(aggregationStrategyImpl.getCompletionTimeout())
.to("direct:start-processing")
.end();

这次我使用aggregate组件来创建记录列表。 aggregationStrategyImpl包含聚合逻辑以及完成谓词和超时,因此当我达到一定数量的记录(或发生超时)时,列表将被发送到“direct:start-processing”。

更多关于此Source Allies blog和Apache Camel Aggregate EIP文档中的聚合实现。

from("direct:start-processing")
.split(body()).executorService(customThreadPool.build(getContext()))
.bean(AnalyzeData.class, "analyze")
.bean(PersistData.class, "persist")
.end();

在这里,我分割获得的列表并使用自定义ThreadPool创建N个线程来分别分析和处理每个记录。这样我可以在并行处理中处理我的列表而不是逐个处理。我本可以使用.split(body()).parallelProcessing(),但默认的ThreadPool设置可能不会在以后最佳。

更多关于Apache Camel Threading Model文档,ThreadPool Configuration笔记和Red Hat Threading Model文档的ThreadPool实现。

解决方案2:

对于此解决方案,它基本上是完全相同的执行但具有以下更改:

// .to("direct:start-processing")
.to("seda:start-processing?size=1&blockWhenFull=true")
.end();
// from("direct:start-processing")
from("seda:start-processing?size=1&blockWhenFull=true")
// continues normally

这样做是将列表异步发送到进程,允许最多1个其他列表在内存中排队,并在队列已满时暂停父线程。因此,不是等待处理记录列表,而是父线程将返回并收集另一批记录。这也意味着如果处理路由尚未完成,新记录将不会被抛出,父线程将等待,直到它可以将批处理发送到SEDA内存中队列。

更多关于GitHub及其site中Apache Camel SEDA组件文档的SEDA组件

结论:

对于解决方案1,它应该花费更长的时间来完成,因为它在从查询中收集更多记录之前首先处理所有数据,但是由于它在聚合谓词中被控制,因此内存消耗应该更少。

使用解决方案2它应该快得多,因为它在处理前一批时从查询中收集下一批记录,但内存消耗将更大,因为它最多可容纳3个列表:正在处理的列表,一个在SEDA队列中以及父线程收集的最新批次(队列已满时暂停)。

我说我仍然在测试这些解决方案,因为有500k记录它可以工作,但我仍然在为将要实现它的服务器设计最佳的ThreadPool设置。我已经研究了Java中的线程,但它似乎在那里除了系统的架构,RAM和反复试验之外,真的没有那么多。

© www.soinside.com 2019 - 2024. All rights reserved.