通过异步迭代简化Node.js流程

来源:岁月联盟 编辑:猪蛋儿 时间:2020-01-29
  'Some text!');
这样,Readable.from()可将其他可迭代器视为字符串,在其代码点上进行迭代。虽然这在迭代性能方面并不理想,但在大多数情况下应该是够用了。我希望Readable.from()要经常与字符串一起使用,所以它将来可能会有被优化。
通过for-await-of读取可读流程
每个可读的流程都是异步可迭代的,这意味着我们可以使用for-await-of循环来读取它的内容:
import * as fs from 'fs';
async function logChunks(readable) {
  for await (const chunk of readable) {
    console.log(chunk);
  }
}
const readable = fs.createReadStream(
  'tmp/test.txt', {encoding: 'utf8'});
logChunks(readable);
// Output:
// 'This is a test!/n'
在字符串中收集可读流程的内容
下面的函数是我们在这篇文章开头看到的函数的一个更简单的重新实现过程:
import {Readable} from 'stream';
async function readableToString2(readable) {
  let result = '';
  for await (const chunk of readable) {
    result += chunk;
  }
  return result;
}
const readable = Readable.from('Good morning!', {encoding: 'utf8'});
assert.equal(await readableToString2(readable), 'Good morning!');
注意,在本文的示例中,我们必须使用异步函数,因为我们希望返回一个Promise。
通过异步生成器转换可读流程
异步迭代提供了一个轻松的选择方式,将原本一个整体的流程处理流程分解成多个步骤:
1.输入进程便是一个可读的流程;
2.第一个转换是由异步生成器执行的,异步生成器在可读流程上迭代,并根据需要生成相应的结果。
3.我们可以选择使用更多的异步生成器来进一步转换。
4.最后,我们有几个选项来处理异步迭代返回的最后一个生成器:
4.1我们可以通过readable .from()将其转换为可读的流程(稍后可以通过流水线技术将其转换为可写的流程)。
4.2我们可以使用异步函数来处理它。
在下面的示例中,最后一步由async函数logLines()执行,该函数以可迭代的方式将项目记录到控制台。
/**
 * @param chunkIterable An asynchronous or synchronous iterable
 * over “chunks” (arbitrary strings)
 * @returns An asynchronous iterable over “lines”
 * (strings with at most one newline that always appears at the end)
 */
async function* chunksToLines(chunkIterable) {
  let previous = '';
  for await (const chunk of chunkIterable) {
    previous += chunk;
    while (true) {
      const eolIndex = previous.indexOf('/n');
      if (eolIndex  0) {
    yield previous;
  }
}
async function* numberLines(lineIterable) {
  let lineNumber = 1;
  for await (const line of lineIterable) {
    yield lineNumber + ' ' + line;
    lineNumber++;
  }
}
async function logLines(lineIterable) {
  for await (const line of lineIterable) {
    console.log(line);
  }
}
const chunks = Readable.from(
  'Text with/nmultiple/nlines./n',
  {encoding: 'utf8'});
logLines(numberLines(chunksToLines(chunks)));
// Output:
// '1 Text with/n'
// '2 multiple/n'
// '3 lines./n'
可写的流程
为文件创建可写的流程
我们可以使用fs.createWriteStream()来创建可写的流程:
const writableStream = fs.createWriteStream(
  'tmp/log.txt', {encoding: 'utf8'});
写入可写流程
在本节中,我们将介绍三种将数据写入可写流程的方法:
1.通过.write()方法直接写入可写流程;
2.使用可读流程的.pipe()方法将其导入可写流程;
3.使用函数pipeline ()从模块流程将可读流程导入可写流程;
为了演示这些方法,我们会以三种不同的方式实现了同一个函数writeIterableToFile()。
在异步函数中写入可写流程
import * as util from 'util';
import * as stream from 'stream';
import * as fs from 'fs';
import {once} from 'events';
const finished = util.promisify(stream.finished); // (A)
async function writeIterableToFile(iterable, filePath) {
  const writable = fs.createWriteStream(filePath, {encoding: 'utf8'});
  for await (const chunk of iterable) {
    if (!writable.write(chunk)) { // (B)
      // Handle backpressure
      await once(writable, 'drain');
    }
  }
  writable.end(); // (C)
  // Wait until done. Throws if there are errors.
  await finished(writable);
}
await writeIterableToFile(
  ['One', ' line of text./n'], 'tmp/log.txt');
assert.equal(
  fs.readFileSync('tmp/log.txt', {encoding: 'utf8'}),
  'One line of text./n');
stream.finished() 的默认版本是基于回调的,但是可以通过util.promisify()将其转换为基于约定的版本(A行)。
为此,我们使用了以下两种模式:
1.在处理backpressure 时写入可写流程(B行):
if (!writable.write(chunk)) {
  await once(writable, 'drain');
}
2.关闭一个可写流程,并等待写入完成(C行):

上一页  [1] [2] [3]  下一页