如何在Node.js中为一个非常大(> 1GB)的文件的每一行运行异步函数

时间:2021-01-06 20:53:47

Say you have a huge (> 1GB) CSV of record ids:

假设您有一个巨大的(> 1GB)CSV记录ID:

655453
4930285
493029
4930301
493031
...

And for each id you want to make a REST API call to fetch the record data, transform it locally, and insert it into a local database.

对于每个id,您希望进行REST API调用以获取记录数据,在本地转换它,并将其插入本地数据库。

How do you do that with Node.js' Readable Stream?

你如何使用Node.js的可读流做到这一点?

My question is basically this: How do you read a very large file, line-by-line, run an async function for each line, and [optionally] be able to start reading the file from a specific line?

我的问题基本上是这样的:你如何逐行读取一个非常大的文件,为每一行运行一个异步函数,并且[可选]能够从特定的行开始读取文件?

From the following Quora question I'm starting to learn to use fs.createReadStream:

从下面的Quora问题我开始学习使用fs.createReadStream:

http://www.quora.com/What-is-the-best-way-to-read-a-file-line-by-line-in-node-js

var fs = require('fs');
var lazy = require('lazy');

var stream = fs.createReadStream(path, {
  flags: 'r',
  encoding: 'utf-8'
});

new lazy(stream).lines.forEach(function(line) {
  var id = line.toString();
  // pause stream
  stream.pause();
  // make async API call...
  makeAPICall(id, function() {
    // then resume to process next id
    stream.resume();
  });
});

But, that pseudocode doesn't work, because the lazy module forces you to read the whole file (as a stream, but there's no pausing). So that approach doesn't seem like it will work.

但是,伪代码不起作用,因为懒惰模块强制您读取整个文件(作为流,但没有暂停)。所以这种方法似乎不会起作用。

Another thing is, I would like to be able to start processing this file from a specific line. The reason for this is, processing each id (making the api call, cleaning the data, etc.) can take up to a half a second per record so I don't want to have to start from the beginning of the file each time. The naive approach I'm thinking about using is to just capture the line number of the last id processed, and save that. Then when you parse the file again, you stream through all the ids, line by line, until you find the line number you left off at, and then you do the makeAPICall business. Another naive approach is to write small files (say of 100 ids) and process each file one at a time (small enough dataset to do everything in memory without an IO stream). Is there a better way to do this?

另一件事是,我希望能够从特定的行开始处理这个文件。原因是,处理每个id(进行api调用,清理数据等)每个记录可能需要半秒钟,因此我不希望每次都从文件的开头处开始。我正在考虑使用的天真方法是捕获最后处理的id的行号,并保存它。然后,当您再次解析文件时,您将逐行流式传输所有ID,直到找到您停止的行号,然后执行makeAPICall业务。另一种天真的方法是编写小文件(比如100个ID)并一次处理一个文件(足够小的数据集可以在没有IO流的情况下在内存中完成所有操作)。有一个更好的方法吗?

I can see how this gets tricky (and where node-lazy comes in) because the chunk in stream.on('data', function(chunk) {}); may contain only part of a line (if the bufferSize is small, each chunk may be 10 lines but because the id is variable length, it may only be 9.5 lines or whatever). This is why I'm wondering what the best approach is to the above question.

我可以看到这是如何变得棘手(以及node-lazy进来的地方)因为stream.on中的块('data',function(chunk){});可能只包含一行的一部分(如果bufferSize很小,每个块可能是10行,但因为id是可变长度,它可能只有9.5行或其他)。这就是为什么我想知道上述问题的最佳方法是什么。

2 个解决方案

#1


1  

I guess you don't need to use node-lazy. Here's what I found in Node docs:

我猜你不需要使用node-lazy。这是我在Node文档中找到的内容:

Event: data

function (data) { }

The data event emits either a Buffer (by default) or a string if setEncoding() was used.

如果使用setEncoding(),则数据事件将发出缓冲区(默认情况下)或字符串。

So that means that is you call setEncoding() on your stream then your data event callback will accept a string parameter. Then inside this callback you can call use .pause() and .resume() methods.

