Streams—The definitive guide

Learn how to use readable, writable, and transform streams with the Streams API.

Streams API 允许您以编程方式访问通过网络接收或在本地通过任何方式创建的数据流,并使用 JavaScript 处理它们。流式处理涉及将您想要接收、发送或转换的资源分解为小块,然后逐位处理这些块。虽然流式处理是浏览器在接收 HTML 或视频等要在网页上显示的资源时无论如何都会做的事情,但在 2015 年引入带有流的 fetch 之前,JavaScript 从未提供过此功能。

以前,如果您想处理某种资源(无论是视频还是文本文件等),您都必须下载整个文件,等待将其反序列化为合适的格式,然后再进行处理。随着 JavaScript 可以使用流,这一切都发生了变化。您现在可以使用 JavaScript 逐步处理原始数据,只要它在客户端上可用,而无需生成缓冲区、字符串或 Blob。这解锁了许多用例,我在下面列出了一些

  • 视频效果: 通过转换流传输可读视频流,以实时应用效果。
  • 数据(解)压缩: 通过转换流传输文件流,以选择性地(解)压缩它。
  • 图像解码: 通过转换流传输 HTTP 响应流,该转换流将字节解码为位图数据,然后再通过另一个转换流将位图转换为 PNG。如果在服务工作线程的 fetch 处理程序内安装,这将允许您透明地填充新的图像格式,如 AVIF。

Browser support

ReadableStream 和 WritableStream

Browser Support

  • Chrome: 43.
  • Edge: 14.
  • Firefox: 65.
  • Safari: 10.1.

Source

TransformStream

Browser Support

  • Chrome: 67.
  • Edge: 79.
  • Firefox: 102.
  • Safari: 14.1.

Source

Core concepts

在详细介绍各种类型的流之前,请允许我介绍一些核心概念。

Chunks

chunk 是数据的一个单独片段,它被写入流或从流中读取。它可以是任何类型;流甚至可以包含不同类型的 chunk。在大多数情况下,chunk 不会是给定流的最原子数据单元。例如,字节流可能包含由 16 KiB Uint8Array 单元而不是单个字节组成的 chunk。

Readable streams

可读流表示您可以从中读取的数据源。换句话说,数据可读流输出。具体而言,可读流是 ReadableStream 类的实例。

Writable streams

可写流表示您可以向其中写入数据的目标。换句话说,数据进入可写流。具体而言,可写流是 WritableStream 类的实例。

Transform streams

转换流由一对流组成:一个可写流(称为其可写端)和一个可读流(称为其可读端)。对此的真实世界隐喻是同声传译员,他们可以即时地从一种语言翻译成另一种语言。以特定于转换流的方式,写入可写端会导致新数据可用于从可读端读取。具体而言,任何具有 writable 属性和 readable 属性的对象都可以用作转换流。但是,标准的 TransformStream 类使创建这种适当纠缠的对变得更容易。

Pipe chains

流主要通过将它们管道连接到彼此来使用。可读流可以直接管道连接到可写流,使用可读流的 pipeTo() 方法,或者它可以首先通过一个或多个转换流进行管道连接,使用可读流的 pipeThrough() 方法。以这种方式管道连接在一起的一组流称为管道链。

Backpressure

一旦构建了管道链,它将传播有关 chunk 应以多快的速度流过它的信号。如果链中的任何步骤还不能接受 chunk,它将向后通过管道链传播信号,直到最终告知原始源停止如此快速地生成 chunk。这种标准化流量的过程称为背压。

Teeing

可以使用可读流的 tee() 方法来分叉(以大写字母“T”的形状命名)可读流。这将锁定流,即使其不再直接可用;但是,它将创建两个新的流(称为分支),可以独立使用。分叉也很重要,因为流无法倒回或重新启动,稍后将详细介绍。

Diagram of a pipe chain consisting of a readable stream coming from a call to the fetch API that is then piped through a transform stream whose output is teed and then sent to the browser for the first resulting readable stream and to the service worker cache for the second resulting readable stream.
管道链。

