使用Java中的任务执行程序将一千万个记录插入DB中

问题描述 投票:1回答:1

我的要求是加密表的个人标识列。为此,我编写了一个小代码,可以分批选择数据并将其插入到新表中,而很少有额外的列。

问题是当我运行我的代码时,它在开始时效果很好,但停止在db中插入记录。它也不会打印任何异常。

这是我的连接池配置。

private BlockingQueue<EntityManager>  getConnectionPool(int poolSize) throws InterruptedException {
   // List<EntityManager> list = new ArrayList<>();
    BlockingQueue<EntityManager> queue = new ArrayBlockingQueue<>(poolSize);
    int i = poolSize;
    do
    {


        EntityManager entityManager = connectionService.getEm();
        queue.put(entityManager);
        //list.add(entityManager);
        i--;
    }
    while (i != 0);

    return queue;
}

这是所有内容的起点。它计算批处理的总数,并为执行程序服务调用一种方法。

public void insertData() throws InterruptedException {

    key = hash(key);
    EntityManager entityManager = connectionService.getEm();
    EntityTransaction entityTransaction = entityManager.getTransaction();
    BlockingQueue<EntityManager> queue = getConnectionPool( 200);
    try {
        int batchSize= 1000;
        BigInteger totalResults = partnerRepository.getCountCustomerLedger(entityManager);

        double totalPages = Math.ceil(totalResults.longValue() / batchSize);

        int maxResult = batchSize;
        CountDownLatch latch = new CountDownLatch(((Double)totalPages).intValue());
        for(int i =1 ; i <= totalPages; i++) {

            int firstResult = (i - 1) * batchSize;
            if (i == totalPages)
            {
                batchSize = totalResults.intValue() - firstResult;

            }
            exectueTask(queue, firstResult, batchSize, latch, i);

        }
        System.out.println("waiting for latch to finish");
        latch.await();
        System.out.println("latch exited");
    }catch (Exception e) {
        e.printStackTrace();
        if (entityTransaction.isActive()) {
            entityTransaction.rollback();
        }
        entityManager.close();
    }
    finally {
        int i = poolSize;
        do
        {
            queue.take().close();
            i--;
        }
        while (i != 0);
    }
    entityManager.close();

}

这将调用执行程序方法

private void exectueTask(BlockingQueue<EntityManager> queue, int firstResult, int batchSize, CountDownLatch latch, int batchNumber) {
    taskExecutor.execute(() -> {
        try {
            try {
                run(queue, firstResult, batchSize, latch, batchNumber);
            } catch (IOException e) {
                e.printStackTrace();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    });
}

这里我批量执行查询并将数据插入db

private void run(BlockingQueue<EntityManager> queue, int firstResult, int batchSize, CountDownLatch latch, int batchNumber) throws InterruptedException, IOException {

        logger.info("batchNumber " + batchNumber + " batchNumber called " + " at " + new Date());
        EntityManager entityManager = queue.take();
        logger.info("batchNumber " + batchNumber + " batchNumber took  " + " following time to get entitymanager " + new Date());
        EntityTransaction entityTransaction = entityManager.getTransaction();

        List<CustomerLedger> customerLedgerList = partnerRepository.getAllCustomerLedger(entityManager,firstResult, batchSize);
        //List<Object[]> customerLedgerList = partnerRepository.getAllCustomerLedgerNative(entityManager,firstResult, batchSize);

        entityTransaction.begin();
        for (CustomerLedger old :customerLedgerList) {
            CustomerLedgerNew ledgerNew = new CustomerLedgerNew();
            String customerLedgerJson = objectMapper.writeValueAsString(old);
            ledgerNew = customerLedgerToCustomerLedgerNew(customerLedgerJson);

            ledgerNew.setFirstName(convertToDatabaseColumn(old.getFirstName(),key));
            ledgerNew.setMiddleName(convertToDatabaseColumn(old.getMiddleName(),key));
            ledgerNew.setLastName(convertToDatabaseColumn(old.getLastName(),key));
            ledgerNew.setAddressLine1(convertToDatabaseColumn(old.getAddressLine1(),key));
            ledgerNew.setAddressLine2(convertToDatabaseColumn(old.getAddressLine2(),key));
            ledgerNew.setAddressLine3(convertToDatabaseColumn(old.getAddressLine3(),key));
            ledgerNew.setAddressLine4(convertToDatabaseColumn(old.getAddressLine4(),key));
            ledgerNew.setHomePhone(convertToDatabaseColumn(old.getHomePhone(),key));
            ledgerNew.setWorkPhone(convertToDatabaseColumn(old.getWorkPhone(),key));
            ledgerNew.setEmail1(convertToDatabaseColumn(old.getEmail1(),key));
            ledgerNew.setMobile(convertToDatabaseColumn(old.getMobile(),key));
            ledgerNew.setMobileSha(sha256Hash(old.getMobile()));
            ledgerNew.setMobileChecksum(getMD5Hash(old.getMobile()));
            ledgerNew.setEmailSha(sha256Hash(old.getEmail1()));
            ledgerNew.setEmailChecksum(getMD5Hash(old.getEmail1()));
            //ledgerNew.setChannel(old.getChannel());
            //ledgerNew.setUniqueCustomerId(old.getUniqueCustomerId());
            //ledgerNew.setLastModifiedDate(old.getLastModifiedDate());
            entityManager.persist(ledgerNew);

        }
        //System.out.println("commited");
        logger.info("batchNumber " + batchNumber + " batchNumber started commiting data at   "  + new Date());
        entityTransaction.commit();
    logger.info("batchNumber " + batchNumber + " batchNumber finished commiting data at   "  + new Date());
        queue.put(entityManager);
    latch.countDown();
    logger.info("batchNumber " + batchNumber + " latch count   "  + latch.getCount());
}

我从日志中注意到的一点,仅打印日志

** batchNumber 615 batchNumber在IST 201 **周三12月11日17:22:54开始提交数据,但不打印下一行提交数据的日志。我真的无法理解这个原因。

线程池配置类

@Configuration
public class ThreadPoolConfiguration {

private final static org.slf4j.Logger LOGGER = org.slf4j.LoggerFactory.getLogger(ThreadPoolConfiguration.class);
private final int defaultCorePoolSize = 200;
private final int defaultMaxPoolSize = 300;
private final int defaultQueueCapacity = 20000;
private final int defaultKeepAlive = 10;


@Bean
@Qualifier("TaskExecutor")
public TaskExecutor taskExecutor() {
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setCorePoolSize(defaultCorePoolSize);
    executor.setMaxPoolSize(defaultMaxPoolSize);
    executor.setQueueCapacity(defaultQueueCapacity);
    executor.setKeepAliveSeconds(defaultKeepAlive);
    executor.setAllowCoreThreadTimeOut(true);
    executor.setWaitForTasksToCompleteOnShutdown(true);
    executor.setThreadNamePrefix("encryption-DEFAULT-");
    executor.initialize();
    return executor;
}
}

如果无法正确构图,请原谅我

我的要求是加密表的个人标识列。为此,我编写了一个小代码,该代码分批选择数据并将其插入具有很少额外列的新表中。 ...

java mysql spring java-threads blockingqueue
1个回答
0
投票

您的问题可能有多种来源:* MySQL表锁定死锁* MySQL用尽连接(检查MySQL日志)*由于EntityManager缓冲区过多而导致内存不足的情况* EntityManager中的死锁

© www.soinside.com 2019 - 2024. All rights reserved.