将数十亿条记录加载到 Apache Ignite 的问题

问题描述 投票:0回答:0

在执行 PoC 时,我真的想通过 spark 应用程序从 parquet 启用本机持久性,将大量数据加载到 Apache Ignite。
最近我处理了加载大约 50 亿条记录的测试。最终,将有大约 1000 亿条记录加载到使用更大硬件的生产环境中。
我从这个实验中得出的结论:
加载最初以每秒 150,000 条记录的速度运行。这最初让我很满意。不幸的是,在加载的前几次迭代之后,加载了 2 亿条记录后,评级开始急剧下降到每秒 5,000 条记录。每次后续加载只会变得更长。单次加载迭代包含大约 1 亿条记录。因此,我被迫将数据加载到新的缓存中,例如:READINGS_202204、READINGS_202205 等等。按月统计数据。 我将无法将所有数据加载到一个缓存中。我怀疑原因可能是单个 index.bin 文件未分区,而不是分区的数据文件。 Ignite 增加了在加载后续批次数据期间搜索索引的时间。这是我的猜测。
因此,我不得不将当年的读数加载到我上面写的十二个 READINGS_ 缓存对象中。

问题: 例如,是否可以将 50 亿条记录加载到一个缓存中,以便在加载 2 亿条记录后加载速度不会急剧下降?给出以下三个物理节点。也许我需要更多的物理节点来完成这项任务?

我的集群规格: 3 名工人 每个工人有:16 Core,96 GB RAM,6 x 900GB HDD,网卡 1Gb/s)。是的。我有 HDD 而不是 SDD。

一些重要的设置:

  • 2048个分区
  • 禁用 WAL
  • 启用直接 I/O
  • 检查点页面缓冲区大小 = 4GB
  • 启用本机持久性

我有页面写入限制的问题,必须设置参数“writeThrottlingEnabled”。目前,备份对我来说并不重要。

我为每个节点创建了五个实例 Ignite 服务器。在三个物理工作节点上总共有 15 个 Ignite 服务器实例。

我在下面附上来自一个工作节点的配置文件的相关部分:

<beans>
    <!--
        Alter configuration below as needed.
    -->
    <bean id="grid.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
        <property name="gridName" value="an-ignite-grid"/>
        <property name="localHost" value="192.168.100.11"/>
        <property name="failureDetectionTimeout" value="60000"/>
        <property name="clientFailureDetectionTimeout" value="20000"/>
        <property name="communicationSpi">
            <bean class="org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi">
                <property name="socketWriteTimeout" value="5000"/>
                <property name="localPort" value="48100"/>
                <property name="usePairedConnections" value="true"/>
                <!--<property name="connectionsPerNode" value="6"/>-->
                <!-- <property name="connectionsPerNode" value="10"/>-->
                <!-- <property name="usePairedConnections" value="true"/>-->
                <!-- <property name="socketReceiveBuffer" value="#{64L * 1024}"/>-->
            </bean>
        </property>
        <property name="discoverySpi">
            <bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
                <!-- Initial local port to listen to. -->
                <property name="localPort" value="48500"/>
                <!-- Changing local port range. This is an optional action. -->
                <property name="localPortRange" value="50"/>
                <property name="ipFinder">
                    <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">
                        <property name="addresses">
                            <list>
                                <value>192.168.100.11:48500..48550</value>
                                <value>192.168.100.12:48500..48550</value>
                                <value>192.168.100.13:48500..48550</value>
                            </list>
                        </property>
                    </bean>
                </property>
            </bean>
        </property>
        <property name="workDirectory" value="/data2/ignite/work"/>
        <property name="dataStorageConfiguration">
            <bean class="org.apache.ignite.configuration.DataStorageConfiguration">
                <property name="pageSize" value="#{4096 * 2}"/>
                <property name="walMode" value="NONE"/>
                <property name="walSegmentSize" value="#{128 * 1024 * 1024}"/>
                <property name="writeThrottlingEnabled" value="true"/>
                <property name="checkpointThreads" value="6"/>
                <property name="defaultDataRegionConfiguration">
                    <bean class="org.apache.ignite.configuration.DataRegionConfiguration">
                        <property name="persistenceEnabled" value="true"/>
                        <property name="maxSize" value="#{8L * 1024 * 1024 * 1024}"/>
                        <property name="pageEvictionMode" value="RANDOM_2_LRU"/>
                        <property name="checkpointPageBufferSize" value="#{4L * 1024 * 1024 * 1024}"/>
                    </bean>
                </property>
                <property name="dataRegionConfigurations">
                    <list>
                        <!--
                        Large erea memory region with eviction enabled.
                        -->
                        <bean class="org.apache.ignite.configuration.DataRegionConfiguration">
                            <property name="name" value="Region_Eviction"/>
                            <!-- Memory region of 512 MB initial size. -->
                            <property name="initialSize" value="#{512 * 1024 * 1024}"/>
                            <!-- Maximum size is 8 GB. -->
                            <property name="maxSize" value="#{8L * 1024 * 1024 * 1024}"/>
                            <!-- Enabling eviction for this memory region. -->
                            <!-- <property name="pageEvictionMode" value="RANDOM_2_LRU"/> -->
                            <property name="persistenceEnabled" value="true"/>
                            <!-- Increasing the buffer size to 4 GB. -->
                            <property name="checkpointPageBufferSize" value="#{4L * 1024 * 1024 * 1024}"/>
                        </bean>
                    </list>
                </property>
            </bean>
        </property>
        <property name="cacheConfiguration">
            <list>
                <bean class="org.apache.ignite.configuration.CacheConfiguration">
                    <!--<property name="groupName" value="group1"/>-->
                    <property name="name" value="READINGS_202204"/>
                    <property name="cacheMode" value="PARTITIONED"/>
                    <property name="atomicityMode" value="ATOMIC"/>
                    <property name="sqlSchema" value="PUBLIC"/>
                    <!-- enable disk page compression for this cache -->
                    <property name="diskPageCompression" value="SNAPPY"/>
                    <property name="dataRegionName" value="Region_Eviction"/>
                    <!-- Number of backup copies -->
                    <property name="backups" value="0"/>
                    <property name="partitionLossPolicy" value="READ_WRITE_SAFE"/>
                    <property name="writeThrough" value="false"/>
                    <property name="affinity">
                        <bean class="org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction">
                            <property name="partitions" value="2048"/>
                        </bean>
                    </property>
                    <property name="keyConfiguration">
                        <bean class="org.apache.ignite.cache.CacheKeyConfiguration">
                            <property name="typeName" value="entity.key.AttributeValueKey"/>
                            <property name="affinityKeyFieldName" value="usage_point_code"/>
                        </bean>
                    </property>
                    <property name="queryEntities">
                        <list>
                            <bean class="org.apache.ignite.cache.QueryEntity">
                                <property name="keyType" value="entity.key.AttributeValueKey"/>
                                <property name="valueType" value="READINGS_202204"/>
                                <property name="fields">
                                    <map>
                                        <entry key="balancing_market_party_code" value="java.lang.String"/>
                                        <entry key="customer_code" value="java.lang.String"/>
                                        <entry key="dso_code" value="java.lang.String"/>
                                        <entry key="inbound_file_name" value="java.lang.String"/>
                                        <entry key="inbound_message_orig_id" value="java.lang.String"/>
                                        <entry key="interval_start_day" value="java.sql.Date"/>
                                        <entry key="is_service_delivery_point" value="java.lang.String"/>
                                        <entry key="reading_source_code" value="java.lang.String"/>
                                        <entry key="reading_time_stamp" value="java.sql.Timestamp"/>
                                        <entry key="reading_time_stamp_tz" value="java.lang.String"/>
                                        <entry key="reported_day" value="java.sql.Date"/>
                                        <entry key="reported_date_time" value="java.sql.Timestamp"/>
                                        <entry key="reported_date_time_tz" value="java.lang.String"/>
                                        <entry key="t_creation_date" value="java.sql.Timestamp"/>
                                        <entry key="t_creation_date_tz" value="java.lang.String"/>
                                        <entry key="usage_point_code" value="java.lang.String"/>
                                        <entry key="acquired_date_time" value="java.sql.Timestamp"/>
                                        <entry key="acquired_date_time_tz" value="java.lang.String"/>
                                        <entry key="inbound_message_id" value="java.lang.Long"/>
                                        <entry key="dso_tariff_code" value="java.lang.String"/>
                                        <entry key="forward_active_energy" value="java.lang.Double"/>
                                        <entry key="forward_quality_code" value="java.lang.String"/>
                                        <entry key="reverse_active_energy" value="java.lang.Double"/>
                                        <entry key="reverse_quality_code" value="java.lang.String"/>
                                        <entry key="balance_active_energy" value="java.lang.Double"/>
                                        <entry key="balance_quality_code" value="java.lang.String"/>
                                    </map>
                                </property>
                                <property name="keyFields">
                                    <set>
                                        <value>usage_point_code</value>
                                        <value>reading_time_stamp</value>
                                        <value>reading_time_stamp_tz</value>
                                        <value>inbound_file_name</value>
                                    </set>
                                </property>
                            </bean>
                        </list>
                    </property>
                </bean>
            </list>
        </property>
    </bean>
