如何在 Node 中逐行读取标准输入

问题描述 投票:0回答:9

我正在寻找使用命令行调用处理带有节点的文本文件,例如:

node app.js < input.txt

文件的每一行都需要单独处理,但是一旦处理完输入行就可以忘记了。

使用标准输入的数据侦听器,我将输入流按字节大小分块,因此我进行了设置。

process.stdin.resume();
process.stdin.setEncoding('utf8');

var lingeringLine = "";

process.stdin.on('data', function(chunk) {
    lines = chunk.split("\n");

    lines[0] = lingeringLine + lines[0];
    lingeringLine = lines.pop();

    lines.forEach(processLine);
});

process.stdin.on('end', function() {
    processLine(lingeringLine);
});

但这看起来太草率了。必须围绕 lines 数组的第一个和最后一个项目进行按摩。有没有更优雅的方法来做到这一点?

node.js stdin
9个回答
301
投票

您可以使用 readline 模块逐行读取标准输入:

const readline = require('readline');

const rl = readline.createInterface({
  input: process.stdin,
  output: process.stdout,
  terminal: false
});

rl.on('line', (line) => {
    console.log(line);
});

rl.once('close', () => {
     // end of input
 });

134
投票
// Work on POSIX and Windows
var fs = require("fs");
var stdinBuffer = fs.readFileSync(0); // STDIN_FILENO = 0
console.log(stdinBuffer.toString());

69
投票

readline
专门设计用于与终端一起使用(即
process.stdin.isTTY === true
)。有很多模块为通用流提供拆分功能,例如 split。它让事情变得超级简单:

process.stdin.pipe(require('split')()).on('data', processLine)

function processLine (line) {
  console.log(line + '!')
}

23
投票
#!/usr/bin/env node

const EventEmitter = require('events');

function stdinLineByLine() {
  const stdin = new EventEmitter();
  let buff = '';

  process.stdin
    .on('data', data => {
      buff += data;
      lines = buff.split(/\r\n|\n/);
      buff = lines.pop();
      lines.forEach(line => stdin.emit('line', line));
    })
    .on('end', () => {
      if (buff.length > 0) stdin.emit('line', buff);
    });

  return stdin;
}

const stdin = stdinLineByLine();
stdin.on('line', console.log);

0
投票

逐行读取流,应该适合通过管道传输到标准输入的大文件,我的版本:

var n=0;
function on_line(line,cb)
{
    ////one each line
    console.log(n++,"line ",line);
    return cb();
    ////end of one each line
}

var fs = require('fs');
var readStream = fs.createReadStream('all_titles.txt');
//var readStream = process.stdin;
readStream.pause();
readStream.setEncoding('utf8');

var buffer=[];
readStream.on('data', (chunk) => {
    const newlines=/[\r\n]+/;
    var lines=chunk.split(newlines)
    if(lines.length==1)
    {
        buffer.push(lines[0]);
        return;
    }   
    
    buffer.push(lines[0]);
    var str=buffer.join('');
    buffer.length=0;
    readStream.pause();

    on_line(str,()=>{
        var i=1,l=lines.length-1;
        i--;
        function while_next()
        {
            i++;
            if(i<l)
            {
                return on_line(lines[i],while_next);
            }
            else
            {
                buffer.push(lines.pop());
                lines.length=0;
                return readStream.resume();
            }
        }
        while_next();
    });
  }).on('end', ()=>{
      if(buffer.length)
          var str=buffer.join('');
          buffer.length=0;
        on_line(str,()=>{
            ////after end
            console.error('done')
            ////end after end
        });
  });
readStream.resume();

说明:

  • 要在 utf8 字母上正确剪切它,而不是在中间字节中将编码设置为 utf8,它确保它每次都发出完整的多字节字母。
  • 收到数据后,输入暂停。它用于阻塞输入,直到所有行都用完。如果行处理功能比输入慢,它可以防止自助餐溢出。
  • 如果每次都是一行,每次都没有换行。需要为所有调用积累它并且什么也不做,返回。一旦有多于一行,也附加它并使用累积缓冲区。
  • 所有分割线都消耗完后。在最后一行将最后一行推送到缓冲并恢复暂停的流。

