我有一个方案,包括以下步骤:
我想问一些关于如何正确处理第3点的建议。我认为向不同的线程提供相同的连接以使并行写入目标表是一个坏主意。我的一般想法是为每个目标表生成一个新线程(在本例中为6)并为每个表创建一个不同的jdbc连接,因此理论上每个写操作可以并行完成并且彼此独立。
这会有用吗?对其他/更好方法的建议?
我的一般想法是为每个目标表生成一个新线程(在本例中为6)并为每个表创建一个不同的jdbc连接,因此理论上每个写操作可以并行完成并且彼此独立。
当然听起来对我来说是个好计划。我会使用连接池,如HikariCP或DBCP来维护与数据库服务器的多个连接。然后,您可以添加多个线程,每个线程都可以请求连接,然后将其返回到池中以便稍后使用。
这会有用吗?对其他/更好方法的建议?
它会工作。需要考虑的一件事是6可能不是正确的数字。您的服务器可能没有足够的带宽来同时处理那么多数据,因此您可能需要考虑减少池中的线程数,直到找到可以提供最大带宽的最佳数量。也就是说,如果有6个表,则6可能确实是正确的数字,具体取决于数据在服务器上的分区方式。
根据您对线程的了解程度,您应该查看docs on thread pooling。
我已经实现了以下使用BlockingQueue和ExecutorService使用生产者/消费者模式的解决方案。主线程(生产者)为每个工作线程(消费者)实例化一个BlockingQueue,并在生成所有数据时向工作线程发信号通知一个布尔的volatile变量“终止”,它们应该终止执行(从while循环中转出,清空队列并在jdbc连接上写入剩余数据)。生产者使用两个BlockingQueue blockingQueue1和blockingQueue2为每个线程生成不同的数据。
这是简化的MainThreadProducer,它只为两个工作线程生成整数数据:
// MainThreadProducer.java
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
public class MainThreadProducer {
public static Logger logger = LogManager.getLogger(MainThreadProducer.class);
public final static BlockingQueue<Integer> blockingQueue1 = new LinkedBlockingDeque<>(100);
public final static BlockingQueue<Integer> blockingQueue2 = new LinkedBlockingDeque<>(100);
/* signal to the worker threads that all data has been generated */
public static volatile boolean terminated = false;
private void run () {
try {
ExecutorService executor = Executors.newFixedThreadPool(2);
Future<Integer> future1 = executor.submit(new WorkerThreadConsumer("1"));
Future<Integer> future2 = executor.submit(new WorkerThreadConsumer("2"));
for (int i = 0; i < 10023; ++i) {
blockingQueue1.put(i);
blockingQueue2.put(i*2);
}
executor.shutdown();
terminated = true;
int res1 = future1.get();
int res2 = future1.get();
logger.info("Total rows written (thread 1): " + res1);
logger.info("Total rows written (thread 2): " + res2);
}
catch (Exception e) {
e.printStackTrace();
System.exit(1);
}
}
public static void main(String[] args) {
MainThreadProducer instance = new MainThreadProducer();
instance.run();
}
}
这是WorkerThreadConsumer.java类。对于此测试,我创建了两个线程,它们将分别写入表TARGET_1和TARGET_2上的数据库DBTEST。每个线程都使用特定的String类型(1和2)进行实例化,因此它可以知道从哪个BlockingQueue读取数据。
// WorkerThreadConsumer.java
import java.sql.PreparedStatement;
import com.microsoft.sqlserver.jdbc.SQLServerResultSet;
import java.sql.*;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import Configuration;
public class WorkerThreadConsumer implements Callable<Integer> {
private String type;
public WorkerThreadConsumer (String type) {
this.type = type;
}
@Override
public Integer call() {
String TAG = "[THREAD_" + Thread.currentThread().getId() + "]";
int processed = 0; // number of rows currently processed
int batchSize = 100; // size of the batch we write to the server with the PreparedStatement
try {
// load jdbc driver
Class.forName("com.microsoft.sqlserver.jdbc.SQLServerDriver");
MainThreadProducer.logger.info(TAG + "\tLoaded com.microsoft.sqlserver.jdbc.SQLServerDriver");
String stub = String.format("INSERT INTO DBTEST.dbo.TARGET_%s (id) VALUES (?);", this.type);
BlockingQueue<Integer> queue;
switch (this.type) {
case "1":
queue = MainThreadProducer.blockingQueue1;
break;
case "2":
queue = MainThreadProducer.blockingQueue2;
break;
default:
queue = MainThreadProducer.blockingQueue1;
}
try (Connection connection = DriverManager.getConnection(Configuration.DWH_DB_CONNECTION_URL);
PreparedStatement stmt = connection.prepareStatement(stub);) {
connection.setAutoCommit(false);
while (!MainThreadProducer.terminated) {
int data = queue.take();
stmt.setInt(1, data);
stmt.addBatch();
processed += 1;
if (processed % batchSize == 0) {
int[] result = stmt.executeBatch();
connection.commit();
MainThreadProducer.logger.info(TAG + "\tWritten rows count: " + result.length);
}
}
// empty queue and write
while (!queue.isEmpty()) {
int data = queue.take();
stmt.setInt(1, data);
stmt.addBatch();
processed += 1;
if (processed % batchSize == 0) {
int[] result = stmt.executeBatch();
connection.commit();
MainThreadProducer.logger.info(TAG + "\tWritten rows count: " + result.length);
}
}
// last write in case queue size > batch size
int[] result = stmt.executeBatch();
connection.commit();
MainThreadProducer.logger.info(TAG + "\tWritten rows count: " + result.length);
}
}
catch (Exception e) {
e.printStackTrace();
System.exit(1);
}
return processed;
}
}
解决方案似乎有效。如果您发现潜在问题,请与我们联系。