将输入流转换为StringBuilder,将StringBuilder转换为输出流。

问题描述 投票:0回答:1

我有一个特定的使用案例,我需要能够转换我在一个.InputStream上收到的数据,并存储在一个StringBuilder中。

  • 输入流并存储在一个StringBuilder
  • StringBuilder并写入输出流

我不想转换为字符串和从字符串,因为我已经有了StringBuilder(但如果它是相同的有一个字符串与内存中的全部内容,无论哪种方式,然后我可以只改变他们的字符串).我不明白的是,当我创建不同的传输类型之间的他们在下面的类测试,我莫名其妙地死锁,即使我已经使每个传输类是单独的线程(作为 Callable<void>)所以我很困惑,为什么会发生这种情况,我甚至试过用RunnbaleThread替换,并进行 start(), join() 还是同样的问题,我想这是一些编码逻辑错误,我似乎看不到。

import java.io.*;
import java.util.concurrent.*;

public class Test {
    
    private static String create() {
        StringBuilder sb = new StringBuilder();
        for (int i=0; i<100; i++)
            sb.append(i+".123401234").append(System.lineSeparator());
        return sb.toString();
    }
    
    public static void main(String[] args) throws Exception {
        ExecutorService executor = Executors.newFixedThreadPool(10);
        ByteArrayInputStream initial_in = new ByteArrayInputStream(create().getBytes());    // data
        PipedInputStream in = new PipedInputStream();                               // intermediary pipe
        PipedOutputStream out = new PipedOutputStream(in);                          // connect OutputStream to pipe InputStream 
        StringBuilder final_out = new StringBuilder();
        NativeToNativeTransfer ntnt = new NativeToNativeTransfer(initial_in, out); // InputStream to OutputStream
        NativeToCustomTransfer ntct = new NativeToCustomTransfer(in, final_out);   // InputStream to StringBuilder 
        Future<Void> f1 = executor.submit(ntnt);
        Future<Void> f2 = executor.submit(ntct);
        f1.get(); f2.get();     // deadlock here ? 
        System.out.println(final_out);
        System.out.println("Done");
    }

    public static class StreamTransfer implements Callable<Void> {
        public static final int BUFFER_SIZE = 1024;
        private InputStream in;
        private OutputStream out;
        
        public StreamTransfer(InputStream in, OutputStream out) {
            this.in = in;
            this.out = out;
        }
        
        @Override
        public Void call() throws Exception {
            BufferedInputStream bis = new BufferedInputStream(in);
            BufferedOutputStream bos = new BufferedOutputStream(out);
            byte[] buffer = new byte[BUFFER_SIZE];
            while (bis.read(buffer) != -1) 
                bos.write(buffer, 0, BUFFER_SIZE);
            bos.flush();
            return null;
        }
    }

    public static class NativeToCustomTransfer implements Callable<Void> {
        private InputStream in;
        private StringBuilder sb;
        
        public NativeToCustomTransfer(InputStream in, StringBuilder out) {
            this.in = in;
            sb = out;
        }
        
        @Override
        public Void call() throws Exception {
            BufferedInputStream bis = new BufferedInputStream(in);
            byte[] buffer = new byte[StreamTransfer.BUFFER_SIZE];
            while (bis.read(buffer) != -1) 
                sb.append(new String(buffer));
            return null;
        }
    }
    
    public static class CustomToNativeTransfer extends StreamTransfer { 
        public CustomToNativeTransfer(StringBuilder in, OutputStream out) {
            super(new ByteArrayInputStream(in.toString().getBytes()), out);
        }
    }
    
    public static class NativeToNativeTransfer extends StreamTransfer {
        public NativeToNativeTransfer(InputStream in, OutputStream out) {
            super(in, out);
        }
    }
    
    public static class CustomToCustomTransfer {
        public CustomToCustomTransfer(StringBuilder in, StringBuilder out) {
            in.chars().forEach(out::append);
        } 
    }
}

编辑:在修复了DuncG和Holger指出的所有错误后,代码实际上可以按计划工作,而不是像以前一样。

在修正了DuncG和Holger指出的所有错误后,代码实际上是按计划工作的,而且不再死锁了。

