Java中如何让线程按照创建的顺序执行

问题描述 投票:0回答:1

我正在使用一个利基数据库。目前在数据备份方面不支持sql文件的导出,只支持dump文件,但是需求需要导出sql文件,所以只能自己查询并写入到sql文件中。

由于代码是商业性的,所以我对代码进行了抽象,造成不便,请见谅。

import lombok.AllArgsConstructor;

import java.io.*;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ThreadQueryWrite {

    @AllArgsConstructor
    class MyRunnable implements Runnable{

        /**
         * database tableName
         */
        String tableName;
        /**
         * The starting primary key of the query range
         */
        long startId;
        /**
         * The ending primary key of the query range
         */
        long endId;

        /**
         * File writing character stream
         */
        Writer writer;



        @Override
        public void run() {
            doQuery();

        }

        private void doQuery() {
            // resultSet = select * from tableName where id between startId and endId;
            // doWriteFile(resultSet);
        }

        private void doWriteFile(ResultSet resultSet) {
            StringBuilder sb = new StringBuilder("(");
            try {
                while(resultSet.next()){
                    // sb.append()
                }
                synchronized(writer){
                    writer.write(sb.toString());
                }
            } catch (SQLException e) {
                throw new RuntimeException(e);
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
    }


    public static void main(String[] args) throws IOException {
        // Assuming that the number of rows of database data
        // has been obtained is 10000000 rows
        int count = 10000000;
        String tableName = "test_table";
        String filePath = "c:\\test\\test.sql";
        ThreadQueryWrite queryWrite = new ThreadQueryWrite();
        queryWrite.doExportSql(count, tableName, filePath);
    }

    private void doExportSql(int count, String tableName, String filePath) throws IOException {
        int maxQueryCount = 3000;

        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(),
                Runtime.getRuntime().availableProcessors() * 2, 5, TimeUnit.MINUTES,
                new LinkedBlockingQueue<>());

        Writer writer = new OutputStreamWriter(Files.newOutputStream(Paths.get(filePath)),
                StandardCharsets.UTF_8);
        for (int i = 1; i < count; i += maxQueryCount) {
            MyRunnable myRunnable = new MyRunnable(tableName, i, i + maxQueryCount - 1, writer);
            threadPoolExecutor.submit(myRunnable);
        }
        threadPoolExecutor.shutdown();
    }

}

因为有些表的数据量很大,我想尽可能节省时间,所以我使用了多线程。每个线程根据主键划分一个范围,并进行异步查询。

我想保证写入文件时主键id是连续的,从1到10000000。

insert into table(id) values
    (1),
    ......,
    (3000);
insert into table(id) values
    (3001),
    ......,
    (6000);
......

但是,我根据主键id执行分段查询,并不能保证哪个线程会先执行查询。这样可能会导致文件开头的id范围为3001~6000,结尾处的id为1~3000,如下,

insert into table(id) values
    (3001),
    ......,
    (6000);
insert into table(id) values
    (1),
    ......,
    (3000);
......

我已经尝试过这种方法排序线程按照它们创建/启动的顺序运行

但是这样可能会带来很大的内存压力,因为我把查询数据全部加载到内存中,然后再一一处理线程的回调结果写入SQL文件。

希望sql文件中的主键id是有序的,按照数据库中自增的顺序。

目前没有解决方案,任何回复将不胜感激。

java multithreading synchronization java-threads
1个回答
0
投票

我们可以在终端操作中强制执行遇到的顺序,并使用流

parallel
foreachOrdered
并行执行中间操作,如herehere所述 没有找到任何可以将forkjoin池队列更改为优先级队列以使用startId来决定优先级的东西。

import java.io.*;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class ThreadQueryWrite {

    @AllArgsConstructor
    class MyRunnable implements Runnable{

        /**
         * database tableName
         */
        String tableName;
        /**
         * The starting primary key of the query range
         */
        long startId;

        public long getStartId() {
            return startId;
        }

        /**
         * The ending primary key of the query range
         */
        long endId;

        /**
         * File writing character stream
         */
        Writer writer;



        @Override
        public void run() {
            doQuery();

        }

        private void doQuery() {
            // resultSet = select * from tableName where id between startId and endId;
            // doWriteFile(resultSet);
        }

        public StringBuilder queryAndGetResult() {
            System.out.println(startId+" Querying for startId ");
            // resultSet = select * from tableName where id between startId and endId;
            ResultSet resultSet = null;
            StringBuilder sb = new StringBuilder(""+startId);
            try {
               // while(resultSet.next()){
                    // sb.append()
                //}

            } catch (Exception e) {
                throw new RuntimeException(e);
            }
            System.out.println(startId+" Query Result ");

            return sb;
        }



        private void doWriteFile(ResultSet resultSet) {
            StringBuilder sb = new StringBuilder("(");
            try {
                while(resultSet.next()){
                    // sb.append()
                }
                synchronized(writer){
                    writer.write(sb.toString());
                }
            } catch (SQLException e) {
                throw new RuntimeException(e);
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
    }


    public static void main(String[] args) throws IOException {
        // Assuming that the number of rows of database data
        // has been obtained is 10000000 rows
        int count = 100;
        String tableName = "test_table";
        String filePath = "c:\\test\\test.sql";
        ThreadQueryWrite queryWrite = new ThreadQueryWrite();
        queryWrite.doExportSql(count, tableName, filePath);
    }

    private void doExportSql(int count, String tableName, String filePath) throws IOException {
        int maxQueryCount = 3;//for increasing readibility of output

        Writer writer = new OutputStreamWriter(Files.newOutputStream(Paths.get(filePath)),
                StandardCharsets.UTF_8);
        ArrayList<MyRunnable> myRunnables = new ArrayList<>();
        for (int i = 1; i < count; i += maxQueryCount) {
            MyRunnable myRunnable = new MyRunnable(tableName, i, i + maxQueryCount - 1, writer);
            myRunnables.add(myRunnable);
        }

        myRunnables.stream()
                .parallel()
                .map(myRunnable -> myRunnable.queryAndGetResult())
                .forEachOrdered(resultSetString-> writeToFile(writer, resultSetString));


    }

    private static void writeToFile(Writer writer, StringBuilder resultSetString) {
        try {
            System.out.println(resultSetString+" File out");
            writer.write(resultSetString.toString());
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

}
© www.soinside.com 2019 - 2024. All rights reserved.