我正在尝试按照文档中的建议将 PostgreSQL CopyManager copyIn 功能与
COPY FROM STDIN
一起使用,以非常快速地从 InputStream 复制到数据库表中。我正在考虑使用它来连续流式传输当我接收/处理一个表时要写入表的行。然而,下面的快速而肮脏的示例代码似乎卡在 copyIn
上并且不写入表。
有人知道我在这里缺少什么或者我的理解是否错误?
import java.sql.*;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.io.BufferedWriter;
import java.io.OutputStreamWriter;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import org.postgresql.core.BaseConnection;
import org.postgresql.copy.CopyManager;
public class PGConnectTest {
public static void main(String[] args) {
try {
try (Connection connection = DriverManager.getConnection("jdbc:postgresql://XX.XX.XX.XX:9432/somedb", "someadmin", "somepassword");
BaseConnection pgcon = (BaseConnection)connection;
PipedInputStream is = new PipedInputStream();
BufferedReader br = new BufferedReader(new InputStreamReader(is));
PipedOutputStream os = new PipedOutputStream(is);
BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(os));) {
ExecutorService executorService = Executors.newSingleThreadExecutor();
Callable callable = () -> {
Thread.sleep(3000);
String frmtStr = "%s\t{\"id\":%s, \"somefield\":\"%s\"}\n";
String row = null;
for(int i=1; i<10; i++) {
row = String.format(frmtStr, i, i, ("row"+i));
System.out.print(row);
bw.write(row);
}
bw.write("\n");
bw.flush();
System.out.println("WRITTEN!");
return true;
};
executorService.submit(callable);
System.out.println(connection);
CopyManager copyManager = new CopyManager(pgcon);
String copySql = "COPY dcm.testtbl FROM STDIN";
executorService.submit(() -> copyManager.copyIn(copySql, br));
Thread.sleep(10000);
System.out.println("QUITTING");
} catch (Exception e) {
throw e;
}
} catch(Exception ex) {
System.out.println(ex);
}
}
}
表
testtbl
的架构如下,
create table testtbl (
id integer primary key,
jsnclm jsonb
)
控制台输出是(它不会返回,需要使用 CTRL+C 来杀死它),
C:\Users\ml410408\Documents\Useful Lookups\POSTGRESQL>java -cp ".;postgresql-42.2.18.jar" PGConnectTest
org.postgresql.jdbc.PgConnection@41975e01
1 {"id":1, "somefield":"row1"}
2 {"id":2, "somefield":"row2"}
3 {"id":3, "somefield":"row3"}
4 {"id":4, "somefield":"row4"}
5 {"id":5, "somefield":"row5"}
6 {"id":6, "somefield":"row6"}
7 {"id":7, "somefield":"row7"}
8 {"id":8, "somefield":"row8"}
9 {"id":9, "somefield":"row9"}
WRITTEN!
QUITTING
更新:
一旦我将
COPY
sql 命令的格式从默认的 TEXT 更改为 CSV 并传入 csv 记录,它就不再卡住,但什么也不做(意味着表中没有记录),即使它返回与以前不同。
import java.sql.*;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.io.BufferedWriter;
import java.io.OutputStreamWriter;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import org.postgresql.core.BaseConnection;
import org.postgresql.copy.CopyManager;
public class PGConnectTest {
public static void main(String[] args) {
try {
try (Connection connection = DriverManager.getConnection("jdbc:postgresql://XX.XX.XX.XX:9432/somedb", "someadmin", "somepassword");
BaseConnection pgcon = (BaseConnection)connection;
PipedInputStream is = new PipedInputStream();
BufferedReader br = new BufferedReader(new InputStreamReader(is));
PipedOutputStream os = new PipedOutputStream(is);
BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(os));) {
ExecutorService executorService = Executors.newSingleThreadExecutor();
Callable callable = () -> {
Thread.sleep(3000);
String frmtStr = "%s,'{\"id\":%s,\"somefield\":\"%s\"}'\n";
String row = null;
for(int i=1; i<10; i++) {
row = String.format(frmtStr, i, i, ("row"+i));
System.out.print(row);
bw.write(row);
}
bw.write("\n");
bw.write("'\\.'\n");
System.out.println("'\\.'\n");
bw.flush();
os.flush();
System.out.println("WRITTEN!");
return true;
};
executorService.submit(callable);
System.out.println(connection);
CopyManager copyManager = new CopyManager(pgcon);
String copySql = "COPY dcm.testtbl FROM STDIN FORMAT CSV DELIMITER ','";
executorService.submit(() -> copyManager.copyIn(copySql, br));
Thread.sleep(5000);
System.out.println(br.ready());
while (br.ready()) {
System.out.println("LINE : " + br.readLine());
}
executorService.shutdown();
System.out.println("QUITTING");
} catch (Exception e) {
throw e;
}
System.out.println("QUITTING FINALLY");
} catch(Exception ex) {
System.out.println(ex);
}
}
}
谢谢
那里似乎有几个不同的问题。
ExecutorService
中的线程使其保持活动状态;提交任务后调用 shutdown()
会导致任务按预期终止。copyIn()
抛出异常:流中的尾随换行符 (bw.write("\n")
) 会触发 ERROR: invalid input syntax for integer: ""
,因为它无法找到 id
列。即便如此,由于资源清理的时间安排,看起来仍然会受到一些竞争条件的影响。
copyIn()
调用将阻塞,直到到达 InputStream
的末尾,对于 PipedInputStream
的情况,“结束”是 PipedOutputStream
关闭的点。但是在流关闭并且 copyIn()
调用解除阻塞之后,输入流和数据库连接会快速连续关闭,可能在副本有机会完成之前。充其量,它似乎成功提交到表,但随后出现错误,并显示“取消复制操作时数据库连接失败”。
为了确保这些资源在仍在使用时不会被释放:
OutputStream
InputStream
/ Connection
等待任务完成的另一个好处是将任何异常传播到主线程。
由于
newSingleThreadExecutor()
,还存在潜在的死锁:如果写入器线程填充了管道的缓冲区,它将阻塞,直到读取器开始消耗数据,如果它们按顺序执行,则永远不会发生这种情况。使用 newFixedThreadPool(2)
应该可以解决这个问题。
考虑到这一切:
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(2);
try {
try (Connection connection = DriverManager.getConnection("jdbc:postgresql://XX.XX.XX.XX:9432/somedb", "someadmin", "somepassword");
BaseConnection pgcon = (BaseConnection) connection;
PipedInputStream is = new PipedInputStream();
BufferedReader br = new BufferedReader(new InputStreamReader(is));
) {
Future write;
Future copy;
try (
PipedOutputStream os = new PipedOutputStream(is);
BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(os))) {
write = executorService.submit(() -> {
String frmtStr = "%s\t{\"id\":%s, \"somefield\":\"%s\"}\n";
String row = null;
for (int i = 1; i < 1000; i++) {
row = String.format(frmtStr, i, i, ("row" + i));
System.out.print(row);
bw.write(row);
}
bw.flush();
System.out.println("WRITTEN!");
return true;
});
System.out.println(connection);
CopyManager copyManager = new CopyManager(pgcon);
String copySql = "COPY dcm.testtbl FROM STDIN";
copy = executorService.submit(() -> copyManager.copyIn(copySql, br));
System.out.println("QUITTING");
write.get();
}
copy.get();
}
} catch (Exception ex) {
System.out.println(ex);
} finally {
executorService.shutdown();
}
}
万一其他人偶然发现这篇文章有类似的问题:我的 copyIn() 被卡住了,因为我之前在同一个表和同一个(Spring Boot)事务中执行了 TRUNCATE。使用 DELETE 为我解决了这个问题。