我有一个特定的使用案例,我需要能够转换我在一个.InputStream上收到的数据,并存储在一个StringBuilder中。
我不想转换为字符串和从字符串,因为我已经有了StringBuilder(但如果它是相同的有一个字符串与内存中的全部内容,无论哪种方式,然后我可以只改变他们的字符串).我不明白的是,当我创建不同的传输类型之间的他们在下面的类测试,我莫名其妙地死锁,即使我已经使每个传输类是单独的线程(作为 Callable<void>
)所以我很困惑,为什么会发生这种情况,我甚至试过用RunnbaleThread替换,并进行 start()
, join()
还是同样的问题,我想这是一些编码逻辑错误,我似乎看不到。
import java.io.*;
import java.util.concurrent.*;
public class Test {
private static String create() {
StringBuilder sb = new StringBuilder();
for (int i=0; i<100; i++)
sb.append(i+".123401234").append(System.lineSeparator());
return sb.toString();
}
public static void main(String[] args) throws Exception {
ExecutorService executor = Executors.newFixedThreadPool(10);
ByteArrayInputStream initial_in = new ByteArrayInputStream(create().getBytes()); // data
PipedInputStream in = new PipedInputStream(); // intermediary pipe
PipedOutputStream out = new PipedOutputStream(in); // connect OutputStream to pipe InputStream
StringBuilder final_out = new StringBuilder();
NativeToNativeTransfer ntnt = new NativeToNativeTransfer(initial_in, out); // InputStream to OutputStream
NativeToCustomTransfer ntct = new NativeToCustomTransfer(in, final_out); // InputStream to StringBuilder
Future<Void> f1 = executor.submit(ntnt);
Future<Void> f2 = executor.submit(ntct);
f1.get(); f2.get(); // deadlock here ?
System.out.println(final_out);
System.out.println("Done");
}
public static class StreamTransfer implements Callable<Void> {
public static final int BUFFER_SIZE = 1024;
private InputStream in;
private OutputStream out;
public StreamTransfer(InputStream in, OutputStream out) {
this.in = in;
this.out = out;
}
@Override
public Void call() throws Exception {
BufferedInputStream bis = new BufferedInputStream(in);
BufferedOutputStream bos = new BufferedOutputStream(out);
byte[] buffer = new byte[BUFFER_SIZE];
while (bis.read(buffer) != -1)
bos.write(buffer, 0, BUFFER_SIZE);
bos.flush();
return null;
}
}
public static class NativeToCustomTransfer implements Callable<Void> {
private InputStream in;
private StringBuilder sb;
public NativeToCustomTransfer(InputStream in, StringBuilder out) {
this.in = in;
sb = out;
}
@Override
public Void call() throws Exception {
BufferedInputStream bis = new BufferedInputStream(in);
byte[] buffer = new byte[StreamTransfer.BUFFER_SIZE];
while (bis.read(buffer) != -1)
sb.append(new String(buffer));
return null;
}
}
public static class CustomToNativeTransfer extends StreamTransfer {
public CustomToNativeTransfer(StringBuilder in, OutputStream out) {
super(new ByteArrayInputStream(in.toString().getBytes()), out);
}
}
public static class NativeToNativeTransfer extends StreamTransfer {
public NativeToNativeTransfer(InputStream in, OutputStream out) {
super(in, out);
}
}
public static class CustomToCustomTransfer {
public CustomToCustomTransfer(StringBuilder in, StringBuilder out) {
in.chars().forEach(out::append);
}
}
}
在修正了DuncG和Holger指出的所有错误后,代码实际上是按计划工作的,而且不再死锁了。
package test;
import java.io.*;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.*;
public class Test {
private static String create() {
StringBuilder sb = new StringBuilder();
for (int i=0; i<100; i++)
sb.append(i+".123401234").append(System.lineSeparator());
return sb.toString();
}
public static void main(String[] args) throws Exception {
ExecutorService executor = Executors.newFixedThreadPool(10);
ByteArrayInputStream initial_in = new ByteArrayInputStream(create().getBytes(StandardCharsets.UTF_8)); // data
PipedInputStream in = new PipedInputStream(); // intermediary pipe
PipedOutputStream out = new PipedOutputStream(in); // connect OutputStream to pipe InputStream
StringBuilder final_out = new StringBuilder();
NativeToNativeTransfer ntnt = new NativeToNativeTransfer(initial_in, out); // InputStream to OutputStream
NativeToCustomTransfer ntct = new NativeToCustomTransfer(in, final_out); // InputStream to StringBuilder
Future<Void> f1 = executor.submit(ntnt);
Future<Void> f2 = executor.submit(ntct);
f1.get(); f2.get(); // no more deadlock
System.out.println(final_out);
System.out.println("Done");
}
public static class NativeToCustomTransfer implements Callable<Void> {
private InputStream in;
private StringBuilder sb;
public NativeToCustomTransfer(InputStream in, StringBuilder out) {
this.in = in;
sb = out;
}
@Override
public Void call() throws Exception {
byte[] buffer = new byte[StreamTransfer.BUFFER_SIZE];
int read = 0;
while ((read = in.read(buffer)) != -1)
sb.append(new String(buffer, 0, read, StandardCharsets.UTF_8));
in.close();
return null;
}
}
public static class CustomToNativeTransfer extends StreamTransfer {
public CustomToNativeTransfer(StringBuilder in, OutputStream out) {
super(new ByteArrayInputStream(in.toString().getBytes()), out);
}
}
public static class NativeToNativeTransfer extends StreamTransfer {
public NativeToNativeTransfer(InputStream in, OutputStream out) {
super(in, out);
}
}
public static class CustomToCustomTransfer {
public CustomToCustomTransfer(StringBuilder in, StringBuilder out) {
in.chars().forEach(out::append);
}
}
public static class StreamTransfer implements Callable<Void> {
public static final int BUFFER_SIZE = 1024;
private InputStream in;
private OutputStream out;
public StreamTransfer(InputStream in, OutputStream out) {
this.in = in;
this.out = out;
}
@Override
public Void call() throws Exception {
byte[] buffer = new byte[BUFFER_SIZE];
int read = 0;
while ((read = in.read(buffer)) != -1)
out.write(buffer, 0, read);
in.close();
out.close();
return null;
}
}
}
1) 你的流没有一个在完成后被关闭,这意味着从管道中读取的数据将等待更多的输入。把flush改成关闭。为了让代码更干净,使用try with resources来保证每次调用()都会关闭inout流。在以后的JRE中(可能是9+?),你只需要在try-with-resources块中引用一个变量就可以让它自动关闭。
try(var try_will_autoclose_this_resource = out)
{
while ((len = in.read(buffer)) != -1)
out.write(buffer, 0, len);
}
2) 你的in.read(buff)的逻辑应该把值存储到本地变量中,这样它就能复制正确的长度。
while ( (len = bis.read(buffer)) != -1) bos.write(buffer, 0, len);
3)你使用new byte[BUFFER_SIZE],所以IO是分块进行的,这意味着你所有的BufferedInput.Output流都是不必要的。
4) 如果你的默认字符集是多字节的,那么使用new String(buffer)可能会失效,因为它可能会导致最终字符的字节集不完整。