可读流的机制

可读流是一个数据源,在 JavaScript 中由 ReadableStream 对象表示,该对象从底层源流出。ReadableStream() 构造函数从给定的处理程序创建并返回可读流对象。底层源有两种类型

  • 推送源在您访问它们时不断向您推送数据,并且由您来启动、暂停或取消对流的访问。示例包括实时视频流、服务器发送事件或 WebSocket。
  • 拉取源要求您在连接到它们后显式地从它们请求数据。示例包括通过 fetch()XMLHttpRequest 调用进行的 HTTP 操作。

流数据以称为 chunk 的小块顺序读取。放入流中的 chunk 被称为排队。这意味着它们正在队列中等待读取。内部队列跟踪尚未读取的 chunk。

排队策略是一个对象,它确定流应如何根据其内部队列的状态发出背压信号。排队策略为每个 chunk 分配一个大小,并将队列中所有 chunk 的总大小与指定的数字(称为高水位线)进行比较。

流中的 chunk 由读取器读取。此读取器一次检索一个 chunk 的数据,允许您对其执行您想做的任何类型的操作。读取器加上随附的其他处理代码称为消费者

此上下文中的下一个构造称为控制器。每个可读流都有一个关联的控制器,顾名思义,它允许您控制流。

一次只能有一个读取器读取流;当创建读取器并开始读取流时(即,成为活动读取器),它将锁定到该流。如果您希望另一个读取器接管读取您的流,您通常需要释放第一个读取器,然后再执行任何其他操作(尽管您可以分叉流)。

创建可读流

您可以通过调用其构造函数 ReadableStream() 来创建可读流。构造函数有一个可选参数 underlyingSource,它表示一个具有方法和属性的对象,这些方法和属性定义了构造的流实例的行为方式。

underlyingSource

这可以使用以下可选的、由开发人员定义的方法

  • start(controller):在构造对象后立即调用。该方法可以访问流源,并执行设置流功能所需的任何其他操作。如果此过程要异步完成,则该方法可以返回一个 promise 以指示成功或失败。传递给此方法的 controller 参数是 ReadableStreamDefaultController
  • pull(controller):可用于在提取更多 chunk 时控制流。只要流的内部 chunk 队列未满,就会重复调用它,直到队列达到其高水位线。如果调用 pull() 的结果是一个 promise,则在所述 promise 完成之前,不会再次调用 pull()。如果 promise 被拒绝,流将变为错误状态。
  • cancel(reason):当流使用者取消流时调用。
const readableStream = new ReadableStream({
  start(controller) {
    /* … */
  },

  pull(controller) {
    /* … */
  },

  cancel(reason) {
    /* … */
  },
});

ReadableStreamDefaultController 支持以下方法

/* … */
start(controller) {
  controller.enqueue('The first chunk!');
},
/* … */

queuingStrategy

ReadableStream() 构造函数的第二个同样是可选的参数是 queuingStrategy。它是一个对象,可以选择定义流的排队策略,该策略采用两个参数

  • highWaterMark:一个非负数,指示使用此排队策略的流的高水位线。
  • size(chunk):一个函数,用于计算并返回给定 chunk 值的有限非负大小。结果用于确定背压,通过适当的 ReadableStreamDefaultController.desiredSize 属性体现出来。它还控制何时调用底层源的 pull() 方法。
const readableStream = new ReadableStream({
    /* … */
  },
  {
    highWaterMark: 10,
    size(chunk) {
      return chunk.length;
    },
  },
);

getReader()read() 方法

要从可读流中读取,您需要一个读取器,它将是 ReadableStreamDefaultReaderReadableStream 接口的 getReader() 方法创建一个读取器并将流锁定到它。在流被锁定时,在释放此读取器之前,无法获取其他读取器。