这意味着你在流上调用setEncoding()然后你的数据事件回调将接受一个字符串参数。然后在这个回调中你可以调用use .pause()和.resume()方法。

The pseudo code should look like this:

伪代码应如下所示:

stream.setEncoding('utf8');
stream.addListener('data', function (line) {
    // pause stream
    stream.pause();
    // make async API call...
    makeAPICall(line, function() {
        // then resume to process next line
        stream.resume();
    });
})

Although the docs don't explicitly specify that stream is read line by line I assume that that's the case for file streams. At least in other languages and platforms text streams work that way and I see no reason for Node streams to differ.

尽管文档没有明确指定逐行读取流,但我认为文件流就是这种情况。至少在其他语言和平台中,文本流以这种方式工作,我认为没有理由使节点流不同。

#2


0  

Related to Andrew Андрей Листочкин's answer:

与AndrewАндрейЛисточкин相关的答案:

You can use a module like byline to get a separate data event for each line. It's a transform stream around the original filestream, which produces a data event for each chunk. This lets you pause after each line.

您可以使用类似byline的模块为每一行获取单独的数据事件。它是围绕原始文件流的转换流,它为每个块生成数据事件。这可以让你在每一行后暂停。

byline won't read the entire file into memory like lazy apparently does.

byline不会像懒惰那样将整个文件读入内存。

var fs = require('fs');
var byline = require('byline');

var stream = fs.createReadStream('bigFile.txt');
stream.setEncoding('utf8');

// Comment out this line to see what the transform stream changes.
stream = byline.createStream(stream); 

// Write each line to the console with a delay.
stream.on('data', function(line) {
  // Pause until we're done processing this line.
  stream.pause();

  setTimeout(() => {
      console.log(line);

      // Resume processing.
      stream.resume();
  }, 200);
});

#1


1  

I guess you don't need to use node-lazy. Here's what I found in Node docs:

我猜你不需要使用node-lazy。这是我在Node文档中找到的内容:

Event: data

function (data) { }

The data event emits either a Buffer (by default) or a string if setEncoding() was used.

如果使用setEncoding(),则数据事件将发出缓冲区(默认情况下)或字符串。

So that means that is you call setEncoding() on your stream then your data event callback will accept a string parameter. Then inside this callback you can call use .pause() and .resume() methods.

这意味着你在流上调用setEncoding()然后你的数据事件回调将接受一个字符串参数。然后在这个回调中你可以调用use .pause()和.resume()方法。

The pseudo code should look like this:

伪代码应如下所示:

stream.setEncoding('utf8');
stream.addListener('data', function (line) {
    // pause stream
    stream.pause();
    // make async API call...
    makeAPICall(line, function() {
        // then resume to process next line
        stream.resume();
    });
})

Although the docs don't explicitly specify that stream is read line by line I assume that that's the case for file streams. At least in other languages and platforms text streams work that way and I see no reason for Node streams to differ.

尽管文档没有明确指定逐行读取流,但我认为文件流就是这种情况。至少在其他语言和平台中,文本流以这种方式工作,我认为没有理由使节点流不同。

#2


0  

Related to Andrew Андрей Листочкин's answer:

与AndrewАндрейЛисточкин相关的答案:

You can use a module like byline to get a separate data event for each line. It's a transform stream around the original filestream, which produces a data event for each chunk. This lets you pause after each line.

您可以使用类似byline的模块为每一行获取单独的数据事件。它是围绕原始文件流的转换流,它为每个块生成数据事件。这可以让你在每一行后暂停。

byline won't read the entire file into memory like lazy apparently does.

byline不会像懒惰那样将整个文件读入内存。

var fs = require('fs');
var byline = require('byline');

var stream = fs.createReadStream('bigFile.txt');
stream.setEncoding('utf8');

// Comment out this line to see what the transform stream changes.
stream = byline.createStream(stream); 

// Write each line to the console with a delay.
stream.on('data', function(line) {
  // Pause until we're done processing this line.
  stream.pause();

  setTimeout(() => {
      console.log(line);

      // Resume processing.
      stream.resume();
  }, 200);
});