package test;

import java.io.*;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.*;

public class Test {
    
    private static String create() {
        StringBuilder sb = new StringBuilder();
        for (int i=0; i<100; i++)
            sb.append(i+".123401234").append(System.lineSeparator());
        return sb.toString();
    }
    
    public static void main(String[] args) throws Exception {
        ExecutorService executor = Executors.newFixedThreadPool(10);
        ByteArrayInputStream initial_in = new ByteArrayInputStream(create().getBytes(StandardCharsets.UTF_8));  // data
        PipedInputStream in = new PipedInputStream();                               // intermediary pipe
        PipedOutputStream out = new PipedOutputStream(in);                          // connect OutputStream to pipe InputStream 
        StringBuilder final_out = new StringBuilder();
        NativeToNativeTransfer ntnt = new NativeToNativeTransfer(initial_in, out); // InputStream to OutputStream
        NativeToCustomTransfer ntct = new NativeToCustomTransfer(in, final_out);   // InputStream to StringBuilder 
        Future<Void> f1 = executor.submit(ntnt);
        Future<Void> f2 = executor.submit(ntct);
        f1.get(); f2.get();     // no more deadlock
        System.out.println(final_out);
        System.out.println("Done");
    }
    
    public static class NativeToCustomTransfer implements Callable<Void> {
        private InputStream in;
        private StringBuilder sb;
        
        public NativeToCustomTransfer(InputStream in, StringBuilder out) {
            this.in = in;
            sb = out;
        }
        
        @Override
        public Void call() throws Exception {
            byte[] buffer = new byte[StreamTransfer.BUFFER_SIZE];
            int read = 0;
            while ((read = in.read(buffer)) != -1) 
                sb.append(new String(buffer, 0, read, StandardCharsets.UTF_8));
            in.close();
            return null;
        }
    }
    
    public static class CustomToNativeTransfer extends StreamTransfer { 
        public CustomToNativeTransfer(StringBuilder in, OutputStream out) {
            super(new ByteArrayInputStream(in.toString().getBytes()), out);
        }
    }
    
    public static class NativeToNativeTransfer extends StreamTransfer {
        public NativeToNativeTransfer(InputStream in, OutputStream out) {
            super(in, out);
        }
    }
    
    public static class CustomToCustomTransfer {
        public CustomToCustomTransfer(StringBuilder in, StringBuilder out) {
            in.chars().forEach(out::append);
        } 
    }
    
    public static class StreamTransfer implements Callable<Void> {
        public static final int BUFFER_SIZE = 1024;
        private InputStream in;
        private OutputStream out;
        
        public StreamTransfer(InputStream in, OutputStream out) {
            this.in = in;
            this.out = out;
        }
        
        @Override
        public Void call() throws Exception {
            byte[] buffer = new byte[BUFFER_SIZE];
            int read = 0;
            while ((read = in.read(buffer)) != -1) 
                out.write(buffer, 0, read);
            in.close();
            out.close();
            return null;
        }
    }
}
java concurrency inputstream stringbuilder outputstream
1个回答
2
投票

1) 你的流没有一个在完成后被关闭,这意味着从管道中读取的数据将等待更多的输入。把flush改成关闭。为了让代码更干净,使用try with resources来保证每次调用()都会关闭inout流。在以后的JRE中(可能是9+?),你只需要在try-with-resources块中引用一个变量就可以让它自动关闭。

try(var try_will_autoclose_this_resource = out)
{
    while ((len = in.read(buffer)) != -1)
        out.write(buffer, 0, len);
}

2) 你的in.read(buff)的逻辑应该把值存储到本地变量中,这样它就能复制正确的长度。

while ( (len = bis.read(buffer)) != -1) bos.write(buffer, 0, len); 

3)你使用new byte[BUFFER_SIZE],所以IO是分块进行的,这意味着你所有的BufferedInput.Output流都是不必要的。

4) 如果你的默认字符集是多字节的,那么使用new String(buffer)可能会失效,因为它可能会导致最终字符的字节集不完整。

© www.soinside.com 2019 - 2024. All rights reserved.