Apache Commons exec PumpStreamHandler 连续输入

问题描述 投票:0回答:3

我正在尝试使用 Apache Commons exec 解决与命令行进程的交互。我被以下代码困住了:

ByteArrayOutputStream out = new ByteArrayOutputStream();
ByteArrayOutputStream ins = new ByteArrayOutputStream();
OutputStreamWriter ow = new OutputStreamWriter(ins);
BufferedWriter writer = new BufferedWriter(ow);
ByteArrayInputStream in = new ByteArrayInputStream(ins.toByteArray());
PumpStreamHandler psh = new PumpStreamHandler(out, null, in);
CommandLine cl = CommandLine.parse(initProcess);
DefaultExecutor exec = new DefaultExecutor();
DefaultExecuteResultHandler resultHandler = new DefaultExecuteResultHandler();
exec.setStreamHandler(psh);
try {
    exec.execute(cl, resultHandler);
    int i = 0;
    while (true) {
        String o = out.toString();
        if (!o.trim().isEmpty()) {
            System.out.println(o);
            out.reset();
        }
        // --- PROBLEM start ---
        if (i == 3) {
            writer.write(internalProcessCommand); 
            // string with or without trailing \n, both tested
            writer.flush();
            writer.close();
            // tested even ins.write(internalProcessCommand.getBytes())
        }
        // --- PROBLEM end ---
        Thread.sleep(3000);
        i++;
    }
} catch (ExecuteException e) {
    System.err.println(e.getMessage());
}

我希望我的代码是清楚的。我在清除流的同时连续读取

out
并在3秒后打印它。问题是输入到
in
传递到
PumpStreamHandler
。我需要从代码本身连续、动态地传递进程命令,就像我通过 CLI 与进程交互一样。当我简单地使用
System.in
作为
PumpStreamHandler
参数时,我可以很好地从控制台编写进程命令。我怎样才能从代码中传递字符串以获得相同的结果?

编辑: 我也尝试连接

PipedInputStream
PipedOutputStream
接收数据,但似乎只有在关闭
PipedOutputStream
后才能读取数据,这使得它不可重用,因此我无法实现交互性。

编辑2: 自己解决了。解决方案在下面的答案中。哎呀。 :-)

java inputstream apache-commons outputstream apache-commons-exec
3个回答
1
投票

解决方案是复制 PumpStreamHandlerStreamPumper 的实现,比如说 ImmediatePumpStreamHandler 和 ImmediateStreamPumper,并进行以下两项更改:

  • 在 ImmediateStreamPumper 中第 108 行的
    os.flush();
    之后直接添加
    os.write(buf, 0, length);
  • 将 ImmediatePumpStreamHandler 中第 269 行的
    new StreamPumper(...)
    更改为
    new ImmediateStreamPumper(...)

0
投票

好吧,我使用内置工具而不是外部库解决了这个问题。感谢独立线程,我能够实现我的目标,上面写着“

Process
InputStream

ProcessBuilder builder = new ProcessBuilder(command);
Process process = builder.start();

BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(process.getOutputStream()));
StreamReader outputReader = new StreamReader(process.getInputStream(), System.out);
outputReader.start();
StreamReader err = new StreamReader(process.getErrorStream(), System.err);
err.start();

for (int i = 0; i < 3; i++) {
    Thread.sleep(5000);
    writer.write(internalProcessCommand + "\n");
    writer.flush();
}
writer.write("exit\n");
writer.flush();

while (process.isAlive()) {
    System.out.println("alive?");
    Thread.sleep(100);
}
System.out.println("dead");
outputReader.shutdown();
err.shutdown();

流阅读器:

class StreamReader extends Thread {

    private AtomicBoolean running = new AtomicBoolean(false);
    private InputStream in;
    private OutputStream out;

    public StreamReader(InputStream in, OutputStream out) {
        this.in = in;
        this.out = out;
        running.set(true);
    }

    @Override
    public void run() {
        Scanner scanner = new Scanner(in);
        PrintWriter writer = new PrintWriter(out, true);
        while (running.get()) {
            if (scanner.hasNextLine()) {
                writer.println(scanner.nextLine());
            }
        }
        scanner.close();
    }

    public void shutdown() {
        running.set(false);
    }

}

0
投票

根据@champagniac给出的答案,我创建了一个简单的修复程序,只需替换

PumpStreamHandler

即可引入额外的刷新
public class PumpStreamHandlerFixed extends PumpStreamHandler
{
  public PumpStreamHandlerFixed()
  {
    super();
  }

  public PumpStreamHandlerFixed(OutputStream out, OutputStream err, InputStream input)
  {
    super(out, err, input);
  }

  public PumpStreamHandlerFixed(OutputStream out, OutputStream err)
  {
    super(out, err);
  }

  public PumpStreamHandlerFixed(OutputStream outAndErr)
  {
    super(outAndErr);
  }

  @Override
  protected Thread createPump(InputStream is, OutputStream os, boolean closeWhenExhausted)
  {
    os = new AutoFlushingOutputStream(os);

    final Thread result = new Thread(new StreamPumper(is, os, closeWhenExhausted), "Exec Stream Pumper");
    result.setDaemon(true);
    return result;
  }
}

class AutoFlushingOutputStream extends OutputStream
{
  private final OutputStream decorated;

  public AutoFlushingOutputStream(OutputStream decorated)
  {
    this.decorated = decorated;
  }

  @Override
  public void write(byte[] b, int off, int len) throws IOException
  {
    this.decorated.write(b, off, len);
    this.decorated.flush();
  }

  @Override
  public void write(int b) throws IOException
  {
    this.decorated.write(b);
    this.decorated.flush();
  }

  @Override
  public void close() throws IOException
  {
    this.decorated.close();
  }

  @Override
  public void flush() throws IOException
  {
    this.decorated.flush();
  }
}
© www.soinside.com 2019 - 2024. All rights reserved.