</beans>

enter image description here

服务器启动后的低 CPU 负载系数尚未加载 我附加到脚本以运行 ignite 实例服务器:

nohup bash $IGNITE_HOME/bin/ignite.sh -v \
-server \
-J-DIGNITE_DIRECT_IO_ENABLED=true \
-J- DIGNITE_WAIT_FOR_BACKUPS_ON_SHUTDOWN=true \
-J-Xloggc:/data2/ignite/work/log/GClog.txt \
-J-Xms4g \
-J-Xmx4g \
-J-XX:+AlwaysPreTouch \
-J-XX:+UseG1GC \
-J-XX:+ScavengeBeforeFullGC \
-J-XX:MaxGCPauseMillis=200 \
-J-XX:InitiatingHeapOccupancyPercent=45 \
-J-XX:+UseCompressedOops \
-J-XX:ParallelGCThreads=4 \
-J-XX:ConcGCThreads=2 \
-J-Djava.net.preferIPv4Stack=true \
/data1/ignite/config/default-config.xml 1> /home/oracle/ignite.log 2>&1 &

以及将数据从 spark 数据帧加载到缓存的应用程序的关键片段代码:

val CONFIG = "default-config.xml"


sqlDF.write.format(IgniteDataFrameSettings.FORMAT_IGNITE)
  .mode(SaveMode.Append)
  .option(IgniteDataFrameSettings.OPTION_CONFIG_FILE, CONFIG)
  .option(IgniteDataFrameSettings.OPTION_TABLE, "READINGS_202204")
  //.option(IgniteDataFrameSettings.OPTION_STREAMER_ALLOW_OVERWRITE, true)
  //.option(IgniteDataFrameSettings.OPTION_STREAMER_FLUSH_FREQUENCY, 30000)
  //.option(IgniteDataFrameSettings.OPTION_STREAMER_PER_NODE_PARALLEL_OPERATIONS, 8)
  //.option(IgniteDataFrameSettings.OPTION_STREAMER_PER_NODE_BUFFER_SIZE, 4096)
  .option(IgniteDataFrameSettings.OPTION_CREATE_TABLE_PRIMARY_KEY_FIELDS, "usage_point_code,reading_time_stamp,reading_time_stamp_tz,inbound_file_name")
  .option(IgniteDataFrameSettings.OPTION_CREATE_TABLE_PARAMETERS, "template=partitioned,backups=0,CACHE_NAME=READINGS_202204")
  .save()

目前我必须使用 UNION ALL 创建 SQL,这看起来不太好,例如:

select * from (select * from READINGS_202201 
               union all 
               select * from READINGS_202202
               union all 
               select * from READINGS_202203 
               union all 
               select * from READINGS_202204 
               union all 
               select * from READINGS_202205 
               union all 
               select * from READINGS_202206
               union all 
               select * from READINGS_202207
               union all 
               select * from READINGS_202208 
               union all 
               select * from READINGS_202209
               union all 
               select * from READINGS_202210
               union all 
               select * from READINGS_202211
               union all 
               select * from READINGS_202212) 
where usage_point_code='590243874018422425'

欢迎任何答案

最好的问候 大流士

我尝试使用不同的设置将数据加载到单个缓存中。每当加载 200-3 亿条记录时,加载更多记录的速度就会显着降低。我希望能够使用三个工作节点将 5 或 200 亿条记录加载到一个缓存中。每个节点都有(96GB RAM,16 核,6 x 900 GB HDD,网卡 1Gb/s)

ignite
© www.soinside.com 2019 - 2024. All rights reserved.