ReadableStreamDefaultReader 接口的 read() 方法返回一个 promise,该 promise 提供对流的内部队列中下一个 chunk 的访问权限。它根据流的状态使用结果完成或拒绝。不同的可能性如下

  • 如果 chunk 可用,则 promise 将使用以下形式的对象完成
    { value: chunk, done: false }.
  • 如果流变为关闭状态,则 promise 将使用以下形式的对象完成
    { value: undefined, done: true }.
  • 如果流变为错误状态,则 promise 将被拒绝并返回相关错误。
const reader = readableStream.getReader();
while (true) {
  const { done, value } = await reader.read();
  if (done) {
    console.log('The stream is done.');
    break;
  }
  console.log('Just read a chunk:', value);
}

locked 属性

您可以通过访问可读流的 ReadableStream.locked 属性来检查它是否被锁定。

const locked = readableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);

可读流代码示例

下面的代码示例显示了所有步骤的实际操作。您首先创建一个 ReadableStream,它在其 underlyingSource 参数(即 TimestampSource 类)中定义了一个 start() 方法。此方法告诉流的 controller 在十秒内每秒 enqueue() 一个时间戳。最后,它告诉控制器 close() 流。您通过 getReader() 方法创建一个读取器并调用 read(),直到流 done 来使用此流。

class TimestampSource {
  #interval

  start(controller) {
    this.#interval = setInterval(() => {
      const string = new Date().toLocaleTimeString();
      // Add the string to the stream.
      controller.enqueue(string);
      console.log(`Enqueued ${string}`);
    }, 1_000);

    setTimeout(() => {
      clearInterval(this.#interval);
      // Close the stream after 10s.
      controller.close();
    }, 10_000);
  }

  cancel() {
    // This is called if the reader cancels.
    clearInterval(this.#interval);
  }
}

const stream = new ReadableStream(new TimestampSource());

async function concatStringStream(stream) {
  let result = '';
  const reader = stream.getReader();
  while (true) {
    // The `read()` method returns a promise that
    // resolves when a value has been received.
    const { done, value } = await reader.read();
    // Result objects contain two properties:
    // `done`  - `true` if the stream has already given you all its data.
    // `value` - Some data. Always `undefined` when `done` is `true`.
    if (done) return result;
    result += value;
    console.log(`Read ${result.length} characters so far`);
    console.log(`Most recently read chunk: ${value}`);
  }
}
concatStringStream(stream).then((result) => console.log('Stream complete', result));

Asynchronous iteration

在每次 read() 循环迭代中检查流是否 done 可能不是最方便的 API。幸运的是,很快就会有更好的方法来做到这一点:异步迭代。

for await (const chunk of stream) {
  console.log(chunk);
}

今天使用异步迭代的一种解决方法是使用 polyfill 实现该行为。

if (!ReadableStream.prototype[Symbol.asyncIterator]) {
  ReadableStream.prototype[Symbol.asyncIterator] = async function* () {
    const reader = this.getReader();
    try {
      while (true) {
        const {done, value} = await reader.read();
        if (done) {
          return;
          }
        yield value;
      }
    }
    finally {
      reader.releaseLock();
    }
  }
}

分叉可读流

ReadableStream 接口的 tee() 方法分叉当前可读流,返回一个包含两个结果分支的二元素数组作为新的 ReadableStream 实例。这允许两个读取器同时读取一个流。例如,如果您想从服务器获取响应并将其流式传输到浏览器,但也要将其流式传输到服务工作线程缓存,您可以在服务工作线程中执行此操作。由于响应正文不能多次使用,因此您需要两个副本才能执行此操作。要取消流,您需要取消两个结果分支。分叉流通常会在整个持续时间内锁定它,从而阻止其他读取器锁定它。

