我正在使用一个利基数据库。目前在数据备份方面不支持sql文件的导出,只支持dump文件,但是需求需要导出sql文件,所以只能自己查询并写入到sql文件中。
由于代码是商业性的,所以我对代码进行了抽象,造成不便,请见谅。
import lombok.AllArgsConstructor;
import java.io.*;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class ThreadQueryWrite {
@AllArgsConstructor
class MyRunnable implements Runnable{
/**
* database tableName
*/
String tableName;
/**
* The starting primary key of the query range
*/
long startId;
/**
* The ending primary key of the query range
*/
long endId;
/**
* File writing character stream
*/
Writer writer;
@Override
public void run() {
doQuery();
}
private void doQuery() {
// resultSet = select * from tableName where id between startId and endId;
// doWriteFile(resultSet);
}
private void doWriteFile(ResultSet resultSet) {
StringBuilder sb = new StringBuilder("(");
try {
while(resultSet.next()){
// sb.append()
}
synchronized(writer){
writer.write(sb.toString());
}
} catch (SQLException e) {
throw new RuntimeException(e);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
public static void main(String[] args) throws IOException {
// Assuming that the number of rows of database data
// has been obtained is 10000000 rows
int count = 10000000;
String tableName = "test_table";
String filePath = "c:\\test\\test.sql";
ThreadQueryWrite queryWrite = new ThreadQueryWrite();
queryWrite.doExportSql(count, tableName, filePath);
}
private void doExportSql(int count, String tableName, String filePath) throws IOException {
int maxQueryCount = 3000;
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(),
Runtime.getRuntime().availableProcessors() * 2, 5, TimeUnit.MINUTES,
new LinkedBlockingQueue<>());
Writer writer = new OutputStreamWriter(Files.newOutputStream(Paths.get(filePath)),
StandardCharsets.UTF_8);
for (int i = 1; i < count; i += maxQueryCount) {
MyRunnable myRunnable = new MyRunnable(tableName, i, i + maxQueryCount - 1, writer);
threadPoolExecutor.submit(myRunnable);
}
threadPoolExecutor.shutdown();
}
}
因为有些表的数据量很大,我想尽可能节省时间,所以我使用了多线程。每个线程根据主键划分一个范围,并进行异步查询。
我想保证写入文件时主键id是连续的,从1到10000000。
insert into table(id) values
(1),
......,
(3000);
insert into table(id) values
(3001),
......,
(6000);
......
但是,我根据主键id执行分段查询,并不能保证哪个线程会先执行查询。这样可能会导致文件开头的id范围为3001~6000,结尾处的id为1~3000,如下,
insert into table(id) values
(3001),
......,
(6000);
insert into table(id) values
(1),
......,
(3000);
......
我已经尝试过这种方法排序线程按照它们创建/启动的顺序运行
但是这样可能会带来很大的内存压力,因为我把查询数据全部加载到内存中,然后再一一处理线程的回调结果写入SQL文件。
希望sql文件中的主键id是有序的,按照数据库中自增的顺序。
目前没有解决方案,任何回复将不胜感激。
我们可以在终端操作中强制执行遇到的顺序,并使用流
parallel
和foreachOrdered
并行执行中间操作,如here和here所述
没有找到任何可以将forkjoin池队列更改为优先级队列以使用startId来决定优先级的东西。
import java.io.*;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
public class ThreadQueryWrite {
@AllArgsConstructor
class MyRunnable implements Runnable{
/**
* database tableName
*/
String tableName;
/**
* The starting primary key of the query range
*/
long startId;
public long getStartId() {
return startId;
}
/**
* The ending primary key of the query range
*/
long endId;
/**
* File writing character stream
*/
Writer writer;
@Override
public void run() {
doQuery();
}
private void doQuery() {
// resultSet = select * from tableName where id between startId and endId;
// doWriteFile(resultSet);
}
public StringBuilder queryAndGetResult() {
System.out.println(startId+" Querying for startId ");
// resultSet = select * from tableName where id between startId and endId;
ResultSet resultSet = null;
StringBuilder sb = new StringBuilder(""+startId);
try {
// while(resultSet.next()){
// sb.append()
//}
} catch (Exception e) {
throw new RuntimeException(e);
}
System.out.println(startId+" Query Result ");
return sb;
}
private void doWriteFile(ResultSet resultSet) {
StringBuilder sb = new StringBuilder("(");
try {
while(resultSet.next()){
// sb.append()
}
synchronized(writer){
writer.write(sb.toString());
}
} catch (SQLException e) {
throw new RuntimeException(e);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
public static void main(String[] args) throws IOException {
// Assuming that the number of rows of database data
// has been obtained is 10000000 rows
int count = 100;
String tableName = "test_table";
String filePath = "c:\\test\\test.sql";
ThreadQueryWrite queryWrite = new ThreadQueryWrite();
queryWrite.doExportSql(count, tableName, filePath);
}
private void doExportSql(int count, String tableName, String filePath) throws IOException {
int maxQueryCount = 3;//for increasing readibility of output
Writer writer = new OutputStreamWriter(Files.newOutputStream(Paths.get(filePath)),
StandardCharsets.UTF_8);
ArrayList<MyRunnable> myRunnables = new ArrayList<>();
for (int i = 1; i < count; i += maxQueryCount) {
MyRunnable myRunnable = new MyRunnable(tableName, i, i + maxQueryCount - 1, writer);
myRunnables.add(myRunnable);
}
myRunnables.stream()
.parallel()
.map(myRunnable -> myRunnable.queryAndGetResult())
.forEachOrdered(resultSetString-> writeToFile(writer, resultSetString));
}
private static void writeToFile(Writer writer, StringBuilder resultSetString) {
try {
System.out.println(resultSetString+" File out");
writer.write(resultSetString.toString());
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}