es6代码

var n=0;
async function on_line(line)
{
    ////one each line
    console.log(n++,"line ",line);
    ////end of one each line
}

var fs = require('fs');
var readStream = fs.createReadStream('all_titles.txt');
//var readStream = process.stdin;
readStream.pause();
readStream.setEncoding('utf8');

var buffer=[];
readStream.on('data', async (chunk) => {
    
    const newlines=/[\r\n]+/;
    var lines=chunk.split(newlines)
    if(lines.length==1)
    {
        buffer.push(lines[0]);
        return;
    }
    readStream.pause();

    // let i=0;
    buffer.push(lines[0]); // take first line
    var str=buffer.join('');
    buffer.length=0;//clear array, because consumed
    await on_line(str);
    
    for(let i=1;i<lines.length-1;i++)
       await on_line(lines[i]);
    buffer.push(lines[lines.length-1]);
    lines.length=0; //optional, clear array to hint GC.
    return readStream.resume();
  }).on('end', async ()=>{
      if(buffer.length)
          var str=buffer.join('');
          buffer.length=0;
          await on_line(str);
  });
  readStream.resume();

我没有测试es6代码


0
投票

旧问题的新答案。自 Node 10(2018 年 4 月)起,由于添加了

Symbol.asyncIterator
方法(ReadableStream 文档Symbol.asyncIterator 文档),诸如 process.stdin 之类的 ReadableStreams 支持 for-await-of 循环。

使用它,我们可以创建一个适配器,从遍历数据块到遍历行。这样做的逻辑改编自this answer.

function streamByLines(stream) {
  return {
    async *[Symbol.asyncIterator]() {
      let buffer = ''

      for await (const chunk of stream) {
        buffer += chunk
        const lines = buffer.split(/\r?\n/)
        buffer = lines.pop()
        for (const line of lines) {
          yield line
        }
      }

      // When the stream ends yield whatever remains in buffer
      yield buffer
    }
  }
}

你可以这样使用它(在允许 await 的上下文中)

for await (const line of streamByLines(process.stdin)) {
  console.log('Current line:', line)
}

-1
投票

在我的例子中,程序 (elinks) 返回的行看起来是空的,但实际上有特殊的终端字符、颜色控制代码和退格键,所以

grep
其他答案中的选项对我不起作用。所以我用 Node.js 写了这个小脚本。我将文件命名为
tight
,但这只是一个随机名称。

#!/usr/bin/env node

function visible(a) {
    var R  =  ''
    for (var i = 0; i < a.length; i++) {
        if (a[i] == '\b') {  R -= 1; continue; }  
        if (a[i] == '\u001b') {
            while (a[i] != 'm' && i < a.length) i++
            if (a[i] == undefined) break
        }
        else R += a[i]
    }
    return  R
}

function empty(a) {
    a = visible(a)
    for (var i = 0; i < a.length; i++) {
        if (a[i] != ' ') return false
    }
    return  true
}

var readline = require('readline')
var rl = readline.createInterface({ input: process.stdin, output: process.stdout, terminal: false })

rl.on('line', function(line) {
    if (!empty(line)) console.log(line) 
})

-2
投票

如果要先询问用户行数:

    //array to save line by line 
    let xInputs = [];

    const getInput = async (resolve)=>{
            const readline = require('readline').createInterface({
                input: process.stdin,
                output: process.stdout,
            });
            readline.on('line',(line)=>{
            readline.close();
            xInputs.push(line);
            resolve(line);
            })
    }

    const getMultiInput = (numberOfInputLines,callback)=>{
        let i = 0;
        let p = Promise.resolve(); 
        for (; i < numberOfInputLines; i++) {
            p = p.then(_ => new Promise(resolve => getInput(resolve)));
        }
        p.then(()=>{
            callback();
        });
    }

    //get number of lines 
    const readline = require('readline').createInterface({
        input: process.stdin,
        output: process.stdout,
        terminal: false
    });
    readline.on('line',(line)=>{
        getMultiInput(line,()=>{
           //get here the inputs from xinputs array 
        });
        readline.close();
    })


-8
投票
process.stdin.pipe(process.stdout);
© www.soinside.com 2019 - 2024. All rights reserved.