我试图从Apache Camel的Informix表中每天处理大约700万行,但我无法弄清楚它是如何实现的。
我第一次使用非常低的数据集(约50k行)尝试使用.split(body()).parallelProcessing()
,如下所示:
from("quartz2://transaccionesGroup/myTimerTransaction?cron=0+1+0+*+*+?")
.bean(QueryTable.class, "queryData").split(body()).parallelProcessing() // Essentially executes a query on my table and returns a list of MyTable.class
.bean(ProcessTable.class, "processData") // Converts each MyTable object into another type of object (NewData.class) for later processing, storing in them in a synchronized list
.end().to("direct:transform-data");
from("direct:transform-data")
.bean(ProcessNewData.class, "processNewData").split(body()).parallelProcessing() // Obtains list
.bean(AnalyzeData.class, "analyze") // Analyzes the data
.bean(PersistData.class, "persist") // Persists the new data on other tables
.end();
当我在.bean(QueryTable.class, "queryData").split(body()).parallelProcessing()
上使用500k行尝试它时,这当然导致了“OutOfMemory”错误,因为它首先尝试在解析之前缓存查询中的所有数据。我尝试将fetchSize
设置为100,但我得到了相同的错误,并且使用maxRows
只会获得我指定的行数并忽略其余部分。
我的下一次尝试是使用Camel的一个组件,如sql-component和jdbc,并尝试使用Splitter处理单独线程中的每一行,但我得到了同样的问题。
SQL:
from("quartz2://transaccionesGroup/myTimerTransaction?cron=0+1+0+*+*+?")
.bean(QueryTable.class, "queryDataParams") // Gets the params for my query
.to("sql:SELECT * FROM my_table WHERE date_received BETWEEN :#startDate AND :#endDate?dataSource=dataSourceInformix").split(body()).parallelProcessing()
// The rest would be essentially the same
JDBC:
from("quartz2://transaccionesGroup/myTimerTransaction?cron=0+1+0+*+*+?")
.bean(QueryTable.class, "queryString") // Gets the query to execute
.to("jdbc:dataSourceInformix").split(body()).parallelProcessing()
我的最后一次尝试是使用maxMessagesPerPoll
用于sql和outputType=StreamList
用于jdbc组件但不幸的是前者一次只处理一行(并且它必须是消费者才能这样使用)而后者给了我一个java.sql.SQLException: Cursor not open
异常。
SQL:
from("sql:" + query +"?dataSource=dataSourceInformix&maxMessagesPerPoll=100") // I need to be able to use the quartz2 component
JDBC:
.to("jdbc:dataSourceInformix?outputType=StreamList").split(body()).streaming() // Throws exception
最终目标是能够处理数百万行而不消耗大量内存,以防止出现“OutOfMemory”错误。如果可能,我的想法是做以下事情:
我知道这个问题类似于this one,但答案并没有真正帮助我的情况。我还注意到,在sql组件的文档中,它为生产者提供了一个outputType=StreamList
选项,但它是在版本2.18.1上实现的2.18版及更高版本。
任何帮助和提示都会非常有帮助!
谢谢。
其他一些信息:Apache Camel版本:2.14.1数据库:Informix
经过相当多的研究,更多的试验和错误以及来自NotaJD的tips,我找到了一个可以解决的问题(仍在测试中)。实际上制作了2个解决方案,但它们的执行类型不同。
为了解释,我将使用以下信息:
AggregationStrategyImpl
通过以下方式扩展AggregationStrategy
:
返回交换体中的List<Object>
当Predicate
时,聚合List<Object> >= 50000
完成
聚合超时设置为30000
毫秒CustomThreadPool
是Camel的ThreadPoolBuilder
类的伪实现:
PoolSize:100
MaxPoolSize:50000
MaxQueueSize:500
TimeUnit:MILLISECONDS
KeepAliveTime:30000from("quartz2://myGroup/myTimerTransaction?cron=0+1+0+*+*+?")
.bean(QueryTable.class, "createQuery")
代码仍将在Quartz cron-timer(每天00:01)上运行,但这次我的QueryTable.class
将获取正确的查询来执行(而不是SELECT *
,我现在指定了我需要的列)并将其设置为交换体。
.to("jdbc:dataSourceInformix?resetAutoCommit=false&outputType=StreamList").split(body()).streaming()
.bean(TransformRecord.class, "process")
Camel jdbc
组件将从交换体中获取查询,将resetAutoCommit
设置为false,因此它不会抛出Cursor not open
错误,将输出设置为streamming并拆分流执行,因此我不会立即查询所有记录但相反,一个接一个。然后,每个获取的记录将通过TransformRecord.class
转换为适当的POJO。
.aggregate(constant(true), aggregationStrategyImpl)
.completionPredicate(aggregationStrategyImpl.getCompletionPredicate())
.completionTimeout(aggregationStrategyImpl.getCompletionTimeout())
.to("direct:start-processing")
.end();
这次我使用aggregate
组件来创建记录列表。 aggregationStrategyImpl
包含聚合逻辑以及完成谓词和超时,因此当我达到一定数量的记录(或发生超时)时,列表将被发送到“direct:start-processing”。
更多关于此Source Allies blog和Apache Camel Aggregate EIP文档中的聚合实现。
from("direct:start-processing")
.split(body()).executorService(customThreadPool.build(getContext()))
.bean(AnalyzeData.class, "analyze")
.bean(PersistData.class, "persist")
.end();
在这里,我分割获得的列表并使用自定义ThreadPool创建N个线程来分别分析和处理每个记录。这样我可以在并行处理中处理我的列表而不是逐个处理。我本可以使用.split(body()).parallelProcessing()
,但默认的ThreadPool设置可能不会在以后最佳。
更多关于Apache Camel Threading Model文档,ThreadPool Configuration笔记和Red Hat Threading Model文档的ThreadPool实现。
对于此解决方案,它基本上是完全相同的执行但具有以下更改:
// .to("direct:start-processing")
.to("seda:start-processing?size=1&blockWhenFull=true")
.end();
// from("direct:start-processing")
from("seda:start-processing?size=1&blockWhenFull=true")
// continues normally
这样做是将列表异步发送到进程,允许最多1个其他列表在内存中排队,并在队列已满时暂停父线程。因此,不是等待处理记录列表,而是父线程将返回并收集另一批记录。这也意味着如果处理路由尚未完成,新记录将不会被抛出,父线程将等待,直到它可以将批处理发送到SEDA内存中队列。
更多关于GitHub及其site中Apache Camel SEDA组件文档的SEDA组件
对于解决方案1,它应该花费更长的时间来完成,因为它在从查询中收集更多记录之前首先处理所有数据,但是由于它在聚合谓词中被控制,因此内存消耗应该更少。
使用解决方案2它应该快得多,因为它在处理前一批时从查询中收集下一批记录,但内存消耗将更大,因为它最多可容纳3个列表:正在处理的列表,一个在SEDA队列中以及父线程收集的最新批次(队列已满时暂停)。
我说我仍然在测试这些解决方案,因为有500k记录它可以工作,但我仍然在为将要实现它的服务器设计最佳的ThreadPool设置。我已经研究了Java中的线程,但它似乎在那里除了系统的架构,RAM和反复试验之外,真的没有那么多。