const readableStream = new ReadableStream({
  start(controller) {
    // Called by constructor.
    console.log('[start]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // Called `read()` when the controller's queue is empty.
    console.log('[pull]');
    controller.enqueue('d');
    controller.close();
  },
  cancel(reason) {
    // Called when the stream is canceled.
    console.log('[cancel]', reason);
  },
});

// Create two `ReadableStream`s.
const [streamA, streamB] = readableStream.tee();

// Read streamA iteratively one by one. Typically, you
// would not do it this way, but you certainly can.
const readerA = streamA.getReader();
console.log('[A]', await readerA.read()); //=> {value: "a", done: false}
console.log('[A]', await readerA.read()); //=> {value: "b", done: false}
console.log('[A]', await readerA.read()); //=> {value: "c", done: false}
console.log('[A]', await readerA.read()); //=> {value: "d", done: false}
console.log('[A]', await readerA.read()); //=> {value: undefined, done: true}

// Read streamB in a loop. This is the more common way
// to read data from the stream.
const readerB = streamB.getReader();
while (true) {
  const result = await readerB.read();
  if (result.done) break;
  console.log('[B]', result);
}

可读字节流

对于表示字节的流,提供了可读流的扩展版本,以有效地处理字节,特别是通过最大限度地减少复制。字节流允许获取自带缓冲区 (BYOB) 读取器。默认实现可以提供一系列不同的输出,例如 WebSocket 情况下的字符串或数组缓冲区,而字节流保证字节输出。此外,BYOB 读取器具有稳定性优势。这是因为如果缓冲区分离,它可以保证不会将内容写入同一个缓冲区两次,从而避免竞争条件。BYOB 读取器可以减少浏览器需要运行垃圾回收的次数,因为它​​可以重用缓冲区。

创建可读字节流

您可以通过将额外的 type 参数传递给 ReadableStream() 构造函数来创建可读字节流。

new ReadableStream({ type: 'bytes' });

underlyingSource

可读字节流的底层源被赋予一个 ReadableByteStreamController 来操作。它的 ReadableByteStreamController.enqueue() 方法接受一个 chunk 参数,该参数的值是 ArrayBufferView。属性 ReadableByteStreamController.byobRequest 返回当前的 BYOB 拉取请求,如果没有,则返回 null。最后,ReadableByteStreamController.desiredSize 属性返回填充受控流的内部队列的所需大小。

queuingStrategy

ReadableStream() 构造函数的第二个同样是可选的参数是 queuingStrategy。它是一个对象,可以选择定义流的排队策略,该策略采用一个参数

  • highWaterMark:一个非负字节数,指示使用此排队策略的流的高水位线。这用于确定背压,通过适当的 ReadableByteStreamController.desiredSize 属性体现出来。它还控制何时调用底层源的 pull() 方法。

getReader()read() 方法

然后,您可以通过相应地设置 mode 参数来访问 ReadableStreamBYOBReaderReadableStream.getReader({ mode: "byob" })。这样可以更精确地控制缓冲区分配,从而避免复制。要从字节流中读取,您需要调用 ReadableStreamBYOBReader.read(view),其中 viewArrayBufferView

可读字节流代码示例

const reader = readableStream.getReader({ mode: "byob" });

let startingAB = new ArrayBuffer(1_024);
const buffer = await readInto(startingAB);
console.log("The first 1024 bytes, or less:", buffer);

async function readInto(buffer) {
  let offset = 0;

  while (offset < buffer.byteLength) {
    const { value: view, done } =
        await reader.read(new Uint8Array(buffer, offset, buffer.byteLength - offset));
    buffer = view.buffer;
    if (done) {
      break;
    }
    offset += view.byteLength;
  }

  return buffer;
}

以下函数返回可读字节流,该流允许对随机生成的数组进行高效的零复制读取。它不是使用预定的 1,024 chunk 大小,而是尝试填充开发人员提供的缓冲区,从而实现完全控制。

const DEFAULT_CHUNK_SIZE = 1_024;

function makeReadableByteStream() {
  return new ReadableStream({
    type: 'bytes',

    pull(controller) {
      // Even when the consumer is using the default reader,
      // the auto-allocation feature allocates a buffer and
      // passes it to us via `byobRequest`.
      const view = controller.byobRequest.view;
      view = crypto.getRandomValues(view);
      controller.byobRequest.respond(view.byteLength);
    },

    autoAllocateChunkSize: DEFAULT_CHUNK_SIZE,
  });
}

可写流的机制

可写流是您可以向其中写入数据的目标,在 JavaScript 中由 WritableStream 对象表示。这充当底层接收器(一个较低级别的 I/O 接收器,原始数据写入其中)之上的抽象。

数据通过写入器一次写入一个 chunk 到流中。chunk 可以采用多种形式,就像读取器中的 chunk 一样。您可以使用您喜欢的任何代码来生成准备写入的 chunk;写入器加上关联的代码称为生产者

当创建写入器并开始写入流时(活动写入器),据说它被锁定到该流。一次只能有一个写入器写入可写流。如果您希望另一个写入器开始写入您的流,您通常需要先释放它,然后再将另一个写入器附加到它。

内部队列跟踪已写入流但尚未由底层接收器处理的 chunk。

排队策略是一个对象,它确定流应如何根据其内部队列的状态发出背压信号。排队策略为每个 chunk 分配一个大小,并将队列中所有 chunk 的总大小与指定的数字(称为高水位线)进行比较。

最后一个构造称为控制器。每个可写流都有一个关联的控制器,它允许您控制流(例如,中止它)。

创建可写流

Streams API 的 WritableStream 接口为将流式数据写入目标(称为接收器)提供了一种标准抽象。此对象带有内置的背压和排队功能。您可以通过调用其构造函数 WritableStream() 来创建可写流。它有一个可选的 underlyingSink 参数,该参数表示一个具有方法和属性的对象,这些方法和属性定义了构造的流实例的行为方式。

underlyingSink

underlyingSink 可以包含以下可选的、由开发人员定义的方法。传递给某些方法的 controller 参数是 WritableStreamDefaultController

  • start(controller):此方法在构造对象后立即调用。此方法的内容应旨在获取对底层接收器的访问权限。如果此过程要异步完成,它可以返回一个 promise 以指示成功或失败。
  • write(chunk, controller):当新的数据 chunk(在 chunk 参数中指定)准备好写入底层接收器时,将调用此方法。它可以返回一个 promise 以指示写入操作的成功或失败。仅在前一次写入成功后,并且永远不会在流关闭或中止后调用此方法。
  • close(controller):如果应用程序发出信号表明它已完成向流写入 chunk,则将调用此方法。内容应执行任何必要的操作以完成写入底层接收器,并释放对其的访问权限。如果此过程是异步的,它可以返回一个 promise 以指示成功或失败。仅在所有排队的写入都成功后才会调用此方法。
  • abort(reason):如果应用发出信号表明希望突然关闭流并使其处于错误状态,则将调用此方法。它可以清理任何已占用的资源,非常像 close(),但即使写入操作已排队,也会调用 abort()。这些数据块将被丢弃。如果此过程是异步的,它可以返回一个 Promise 来表示成功或失败。reason 参数包含一个 DOMString,用于描述流被中止的原因。
const writableStream = new WritableStream({
  start(controller) {
    /* … */
  },

  write(chunk, controller) {
    /* … */
  },

  close(controller) {
    /* … */
  },

  abort(reason) {
    /* … */
  },
});

Streams API 的 WritableStreamDefaultController 接口表示一个控制器,允许在设置期间、提交更多数据块进行写入时或写入结束时控制 WritableStream 的状态。在构造 WritableStream 时,底层接收器会获得一个相应的 WritableStreamDefaultController 实例来操作。WritableStreamDefaultController 只有一个方法:WritableStreamDefaultController.error(),它会导致与关联流的任何未来交互都出错。WritableStreamDefaultController 还支持一个 signal 属性,该属性返回 AbortSignal 的实例,允许在需要时停止 WritableStream 操作。

/* … */
write(chunk, controller) {
  try {
    // Try to do something dangerous with `chunk`.
  } catch (error) {
    controller.error(error.message);
  }
},
/* … */

queuingStrategy

WritableStream() 构造函数的第二个参数(同样是可选的)是 queuingStrategy。它是一个对象,可以选择性地为流定义排队策略,该策略接受两个参数:

  • highWaterMark:一个非负数,指示使用此排队策略的流的高水位线。
  • size(chunk):一个函数,用于计算并返回给定数据块值的有限非负大小。结果用于确定反压,通过相应的 WritableStreamDefaultWriter.desiredSize 属性体现出来。

getWriter()write() 方法

要写入可写流,您需要一个写入器,它将是一个 WritableStreamDefaultWriterWritableStream 接口的 getWriter() 方法返回 WritableStreamDefaultWriter 的新实例,并将流锁定到该实例。在流被锁定时,在当前写入器释放之前,无法获取其他写入器。

write() 方法(WritableStreamDefaultWriter 接口)将传递的数据块写入 WritableStream 及其底层接收器,然后返回一个 Promise,该 Promise 解析以指示写入操作的成功或失败。请注意,“成功”的含义取决于底层接收器;它可能表示数据块已被接受,但不一定表示它已安全地保存到其最终目的地。

const writer = writableStream.getWriter();
const resultPromise = writer.write('The first chunk!');

locked 属性

您可以通过访问可写流的 WritableStream.locked 属性来检查它是否被锁定。

const locked = writableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);

