在nodejs中管道从子节点到父节点的数据

问题描述 投票:9回答:3

我有一个nodejs父进程启动另一个nodejs子进程。子进程执行一些逻辑,然后将输出返回给父进程。输出很大,我正在尝试使用管道进行通信,如child.send()方法的文档(其工作正常BTW)所示。

我希望有人建议如何正确构建这个沟通渠道。我希望能够将数据从父节点发送到子节点,并且还能够将数据从子节点发送到父节点。我已经开始了一点,但它是不完整的(仅从父节点向子节点发送消息)并抛出错误。

父文件代码:

var child_process = require('child_process');

var opts = {
    stdio: [process.stdin, process.stdout, process.stderr, 'pipe']
};
var child = child_process.spawn('node', ['./b.js'], opts);

require('streamifier').createReadStream('test 2').pipe(child.stdio[3]);

子文件代码:

var fs =  require('fs');

// read from it
var readable = fs.createReadStream(null, {fd: 3});

var chunks = []; 

readable.on('data', function(chunk) {
    chunks.push(chunk);
});

readable.on('end', function() {
    console.log(chunks.join().toString());
})

上面的代码打印预期输出(“测试2”)以及以下错误:

events.js:85
      throw er; // Unhandled 'error' event
            ^
Error: shutdown ENOTCONN
    at exports._errnoException (util.js:746:11)
    at Socket.onSocketFinish (net.js:232:26)
    at Socket.emit (events.js:129:20)
    at finishMaybe (_stream_writable.js:484:14)
    at afterWrite (_stream_writable.js:362:3)
    at _stream_writable.js:349:9
    at process._tickCallback (node.js:355:11)
    at Function.Module.runMain (module.js:503:11)
    at startup (node.js:129:16)
    at node.js:814:3

完整答案:

家长代码:

var child_process = require('child_process');

var opts = {
    stdio: [process.stdin, process.stdout, process.stderr, 'pipe', 'pipe']
};
var child = child_process.spawn('node', ['./b.js'], opts);

child.stdio[3].write('First message.\n', 'utf8', function() {
    child.stdio[3].write('Second message.\n', 'utf8', function() {

    });
}); 

child.stdio[4].pipe(process.stdout);

孩子的代码:

var fs =  require('fs');

// read from it
var readable = fs.createReadStream(null, {fd: 3});

readable.pipe(process.stdout);
fs.createWriteStream(null, {fd: 4}).write('Sending a message back.');
node.js pipe child-process
3个回答
6
投票

您的代码有效,但是通过使用流化器包从字符串创建读取流,您的通信通道将在传输该字符串后自动关闭,这就是您收到ENOTCONN错误的原因。

为了能够通过流发送多条消息,请考虑在其上使用.write。你可以随意调用它:

child.stdio[3].write('First message.\n');
child.stdio[3].write('Second message.\n');

如果你想使用这种方法发送多个离散消息(我认为这是基于你之前使用child.send()的说法),最好使用一些分隔符号来在读取流时分割消息在孩子身上在上面的例子中,我使用换行符。帮助解决这个分裂的有用方法是event-stream

现在,为了从父级中的子级创建另一个通信通道,只需将另一个“管道”添加到您的stdio。

你可以在孩子写信:

fs.createWriteStream(null, {fd: 4}).write('Sending a message back.');

并从父母那里读取:

child.stdio[4].pipe(process.stdout);

这将打印“发回信息”。到控制台。


2
投票

我遇到了同样的问题并使用{end:false}选项来修复错误。不幸的是,接受的答案仅在处理短数据的离散写入时起作用。如果您有大量数据(而不仅仅是短消息),则需要处理流量控制,并且使用.write()不是最好的。对于这样的场景(大数据传输),最好在代码中使用.pipe()函数来处理流控制。

抛出错误是因为父进程中的可读流正在尝试结束和关闭子进程的可写流输入管道。您应该在父进程管道中使用{end: false}选项:

原始代码: require('streamifier').createReadStream('test 2').pipe(child.stdio[3]);

建议修改: require('streamifier').createReadStream('test 2').pipe(child.stdio[3], {end:false});

请参阅NodeJs文档中的详细信息:https://nodejs.org/dist/latest-v5.x/docs/api/stream.html#stream_readable_pipe_destination_options

希望这可以帮助其他人面对这个问题。


2
投票

你可以用fork()做到这一点

我刚刚为自己解决了这个... fork()是spawn的更高级版本,并且建议一般使用fork()而不是spawn()

如果您使用{silent:true}选项,stdio将通过管道传输到父进程

          const cp = require('child_process');

          const n = cp.fork(<path>, args, {
              cwd: path.resolve(__dirname),
              detached: true,
           });

          n.stdout.setEncoding('utf8');

          // here we can listen to the stream of data coming from the child process:
          n.stdout.on('data', (data) => {
            ee.emit('data',data);
          });

          //you can also listen to other events emitted by the child process
          n.on('error', function (err) {
            console.error(err.stack);
            ee.emit('error', err);
          });

          n.on('message', function (msg) {
            ee.emit('message', msg);
          });

          n.on('uncaughtException', function (err) {
            console.error(err.stack);
            ee.emit('error', err);
          });


          n.once('exit', function (err) {
             console.error(err.stack);
             ee.emit('exit', err);
          });
© www.soinside.com 2019 - 2024. All rights reserved.