在执行 PoC 时,我真的想通过 spark 应用程序从 parquet 启用本机持久性,将大量数据加载到 Apache Ignite。
最近我处理了加载大约 50 亿条记录的测试。最终,将有大约 1000 亿条记录加载到使用更大硬件的生产环境中。
我从这个实验中得出的结论:
加载最初以每秒 150,000 条记录的速度运行。这最初让我很满意。不幸的是,在加载的前几次迭代之后,加载了 2 亿条记录后,评级开始急剧下降到每秒 5,000 条记录。每次后续加载只会变得更长。单次加载迭代包含大约 1 亿条记录。因此,我被迫将数据加载到新的缓存中,例如:READINGS_202204、READINGS_202205 等等。按月统计数据。
我将无法将所有数据加载到一个缓存中。我怀疑原因可能是单个 index.bin 文件未分区,而不是分区的数据文件。 Ignite 增加了在加载后续批次数据期间搜索索引的时间。这是我的猜测。
因此,我不得不将当年的读数加载到我上面写的十二个 READINGS_ 缓存对象中。
问题: 例如,是否可以将 50 亿条记录加载到一个缓存中,以便在加载 2 亿条记录后加载速度不会急剧下降?给出以下三个物理节点。也许我需要更多的物理节点来完成这项任务?
我的集群规格: 3 名工人 每个工人有:16 Core,96 GB RAM,6 x 900GB HDD,网卡 1Gb/s)。是的。我有 HDD 而不是 SDD。
一些重要的设置:
我有页面写入限制的问题,必须设置参数“writeThrottlingEnabled”。目前,备份对我来说并不重要。
我为每个节点创建了五个实例 Ignite 服务器。在三个物理工作节点上总共有 15 个 Ignite 服务器实例。
我在下面附上来自一个工作节点的配置文件的相关部分:
<beans>
<!--
Alter configuration below as needed.
-->
<bean id="grid.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
<property name="gridName" value="an-ignite-grid"/>
<property name="localHost" value="192.168.100.11"/>
<property name="failureDetectionTimeout" value="60000"/>
<property name="clientFailureDetectionTimeout" value="20000"/>
<property name="communicationSpi">
<bean class="org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi">
<property name="socketWriteTimeout" value="5000"/>
<property name="localPort" value="48100"/>
<property name="usePairedConnections" value="true"/>
<!--<property name="connectionsPerNode" value="6"/>-->
<!-- <property name="connectionsPerNode" value="10"/>-->
<!-- <property name="usePairedConnections" value="true"/>-->
<!-- <property name="socketReceiveBuffer" value="#{64L * 1024}"/>-->
</bean>
</property>
<property name="discoverySpi">
<bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
<!-- Initial local port to listen to. -->
<property name="localPort" value="48500"/>
<!-- Changing local port range. This is an optional action. -->
<property name="localPortRange" value="50"/>
<property name="ipFinder">
<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">
<property name="addresses">
<list>
<value>192.168.100.11:48500..48550</value>
<value>192.168.100.12:48500..48550</value>
<value>192.168.100.13:48500..48550</value>
</list>
</property>
</bean>
</property>
</bean>
</property>
<property name="workDirectory" value="/data2/ignite/work"/>
<property name="dataStorageConfiguration">
<bean class="org.apache.ignite.configuration.DataStorageConfiguration">
<property name="pageSize" value="#{4096 * 2}"/>
<property name="walMode" value="NONE"/>
<property name="walSegmentSize" value="#{128 * 1024 * 1024}"/>
<property name="writeThrottlingEnabled" value="true"/>
<property name="checkpointThreads" value="6"/>
<property name="defaultDataRegionConfiguration">
<bean class="org.apache.ignite.configuration.DataRegionConfiguration">
<property name="persistenceEnabled" value="true"/>
<property name="maxSize" value="#{8L * 1024 * 1024 * 1024}"/>
<property name="pageEvictionMode" value="RANDOM_2_LRU"/>
<property name="checkpointPageBufferSize" value="#{4L * 1024 * 1024 * 1024}"/>
</bean>
</property>
<property name="dataRegionConfigurations">
<list>
<!--
Large erea memory region with eviction enabled.
-->
<bean class="org.apache.ignite.configuration.DataRegionConfiguration">
<property name="name" value="Region_Eviction"/>
<!-- Memory region of 512 MB initial size. -->
<property name="initialSize" value="#{512 * 1024 * 1024}"/>
<!-- Maximum size is 8 GB. -->
<property name="maxSize" value="#{8L * 1024 * 1024 * 1024}"/>
<!-- Enabling eviction for this memory region. -->
<!-- <property name="pageEvictionMode" value="RANDOM_2_LRU"/> -->
<property name="persistenceEnabled" value="true"/>
<!-- Increasing the buffer size to 4 GB. -->
<property name="checkpointPageBufferSize" value="#{4L * 1024 * 1024 * 1024}"/>
</bean>
</list>
</property>
</bean>
</property>
<property name="cacheConfiguration">
<list>
<bean class="org.apache.ignite.configuration.CacheConfiguration">
<!--<property name="groupName" value="group1"/>-->
<property name="name" value="READINGS_202204"/>
<property name="cacheMode" value="PARTITIONED"/>
<property name="atomicityMode" value="ATOMIC"/>
<property name="sqlSchema" value="PUBLIC"/>
<!-- enable disk page compression for this cache -->
<property name="diskPageCompression" value="SNAPPY"/>
<property name="dataRegionName" value="Region_Eviction"/>
<!-- Number of backup copies -->
<property name="backups" value="0"/>
<property name="partitionLossPolicy" value="READ_WRITE_SAFE"/>
<property name="writeThrough" value="false"/>
<property name="affinity">
<bean class="org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction">
<property name="partitions" value="2048"/>
</bean>
</property>
<property name="keyConfiguration">
<bean class="org.apache.ignite.cache.CacheKeyConfiguration">
<property name="typeName" value="entity.key.AttributeValueKey"/>
<property name="affinityKeyFieldName" value="usage_point_code"/>
</bean>
</property>
<property name="queryEntities">
<list>
<bean class="org.apache.ignite.cache.QueryEntity">
<property name="keyType" value="entity.key.AttributeValueKey"/>
<property name="valueType" value="READINGS_202204"/>
<property name="fields">
<map>
<entry key="balancing_market_party_code" value="java.lang.String"/>
<entry key="customer_code" value="java.lang.String"/>
<entry key="dso_code" value="java.lang.String"/>
<entry key="inbound_file_name" value="java.lang.String"/>
<entry key="inbound_message_orig_id" value="java.lang.String"/>
<entry key="interval_start_day" value="java.sql.Date"/>
<entry key="is_service_delivery_point" value="java.lang.String"/>
<entry key="reading_source_code" value="java.lang.String"/>
<entry key="reading_time_stamp" value="java.sql.Timestamp"/>
<entry key="reading_time_stamp_tz" value="java.lang.String"/>
<entry key="reported_day" value="java.sql.Date"/>
<entry key="reported_date_time" value="java.sql.Timestamp"/>
<entry key="reported_date_time_tz" value="java.lang.String"/>
<entry key="t_creation_date" value="java.sql.Timestamp"/>
<entry key="t_creation_date_tz" value="java.lang.String"/>
<entry key="usage_point_code" value="java.lang.String"/>
<entry key="acquired_date_time" value="java.sql.Timestamp"/>
<entry key="acquired_date_time_tz" value="java.lang.String"/>
<entry key="inbound_message_id" value="java.lang.Long"/>
<entry key="dso_tariff_code" value="java.lang.String"/>
<entry key="forward_active_energy" value="java.lang.Double"/>
<entry key="forward_quality_code" value="java.lang.String"/>
<entry key="reverse_active_energy" value="java.lang.Double"/>
<entry key="reverse_quality_code" value="java.lang.String"/>
<entry key="balance_active_energy" value="java.lang.Double"/>
<entry key="balance_quality_code" value="java.lang.String"/>
</map>
</property>
<property name="keyFields">
<set>
<value>usage_point_code</value>
<value>reading_time_stamp</value>
<value>reading_time_stamp_tz</value>
<value>inbound_file_name</value>
</set>
</property>
</bean>
</list>
</property>
</bean>
</list>
</property>
</bean>
</beans>
服务器启动后的低 CPU 负载系数尚未加载 我附加到脚本以运行 ignite 实例服务器:
nohup bash $IGNITE_HOME/bin/ignite.sh -v \
-server \
-J-DIGNITE_DIRECT_IO_ENABLED=true \
-J- DIGNITE_WAIT_FOR_BACKUPS_ON_SHUTDOWN=true \
-J-Xloggc:/data2/ignite/work/log/GClog.txt \
-J-Xms4g \
-J-Xmx4g \
-J-XX:+AlwaysPreTouch \
-J-XX:+UseG1GC \
-J-XX:+ScavengeBeforeFullGC \
-J-XX:MaxGCPauseMillis=200 \
-J-XX:InitiatingHeapOccupancyPercent=45 \
-J-XX:+UseCompressedOops \
-J-XX:ParallelGCThreads=4 \
-J-XX:ConcGCThreads=2 \
-J-Djava.net.preferIPv4Stack=true \
/data1/ignite/config/default-config.xml 1> /home/oracle/ignite.log 2>&1 &
以及将数据从 spark 数据帧加载到缓存的应用程序的关键片段代码:
val CONFIG = "default-config.xml"
sqlDF.write.format(IgniteDataFrameSettings.FORMAT_IGNITE)
.mode(SaveMode.Append)
.option(IgniteDataFrameSettings.OPTION_CONFIG_FILE, CONFIG)
.option(IgniteDataFrameSettings.OPTION_TABLE, "READINGS_202204")
//.option(IgniteDataFrameSettings.OPTION_STREAMER_ALLOW_OVERWRITE, true)
//.option(IgniteDataFrameSettings.OPTION_STREAMER_FLUSH_FREQUENCY, 30000)
//.option(IgniteDataFrameSettings.OPTION_STREAMER_PER_NODE_PARALLEL_OPERATIONS, 8)
//.option(IgniteDataFrameSettings.OPTION_STREAMER_PER_NODE_BUFFER_SIZE, 4096)
.option(IgniteDataFrameSettings.OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS, "usage_point_code,reading_time_stamp,reading_time_stamp_tz,inbound_file_name")
.option(IgniteDataFrameSettings.OPTION_CREATE_TABLE_PARAMETERS, "template=partitioned,backups=0,CACHE_NAME=READINGS_202204")
.save()
目前我必须使用 UNION ALL 创建 SQL,这看起来不太好,例如:
select * from (select * from READINGS_202201
union all
select * from READINGS_202202
union all
select * from READINGS_202203
union all
select * from READINGS_202204
union all
select * from READINGS_202205
union all
select * from READINGS_202206
union all
select * from READINGS_202207
union all
select * from READINGS_202208
union all
select * from READINGS_202209
union all
select * from READINGS_202210
union all
select * from READINGS_202211
union all
select * from READINGS_202212)
where usage_point_code='590243874018422425'
欢迎任何答案
最好的问候 大流士
我尝试使用不同的设置将数据加载到单个缓存中。每当加载 200-3 亿条记录时,加载更多记录的速度就会显着降低。我希望能够使用三个工作节点将 5 或 200 亿条记录加载到一个缓存中。每个节点都有(96GB RAM,16 核,6 x 900 GB HDD,网卡 1Gb/s)