可写流代码示例

下面的代码示例展示了所有步骤的实际操作。

const writableStream = new WritableStream({
  start(controller) {
    console.log('[start]');
  },
  async write(chunk, controller) {
    console.log('[write]', chunk);
    // Wait for next write.
    await new Promise((resolve) => setTimeout(() => {
      document.body.textContent += chunk;
      resolve();
    }, 1_000));
  },
  close(controller) {
    console.log('[close]');
  },
  abort(reason) {
    console.log('[abort]', reason);
  },
});

const writer = writableStream.getWriter();
const start = Date.now();
for (const char of 'abcdefghijklmnopqrstuvwxyz') {
  // Wait to add to the write queue.
  await writer.ready;
  console.log('[ready]', Date.now() - start, 'ms');
  // The Promise is resolved after the write finishes.
  writer.write(char);
}
await writer.close();

将可读流管道传输到可写流

可以通过可读流的 pipeTo() 方法将可读流管道传输到可写流。ReadableStream.pipeTo() 将当前 ReadableStream 管道传输到给定的 WritableStream,并返回一个 Promise,该 Promise 在管道传输过程成功完成时兑现,或者在遇到任何错误时拒绝。

const readableStream = new ReadableStream({
  start(controller) {
    // Called by constructor.
    console.log('[start readable]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // Called when controller's queue is empty.
    console.log('[pull]');
    controller.enqueue('d');
    controller.close();
  },
  cancel(reason) {
    // Called when the stream is canceled.
    console.log('[cancel]', reason);
  },
});

const writableStream = new WritableStream({
  start(controller) {
    // Called by constructor
    console.log('[start writable]');
  },
  async write(chunk, controller) {
    // Called upon writer.write()
    console.log('[write]', chunk);
    // Wait for next write.
    await new Promise((resolve) => setTimeout(() => {
      document.body.textContent += chunk;
      resolve();
    }, 1_000));
  },
  close(controller) {
    console.log('[close]');
  },
  abort(reason) {
    console.log('[abort]', reason);
  },
});

await readableStream.pipeTo(writableStream);
console.log('[finished]');

创建转换流

Streams API 的 TransformStream 接口表示一组可转换的数据。您可以通过调用其构造函数 TransformStream() 来创建转换流,该构造函数从给定的处理程序创建并返回转换流对象。TransformStream() 构造函数接受一个可选的 JavaScript 对象作为其第一个参数,该对象表示 transformer。此类对象可以包含以下任何方法:

transformer

  • start(controller):在构造对象后立即调用此方法。通常,这用于使用 controller.enqueue() 将前缀数据块排队。这些数据块将从可读侧读取,但不依赖于对可写侧的任何写入。如果此初始过程是异步的,例如,因为它需要一些努力来获取前缀数据块,则该函数可以返回一个 Promise 来表示成功或失败;被拒绝的 Promise 将使流出错。任何抛出的异常都将由 TransformStream() 构造函数重新抛出。
  • transform(chunk, controller):当最初写入可写侧的新数据块准备好进行转换时,将调用此方法。流实现保证仅在前一次转换成功后才调用此函数,并且永远不会在 start() 完成或 flush() 被调用之后调用。此函数执行转换流的实际转换工作。它可以使用 controller.enqueue() 将结果排队。这允许写入可写侧的单个数据块在可读侧产生零个或多个数据块,具体取决于调用 controller.enqueue() 的次数。如果转换过程是异步的,则此函数可以返回一个 Promise,以表示转换的成功或失败。被拒绝的 Promise 将使转换流的可读侧和可写侧都出错。如果未提供 transform() 方法,则使用恒等转换,该转换将数据块从可写侧不变地排队到可读侧。
  • flush(controller):在写入可写侧的所有数据块都已通过成功通过 transform() 转换,并且可写侧即将关闭后,将调用此方法。通常,这用于在可读侧也关闭之前,将后缀数据块排队到可读侧。如果刷新过程是异步的,则该函数可以返回一个 Promise 以表示成功或失败;结果将传达给 stream.writable.write() 的调用者。此外,被拒绝的 Promise 将使流的可读侧和可写侧都出错。抛出异常与返回被拒绝的 Promise 的处理方式相同。
const transformStream = new TransformStream({
  start(controller) {
    /* … */
  },

  transform(chunk, controller) {
    /* … */
  },

  flush(controller) {
    /* … */
  },
});

writableStrategyreadableStrategy 排队策略

TransformStream() 构造函数的第二个和第三个可选参数是可选的 writableStrategyreadableStrategy 排队策略。它们的定义分别在 可读流可写流 部分中概述。

转换流代码示例

以下代码示例展示了一个简单的转换流的实际操作。

// Note that `TextEncoderStream` and `TextDecoderStream` exist now.
// This example shows how you would have done it before.
const textEncoderStream = new TransformStream({
  transform(chunk, controller) {
    console.log('[transform]', chunk);
    controller.enqueue(new TextEncoder().encode(chunk));
  },
  flush(controller) {
    console.log('[flush]');
    controller.terminate();
  },
});

(async () => {
  const readStream = textEncoderStream.readable;
  const writeStream = textEncoderStream.writable;

  const writer = writeStream.getWriter();
  for (const char of 'abc') {
    writer.write(char);
  }
  writer.close();

  const reader = readStream.getReader();
  for (let result = await reader.read(); !result.done; result = await reader.read()) {
    console.log('[value]', result.value);
  }
})();

通过转换流管道传输可读流

ReadableStream 接口的 pipeThrough() 方法提供了一种链式方式,用于通过转换流或任何其他可写/可读对管道传输当前流。管道传输流通常会在管道传输期间锁定它,从而阻止其他读取器锁定它。

const transformStream = new TransformStream({
  transform(chunk, controller) {
    console.log('[transform]', chunk);
    controller.enqueue(new TextEncoder().encode(chunk));
  },
  flush(controller) {
    console.log('[flush]');
    controller.terminate();
  },
});

const readableStream = new ReadableStream({
  start(controller) {
    // called by constructor
    console.log('[start]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // called read when controller's queue is empty
    console.log('[pull]');
    controller.enqueue('d');
    controller.close(); // or controller.error();
  },
  cancel(reason) {
    // called when rs.cancel(reason)
    console.log('[cancel]', reason);
  },
});

(async () => {
  const reader = readableStream.pipeThrough(transformStream).getReader();
  for (let result = await reader.read(); !result.done; result = await reader.read()) {
    console.log('[value]', result.value);
  }
})();

下一个代码示例(有点牵强)展示了如何实现“shouting”版本的 fetch(),该版本通过使用返回的响应 Promise 作为流 并逐块大写来将所有文本大写。这种方法的优点是您无需等待整个文档下载完毕,这在处理大型文件时可能会产生巨大的差异。

function upperCaseStream() {
  return new TransformStream({
    transform(chunk, controller) {
      controller.enqueue(chunk.toUpperCase());
    },
  });
}

function appendToDOMStream(el) {
  return new WritableStream({
    write(chunk) {
      el.append(chunk);
    }
  });
}

fetch('./lorem-ipsum.txt').then((response) =>
  response.body
    .pipeThrough(new TextDecoderStream())
    .pipeThrough(upperCaseStream())
    .pipeTo(appendToDOMStream(document.body))
);

演示

下面的演示展示了可读流、可写流和转换流的实际操作。它还包括 pipeThrough()pipeTo() 管道链的示例,并演示了 tee()。您可以选择在自己的窗口中运行演示或查看源代码

浏览器中可用的实用流

浏览器中内置了许多有用的流。您可以轻松地从 Blob 创建 ReadableStreamBlob 接口的 stream() 方法返回一个 ReadableStream,它在读取时返回 Blob 中包含的数据。还要记住,File 对象是 Blob 的一种特定类型,可以在 Blob 可以使用的任何上下文中使用。

const readableStream = new Blob(['hello world'], { type: 'text/plain' }).stream();

TextDecoder.decode()TextEncoder.encode() 的流式变体分别称为 TextDecoderStreamTextEncoderStream

const response = await fetch('https://streams.spec.whatwg.org/');
const decodedStream = response.body.pipeThrough(new TextDecoderStream());

使用 CompressionStreamDecompressionStream 转换流可以轻松地压缩或解压缩文件。下面的代码示例展示了如何下载 Streams 规范,在浏览器中直接压缩 (gzip) 它,并将压缩后的文件直接写入磁盘。

const response = await fetch('https://streams.spec.whatwg.org/');
const readableStream = response.body;
const compressedStream = readableStream.pipeThrough(new CompressionStream('gzip'));

const fileHandle = await showSaveFilePicker();
const writableStream = await fileHandle.createWritable();
compressedStream.pipeTo(writableStream);

File System Access APIFileSystemWritableFileStream 和实验性的 fetch() 请求流 是野外可写流的示例。

Serial API 大量使用了可读流和可写流。

// Prompt user to select any serial port.
const port = await navigator.serial.requestPort();
// Wait for the serial port to open.
await port.open({ baudRate: 9_600 });
const reader = port.readable.getReader();

// Listen to data coming from the serial device.
while (true) {
  const { value, done } = await reader.read();
  if (done) {
    // Allow the serial port to be closed later.
    reader.releaseLock();
    break;
  }
  // value is a Uint8Array.
  console.log(value);
}

// Write to the serial port.
const writer = port.writable.getWriter();
const data = new Uint8Array([104, 101, 108, 108, 111]); // hello
await writer.write(data);
// Allow the serial port to be closed later.
writer.releaseLock();

最后,WebSocketStream API 将流与 WebSocket API 集成在一起。

const wss = new WebSocketStream(WSS_URL);
const { readable, writable } = await wss.connection;
const reader = readable.getReader();
const writer = writable.getWriter();

while (true) {
  const { value, done } = await reader.read();
  if (done) {
    break;
  }
  const result = await process(value);
  await writer.write(result);
}

实用资源

致谢

本文由 Jake ArchibaldFrançois BeaufortSam DuttonMattias BuelensSurmaJoe MedleyAdam Rice 审阅。Jake Archibald 的博客文章对我在理解流方面帮助很大。一些代码示例的灵感来自 GitHub 用户 @bellbind 的探索,并且部分散文在很大程度上基于 MDN Web Docs on StreamsStreams Standard作者 在编写此规范方面做得非常出色。英雄图片由 Ryan LaraUnsplash 上提供。