流(Streams)

现行标准 — 最后更新

参与:
GitHub whatwg/streams新建议题所有议题
Matrix 聊天
提交记录:
GitHub whatwg/streams/commits
当前提交快照
@streamsstandard
测试:
web-platform-tests streams/进行中的工作
翻译 (非规范性)
日本語
简体中文
演示:
streams.spec.whatwg.org/demos

摘要

本规范提供了用于创建、组合和消费数据流的 API,这些数据流能够高效地映射到底层 I/O 原语。

1. 简介

本节为非规范性内容。

Web 平台的大部分内容都建立在流数据之上:也就是说,数据以增量的方式被创建、处理和消费,无需将所有数据一次性读取到内存中。Streams 标准为创建和交互此类流数据提供了一套通用 API,体现在可读流可写流转换流中。

这些 API 被设计为能够高效地映射到底层的 I/O 原语,包括在合适情况下对字节流的专门支持。它们允许轻松地将多个流组合为管道链,或者通过读取器写入器直接使用。最后,它们还被设计为自动提供背压和排队特性。

本标准为其他 Web 平台的组成部分提供了基础流原语,以便它们用于暴露其流式数据。例如,[FETCH]Response 的 body 暴露为ReadableStream 实例。更一般而言,平台中充满了等待被表达为流的流式抽象:多媒体流、文件流、跨全局通信等,都因能够增量处理数据而无需全部缓冲到内存后再处理而受益。通过为这些流向开发者暴露提供基础,Streams 标准使如下用例成为可能:

Web 开发者也可以使用此处描述的 API 创建自己的流,其 API 与平台提供的流一致。其他开发者随后可以透明地将平台流与库提供的流进行组合。通过这种方式,这里的 API 提供了所有流的统一抽象,促进了围绕这些共享且可组合接口的生态系统发展。

2. 模型

数据块(chunk)是写入或从流中读取的单个数据片段。它可以是任意类型;流甚至可以包含不同类型的数据块。对于给定流而言,数据块往往不是最原子的单位;例如,字节流中的数据块可能由 16 KiB 大小的 Uint8Array 组成,而不是单个字节。

2.1. 可读流

可读流(readable stream) 代表一个数据源,你可以从中读取数据。换句话说,数据可读流流出。具体来说,可读流是 ReadableStream 类的一个实例。

尽管可读流可以用任意行为来创建,但大多数可读流会包装一个较低层次的 I/O 源,这被称为底层源(underlying source)。底层源有两种类型:推送源(push sources)和拉取源(pull sources)。

推送源会主动向你推送数据,无论你是否在监听。它们通常还会提供暂停和恢复数据流的机制。推送源的一个例子是 TCP 套接字,数据会持续从操作系统层面推送过来,可以通过调整 TCP 窗口大小来控制速率。

拉取源则需要你主动请求数据。数据可能是同步可用的,例如被操作系统缓存在内存中的缓冲区,也可能是异步的,比如从磁盘读取。拉取源的一个例子是文件句柄,你可以定位到特定位置并读取指定数量的数据。

可读流被设计为通过统一的接口包装这两种类型的源。对于 Web 开发者自定义的流,底层源的实现细节由一个包含特定方法和属性的对象提供,并传递给 ReadableStream() 构造函数。

数据块(chunk)由流的底层源入队至流中。它们随后可以通过流的公共接口逐个读取,特别是通过使用流的 可读流读取器,该读取器可通过流的 getReader() 方法获取。

使用流的公共接口从可读流读取数据的代码被称为消费者(consumer)

消费者还可以通过 cancel() 方法取消可读流。这表示消费者不再关注该流,会立即关闭流、丢弃队列中的所有数据块,并执行底层源的任何取消机制。

消费者还可以通过流的 tee() 方法分叉(tee)可读流。这会锁定该流,使其无法直接使用,但会创建两个新的流,称为分支(branches),可分别消费。

对于表示字节的流,提供了扩展版本的可读流,以高效处理字节,尤其是减少拷贝。此类可读流的底层源被称为底层字节源(underlying byte source)。底层源为底层字节源的可读流有时被称为可读字节流(readable byte stream)。可读字节流的消费者可以通过流的 getReader() 方法获取BYOB 读取器

2.2. 可写流

可写流(writable stream) 代表一个数据目标,你可以向其中写入数据。换句话说,数据流入可写流。具体来说,可写流是 WritableStream 类的一个实例。

类似于可读流,大多数可写流会包装一个较低层次的 I/O 目标,称为底层接收器(underlying sink)。 可写流通过排队后续写入并逐个交付给底层接收器,来抽象底层接收器的一些复杂性。

数据块(chunk)通过流的公共接口写入流中,并依次传递给流的底层接收器。 对于 Web 开发者自定义的流,底层接收器的实现细节由一个包含特定方法的对象提供,并传递给 WritableStream() 构造函数。

使用流的公共接口向可写流写入数据的代码被称为生产者(producer)

生产者还可以通过 abort() 方法中止可写流, 这表示生产者认为出现了问题,后续写入应当被终止。即使没有来自底层接收器的信号,这也会使流进入错误状态,并丢弃流内部队列中的所有写入。

2.3. 转换流

转换流(transform stream) 由一对流组成:一个可写流,称为其可写端(writable side),以及一个可读流,称为其可读端(readable side)。对于特定的转换流,写入可写端的数据会以特定方式产生可以从可读端读取的新数据。

具体来说,任何包含 writable 属性和 readable 属性的对象都可以作为转换流。然而,标准的TransformStream 类可以更容易地创建一对正确关联的流。它包装了一个转换器(transformer),定义了要执行的具体转换算法。对于 Web 开发者自定义的流,转换器的实现细节由一个包含特定方法和属性的对象提供,并传递给 TransformStream() 构造函数。其他规范可能会使用 GenericTransformStream 混入来创建具有相同 writable/readable 属性对,但上层带有自定义 API 的类。

恒等转换流(identity transform stream)是一种特殊的转换流,会将写入其可写端的所有数据块原样转发到其可读端,不做任何更改。这在多种场景下很有用。当传递给 TransformStream 构造函数的transform() 方法不存在于转换器对象上时,默认会创建一个恒等转换流。

一些可能的转换流示例包括:

2.4. 管道链与背压

流主要通过管道(piping)方式相互连接使用。可读流可以直接通过其 pipeTo() 方法连接到可写流,也可以先通过一个或多个转换流连接,使用其 pipeThrough() 方法。

这样连接在一起的一组流被称为管道链(pipe chain)。在管道链中,原始源(original source)是链中第一个可读流的底层源最终接收器(ultimate sink)是链中最后一个可写流的底层接收器

一旦管道链被构建,关于数据块流经速度的信号就会在整个链上传播。如果链中的某一环节无法立即接收数据块,它会向管道链反向传播信号,最终告知原始源不要太快地产生数据块。这种根据链条处理速度来规范原始源输出速度的机制被称为背压(backpressure)

具体来说,原始源会获取 controller.desiredSize (或 byteController.desiredSize) 的值,并据此调整自身数据流速率。这个值来源于与最终接收器对应的 writer.desiredSize ,该值会随着最终接收器完成写入数据块而更新。用于构建管道链的 pipeTo() 方法会自动确保这些信息能够在管道链中向后传播。

当对可读流进行分叉(tee)时,其两个分支背压信号会聚合;如果两个分支都没有被读取,则会向原始流的底层源发出背压信号。

管道操作会锁定可读流和可写流,在管道操作期间无法再对它们进行其他操作。这允许实现进行重要的优化,比如在底层源与底层接收器之间直接传递数据,绕过许多中间队列。

2.5. 内部队列与排队策略

可读流和可写流都维护着内部队列(internal queues),它们的用途基本类似。对于可读流,内部队列包含由数据块底层源入队但尚未被消费者读取的数据。对于可写流,内部队列包含由生产者写入但尚未被底层接收器处理和确认的数据块。

排队策略(queuing strategy)是一个对象,根据流的内部队列状态,决定如何向流发出背压信号。排队策略会为每个数据块分配一个大小,并将队列中所有数据块的总大小与一个特定值(高水位线(high water mark))进行比较。两者的差值(高水位线减去当前总大小)被用来确定队列需要填充的期望大小

对于可读流,底层源可以将这个期望大小作为背压信号,放慢数据块的生成速度,以尽量保持队列期望大小大于或等于零。对于可写流,生产者也可以类似地避免写入导致期望大小变为负数的数据。

具体来说,Web 开发者自定义流的排队策略由任何具有 highWaterMark 属性的 JavaScript 对象给出。对于字节流,highWaterMark 的单位始终为字节。对于其他流,默认单位为数据块,但也可以在策略对象中包含一个 size() 函数,为每个数据块返回其大小。这允许 highWaterMark 以任意浮点单位表示。

一个简单的排队策略示例是:为每个数据块分配大小为 1,高水位线为 3。这样在可读流中最多可以入队 3 个数据块,在可写流中最多可以写入 3 个数据块,超过后流就会施加背压。

在 JavaScript 中,这样的策略可手动写为 { highWaterMark: 3, size() { return 1; }}, 或用内置的 CountQueuingStrategy 类写为 new CountQueuingStrategy({ highWaterMark: 3 })

2.6. 锁定

可读流读取器(reader/readable stream reader),简称 reader,是一个允许直接从可读流读取数据块的对象。如果没有读取器,消费者只能对可读流执行高级操作:取消流或管道到可写流。读取器可通过流的 getReader() 方法获取。

可读字节流可以提供两种读取器:默认读取器(default reader)BYOB 读取器(BYOB reader)。BYOB("bring your own buffer")读取器允许读取到开发者自定义的缓冲区,从而减少拷贝。非字节流的可读流只能提供默认读取器。默认读取器是 ReadableStreamDefaultReader 类的实例,BYOB 读取器是 ReadableStreamBYOBReader 类的实例。

类似地,可写流写入器(writer/writable stream writer),简称 writer,是一个允许直接向可写流写入数据块的对象。如果没有写入器,生产者只能执行高级操作,如中止流或管道可读流到可写流。写入器由 WritableStreamDefaultWriter 类表示。

在底层,这些高级操作其实内部也会用到读取器或写入器。

每个可读流或可写流在同一时刻最多只能有一个读取器或写入器。此时称该流被锁定(locked),且该读取器或写入器为活动(active)。可通过 readableStream.lockedwritableStream.locked 属性判断流是否锁定。

读取器或写入器还可以释放其锁(release its lock),此时其不再为活动状态,允许获取新的读取器或写入器。可通过 defaultReader.releaseLock()byobReader.releaseLock()、 或 writer.releaseLock() 方法实现。

3. 约定

本规范依赖于 Infra 标准。[INFRA]

本规范在内部算法中使用了 JavaScript 规范中的抽象操作(abstract operation)概念。这包括将其返回值视为完成记录(completion records),并使用 ! 和 ? 前缀来解包这些完成记录。[ECMASCRIPT]

本规范还使用了 JavaScript 规范中的内部插槽(internal slot)概念和记法。(不过,这些内部插槽是用在 Web IDL 的平台对象上的,而不是 JavaScript 对象上。)

采用这些外来的 JavaScript 规范约定主要是历史原因。我们建议你在编写自己的 Web 规范时避免效仿我们的做法。

在本规范中,所有数字都表示为双精度 64 位 IEEE 754 浮点值(如 JavaScript 的Number 类型或 Web IDL 的 unrestricted double 类型),并且所有算术操作都必须按此标准方式执行。这对于§ 8.1 队列含大小中描述的数据结构尤为重要。[IEEE-754]

4. 可读流

4.1. 使用可读流

消费可读流的最简单方式就是将其管道到一个可写流。这样可以保证背压被正确处理,且写入或读取的任何错误都会沿链路传播:
readableStream.pipeTo(writableStream)
  .then(() => console.log("所有数据已成功写入!"))
  .catch(e => console.error("发生错误!", e));
如果你只是想获知每个新数据块,可以创建一个自定义的可写流并将可读流管道进去:
readableStream.pipeTo(new WritableStream({
  write(chunk) {
    console.log("收到数据块", chunk);
  },
  close() {
    console.log("所有数据已成功读取!");
  },
  abort(e) {
    console.error("发生错误!", e);
  }
}));

通过让 write() 返回 Promise,你可以向可读流发出背压信号。

虽然可读流通常通过管道到可写流使用,但你也可以获取读取器并用其 read() 方法直接读取每一个数据块。例如,下面的代码会打印流中的下一个数据块(如有):
const reader = readableStream.getReader();

reader.read().then(
  ({ value, done }) => {
    if (done) {
      console.log("流已关闭!");
    } else {
      console.log(value);
    }
  },
  e => console.error("流发生错误,无法读取!", e)
);

这种更为手动的读取方式主要适用于为流开发新型高级操作的库作者,超越了已有的管道分叉操作。

上述示例用的是可读流的默认读取器。如果该流是可读字节流,你还可以获取BYOB 读取器,以便对缓冲区分配有更高的控制权,避免拷贝。例如,下面代码会将流中的前 1024 字节读取到一块内存缓冲区中:
const reader = readableStream.getReader({ mode: "byob" });

let startingAB = new ArrayBuffer(1024);
const buffer = await readInto(startingAB);
console.log("前 1024 字节: ", buffer);

async function readInto(buffer) {
  let offset = 0;

  while (offset < buffer.byteLength) {
    const { value: view, done } =
     await reader.read(new Uint8Array(buffer, offset, buffer.byteLength - offset));
    buffer = view.buffer;
    if (done) {
      break;
    }
    offset += view.byteLength;
  }

  return buffer;
}

需要注意的是,最终的 buffer 变量与 startingAB 不同,但它(以及所有中间缓冲区)实际上共享同一块底层内存。在每一步,buffer 都会转移给一个新的 ArrayBuffer 对象。view 是读取结果中解构出来的 Uint8Array,其 buffer 属性指向 ArrayBuffer 对象,byteOffset 表示字节写入的起始偏移,byteLength 表示写入的字节数。

需要说明的是,这个示例主要用于教学。实际应用中,min 选项可以更简单直接地读取指定字节数:

const reader = readableStream.getReader({ mode: "byob" });
const { value: view, done } = await reader.read(new Uint8Array(1024), { min: 1024 });
console.log("前 1024 字节: ", view);

4.2. ReadableStream

ReadableStream 类是通用可读流概念的具体实现。它适用于任何数据块类型,并维护一个内部队列,用于跟踪由底层源提供但尚未被消费者读取的数据。

4.2.1. 接口定义

ReadableStream 类的 Web IDL 定义如下:

[Exposed=*, Transferable]
interface ReadableStream {
  constructor(optional object underlyingSource, optional QueuingStrategy strategy = {});

  static ReadableStream from(any asyncIterable);

  readonly attribute boolean locked;

  Promise<undefined> cancel(optional any reason);
  ReadableStreamReader getReader(optional ReadableStreamGetReaderOptions options = {});
  ReadableStream pipeThrough(ReadableWritablePair transform, optional StreamPipeOptions options = {});
  Promise<undefined> pipeTo(WritableStream destination, optional StreamPipeOptions options = {});
  sequence<ReadableStream> tee();

  async iterable<any>(optional ReadableStreamIteratorOptions options = {});
};

typedef (ReadableStreamDefaultReader or ReadableStreamBYOBReader) ReadableStreamReader;

enum ReadableStreamReaderMode { "byob" };

dictionary ReadableStreamGetReaderOptions {
  ReadableStreamReaderMode mode;
};

dictionary ReadableStreamIteratorOptions {
  boolean preventCancel = false;
};

dictionary ReadableWritablePair {
  required ReadableStream readable;
  required WritableStream writable;
};

dictionary StreamPipeOptions {
  boolean preventClose = false;
  boolean preventAbort = false;
  boolean preventCancel = false;
  AbortSignal signal;
};

4.2.2. 内部插槽

ReadableStream 的实例具有下表所述的内部插槽:

内部插槽 描述(非规范性
[[controller]] 一个 ReadableStreamDefaultControllerReadableByteStreamController ,用于控制该流的状态和队列
[[Detached]] 当流被转移(transferred)时为 true 的布尔值标志
[[disturbed]] 当流已被读取或已被取消时为 true 的布尔值标志
[[reader]] 如果流被读取器锁定,则为 ReadableStreamDefaultReaderReadableStreamBYOBReader 实例,否则为 undefined
[[state]] 包含流当前状态的字符串,仅供内部使用;取值为 "readable"、"closed" 或 "errored"
[[storedError]] 指示流如何失败的值,用作操作出错流时的失败原因或异常

4.2.3. 底层源 API

ReadableStream() 构造函数的第一个参数是一个 JavaScript 对象,表示底层源(underlying source)。 这样的对象可以包含以下任意属性:

dictionary UnderlyingSource {
  UnderlyingSourceStartCallback start;
  UnderlyingSourcePullCallback pull;
  UnderlyingSourceCancelCallback cancel;
  ReadableStreamType type;
  [EnforceRange] unsigned long long autoAllocateChunkSize;
};

typedef (ReadableStreamDefaultController or ReadableByteStreamController) ReadableStreamController;

callback UnderlyingSourceStartCallback = any (ReadableStreamController controller);
callback UnderlyingSourcePullCallback = Promise<undefined> (ReadableStreamController controller);
callback UnderlyingSourceCancelCallback = Promise<undefined> (optional any reason);

enum ReadableStreamType { "bytes" };
start(controller), 类型为 UnderlyingSourceStartCallback

在创建 ReadableStream 实例时立即调用的函数。

通常用于适配推送源,比如设置相关事件监听(见§ 10.1 一个没有背压支持的底层推送源的可读流示例),或者用于获取拉取源的访问权(见§ 10.4 一个底层拉取源的可读流示例)。

如果初始化过程是异步的,可以返回一个 promise 以通知成功或失败;被拒绝的 promise 会使流变为错误状态。抛出的异常将由 ReadableStream() 构造函数重新抛出。

pull(controller), 类型为 UnderlyingSourcePullCallback

当流的内部队列中数据块未满时调用(即队列的期望大小为正时)。通常会反复调用,直到队列达到高水位线(即期望大小≤0)。

对于推送源,可用于恢复暂停的数据流(见§ 10.2 支持背压的底层推送源可读流);对于拉取源,用于获取新的数据块并入队(见§ 10.4 一个底层拉取源的可读流示例)。

只有在 start() 成功完成后才会被调用。并且只有在 pull() 实现中入队了至少一个数据块或满足了 BYOB 请求时才会持续被调用;无操作的 pull() 不会持续被调用。

如果该函数返回一个 promise,则只有该 promise 履约后才会再次调用 pull()。(若 promise 被拒绝,流会进入错误状态。)这主要用于拉取源,返回的 promise 表示获取新数据块的过程。抛出异常等同于返回被拒绝的 promise。

cancel(reason), 类型为 UnderlyingSourceCancelCallback

消费者通过 stream.cancel()reader.cancel() 取消流时调用。其参数与消费者传递给这些方法的值相同。

可读流在管道操作中也可能被取消(见 pipeTo() 方法的定义)。

通常用于释放底层资源的访问权(见§ 10.1 一个没有背压支持的底层推送源的可读流示例)。

如果取消过程是异步的,可以返回 promise 通知成功或失败;结果会通过调用 cancel() 方法的返回值反馈。抛出异常等同于返回被拒绝的 promise。

即使取消过程失败,流也会关闭,不会进入错误状态。因为一旦消费者通过 cancel 表达了不再关心该流,取消过程的失败对消费者视角已无影响,失败结果只通知给相应方法的直接调用者。

这不同于 closeabort 这类 WritableStream底层接收器选项,后者失败时会令对应 WritableStream 进入错误状态。因为这些操作对应生产者请求的特定动作,失败时意味着有更严重的问题。

type(仅字节流), 类型为 ReadableStreamType

可设置为 "bytes",表示构造得到的 ReadableStream可读字节流。这保证流可通过 getReader() 方法成功获取 BYOB 读取器。同时会影响传递给 start()pull() 方法的 controller 参数类型(见下文)。

如何设置一个可读字节流,包括使用不同的控制器接口,可参考 § 10.3 一个没有背压支持的字节推送源可读字节流

设置为 "bytes" 或 undefined 之外的任何值都会导致 ReadableStream() 构造函数抛出异常。

autoAllocateChunkSize (仅字节流), 类型为 unsigned long long

可设为一个正整数,使实现自动为底层源分配缓冲区。当消费者使用 默认读取器 时,流实现会自动分配指定大小的 ArrayBuffer ,这样 controller.byobRequest 始终存在,仿佛消费者使用的是 BYOB 读取器

这样可以减少处理默认读取器消费者时所需代码量(可比较 § 10.3 一个没有背压支持的字节推送源可读字节流§ 10.5 一个带自动分配的字节拉取源可读字节流)。

传递给 start()pull() 方法的 controller 参数类型取决于 type 选项的值。如果 type 为 undefined(包括未设置),则 controllerReadableStreamDefaultController。 如果为 "bytes", 则 controllerReadableByteStreamController

4.2.4. 构造函数、方法和属性

stream = new ReadableStream(underlyingSource[, strategy])

创建一个新的 ReadableStream,包装所提供的底层源。更多关于 underlyingSource 参数的信息见 § 4.2.3 底层源 API

strategy 参数表示流的排队策略,详见 § 7.1 排队策略 API。如果未提供该参数,则默认行为等同于 CountQueuingStrategy,其高水位线为 1。

stream = ReadableStream.from(asyncIterable)

创建一个新的 ReadableStream,包装所提供的可迭代对象异步可迭代对象

这可以用于将各种对象适配为可读流,如数组异步生成器,或 Node.js readable stream 等。

isLocked = stream.locked

返回该可读流是否被某个读取器锁定

await stream.cancel([ reason ])

取消流,表示消费者不再关心该流。提供的 reason 参数会传递给底层源的 cancel() 方法,具体是否使用由底层源决定。

返回的 promise 在流被成功关闭时履约,在底层源关闭失败时拒绝。如果流当前被锁定,则会直接拒绝并抛出 TypeError,不会尝试取消流。

reader = stream.getReader()

创建一个 ReadableStreamDefaultReader锁定该流。流被锁定期间,不能获取其他读取器,直到释放该读取器(release)。

该功能尤其适合希望独占消费整个流的抽象对象。通过为流获取读取器,可以确保不会有其他人与你并发读取或取消流,从而影响你的抽象。

reader = stream.getReader({ mode: "byob" })

创建一个 ReadableStreamBYOBReader锁定该流。

该调用与无参数版本行为相同,但只适用于可读字节流,即专门支持“自带缓冲区”读取的流。返回的BYOB 读取器允许直接通过 read() 方法将数据读入开发者自定义的缓冲区,实现对分配的精细控制。

readable = stream.pipeThrough({ writable, readable }[, { preventClose, preventAbort, preventCancel, signal }])

为该可读流提供一个便捷的链式管道方式,通过一个转换流(或任意 { writable, readable } 对)进行处理。它会将当前流管道到指定对的 writable 端,并返回 readable 端以便继续使用。

执行管道操作会在整个管道操作期间锁定该流,防止其他消费者获取读取器。

await stream.pipeTo(destination[, { preventClose, preventAbort, preventCancel, signal }])

将该可读流管道到给定的可写流destination。通过传递参数可以自定义在各种错误条件下管道的行为。返回一个 promise,在管道过程成功完成时履约,否则在出错时拒绝。

执行管道操作会在整个管道期间锁定该流,防止其他消费者获取读取器。

源流与目标流的错误与关闭传播规则如下:

  • 可读流出错时,会中止destination,除非 preventAbort 为真。promise 会以源流的错误或中止目标流时产生的错误拒绝。

  • destination 出错时,会取消可读流,除非 preventCancel 为真。promise 会以目标流的错误或取消源流时的错误拒绝。

  • 当源可读流关闭时,destination 也会关闭,除非 preventClose 为真。promise 会在关闭目标流过程完成后履约,如果关闭目标流时报错则以该错误拒绝。

  • 如果 destination 最初已关闭或正在关闭,则该可读流会被取消,除非 preventCancel 为真。promise 会以管道到已关闭流失败的错误或取消源流时的错误拒绝。

signal 选项可以设置为 AbortSignal,以允许通过相应的 AbortController 中止正在进行的管道操作。在这种情况下,源可读流会被取消destination 会被中止,除非相应的 preventCancelpreventAbort 被设置。

[branch1, branch2] = stream.tee()

分叉该可读流,返回一个包含两个分支的新 ReadableStream 实例的数组。

分叉操作会锁定该流,防止其他消费者获取读取器。要取消流,需要取消两个分支,最终组合的取消原因会传递给流的底层源

如果该流是可读字节流,每个分支会收到各自的数据块副本;否则,两个分支看到的数据块是同一个对象。如果数据块不是不可变的,这可能导致两个分支间互相影响。

new ReadableStream(underlyingSource, strategy) 构造函数的步骤如下:
  1. 如果 underlyingSource 缺失,则将其设为 null。

  2. underlyingSourceDictunderlyingSource转换为类型为 UnderlyingSource 的 IDL 值。

    我们不能直接将 underlyingSource 参数声明为 UnderlyingSource 类型,否则会丢失原始对象的引用。我们需要保留该对象,以便调用其上的各种方法。

  3. 执行 ! InitializeReadableStream(this)。

  4. 如果 underlyingSourceDict["type"] 为 "bytes":

    1. 如果 strategy["size"] 存在,则抛出 RangeError 异常。

    2. highWaterMark 为 ? ExtractHighWaterMark(strategy, 0)。

    3. 执行 ? SetUpReadableByteStreamControllerFromUnderlyingSource(this, underlyingSource, underlyingSourceDict, highWaterMark)。

  5. 否则,

    1. 断言:underlyingSourceDict["type"] 不存在

    2. sizeAlgorithm 为 ! ExtractSizeAlgorithm(strategy)。

    3. highWaterMark 为 ? ExtractHighWaterMark(strategy, 1)。

    4. 执行 ? SetUpReadableStreamDefaultControllerFromUnderlyingSource(this, underlyingSource, underlyingSourceDict, highWaterMark, sizeAlgorithm)。

静态 from(asyncIterable) 方法的步骤如下:
  1. 返回 ? ReadableStreamFromIterable(asyncIterable)。

locked getter 步骤如下:
  1. 返回 ! IsReadableStreamLocked(this)。

cancel(reason) 方法的步骤如下:
  1. 如果 ! IsReadableStreamLocked(this) 为 true,则返回一个被拒绝的 promise,其原因是 TypeError 异常。

  2. 返回 ! ReadableStreamCancel(this, reason)。

getReader(options) 方法的步骤如下:
  1. 如果 options["mode"] 不存在,则返回 ? AcquireReadableStreamDefaultReader(this)。

  2. 断言:options["mode"] 的值为 "byob"。

  3. 返回 ? AcquireReadableStreamBYOBReader(this)。

一个使用读取器的抽象示例如下,该函数将整个可读流读取到内存数组,数组元素为每个数据块
function readAllChunks(readableStream) {
  const reader = readableStream.getReader();
  const chunks = [];

  return pump();

  function pump() {
    return reader.read().then(({ value, done }) => {
      if (done) {
        return chunks;
      }

      chunks.push(value);
      return pump();
    });
  }
}

注意它第一步就获取读取器,之后只使用该读取器。这确保了没有其他消费者可以干扰流(如并发读取或取消流)。

pipeThrough(transform, options) 方法步骤如下:
  1. 如果 ! IsReadableStreamLocked(this) 为 true,则抛出 TypeError 异常。

  2. 如果 ! IsWritableStreamLocked(transform["writable"]) 为 true,则抛出 TypeError 异常。

  3. signaloptions["signal"],如果存在则取该值,否则为 undefined。

  4. promise 为 ! ReadableStreamPipeTo(this, transform["writable"], options["preventClose"], options["preventAbort"], options["preventCancel"], signal)。

  5. promise.[[PromiseIsHandled]] 设为 true。

  6. 返回 transform["readable"]。

使用 pipeThrough(transform, options) 构建管道链的典型示例如下:
httpResponseBody
  .pipeThrough(decompressorTransform)
  .pipeThrough(ignoreNonImageFilesTransform)
  .pipeTo(mediaGallery);
pipeTo(destination, options) 方法步骤如下:
  1. 如果 ! IsReadableStreamLocked(this) 为 true,返回一个被拒绝的 promise,其原因为 TypeError 异常。

  2. 如果 ! IsWritableStreamLocked(destination) 为 true,返回一个被拒绝的 promise,其原因为 TypeError 异常。

  3. signaloptions["signal"],如果存在则取该值,否则为 undefined。

  4. 返回 ! ReadableStreamPipeTo(this, destination, options["preventClose"], options["preventAbort"], options["preventCancel"], signal)。

可以使用 AbortSignal 停止正在进行的 管道操作,示例如下:
const controller = new AbortController();
readable.pipeTo(writable, { signal: controller.signal });

// ... 之后的某个时刻 ...
controller.abort();

(上述代码省略了对 pipeTo() 返回的 promise 的错误处理。此外,preventAbortpreventCancel 选项对于管道被终止时的影响也值得考虑。)

上述技术也可用于切换被管道的 ReadableStream,但写入同一个 WritableStream
const controller = new AbortController();
const pipePromise = readable1.pipeTo(writable, { preventAbort: true, signal: controller.signal });

// ... 之后的某个时刻 ...
controller.abort();

// 等待管道完成后再启动新管道:
try {
 await pipePromise;
} catch (e) {
 // 预期 "AbortError" DOMException 可吞掉,其他错误需重新抛出。
 if (e.name !== "AbortError") {
  throw e;
 }
}

// 启动新的管道!
readable2.pipeTo(writable);
tee() 方法步骤如下:
  1. 返回 ? ReadableStreamTee(this, false)。

对流进行分叉(tee)在希望让两个独立的消费者并行读取流(甚至读取速度不同)时很有用。例如,给定一个代表本地文件的可写流 cacheEntry,和一个代表远程服务器上传的可写流 httpRequestBody,你可以同时将同一个可读流管道到两个目标:
const [forLocal, forRemote] = readableStream.tee();

Promise.all([
  forLocal.pipeTo(cacheEntry),
  forRemote.pipeTo(httpRequestBody)
])
.then(() => console.log("已保存到缓存并上传!"))
.catch(e => console.error("缓存或上传失败:" , e));

4.2.5. 异步迭代

for await (const chunk of stream) { ... }
for await (const chunk of stream.values({ preventCancel: true })) { ... }

异步遍历流内部队列中的数据块(chunk)

异步迭代流时会锁定该流,阻止其他消费者获取读取器。如果通过 return() 方法(如通过 break 跳出循环)结束异步迭代,会释放该锁。

默认情况下,调用异步迭代器的 return() 方法也会取消流。如要阻止此行为,可调用流的 values() 方法并为 preventCancel 选项传入 true。

可读流的异步迭代器初始化步骤,给定 streamiteratorargs,如下:
  1. reader 为 ? AcquireReadableStreamDefaultReader(stream)。

  2. iteratorreader 设为 reader

  3. preventCancelargs[0]["preventCancel"]。

  4. iteratorprevent cancel 设为 preventCancel

获取下一个迭代结果的步骤(针对 ReadableStream,给定 streamiterator)如下:
  1. readeriteratorreader

  2. 断言:reader.[[stream]] 不为 undefined。

  3. promise新建的 promise

  4. readRequest 为新的读取请求(read request),包含以下条目

    chunk steps,给定 chunk
    1. 解析 promise,值为 chunk

    close steps
    1. 执行 ! ReadableStreamDefaultReaderRelease(reader)。

    2. 解析 promise,值为 迭代结束

    error steps,给定 e
    1. 执行 ! ReadableStreamDefaultReaderRelease(reader)。

    2. 拒绝 promise,理由为 e

  5. 执行 ! ReadableStreamDefaultReaderRead(this, readRequest)。

  6. 返回 promise

异步迭代器 return 步骤(针对 ReadableStream,给定 streamiteratorarg)如下:
  1. readeriteratorreader

  2. 断言:reader.[[stream]] 不为 undefined。

  3. 断言:reader.[[readRequests]] 为空(async iterator 机制保证所有之前对 next() 的调用都已 settle,才会调用此方法)。

  4. 如果 iteratorprevent cancel 为 false:

    1. result 为 ! ReadableStreamReaderGenericCancel(reader, arg)。

    2. 执行 ! ReadableStreamDefaultReaderRelease(reader)。

    3. 返回 result

  5. 执行 ! ReadableStreamDefaultReaderRelease(reader)。

  6. 返回一个以 undefined 解决的 promise

4.2.6. 通过 postMessage() 传递

destination.postMessage(rs, { transfer: [rs] });

将一个 ReadableStream 发送到另一个 frame、window 或 worker。

被传递的流可与原始流一样使用。原始流会被锁定,无法再直接使用。

ReadableStream 对象是可转移对象(transferable objects)。它们的转移步骤(transfer steps),给定 valuedataHolder,如下:
  1. 如果 ! IsReadableStreamLocked(value) 为 true,则抛出 "DataCloneError" DOMException

  2. port1 为在当前 Realm中新建的 MessagePort

  3. port2 为在当前 Realm中新建的 MessagePort

  4. 关联 port1port2

  5. writable 为在当前 Realm中新建的 WritableStream

  6. 执行 ! SetUpCrossRealmTransformWritable(writable, port1)。

  7. promise 为 ! ReadableStreamPipeTo(value, writable, false, false, false)。

  8. promise.[[PromiseIsHandled]] 设为 true。

  9. dataHolder.[[port]] 设为 ! StructuredSerializeWithTransfer(port2, « port2 »)。

转移接收步骤(transfer-receiving steps),给定 dataHoldervalue,如下:
  1. deserializedRecord 为 ! StructuredDeserializeWithTransfer(dataHolder.[[port]], 当前 Realm)。

  2. portdeserializedRecord.[[Deserialized]]。

  3. 执行 ! SetUpCrossRealmTransformReadable(value, port)。

4.3. ReadableStreamGenericReader mixin

ReadableStreamGenericReader mixin 定义了 ReadableStreamDefaultReaderReadableStreamBYOBReader 对象共享的内部槽、getter 和方法。

4.3.1. 混入定义

ReadableStreamGenericReader 混入的 Web IDL 定义如下:

interface mixin ReadableStreamGenericReader {
  readonly attribute Promise<undefined> closed;

  Promise<undefined> cancel(optional any reason);
};

4.3.2. 内部插槽

包含 ReadableStreamGenericReader 混入的类的实例会带有下表描述的内部插槽:

内部插槽 描述(非规范性
[[closedPromise]] 由 reader 的 closed getter 返回的 promise
[[stream]] 拥有此 reader 的 ReadableStream 实例

4.3.3. 方法与属性

closed 属性步骤如下:
  1. 返回 this.[[closedPromise]]

cancel(reason) 方法步骤如下:
  1. 如果 this.[[stream]] 为 undefined,则返回以 TypeError 异常拒绝的 promise。

  2. 返回 ! ReadableStreamReaderGenericCancel(this, reason)。

4.4. ReadableStreamDefaultReader

ReadableStreamDefaultReader 类表示被 ReadableStream 实例提供的默认 reader

4.4.1. 接口定义

ReadableStreamDefaultReader 类的 Web IDL 定义如下:

[Exposed=*]
interface ReadableStreamDefaultReader {
  constructor(ReadableStream stream);

  Promise<ReadableStreamReadResult> read();
  undefined releaseLock();
};
ReadableStreamDefaultReader includes ReadableStreamGenericReader;

dictionary ReadableStreamReadResult {
  any value;
  boolean done;
};

4.4.2. 内部插槽

ReadableStreamDefaultReader 的实例拥有 ReadableStreamGenericReader 定义的内部插槽,以及下表所述的插槽:

内部插槽 描述(非规范性
[[readRequests]] 一个列表,包含读取请求,用于在消费者请求时,这些块尚未可用

读取请求是一个结构体 ,包含三个算法,用于响应填充可读流内部队列或其状态变化。它包含如下

chunk steps

接收一个的算法,当有可读取的块时调用

close steps

无参数的算法,当没有且流已关闭时调用

error steps

接收一个 JavaScript 值的算法,当没有且流出错时调用

4.4.3. 构造函数、方法和属性

reader = new ReadableStreamDefaultReader(stream)

这等价于调用 stream.getReader()

await reader.closed

返回一个 promise,当流关闭时被满足;如果流发生错误或 reader 的锁在流关闭前被释放,则 promise 被拒绝。

await reader.cancel([ reason ])

如果 reader 处于活动状态,则行为与 stream.cancel(reason) 相同。

{ value, done } = await reader.read()

返回一个 promise,允许访问流的内部队列中的下一个(如果有)。

  • 如果块可用,promise 被满足,返回的对象格式为 { value: theChunk, done: false }
  • 如果流被关闭,promise 被满足,返回对象格式为 { value: undefined, done: true }
  • 如果流出错,promise 被拒绝并带有相关错误。

如果读取一个块导致队列变为空,将会从底层源拉取更多数据。

reader.releaseLock()

释放 reader 对应流的锁。锁被释放后,reader 不再处于活动状态。如果关联的流在释放锁时出错,reader 从此也会表现为出错;否则 reader 表现为已关闭。

如果 reader 在释放锁时还有未完成的读取请求,则 read() 方法返回的 promise 会立即被 TypeError 拒绝。所有未读取的块将保留在流的内部队列,可通过获取新 reader 再次读取。

new ReadableStreamDefaultReader(stream) 构造函数步骤如下:
  1. 执行 ? SetUpReadableStreamDefaultReader(this, stream)。

read() 方法步骤如下:
  1. 如果 this.[[stream]] 为 undefined,返回以 TypeError 异常拒绝的 promise。

  2. promise一个新的 promise

  3. readRequest 为一个新的读取请求,包含以下

    分块步骤,给定 chunk
    1. 解析 promise,值为 «[ "value" → chunk, "done" → false ]»。

    关闭步骤
    1. 解析 promise,值为 «[ "value" → undefined, "done" → true ]»。

    错误步骤,给定 e
    1. 拒绝 promise,理由为 e

  4. 执行 ! ReadableStreamDefaultReaderRead(this, readRequest)。

  5. 返回 promise

releaseLock() 方法步骤如下:
  1. 如果 this.[[stream]] 为 undefined,则返回。

  2. 执行 ! ReadableStreamDefaultReaderRelease(this)。

4.5. ReadableStreamBYOBReader

ReadableStreamBYOBReader 类表示一种BYOB reader ,用于由 ReadableStream 实例提供。

4.5.1. 接口定义

ReadableStreamBYOBReader 类的 Web IDL 定义如下:

[Exposed=*]
interface ReadableStreamBYOBReader {
  constructor(ReadableStream stream);

  Promise<ReadableStreamReadResult> read(ArrayBufferView view, optional ReadableStreamBYOBReaderReadOptions options = {});
  undefined releaseLock();
};
ReadableStreamBYOBReader includes ReadableStreamGenericReader;

dictionary ReadableStreamBYOBReaderReadOptions {
  [EnforceRange] unsigned long long min = 1;
};

4.5.2. 内部插槽

ReadableStreamBYOBReader 的实例拥有 ReadableStreamGenericReader 定义的内部插槽,以及下表所述的插槽:

内部插槽 描述(非规范性
[[readIntoRequests]] 一个列表,包含read-into 请求,用于在消费者请求时,这些块尚未可用

read-into 请求是一个结构体 ,包含三个算法,用于响应填充可读字节流内部队列或其状态变化。它包含如下

chunk steps

接收一个的算法,当有可读取的块时调用

close steps

接收一个或 undefined 的算法,当没有块(chunk)且流已关闭时调用

error steps

接收一个 JavaScript 值的算法,当没有且流出错时调用

close steps 接收一个,这样在可能的情况下可以将底层内存返回给调用者。例如,byobReader.read(chunk) 会在流关闭时以 { value: newViewOnSameMemory, done: true } 形式 fulfill。如果流被取消,则底层内存会被丢弃,byobReader.read(chunk) 会以更传统的 { value: undefined, done: true } fulfill。

4.5.3. 构造函数、方法和属性

reader = new ReadableStreamBYOBReader(stream)

这等价于调用 stream.getReader({ mode: "byob" })

await reader.closed

返回一个 promise,当流关闭时被满足;如果流发生错误或 reader 的锁在流关闭前被释放,则 promise 被拒绝。

await reader.cancel([ reason ])

如果 reader 处于活动状态,则行为与 stream.cancel(reason) 相同。

{ value, done } = await reader.read(view[, { min }])

尝试将字节读取到 view 中,并返回一个 promise,promise 解析后的结果如下:

  • 如果块可用,promise 被满足,返回的对象格式为 { value: newView, done: false }。 此时 view 会被分离,不可再用,newView 是同一底层内存的新视图,数据已写入其中。
  • 如果流被关闭,promise 被满足,返回对象格式为 { value: newView, done: true }。 此时 view 会被分离,不可再用,newView 是同一底层内存的新视图,未修改数据,目的是保证内存返回给调用者。
  • 如果 reader 被取消,promise 被满足,返回对象格式为 { value: undefined, done: true }。 此时 view 的底层内存会被丢弃,不再返回给调用者。
  • 如果流出错,promise 被拒绝并带有相关错误。

如果读取一个块导致队列变为空,将会从底层源拉取更多数据。

如果指定了 min ,promise 只会在满足最小元素数量时才会被 fulfill。此处“元素数量”对于 typed array 是 newViewlength,对于 DataViewnewView.byteLength。如果流关闭,则 promise 返回流中剩余元素,可能少于最初请求数量。如果未指定,则至少有一个元素时就会 fulfill。

reader.releaseLock()

释放 reader 对应流的锁。锁被释放后,reader 不再处于活动状态。如果关联的流在释放锁时出错,reader 从此也会表现为出错;否则 reader 表现为已关闭。

如果 reader 在释放锁时还有未完成的读取请求,则 read() 方法返回的 promise 会立即被 TypeError 拒绝。所有未读取的块将保留在流的内部队列,可通过获取新 reader 再次读取。

new ReadableStreamBYOBReader(stream) 构造函数步骤如下:
  1. 执行 ? SetUpReadableStreamBYOBReader(this, stream)。

read(view, options) 方法步骤如下:
  1. 如果 view.[[ByteLength]] 为 0,返回以 TypeError 异常拒绝的 promise。

  2. 如果 view.[[ViewedArrayBuffer]].[[ByteLength]] 为 0,返回以 TypeError 异常拒绝的 promise。

  3. 如果 ! IsDetachedBuffer(view.[[ViewedArrayBuffer]]) 为 true,返回以 TypeError 异常拒绝的 promise。

  4. 如果 options["min"] 为 0,返回以 TypeError 异常拒绝的 promise。

  5. 如果 view 拥有 [[TypedArrayName]] 内部插槽,

    1. 如果 options["min"] > view.[[ArrayLength]], 返回以 RangeError 异常拒绝的 promise。

  6. 否则(即为 DataView),

    1. 如果 options["min"] > view.[[ByteLength]], 返回以 RangeError 异常拒绝的 promise。

  7. 如果 this.[[stream]] 为 undefined,返回以 TypeError 异常拒绝的 promise。

  8. promise一个新的 promise

  9. readIntoRequest 为一个新的read-into 请求,包含以下

    分块步骤,给定 chunk
    1. 解析 promise,值为 «[ "value" → chunk, "done" → false ]»。

    关闭步骤,给定 chunk
    1. 解析 promise,值为 «[ "value" → chunk, "done" → true ]»。

    错误步骤,给定 e
    1. 拒绝 promise,理由为 e

  10. 执行 ! ReadableStreamBYOBReaderRead(this, view, options["min"], readIntoRequest).

  11. 返回 promise

releaseLock() 方法步骤如下:
  1. 如果 this.[[stream]] 为 undefined,则返回。

  2. 执行 ! ReadableStreamBYOBReaderRelease(this)。

4.6. ReadableStreamDefaultController

ReadableStreamDefaultController 类拥有允许控制 ReadableStream 状态和内部队列的方法。 当构造一个不是可读字节流ReadableStream 时,底层源会被赋予一个对应的 ReadableStreamDefaultController 实例用于操作流。

4.6.1. 接口定义

ReadableStreamDefaultController 类的 Web IDL 定义如下:

[Exposed=*]
interface ReadableStreamDefaultController {
  readonly attribute unrestricted double? desiredSize;

  undefined close();
  undefined enqueue(optional any chunk);
  undefined error(optional any e);
};

4.6.2. 内部插槽

ReadableStreamDefaultController 的实例拥有下表所描述的内部插槽:

内部插槽 描述(非规范性
[[cancelAlgorithm]] 一个返回 promise 的算法,接收一个参数(取消原因),用于将取消请求传递给底层源
[[closeRequested]] 一个布尔标志,指示流是否已被底层源关闭,但其内部队列中仍有未读取的
[[pullAgain]] 一个布尔标志,如果流的机制请求调用底层源的 pull 算法拉取更多数据,但由于上一次调用尚未完成,不能立即执行,则设置为 true
[[pullAlgorithm]] 一个返回 promise 的算法,从底层源拉取数据
[[pulling]] 一个布尔标志,在底层源的 pull 算法执行且返回的 promise 尚未完成时为 true,用于防止重入调用
[[queue]] 一个列表,表示流的内部队列
[[queueTotalSize]] 存储于[[queue]]中的所有块的总大小(见 § 8.1 队列与大小
[[started]] 一个布尔标志,表示底层源是否已完成启动
[[strategyHWM]] 构造时作为流排队策略一部分提供的数值,表示应用背压底层源的阈值
[[strategySizeAlgorithm]] 一个算法,用于计算加入队列的的大小,作为流排队策略的一部分
[[stream]] 被控制的 ReadableStream 实例

4.6.3. 方法与属性

desiredSize = controller.desiredSize

返回填满所控制流的内部队列所需的期望大小。如果队列过满,该值可能为负。底层源应利用此信息判断何时以及如何施加背压

controller.close()

关闭所控制的可读流。消费者仍可读取之前已进入队列的,但全部读取后流将关闭。

controller.enqueue(chunk)

将给定的chunk加入所控制的可读流队列。

controller.error(e)

使所控制的可读流出错,后续所有操作均会因给定错误e失败。

desiredSize getter 步骤如下:
  1. 返回 ! ReadableStreamDefaultControllerGetDesiredSize(this)。

close() 方法步骤如下:
  1. 如果 ! ReadableStreamDefaultControllerCanCloseOrEnqueue(this) 为 false,则抛出 TypeError 异常。

  2. 执行 ! ReadableStreamDefaultControllerClose(this)。

enqueue(chunk) 方法步骤如下:
  1. 如果 ! ReadableStreamDefaultControllerCanCloseOrEnqueue(this) 为 false,则抛出 TypeError 异常。

  2. 执行 ? ReadableStreamDefaultControllerEnqueue(this, chunk)。

error(e) 方法步骤如下:
  1. 执行 ! ReadableStreamDefaultControllerError(this, e)。

4.6.4. 内部方法

每个 ReadableStreamDefaultController 实例实现以下内部方法。 可读流实现会多态地调用这些方法,或调用 BYOB 控制器的对应方法,详见§ 4.9.2 控制器接口抽象操作

[[CancelSteps]](reason) 实现 [[CancelSteps]] 合约。步骤如下:
  1. 执行 ! ResetQueue(this)。

  2. result 为执行 this.[[cancelAlgorithm]],传入 reason 的结果。

  3. 执行 ! ReadableStreamDefaultControllerClearAlgorithms(this)。

  4. 返回 result

[[PullSteps]](readRequest) 实现 [[PullSteps]] 合约。步骤如下:
  1. streamthis.[[stream]]

  2. 如果 this.[[queue]]

    1. chunk 为 ! DequeueValue(this)。

    2. 如果 this.[[closeRequested]] 为 true 且 this.[[queue]] 为空

      1. 执行 ! ReadableStreamDefaultControllerClearAlgorithms(this)。

      2. 执行 ! ReadableStreamClose(stream).

    3. 否则,执行 ! ReadableStreamDefaultControllerCallPullIfNeeded(this)。

    4. 执行 readRequestchunk steps,传入 chunk

  3. 否则,

    1. 执行 ! ReadableStreamAddReadRequest(stream, readRequest)。

    2. 执行 ! ReadableStreamDefaultControllerCallPullIfNeeded(this)。

[[ReleaseSteps]]() 实现 [[ReleaseSteps]] 合约。步骤如下:
  1. 返回。

4.7. ReadableByteStreamController

ReadableByteStreamController 类拥有允许控制 ReadableStream 状态和内部队列的方法。 当构造一个可读字节流ReadableStream 时,底层源会被赋予一个对应的 ReadableByteStreamController 实例用于操作流。

4.7.1. 接口定义

ReadableByteStreamController 类的 Web IDL 定义如下:

[Exposed=*]
interface ReadableByteStreamController {
  readonly attribute ReadableStreamBYOBRequest? byobRequest;
  readonly attribute unrestricted double? desiredSize;

  undefined close();
  undefined enqueue(ArrayBufferView chunk);
  undefined error(optional any e);
};

4.7.2. 内部插槽

ReadableByteStreamController 的实例拥有下表所描述的内部插槽:

内部插槽 描述(非规范性
[[autoAllocateChunkSize]] 一个正整数,当启用自动 buffer 分配特性时存在。在这种情况下,该值指定要分配的 buffer 大小。否则为 undefined。
[[byobRequest]] 一个 ReadableStreamBYOBRequest 实例,表示当前 BYOB 读取请求,如果没有待处理请求则为 null
[[cancelAlgorithm]] 一个返回 promise 的算法,接收一个参数(取消原因),用于将取消请求传递给底层字节源
[[closeRequested]] 一个布尔标志,指示流是否已被底层字节源关闭,但其内部队列中仍有未读取的
[[pullAgain]] 一个布尔标志,如果流的机制请求调用底层字节源的 pull 算法拉取更多数据,但由于上一次调用尚未完成,不能立即执行,则设置为 true
[[pullAlgorithm]] 一个返回 promise 的算法,从底层字节源拉取数据
[[pulling]] 一个布尔标志,在底层字节源的 pull 算法执行且返回的 promise 尚未完成时为 true,用于防止重入调用
[[pendingPullIntos]] 一个列表,包含pull-into 描述符
[[queue]] 一个列表,包含可读字节流队列项,表示流的内部队列
[[queueTotalSize]] 以字节为单位,存储于[[queue]]中的所有块的总大小(见 § 8.1 队列与大小
[[started]] 一个布尔标志,表示底层字节源是否已完成启动
[[strategyHWM]] 构造时作为流排队策略一部分提供的数值,表示应用背压底层字节源的阈值
[[stream]] 被控制的 ReadableStream 实例

虽然 ReadableByteStreamController 实例有 [[queue]][[queueTotalSize]] 插槽,但我们并不会对它们使用 § 8.1 队列与大小 中的大多数抽象操作,因为规范中对该队列的操作与其他类型有较大不同。我们会手动同步更新这两个插槽。

这部分可能会在后续规范重构中得到优化。

可读字节流队列项是一个结构体,用于封装可读字节流中特有的重要信息。它包含如下

buffer

一个 ArrayBuffer,为转移自底层字节源最初提供的 buffer。

byte offset

一个非负整数,表示来自底层字节源视图的字节偏移量。

byte length

一个非负整数,表示来自底层字节源视图的字节长度。

pull-into 描述符是一个结构体,用于表示待处理的 BYOB 拉取请求。其包含如下

buffer

一个 ArrayBuffer

buffer byte length

一个正整数,表示 buffer 的初始字节长度

byte offset

一个非负整数,表示 buffer 中,底层字节源将开始写入的字节偏移量

byte length

一个正整数,表示可以写入 buffer 的字节数

bytes filled

一个非负整数,表示当前已写入 buffer 的字节数

minimum fill

一个正整数,表示在满足相关 read() 请求前,必须写入 buffer 的最小字节数。默认等于 元素大小

element size

一个正整数,表示每次可写入 buffer 的字节数,根据 视图构造器 类型确定

view constructor

一个typed array 构造器%DataView%,用于构造写入 buffer 的视图

reader type

default” 或 “byob”,表示发起本次请求的可读流 reader类型,如果发起方reader释放,则为 “none”。

4.7.3. 方法与属性

byobRequest = controller.byobRequest

返回当前的 BYOB 拉取请求,如果没有则为 null。

desiredSize = controller.desiredSize

返回填满所控制流的内部队列所需的期望大小。如果队列过满,该值可能为负。底层字节源应利用此信息判断何时以及如何施加背压

controller.close()

关闭所控制的可读流。消费者仍可读取之前已进入队列的,但全部读取后流将关闭。

controller.enqueue(chunk)

将给定的chunk加入所控制的可读流队列。 chunk 必须是 ArrayBufferView 实例,否则会抛出 TypeError

controller.error(e)

使所控制的可读流出错,后续所有操作均会因给定错误e失败。

byobRequest getter 步骤如下:
  1. 返回 ! ReadableByteStreamControllerGetBYOBRequest(this)。

desiredSize getter 步骤如下:
  1. 返回 ! ReadableByteStreamControllerGetDesiredSize(this)。

close() 方法步骤如下:
  1. 如果 this.[[closeRequested]] 为 true, 抛出 TypeError 异常。

  2. 如果 this.[[stream]].[[state]] 不为 "readable",抛出 TypeError 异常。

  3. 执行 ? ReadableByteStreamControllerClose(this)。

enqueue(chunk) 方法步骤如下:
  1. 如果 chunk.[[ByteLength]] 为 0,抛出 TypeError 异常。

  2. 如果 chunk.[[ViewedArrayBuffer]].[[ByteLength]] 为 0,抛出 TypeError 异常。

  3. 如果 this.[[closeRequested]] 为 true, 抛出 TypeError 异常。

  4. 如果 this.[[stream]].[[state]] 不为 "readable",抛出 TypeError 异常。

  5. 返回 ? ReadableByteStreamControllerEnqueue(this, chunk)。

error(e) 方法步骤如下:
  1. 执行 ! ReadableByteStreamControllerError(this, e)。

4.7.4. 内部方法

每个 ReadableByteStreamController 实例实现以下内部方法。 可读流实现会多态地调用这些方法,或调用默认控制器的对应方法,详见§ 4.9.2 控制器接口抽象操作

[[CancelSteps]](reason) 实现 [[CancelSteps]] 合约。步骤如下:
  1. 执行 ! ReadableByteStreamControllerClearPendingPullIntos(this)。

  2. 执行 ! ResetQueue(this)。

  3. result 为执行 this.[[cancelAlgorithm]],传入 reason 的结果。

  4. 执行 ! ReadableByteStreamControllerClearAlgorithms(this)。

  5. 返回 result

[[PullSteps]](readRequest) 实现 [[PullSteps]] 合约。步骤如下:
  1. streamthis.[[stream]]

  2. 断言:! ReadableStreamHasDefaultReader(stream) 为 true。

  3. 如果 this.[[queueTotalSize]] > 0,

    1. 断言:! ReadableStreamGetNumReadRequests(stream) 为 0。

    2. 执行 ! ReadableByteStreamControllerFillReadRequestFromQueue(this, readRequest)。

    3. 返回。

  4. autoAllocateChunkSizethis.[[autoAllocateChunkSize]]

  5. 如果 autoAllocateChunkSize 不为 undefined,

    1. bufferConstruct(%ArrayBuffer%, « autoAllocateChunkSize »)。

    2. 如果 buffer 是异常完成,

      1. 执行 readRequesterror steps,参数为 buffer.[[Value]]。

      2. 返回。

    3. pullIntoDescriptor 为一个新的pull-into 描述符,内容为

      buffer
      buffer.[[Value]]
      buffer byte length
      autoAllocateChunkSize
      byte offset
      0
      byte length
      autoAllocateChunkSize
      bytes filled
      0
      minimum fill
      1
      element size
      1
      view constructor
      %Uint8Array%
      reader type
      "default"
    4. pullIntoDescriptor 添加到 this.[[pendingPullIntos]]

  6. 执行 ! ReadableStreamAddReadRequest(stream, readRequest)。

  7. 执行 ! ReadableByteStreamControllerCallPullIfNeeded(this)。

[[ReleaseSteps]]() 实现 [[ReleaseSteps]] 合约。步骤如下:
  1. 如果 this.[[pendingPullIntos]]

    1. firstPendingPullIntothis.[[pendingPullIntos]][0]。

    2. firstPendingPullIntoreader type 设为 "none"。

    3. this.[[pendingPullIntos]] 设为仅包含 firstPendingPullInto列表 « firstPendingPullInto »。

4.8. ReadableStreamBYOBRequest

ReadableStreamBYOBRequest 类表示 ReadableByteStreamController 中的拉取(pull-into)请求。

4.8.1. 接口定义

ReadableStreamBYOBRequest 类的 Web IDL 定义如下:

[Exposed=*]
interface ReadableStreamBYOBRequest {
  readonly attribute ArrayBufferView? view;

  undefined respond([EnforceRange] unsigned long long bytesWritten);
  undefined respondWithNewView(ArrayBufferView view);
};

4.8.2. 内部插槽

ReadableStreamBYOBRequest 的实例拥有下表所描述的内部插槽:

内部插槽 描述(非规范性
[[controller]] ReadableByteStreamController 实例
[[view]] 一个typed array,表示控制器可写入生成数据的目标区域;BYOB 请求失效后为 null。

4.8.3. 方法与属性

view = byobRequest.view

返回用于写入的数据视图,如果 BYOB 请求已经响应则为 null。

byobRequest.respond(bytesWritten)

通知关联的可读字节流,表示有 bytesWritten 字节已写入 view, 结果会传递给消费者

调用此方法后,view 会被转移,无法再被修改。

byobRequest.respondWithNewView(view)

通知关联的可读字节流,底层字节源不再写入 view, 而是提供一个新的 ArrayBufferView, 会返回给消费者

view 必须是和 view 相同底层内存的新视图,即其 buffer 必须等于(或为转移自)view 的 buffer。其 byteOffset 必须等于 viewbyteOffset,其 byteLength(表示写入的字节数)必须小于等于 viewbyteLength

调用此方法后,view 会被转移,无法再被修改。

view getter 步骤如下:
  1. 返回 this.[[view]]

respond(bytesWritten) 方法步骤如下:
  1. 如果 this.[[controller]] 为 undefined,则抛出 TypeError 异常。

  2. 如果 ! IsDetachedBuffer(this.[[view]].[[ArrayBuffer]]) 为 true,则抛出 TypeError 异常。

  3. 断言:this.[[view]].[[ByteLength]] > 0。

  4. 断言:this.[[view]].[[ViewedArrayBuffer]].[[ByteLength]] > 0。

  5. 执行 ? ReadableByteStreamControllerRespond(this.[[controller]], bytesWritten)。

respondWithNewView(view) 方法步骤如下:
  1. 如果 this.[[controller]] 为 undefined,则抛出 TypeError 异常。

  2. 如果 ! IsDetachedBuffer(view.[[ViewedArrayBuffer]]) 为 true, 则抛出 TypeError 异常。

  3. 返回 ? ReadableByteStreamControllerRespondWithNewView(this.[[controller]], view)。

4.9. 抽象操作

4.9.1. 与可读流协作

下列抽象操作以较高层次操作 ReadableStream 实例。

AcquireReadableStreamBYOBReader(stream) 执行以下步骤:
  1. reader新建ReadableStreamBYOBReader

  2. 执行 ? SetUpReadableStreamBYOBReader(reader, stream)。

  3. 返回 reader

AcquireReadableStreamDefaultReader(stream) 执行以下步骤:
  1. reader新建ReadableStreamDefaultReader

  2. 执行 ? SetUpReadableStreamDefaultReader(reader, stream)。

  3. 返回 reader

CreateReadableStream(startAlgorithm, pullAlgorithm, cancelAlgorithm[, highWaterMark, [, sizeAlgorithm]]) 执行以下步骤:
  1. 如果未传入 highWaterMark,则设为 1。

  2. 如果未传入 sizeAlgorithm,则设为一个返回 1 的算法。

  3. 断言:! IsNonNegativeNumber(highWaterMark) 为 true。

  4. stream新建ReadableStream

  5. 执行 ! InitializeReadableStream(stream)。

  6. controller新建ReadableStreamDefaultController

  7. 执行 ? SetUpReadableStreamDefaultController(stream, controller, startAlgorithm, pullAlgorithm, cancelAlgorithm, highWaterMark, sizeAlgorithm)。

  8. 返回 stream

本抽象操作仅当传入的 startAlgorithm 抛出异常时才会抛出异常。

CreateReadableByteStream(startAlgorithm, pullAlgorithm, cancelAlgorithm) 执行以下步骤:
  1. stream新建ReadableStream

  2. 执行 ! InitializeReadableStream(stream)。

  3. controller新建ReadableByteStreamController

  4. 执行 ? SetUpReadableByteStreamController(stream, controller, startAlgorithm, pullAlgorithm, cancelAlgorithm, 0, undefined)。

  5. 返回 stream

本抽象操作仅当传入的 startAlgorithm 抛出异常时才会抛出异常。

InitializeReadableStream(stream) 执行以下步骤:
  1. stream.[[state]] 设为 "readable"。

  2. stream.[[reader]]stream.[[storedError]] 设为 undefined。

  3. stream.[[disturbed]] 设为 false。

IsReadableStreamLocked(stream) 执行以下步骤:
  1. 如果 stream.[[reader]] 为 undefined,返回 false。

  2. 返回 true。

ReadableStreamFromIterable(asyncIterable) 执行以下步骤:
  1. stream 为 undefined。

  2. iteratorRecord 为 ? GetIterator(asyncIterable, async)。

  3. startAlgorithm 为一个返回 undefined 的算法。

  4. pullAlgorithm 为以下步骤:

    1. nextResultIteratorNext(iteratorRecord)。

    2. 如果 nextResult 是异常完成,返回 nextResult.[[Value]] 拒绝的 promise

    3. nextPromisenextResult.[[Value]] 解决的 promise

    4. 返回对 nextPromise 反应的结果,满足时执行如下步骤,参数 iterResult

      1. 如果 iterResult 不是对象,抛出 TypeError

      2. done 为 ? IteratorComplete(iterResult)。

      3. 如果 done 为 true:

        1. 执行 ! ReadableStreamDefaultControllerClose(stream.[[controller]])。

      4. 否则:

        1. value 为 ? IteratorValue(iterResult)。

        2. 执行 ! ReadableStreamDefaultControllerEnqueue(stream.[[controller]], value)。

  5. cancelAlgorithm 为以下步骤,参数 reason

    1. iteratoriteratorRecord.[[Iterator]]。

    2. returnMethodGetMethod(iterator, "return")。

    3. 如果 returnMethod 是异常完成,返回 returnMethod.[[Value]] 拒绝的 promise

    4. 如果 returnMethod.[[Value]] 为 undefined,返回 以 undefined 解决的 promise

    5. returnResultCall(returnMethod.[[Value]], iterator, « reason »)。

    6. 如果 returnResult 是异常完成,返回 returnResult.[[Value]] 拒绝的 promise

    7. returnPromisereturnResult.[[Value]] 解决的 promise

    8. 返回对 returnPromise 反应的结果,满足时执行如下步骤,参数 iterResult

      1. 如果 iterResult 不是对象,抛出 TypeError

      2. 返回 undefined。

  6. stream 设为 ! CreateReadableStream(startAlgorithm, pullAlgorithm, cancelAlgorithm, 0)。

  7. 返回 stream

ReadableStreamPipeTo(source, dest, preventClose, preventAbort, preventCancel[, signal]) 执行以下步骤:
  1. 断言:source 实现了 ReadableStream

  2. 断言:dest 实现了 WritableStream

  3. 断言:preventClosepreventAbortpreventCancel 都是布尔值。

  4. 如果未传入 signal,则令 signal 为 undefined。

  5. 断言:signal 要么为 undefined,要么 signal 实现了 AbortSignal

  6. 断言:! IsReadableStreamLocked(source) 为 false。

  7. 断言:! IsWritableStreamLocked(dest) 为 false。

  8. 如果 source.[[controller]] 实现了 ReadableByteStreamController, 则令 reader 为 ! AcquireReadableStreamBYOBReader(source) 或 ! AcquireReadableStreamDefaultReader(source), 由 user agent 自行决定。

  9. 否则,令 reader 为 ! AcquireReadableStreamDefaultReader(source)。

  10. writer 为 ! AcquireWritableStreamDefaultWriter(dest)。

  11. source.[[disturbed]] 设为 true。

  12. shuttingDown 为 false。

  13. promise一个新的 promise

  14. 如果 signal 不为 undefined,

    1. abortAlgorithm 为以下步骤:

      1. errorsignal中止原因

      2. actions 为一个空的有序集合

      3. 如果 preventAbort 为 false,添加以下 action 到 actions 中:

        1. 如果 dest.[[state]] 为 "writable",返回 ! WritableStreamAbort(dest, error)。

        2. 否则,返回 以 undefined 解决的 promise

      4. 如果 preventCancel 为 false,添加以下 action 到 actions 中:

        1. 如果 source.[[state]] 为 "readable",返回 ! ReadableStreamCancel(source, error)。

        2. 否则,返回 以 undefined 解决的 promise

      5. 带有 action 的关闭,action 为等待所有 actions 的 promise,error 为 error

    2. 如果 signal中止,执行 abortAlgorithm 并返回 promise

    3. 添加 abortAlgorithmsignal

  15. 并行 但实际上不是;参见 #905,使用 readerwriter,从 source 读取所有并写入 dest。由于 reader 和 writer 提供的锁定机制,具体实现方式对作者代码不可见,因此有灵活性。无论实现方式如何,都必须遵循以下约束:

  16. 返回 promise

此处执行的多种抽象操作包括对象创建(通常为 promise),按理需指定对象创建的 realm。但由于加锁,外部代码无法观察到这些对象,因此采用何种 realm 并不影响行为。

ReadableStreamTee(stream, cloneForBranch2)分叉给定的可读流。

第二个参数 cloneForBranch2 控制原始流中的数据是否会在出现在返回的第二个分支前被克隆(使用 HTML 的 可序列化对象机制)。这对于两分支都会以会互相干扰的方式消费,例如转移的场景很有用。但这样会导致两个分支不对称,并限制必须可序列化。[HTML]

如果 stream可读字节流,则 cloneForBranch2 被忽略,块始终无条件克隆。

在本标准中,ReadbleStreamTee 总是以 cloneForBranch2 为 false 调用;其他规范会通过 tee 包装算法传递 true。

执行以下步骤:

  1. 断言:stream 实现了 ReadableStream

  2. 断言:cloneForBranch2 为布尔值。

  3. 如果 stream.[[controller]] 实现了 ReadableByteStreamController, 返回 ? ReadableByteStreamTee(stream)。

  4. 否则,返回 ? ReadableStreamDefaultTee(stream, cloneForBranch2)。

ReadableStreamDefaultTee(stream, cloneForBranch2) 执行以下步骤:
  1. 断言:stream 实现了 ReadableStream

  2. 断言:cloneForBranch2 为布尔值。

  3. reader 为 ? AcquireReadableStreamDefaultReader(stream)。

  4. reading 为 false。

  5. readAgain 为 false。

  6. canceled1 为 false。

  7. canceled2 为 false。

  8. reason1 为 undefined。

  9. reason2 为 undefined。

  10. branch1 为 undefined。

  11. branch2 为 undefined。

  12. cancelPromise一个新的 promise

  13. pullAlgorithm 为以下步骤:

    1. 如果 reading 为 true,

      1. readAgain 为 true。

      2. 返回 以 undefined 解决的 promise

    2. reading 为 true。

    3. readRequest 为一个读取请求,包含如下

      chunk steps,参数 chunk
      1. 队列一个 microtask执行如下步骤:

        1. readAgain 为 false。

        2. chunk1chunk2 皆为 chunk

        3. 如果 canceled2 为 false 且 cloneForBranch2 为 true,

          1. cloneResultStructuredClone(chunk2)。

          2. 如果 cloneResult 是异常完成,

            1. 执行 ! ReadableStreamDefaultControllerError(branch1.[[controller]], cloneResult.[[Value]])。

            2. 执行 ! ReadableStreamDefaultControllerError(branch2.[[controller]], cloneResult.[[Value]])。

            3. 解决 cancelPromise,值为 ! ReadableStreamCancel(stream, cloneResult.[[Value]])。

            4. 返回。

          3. 否则,设 chunk2cloneResult.[[Value]]。

        4. 如果 canceled1 为 false,执行 ! ReadableStreamDefaultControllerEnqueue(branch1.[[controller]], chunk1)。

        5. 如果 canceled2 为 false,执行 ! ReadableStreamDefaultControllerEnqueue(branch2.[[controller]], chunk2)。

        6. reading 为 false。

        7. 如果 readAgain 为 true,执行 pullAlgorithm

      这里的 microtask 延迟是必要的,因为要检测错误至少需要一个 microtask(见下文 reader.[[closedPromise]])。 这样才能保证流的错误会立即传递到两个分支,而不会先发生成功同步读取。

      close steps
      1. reading 为 false。

      2. 如果 canceled1 为 false,执行 ! ReadableStreamDefaultControllerClose(branch1.[[controller]])。

      3. 如果 canceled2 为 false,执行 ! ReadableStreamDefaultControllerClose(branch2.[[controller]])。

      4. 如果 canceled1 为 false 或 canceled2 为 false,解决 cancelPromise,值为 undefined。

      error steps
      1. reading 为 false。

    4. 执行 ! ReadableStreamDefaultReaderRead(reader, readRequest)。

    5. 返回 以 undefined 解决的 promise

  14. cancel1Algorithm 为以下步骤,参数 reason

    1. canceled1 为 true。

    2. reason1reason

    3. 如果 canceled2 为 true,

      1. compositeReason 为 ! CreateArrayFromListreason1, reason2 »)。

      2. cancelResult 为 ! ReadableStreamCancel(stream, compositeReason)。

      3. 解决 cancelPromise,值为 cancelResult

    4. 返回 cancelPromise

  15. cancel2Algorithm 为以下步骤,参数 reason

    1. canceled2 为 true。

    2. reason2reason

    3. 如果 canceled1 为 true,

      1. compositeReason 为 ! CreateArrayFromListreason1, reason2 »)。

      2. cancelResult 为 ! ReadableStreamCancel(stream, compositeReason)。

      3. 解决 cancelPromise,值为 cancelResult

    4. 返回 cancelPromise

  16. startAlgorithm 为一个返回 undefined 的算法。

  17. branch1 为 ! CreateReadableStream(startAlgorithm, pullAlgorithm, cancel1Algorithm)。

  18. branch2 为 ! CreateReadableStream(startAlgorithm, pullAlgorithm, cancel2Algorithm)。

  19. 当拒绝 reader.[[closedPromise]],原因为 r

    1. 执行 ! ReadableStreamDefaultControllerError(branch1.[[controller]], r)。

    2. 执行 ! ReadableStreamDefaultControllerError(branch2.[[controller]], r)。

    3. 如果 canceled1 为 false 或 canceled2 为 false,解决 cancelPromise,值为 undefined。

  20. 返回 « branch1, branch2 »。

ReadableByteStreamTee(stream) 执行以下步骤:
  1. 断言:stream 实现了 ReadableStream

  2. 断言:stream.[[controller]] 实现了 ReadableByteStreamController

  3. reader 为 ? AcquireReadableStreamDefaultReader(stream)。

  4. reading 为 false。

  5. readAgainForBranch1 为 false。

  6. readAgainForBranch2 为 false。

  7. canceled1 为 false。

  8. canceled2 为 false。

  9. reason1 为 undefined。

  10. reason2 为 undefined。

  11. branch1 为 undefined。

  12. branch2 为 undefined。

  13. cancelPromise一个新的 promise

  14. forwardReaderError 为下述步骤,参数 thisReader

    1. 当拒绝 thisReader.[[closedPromise]],原因为 r

      1. 如果 thisReader 不等于 reader,返回。

      2. 执行 ! ReadableByteStreamControllerError(branch1.[[controller]], r)。

      3. 执行 ! ReadableByteStreamControllerError(branch2.[[controller]], r)。

      4. canceled1 为 false 或 canceled2 为 false,解决 cancelPromise,值为 undefined。

  15. pullWithDefaultReader 为下述步骤:

    1. 如果 reader 实现了 ReadableStreamBYOBReader

      1. 断言:reader.[[readIntoRequests]] 为空。

      2. 执行 ! ReadableStreamBYOBReaderRelease(reader)。

      3. reader 为 ! AcquireReadableStreamDefaultReader(stream)。

      4. 执行 forwardReaderError,参数 reader

    2. readRequest 为一个读取请求,包含如下

      chunk steps,参数 chunk
      1. 队列一个 microtask,执行以下步骤:

        1. readAgainForBranch1readAgainForBranch2 均为 false。

        2. chunk1chunk2 都为 chunk

        3. canceled1canceled2 均为 false,

          1. cloneResultCloneAsUint8Array(chunk)。

          2. cloneResult 为异常完成,

            1. 执行 ! ReadableByteStreamControllerError(branch1.[[controller]], cloneResult.[[Value]])。

            2. 执行 ! ReadableByteStreamControllerError(branch2.[[controller]], cloneResult.[[Value]])。

            3. 解决 cancelPromise,值为 ! ReadableStreamCancel(stream, cloneResult.[[Value]])。

            4. 返回。

          3. 否则,设 chunk2cloneResult.[[Value]]。

        4. canceled1 为 false,执行 ! ReadableByteStreamControllerEnqueue(branch1.[[controller]], chunk1)。

        5. canceled2 为 false,执行 ! ReadableByteStreamControllerEnqueue(branch2.[[controller]], chunk2)。

        6. reading 为 false。

        7. readAgainForBranch1 为 true,执行 pull1Algorithm

        8. 否则如 readAgainForBranch2 为 true,执行 pull2Algorithm

      此处 microtask 延迟是必要的,因为要检测错误至少需要一个 microtask,见下文 reader.[[closedPromise]]。 这样可保证流错误立即传递到两个分支,而不会让同步读取领先于异步错误。

      close steps
      1. reading 为 false。

      2. canceled1 为 false,执行 ! ReadableByteStreamControllerClose(branch1.[[controller]])。

      3. canceled2 为 false,执行 ! ReadableByteStreamControllerClose(branch2.[[controller]])。

      4. branch1.[[controller]].[[pendingPullIntos]] 非空,执行 ! ReadableByteStreamControllerRespond(branch1.[[controller]], 0)。

      5. branch2.[[controller]].[[pendingPullIntos]] 非空,执行 ! ReadableByteStreamControllerRespond(branch2.[[controller]], 0)。

      6. canceled1 为 false 或 canceled2 为 false,解决 cancelPromise,值为 undefined。

      error steps
      1. reading 为 false。

    3. 执行 ! ReadableStreamDefaultReaderRead(reader, readRequest)。

  16. pullWithBYOBReader 为下述步骤,参数 viewforBranch2

    1. 如果 reader 实现了 ReadableStreamDefaultReader

      1. 断言:reader.[[readRequests]] 为空。

      2. 执行 ! ReadableStreamDefaultReaderRelease(reader)。

      3. reader 为 ! AcquireReadableStreamBYOBReader(stream)。

      4. 执行 forwardReaderError,参数 reader

    2. byobBranchbranch2forBranch2 为 true,否则为 branch1

    3. otherBranchbranch2forBranch2 为 false,否则为 branch1

    4. readIntoRequest 为一个read-into 请求,包含如下

      chunk steps,参数 chunk
      1. 队列一个 microtask,执行以下步骤:

        1. readAgainForBranch1readAgainForBranch2 均为 false。

        2. byobCanceledcanceled2forBranch2 为 true,否则为 canceled1

        3. otherCanceledcanceled2forBranch2 为 false,否则为 canceled1

        4. otherCanceled 为 false,

          1. cloneResultCloneAsUint8Array(chunk)。

          2. cloneResult 为异常完成,

            1. 执行 ! ReadableByteStreamControllerError(byobBranch.[[controller]], cloneResult.[[Value]])。

            2. 执行 ! ReadableByteStreamControllerError(otherBranch.[[controller]], cloneResult.[[Value]])。

            3. 解决 cancelPromise,值为 ! ReadableStreamCancel(stream, cloneResult.[[Value]])。

            4. 返回。

          3. 否则,令 clonedChunkcloneResult.[[Value]]。

          4. byobCanceled 为 false,执行 ! ReadableByteStreamControllerRespondWithNewView(byobBranch.[[controller]], chunk)。

          5. 执行 ! ReadableByteStreamControllerEnqueue(otherBranch.[[controller]], clonedChunk)。

        5. 否则如 byobCanceled 为 false,执行 ! ReadableByteStreamControllerRespondWithNewView(byobBranch.[[controller]], chunk)。

        6. reading 为 false。

        7. readAgainForBranch1 为 true,执行 pull1Algorithm

        8. 否则如 readAgainForBranch2 为 true,执行 pull2Algorithm

      此处 microtask 延迟是必要的,因为要检测错误至少需要一个 microtask,见下文 reader.[[closedPromise]]。 这样可保证流错误立即传递到两个分支,而不会让同步读取领先于异步错误。

      close steps,参数 chunk
      1. reading 为 false。

      2. byobCanceledcanceled2forBranch2 为 true,否则为 canceled1

      3. otherCanceledcanceled2forBranch2 为 false,否则为 canceled1

      4. byobCanceled 为 false,执行 ! ReadableByteStreamControllerClose(byobBranch.[[controller]])。

      5. otherCanceled 为 false,执行 ! ReadableByteStreamControllerClose(otherBranch.[[controller]])。

      6. 如果 chunk 不为 undefined,

        1. 断言:chunk.[[ByteLength]] 为 0。

        2. byobCanceled 为 false,执行 ! ReadableByteStreamControllerRespondWithNewView(byobBranch.[[controller]], chunk)。

        3. otherCanceled 为 false 且 otherBranch.[[controller]].[[pendingPullIntos]] 非空,执行 ! ReadableByteStreamControllerRespond(otherBranch.[[controller]], 0)。

      7. byobCanceled 为 false 或 otherCanceled 为 false, 解决 cancelPromise,值为 undefined。

      error steps
      1. reading 为 false。

    5. 执行 ! ReadableStreamBYOBReaderRead(reader, view, 1, readIntoRequest)。

  17. pull1Algorithm 为下述步骤:

    1. reading 为 true,

      1. readAgainForBranch1 为 true。

      2. 返回 以 undefined 解决的 promise

    2. reading 为 true。

    3. byobRequest 为 ! ReadableByteStreamControllerGetBYOBRequest(branch1.[[controller]])。

    4. byobRequest 为 null,执行 pullWithDefaultReader

    5. 否则,执行 pullWithBYOBReader,参数为 byobRequest.[[view]] 和 false。

    6. 返回 以 undefined 解决的 promise

  18. pull2Algorithm 为下述步骤:

    1. reading 为 true,

      1. readAgainForBranch2 为 true。

      2. 返回 以 undefined 解决的 promise

    2. reading 为 true。

    3. byobRequest 为 ! ReadableByteStreamControllerGetBYOBRequest(branch2.[[controller]])。

    4. byobRequest 为 null,执行 pullWithDefaultReader

    5. 否则,执行 pullWithBYOBReader,参数为 byobRequest.[[view]] 和 true。

    6. 返回 以 undefined 解决的 promise

  19. cancel1Algorithm 为下述步骤,参数 reason

    1. canceled1 为 true。

    2. reason1reason

    3. canceled2 为 true,

      1. compositeReason 为 ! CreateArrayFromListreason1, reason2 »)。

      2. cancelResult 为 ! ReadableStreamCancel(stream, compositeReason)。

      3. 解决 cancelPromise,值为 cancelResult

    4. 返回 cancelPromise

  20. cancel2Algorithm 为下述步骤,参数 reason

    1. canceled2 为 true。

    2. reason2reason

    3. canceled1 为 true,

      1. compositeReason 为 ! CreateArrayFromListreason1, reason2 »)。

      2. cancelResult 为 ! ReadableStreamCancel(stream, compositeReason)。

      3. 解决 cancelPromise,值为 cancelResult

    4. 返回 cancelPromise

  21. startAlgorithm 为一个返回 undefined 的算法。

  22. branch1 为 ! CreateReadableByteStream(startAlgorithm, pull1Algorithm, cancel1Algorithm)。

  23. branch2 为 ! CreateReadableByteStream(startAlgorithm, pull2Algorithm, cancel2Algorithm)。

  24. 执行 forwardReaderError,参数 reader

  25. 返回 « branch1, branch2 »。

4.9.2. 与控制器的接口

在规范结构上,ReadableStream 类将简单可读流和可读字节流的行为封装到一个类中,主要通过将大部分可能变化的逻辑集中到两个控制器类:ReadableStreamDefaultControllerReadableByteStreamController。 这两个类定义了大多数有状态的内部插槽和抽象操作,来管理流的内部队列及与底层源底层字节源的接口。

每个控制器类都定义了三个内部方法,由ReadableStream 的算法调用:

[[CancelSteps]](reason)
控制器在流被取消时运行的步骤,用于清理控制器存储的状态,并通知底层源
[[PullSteps]](readRequest)
控制器在默认 reader被读取时运行的步骤,用于从控制器拉取排队的,或从底层源拉取更多块。
[[ReleaseSteps]]()
控制器在reader释放时运行的步骤,用于清理控制器中 reader 相关的资源。

(这些定义为内部方法而非抽象操作,因此 ReadableStream 算法可以多态调用,无需判断控制器类型。)

本节剩余部分是控制器反向调用流对象的抽象操作:用于控制器实现影响其关联的 ReadableStream ,将控制器的内部状态变化反映到开发者可见的 public API。

ReadableStreamAddReadIntoRequest(stream, readRequest) 执行以下步骤:
  1. 断言:stream.[[reader]] 实现了 ReadableStreamBYOBReader

  2. 断言:stream.[[state]] 为 "readable" 或 "closed"。

  3. 追加 readRequeststream.[[reader]].[[readIntoRequests]]

ReadableStreamAddReadRequest(stream, readRequest) 执行以下步骤:
  1. 断言:stream.[[reader]] 实现了 ReadableStreamDefaultReader

  2. 断言:stream.[[state]] 为 "readable"。

  3. 追加 readRequeststream.[[reader]].[[readRequests]]

ReadableStreamCancel(stream, reason) 执行以下步骤:
  1. stream.[[disturbed]] 设为 true。

  2. 如果 stream.[[state]] 为 "closed",返回 以 undefined 解决的 promise

  3. 如果 stream.[[state]] 为 "errored",返回 stream.[[storedError]] 拒绝的 promise。

  4. 执行 ! ReadableStreamClose(stream)。

  5. readerstream.[[reader]]

  6. 如果 reader 不为 undefined 且 reader 实现了 ReadableStreamBYOBReader

    1. readIntoRequestsreader.[[readIntoRequests]]

    2. reader.[[readIntoRequests]] 设为空 列表

    3. 对于每个 readIntoRequest 属于 readIntoRequests

      1. 以 undefined 作为参数,执行 readIntoRequestclose steps

  7. sourceCancelPromise 为 ! stream.[[controller]].[[CancelSteps]](reason)。

  8. 返回 sourceCancelPromise 的反应,fulfilled 时返回 undefined。

ReadableStreamClose(stream) 执行以下步骤:
  1. 断言:stream.[[state]] 为 "readable"。

  2. stream.[[state]] 设为 "closed"。

  3. readerstream.[[reader]]

  4. 如果 reader 未定义,返回。

  5. 解决 reader.[[closedPromise]],值为 undefined。

  6. 如果 reader 实现了 ReadableStreamDefaultReader

    1. readRequestsreader.[[readRequests]]

    2. reader.[[readRequests]] 设为空 列表

    3. 对于每个 readRequest 属于 readRequests

      1. 执行 readRequestclose steps

ReadableStreamError(stream, e) 执行以下步骤:
  1. 断言:stream.[[state]] 为 "readable"。

  2. stream.[[state]] 设为 "errored"。

  3. stream.[[storedError]] 设为 e

  4. readerstream.[[reader]]

  5. 如果 reader 未定义,返回。

  6. 拒绝 reader.[[closedPromise]],原因为 e

  7. reader.[[closedPromise]].[[PromiseIsHandled]] 设为 true。

  8. 如果 reader 实现了 ReadableStreamDefaultReader

    1. 执行 !ReadableStreamDefaultReaderErrorReadRequests(reader, e)。

  9. 否则,

    1. 断言:reader 实现了 ReadableStreamBYOBReader

    2. 执行 !ReadableStreamBYOBReaderErrorReadIntoRequests(reader, e)。

ReadableStreamFulfillReadIntoRequest(stream, chunk, done) 执行以下步骤:
  1. 断言:!ReadableStreamHasBYOBReader(stream) 为 true。

  2. readerstream.[[reader]]

  3. 断言:reader.[[readIntoRequests]] 不为

  4. readIntoRequestreader.[[readIntoRequests]][0]。

  5. 移除 readIntoRequestreader.[[readIntoRequests]]

  6. 如果 done 为 true,执行 readIntoRequestclose steps,参数为 chunk

  7. 否则,执行 readIntoRequestchunk steps,参数为 chunk

ReadableStreamFulfillReadRequest(stream, chunk, done) 执行以下步骤:
  1. 断言:!ReadableStreamHasDefaultReader(stream) 为 true。

  2. readerstream.[[reader]]

  3. 断言:reader.[[readRequests]] 不为

  4. readRequestreader.[[readRequests]][0]。

  5. 移除 readRequestreader.[[readRequests]]

  6. 如果 done 为 true,执行 readRequestclose steps

  7. 否则,执行 readRequestchunk steps,参数为 chunk

ReadableStreamGetNumReadIntoRequests(stream) 执行以下步骤:
  1. 断言:!ReadableStreamHasBYOBReader(stream) 为 true。

  2. 返回 stream.[[reader]].[[readIntoRequests]]大小

ReadableStreamGetNumReadRequests(stream) 执行以下步骤:
  1. 断言:!ReadableStreamHasDefaultReader(stream) 为 true。

  2. 返回 stream.[[reader]].[[readRequests]]大小

ReadableStreamHasBYOBReader(stream) 执行以下步骤:
  1. readerstream.[[reader]]

  2. 如果 reader 未定义,返回 false。

  3. 如果 reader 实现了 ReadableStreamBYOBReader,返回 true。

  4. 返回 false。

ReadableStreamHasDefaultReader(stream) 执行以下步骤:
  1. readerstream.[[reader]]

  2. 如果 reader 未定义,返回 false。

  3. 如果 reader 实现了 ReadableStreamDefaultReader,返回 true。

  4. 返回 false。

4.9.3. 读者(Readers)

以下抽象操作用于支持 ReadableStreamDefaultReaderReadableStreamBYOBReader 实例的实现和操作。

ReadableStreamReaderGenericCancel(reader, reason) 执行以下步骤:
  1. streamreader.[[stream]]

  2. 断言:stream 不为 undefined。

  3. 返回 !ReadableStreamCancel(stream, reason)。

ReadableStreamReaderGenericInitialize(reader, stream) 执行以下步骤:
  1. reader.[[stream]] 设为 stream

  2. stream.[[reader]] 设为 reader

  3. 如果 stream.[[state]] 为 "readable",

    1. reader.[[closedPromise]] 设为新的 promise

  4. 否则,如果 stream.[[state]] 为 "closed",

    1. reader.[[closedPromise]] 设为以 undefined 解决的 promise

  5. 否则,

    1. 断言:stream.[[state]] 为 "errored"。

    2. reader.[[closedPromise]] 设置为 一个被拒绝的 promise, 原因为 stream.[[storedError]]

    3. reader.[[closedPromise]].[[PromiseIsHandled]] 设为 true。

ReadableStreamReaderGenericRelease(reader) 执行以下步骤:
  1. streamreader.[[stream]]

  2. 断言:stream 不为 undefined。

  3. 断言:stream.[[reader]]reader

  4. 如果 stream.[[state]] 为 "readable",拒绝 reader.[[closedPromise]],原因为 TypeError 异常。

  5. 否则,将 reader.[[closedPromise]] 设置为 一个被拒绝的 promise, 原因为 TypeError 异常。

  6. reader.[[closedPromise]].[[PromiseIsHandled]] 设为 true。

  7. 执行 !stream.[[controller]].[[ReleaseSteps]]()。

  8. stream.[[reader]] 设为 undefined。

  9. reader.[[stream]] 设为 undefined。

ReadableStreamBYOBReaderErrorReadIntoRequests(reader, e) 执行以下步骤:
  1. readIntoRequestsreader.[[readIntoRequests]]

  2. reader.[[readIntoRequests]] 设为一个新的空 列表

  3. 对于每个 readIntoRequest 属于 readIntoRequests

    1. e 作为参数,执行 readIntoRequesterror steps

ReadableStreamBYOBReaderRead(reader, view, min, readIntoRequest) 执行以下步骤:
  1. streamreader.[[stream]]

  2. 断言:stream 不为 undefined。

  3. stream.[[disturbed]] 设为 true。

  4. 如果 stream.[[state]] 为 "errored",以 stream.[[storedError]] 作为参数,执行 readIntoRequesterror steps

  5. 否则,执行 !ReadableByteStreamControllerPullInto(stream.[[controller]], view, min, readIntoRequest)。

ReadableStreamBYOBReaderRelease(reader) 执行以下步骤:
  1. 执行 !ReadableStreamReaderGenericRelease(reader)。

  2. e 为一个新的 TypeError 异常。

  3. 执行 !ReadableStreamBYOBReaderErrorReadIntoRequests(reader, e)。

ReadableStreamDefaultReaderErrorReadRequests(reader, e) 执行以下步骤:
  1. readRequestsreader.[[readRequests]]

  2. reader.[[readRequests]] 设为新的空 列表

  3. 对于每个 readRequest 属于 readRequests

    1. e 作为参数,执行 readRequesterror steps

ReadableStreamDefaultReaderRead(reader, readRequest) 执行以下步骤:
  1. streamreader.[[stream]]

  2. 断言:stream 不为 undefined。

  3. stream.[[disturbed]] 设为 true。

  4. 如果 stream.[[state]] 为 "closed",执行 readRequestclose steps

  5. 否则,如果 stream.[[state]] 为 "errored",以 stream.[[storedError]] 作为参数,执行 readRequesterror steps

  6. 否则,

    1. 断言:stream.[[state]] 为 "readable"。

    2. 执行 !stream.[[controller]].[[PullSteps]](readRequest)。

ReadableStreamDefaultReaderRelease(reader) 执行以下步骤:
  1. 执行 !ReadableStreamReaderGenericRelease(reader)。

  2. e 为一个新的 TypeError 异常。

  3. 执行 !ReadableStreamDefaultReaderErrorReadRequests(reader, e)。

SetUpReadableStreamBYOBReader(reader, stream) 执行以下步骤:
  1. 如果 !IsReadableStreamLocked(stream) 为 true,抛出 TypeError 异常。

  2. 如果 stream.[[controller]]实现 ReadableByteStreamController, 抛出 TypeError 异常。

  3. 执行 !ReadableStreamReaderGenericInitialize(reader, stream)。

  4. reader.[[readIntoRequests]] 设为新的空 列表

SetUpReadableStreamDefaultReader(reader, stream) 执行以下步骤:
  1. 如果 !IsReadableStreamLocked(stream) 为 true,抛出 TypeError 异常。

  2. 执行 !ReadableStreamReaderGenericInitialize(reader, stream)。

  3. reader.[[readRequests]] 设为新的空 列表

4.9.4. 默认控制器

以下抽象操作用于支持 ReadableStreamDefaultController 类的实现。

ReadableStreamDefaultControllerCallPullIfNeeded(controller) 执行以下步骤:
  1. shouldPull 为 !ReadableStreamDefaultControllerShouldCallPull(controller)。

  2. 如果 shouldPull 为 false,返回。

  3. 如果 controller.[[pulling]] 为 true,

    1. controller.[[pullAgain]] 设为 true。

    2. 返回。

  4. 断言:controller.[[pullAgain]] 为 false。

  5. controller.[[pulling]] 设为 true。

  6. pullPromise 为执行 controller.[[pullAlgorithm]] 的结果。

  7. pullPromise 成功时

    1. controller.[[pulling]] 设为 false。

    2. 如果 controller.[[pullAgain]] 为 true,

      1. controller.[[pullAgain]] 设为 false。

      2. 执行 !ReadableStreamDefaultControllerCallPullIfNeeded(controller)。

  8. pullPromise 被拒绝且原因为 e 时,

    1. 执行 !ReadableStreamDefaultControllerError(controller, e)。

ReadableStreamDefaultControllerShouldCallPull(controller) 执行以下步骤:
  1. streamcontroller.[[stream]]

  2. 如果 !ReadableStreamDefaultControllerCanCloseOrEnqueue(controller) 为 false,返回 false。

  3. 如果 controller.[[started]] 为 false,返回 false。

  4. 如果 !IsReadableStreamLocked(stream) 为 true 且 !ReadableStreamGetNumReadRequests(stream) > 0,返回 true。

  5. desiredSize 为 !ReadableStreamDefaultControllerGetDesiredSize(controller)。

  6. 断言:desiredSize 不为 null。

  7. 如果 desiredSize > 0,返回 true。

  8. 返回 false。

ReadableStreamDefaultControllerClearAlgorithms(controller) 在流关闭或出错且算法不再会被执行时调用。通过移除算法引用,即使 ReadableStream 仍被引用,也允许底层源对象被垃圾回收。

这可以通过弱引用观察到。详细内容见 tc39/proposal-weakrefs#31

执行以下步骤:

  1. controller.[[pullAlgorithm]] 设为 undefined。

  2. controller.[[cancelAlgorithm]] 设为 undefined。

  3. controller.[[strategySizeAlgorithm]] 设为 undefined。

ReadableStreamDefaultControllerClose(controller) 执行以下步骤:
  1. 如果 !ReadableStreamDefaultControllerCanCloseOrEnqueue(controller) 为 false,返回。

  2. streamcontroller.[[stream]]

  3. controller.[[closeRequested]] 设为 true。

  4. 如果 controller.[[queue]] 为空

    1. 执行 !ReadableStreamDefaultControllerClearAlgorithms(controller)。

    2. 执行 !ReadableStreamClose(stream)。

ReadableStreamDefaultControllerEnqueue(controller, chunk) 执行以下步骤:
  1. 如果 !ReadableStreamDefaultControllerCanCloseOrEnqueue(controller) 为 false,返回。

  2. streamcontroller.[[stream]]

  3. 如果 !IsReadableStreamLocked(stream) 为 true 且 !ReadableStreamGetNumReadRequests(stream) > 0,执行 !ReadableStreamFulfillReadRequest(stream, chunk, false)。

  4. 否则,

    1. result 为执行 controller.[[strategySizeAlgorithm]],参数为 chunk 并将结果视为 completion record

    2. 如果 result 为异常完成,

      1. 执行 !ReadableStreamDefaultControllerError(controller, result.[[Value]])。

      2. 返回 result

    3. chunkSizeresult.[[Value]]。

    4. enqueueResultEnqueueValueWithSize(controller, chunk, chunkSize)。

    5. 如果 enqueueResult 为异常完成,

      1. 执行 !ReadableStreamDefaultControllerError(controller, enqueueResult.[[Value]])。

      2. 返回 enqueueResult

  5. 执行 !ReadableStreamDefaultControllerCallPullIfNeeded(controller)。

ReadableStreamDefaultControllerError(controller, e) 执行以下步骤:
  1. streamcontroller.[[stream]]

  2. 如果 stream.[[state]] 不为 "readable",返回。

  3. 执行 !ResetQueue(controller)。

  4. 执行 !ReadableStreamDefaultControllerClearAlgorithms(controller)。

  5. 执行 !ReadableStreamError(stream, e)。

ReadableStreamDefaultControllerGetDesiredSize(controller) 执行以下步骤:
  1. statecontroller.[[stream]].[[state]]

  2. 如果 state 为 "errored",返回 null。

  3. 如果 state 为 "closed",返回 0。

  4. 返回 controller.[[strategyHWM]]controller.[[queueTotalSize]]

ReadableStreamDefaultControllerHasBackpressure(controller) 用于 TransformStream 的实现。 执行以下步骤:
  1. 如果 !ReadableStreamDefaultControllerShouldCallPull(controller) 为 true,返回 false。

  2. 否则,返回 true。

ReadableStreamDefaultControllerCanCloseOrEnqueue(controller) 执行以下步骤:
  1. statecontroller.[[stream]].[[state]]

  2. 如果 controller.[[closeRequested]] 为 false 且 state 为 "readable",返回 true。

  3. 否则,返回 false。

controller.[[closeRequested]] 为 false,但 state 不为 "readable" 时,可能是因为流被 controller.error() 置为错误,或者流被关闭但未调用其控制器的 controller.close() 方法,例如通过调用 stream.cancel() 关闭流。

SetUpReadableStreamDefaultController(stream, controller, startAlgorithm, pullAlgorithm, cancelAlgorithm, highWaterMark, sizeAlgorithm) 执行以下步骤:
  1. 断言:stream.[[controller]] 未定义。

  2. controller.[[stream]] 设为 stream

  3. 执行 !ResetQueue(controller)。

  4. controller.[[started]]controller.[[closeRequested]]controller.[[pullAgain]]controller.[[pulling]] 设为 false。

  5. controller.[[strategySizeAlgorithm]] 设为 sizeAlgorithmcontroller.[[strategyHWM]] 设为 highWaterMark

  6. controller.[[pullAlgorithm]] 设为 pullAlgorithm

  7. controller.[[cancelAlgorithm]] 设为 cancelAlgorithm

  8. stream.[[controller]] 设为 controller

  9. startResult 为执行 startAlgorithm 的结果。(此操作可能抛出异常。)

  10. startPromisestartResult 解决的 promise

  11. startPromise fulfilled 时

    1. controller.[[started]] 设为 true。

    2. 断言:controller.[[pulling]] 为 false。

    3. 断言:controller.[[pullAgain]] 为 false。

    4. 执行 !ReadableStreamDefaultControllerCallPullIfNeeded(controller)。

  12. startPromise 被拒绝,原因为 r 时,

    1. 执行 !ReadableStreamDefaultControllerError(controller, r)。

SetUpReadableStreamDefaultControllerFromUnderlyingSource(stream, underlyingSource, underlyingSourceDict, highWaterMark, sizeAlgorithm) 执行以下步骤:
  1. controller新的 ReadableStreamDefaultController

  2. startAlgorithm 为一个返回 undefined 的算法。

  3. pullAlgorithm 为一个返回 以 undefined 解决的 promise 的算法。

  4. cancelAlgorithm 为一个返回 以 undefined 解决的 promise 的算法。

  5. 如果 underlyingSourceDict["start"] 存在,则将 startAlgorithm 设为一个算法,其返回 调用 underlyingSourceDict["start"], 参数为 « controller »,回调 this 值underlyingSource 的结果。

  6. 如果 underlyingSourceDict["pull"] 存在,则将 pullAlgorithm 设为一个算法,其返回 调用 underlyingSourceDict["pull"], 参数为 « controller »,回调 this 值underlyingSource 的结果。

  7. 如果 underlyingSourceDict["cancel"] 存在,则将 cancelAlgorithm 设为一个接收参数 reason 并返回 调用 underlyingSourceDict["cancel"], 参数为 « reason »,回调 this 值underlyingSource 的结果的算法。

  8. 执行 ?SetUpReadableStreamDefaultController(stream, controller, startAlgorithm, pullAlgorithm, cancelAlgorithm, highWaterMark, sizeAlgorithm)。

4.9.5. 字节流控制器

ReadableByteStreamControllerCallPullIfNeeded(controller) 执行以下步骤:
  1. shouldPull 为 !ReadableByteStreamControllerShouldCallPull(controller)。

  2. 如果 shouldPull 为 false,返回。

  3. 如果 controller.[[pulling]] 为 true,

    1. controller.[[pullAgain]] 设为 true。

    2. 返回。

  4. 断言:controller.[[pullAgain]] 为 false。

  5. controller.[[pulling]] 设为 true。

  6. pullPromise 为执行 controller.[[pullAlgorithm]] 的结果。

  7. pullPromise fulfilled 时

    1. controller.[[pulling]] 设为 false。

    2. 如果 controller.[[pullAgain]] 为 true,

      1. controller.[[pullAgain]] 设为 false。

      2. 执行 !ReadableByteStreamControllerCallPullIfNeeded(controller)。

  8. pullPromise 被拒绝,原因为 e 时,

    1. 执行 !ReadableByteStreamControllerError(controller, e)。

ReadableByteStreamControllerClearAlgorithms(controller) 在流关闭或出错且算法不再会被执行时调用。通过移除算法引用,即使 ReadableStream 仍被引用,也允许底层字节源对象被垃圾回收。

这可以通过弱引用观察到。详细内容见 tc39/proposal-weakrefs#31

执行以下步骤:

  1. controller.[[pullAlgorithm]] 设为 undefined。

  2. controller.[[cancelAlgorithm]] 设为 undefined。

ReadableByteStreamControllerClearPendingPullIntos(controller) 执行以下步骤:
  1. 执行 !ReadableByteStreamControllerInvalidateBYOBRequest(controller)。

  2. controller.[[pendingPullIntos]] 设为新的空 列表

ReadableByteStreamControllerClose(controller) 执行以下步骤:
  1. streamcontroller.[[stream]]

  2. 如果 controller.[[closeRequested]] 为 true 或 stream.[[state]] 不为 "readable",返回。

  3. 如果 controller.[[queueTotalSize]] > 0,

    1. controller.[[closeRequested]] 设为 true。

    2. 返回。

  4. 如果 controller.[[pendingPullIntos]] 不为空,

    1. firstPendingPullIntocontroller.[[pendingPullIntos]][0]。

    2. 如果 firstPendingPullIntobytes filled 除以 firstPendingPullIntoelement size 的余数不为 0,

      1. e 为一个新的 TypeError 异常。

      2. 执行 !ReadableByteStreamControllerError(controller, e)。

      3. 抛出 e

  5. 执行 !ReadableByteStreamControllerClearAlgorithms(controller)。

  6. 执行 !ReadableStreamClose(stream)。

ReadableByteStreamControllerCommitPullIntoDescriptor(stream, pullIntoDescriptor) 执行以下步骤:
  1. 断言:stream.[[state]] 不为 "errored"。

  2. 断言:pullIntoDescriptor.reader type 不为 "none"。

  3. done 为 false。

  4. 如果 stream.[[state]] 为 "closed",

    1. 断言:pullIntoDescriptorbytes filled 除以 pullIntoDescriptorelement size 的余数为 0。

    2. done 设为 true。

  5. filledView 为 !ReadableByteStreamControllerConvertPullIntoDescriptor(pullIntoDescriptor)。

  6. 如果 pullIntoDescriptorreader type 为 "default",

    1. 执行 !ReadableStreamFulfillReadRequest(stream, filledView, done)。

  7. 否则,

    1. 断言:pullIntoDescriptorreader type 为 "byob"。

    2. 执行 !ReadableStreamFulfillReadIntoRequest(stream, filledView, done)。

ReadableByteStreamControllerConvertPullIntoDescriptor(pullIntoDescriptor) 执行以下步骤:
  1. bytesFilledpullIntoDescriptorbytes filled

  2. elementSizepullIntoDescriptorelement size

  3. 断言:bytesFilledpullIntoDescriptorbyte length

  4. 断言:bytesFilled 除以 elementSize 的余数为 0。

  5. buffer 为 !TransferArrayBuffer(pullIntoDescriptorbuffer)。

  6. 返回 !Construct(pullIntoDescriptorview constructor,« buffer, pullIntoDescriptorbyte offset, bytesFilled ÷ elementSize »)。

ReadableByteStreamControllerEnqueue(controller, chunk) 执行以下步骤:
  1. streamcontroller.[[stream]]

  2. 如果 controller.[[closeRequested]] 为 true 或 stream.[[state]] 不为 "readable",返回。

  3. bufferchunk.[[ViewedArrayBuffer]]。

  4. byteOffsetchunk.[[ByteOffset]]。

  5. byteLengthchunk.[[ByteLength]]。

  6. 如果 !IsDetachedBuffer(buffer) 为 true,抛出 TypeError 异常。

  7. transferredBuffer 为 ?TransferArrayBuffer(buffer)。

  8. 如果 controller.[[pendingPullIntos]] 不为

    1. firstPendingPullIntocontroller.[[pendingPullIntos]][0]。

    2. 如果 !IsDetachedBuffer(firstPendingPullIntobuffer) 为 true,抛出 TypeError 异常。

    3. 执行 !ReadableByteStreamControllerInvalidateBYOBRequest(controller)。

    4. firstPendingPullIntobuffer 设为 !TransferArrayBuffer(firstPendingPullIntobuffer)。

    5. 如果 firstPendingPullIntoreader type 为 "none",执行 ?ReadableByteStreamControllerEnqueueDetachedPullIntoToQueue(controller, firstPendingPullInto)。

  9. 如果 !ReadableStreamHasDefaultReader(stream) 为 true,

    1. 执行 !ReadableByteStreamControllerProcessReadRequestsUsingQueue(controller)。

    2. 如果 !ReadableStreamGetNumReadRequests(stream) 为 0,

      1. 断言:controller.[[pendingPullIntos]]

      2. 执行 !ReadableByteStreamControllerEnqueueChunkToQueue(controller, transferredBuffer, byteOffset, byteLength)。

    3. 否则,

      1. 断言:controller.[[queue]] 为空

      2. 如果 controller.[[pendingPullIntos]] 不为

        1. 断言:controller.[[pendingPullIntos]][0] 的 reader type 为 "default"。

        2. 执行 !ReadableByteStreamControllerShiftPendingPullInto(controller)。

      3. transferredView 为 !Construct(%Uint8Array%, « transferredBuffer, byteOffset, byteLength »)。

      4. 执行 !ReadableStreamFulfillReadRequest(stream, transferredView, false)。

  10. 否则,如果 !ReadableStreamHasBYOBReader(stream) 为 true,

    1. 执行 !ReadableByteStreamControllerEnqueueChunkToQueue(controller, transferredBuffer, byteOffset, byteLength)。

    2. filledPullIntos 为执行 !ReadableByteStreamControllerProcessPullIntoDescriptorsUsingQueue(controller) 的结果。

    3. 对于每个 filledPullInto 属于 filledPullIntos

      1. 执行 !ReadableByteStreamControllerCommitPullIntoDescriptor(stream, filledPullInto)。

  11. 否则,

    1. 断言:!IsReadableStreamLocked(stream) 为 false。

    2. 执行 !ReadableByteStreamControllerEnqueueChunkToQueue(controller, transferredBuffer, byteOffset, byteLength)。

  12. 执行 !ReadableByteStreamControllerCallPullIfNeeded(controller)。

ReadableByteStreamControllerEnqueueChunkToQueue(controller, buffer, byteOffset, byteLength) 执行以下步骤:
  1. 追加 一个带有 buffer bufferbyte offset byteOffsetbyte length byteLength 的新的 readable byte stream queue entrycontroller.[[queue]]

  2. controller.[[queueTotalSize]] 设为 controller.[[queueTotalSize]] + byteLength

ReadableByteStreamControllerEnqueueClonedChunkToQueue(controller, buffer, byteOffset, byteLength) 执行以下步骤:
  1. cloneResultCloneArrayBuffer(buffer, byteOffset, byteLength, %ArrayBuffer%)。

  2. 如果 cloneResult 为异常完成,

    1. 执行 !ReadableByteStreamControllerError(controller, cloneResult.[[Value]])。

    2. 返回 cloneResult

  3. 执行 !ReadableByteStreamControllerEnqueueChunkToQueue(controller, cloneResult.[[Value]], 0, byteLength)。

ReadableByteStreamControllerEnqueueDetachedPullIntoToQueue(controller, pullIntoDescriptor) 执行以下步骤:
  1. 断言:pullIntoDescriptorreader type 为 "none"。

  2. 如果 pullIntoDescriptorbytes filled > 0,执行 ?ReadableByteStreamControllerEnqueueClonedChunkToQueue(controller, pullIntoDescriptorbuffer, pullIntoDescriptorbyte offset, pullIntoDescriptorbytes filled)。

  3. 执行 !ReadableByteStreamControllerShiftPendingPullInto(controller)。

ReadableByteStreamControllerError(controller, e) 执行以下步骤:
  1. streamcontroller.[[stream]]

  2. 如果 stream.[[state]] 不为 "readable",返回。

  3. 执行 !ReadableByteStreamControllerClearPendingPullIntos(controller)。

  4. 执行 !ResetQueue(controller)。

  5. 执行 !ReadableByteStreamControllerClearAlgorithms(controller)。

  6. 执行 !ReadableStreamError(stream, e)。

ReadableByteStreamControllerFillHeadPullIntoDescriptor(controller, size, pullIntoDescriptor) 执行以下步骤:
  1. 断言:controller.[[pendingPullIntos]] 为空,或 controller.[[pendingPullIntos]][0] 为 pullIntoDescriptor

  2. 断言:controller.[[byobRequest]] 为 null。

  3. pullIntoDescriptorbytes filled 设为 bytes filled + size

ReadableByteStreamControllerFillPullIntoDescriptorFromQueue(controller, pullIntoDescriptor) 执行以下步骤:
  1. maxBytesToCopy 为 min(controller.[[queueTotalSize]]pullIntoDescriptorbyte lengthpullIntoDescriptorbytes filled)。

  2. maxBytesFilledpullIntoDescriptorbytes filled + maxBytesToCopy

  3. totalBytesToCopyRemainingmaxBytesToCopy

  4. ready 为 false。

  5. 断言:!IsDetachedBuffer(pullIntoDescriptorbuffer) 为 false。

  6. 断言:pullIntoDescriptorbytes filled < pullIntoDescriptorminimum fill

  7. remainderBytesmaxBytesFilled 除以 pullIntoDescriptorelement size 的余数。

  8. maxAlignedBytesmaxBytesFilledremainderBytes

  9. 如果 maxAlignedBytespullIntoDescriptorminimum fill

    1. totalBytesToCopyRemaining 设为 maxAlignedBytespullIntoDescriptorbytes filled

    2. ready 设为 true。

      尚未填充到最小长度的 read() 请求描述符会保持在队列头部,底层源 underlying source 可以继续填充。

  10. queuecontroller.[[queue]]

  11. totalBytesToCopyRemaining > 0,

    1. headOfQueuequeue[0]。

    2. bytesToCopy 为 min(totalBytesToCopyRemaining, headOfQueuebyte length)。

    3. destStartpullIntoDescriptorbyte offset + pullIntoDescriptorbytes filled

    4. descriptorBufferpullIntoDescriptorbuffer

    5. queueBufferheadOfQueuebuffer

    6. queueByteOffsetheadOfQueuebyte offset

    7. 断言:!CanCopyDataBlockBytes(descriptorBuffer, destStart, queueBuffer, queueByteOffset, bytesToCopy) 为 true。

      如断言失败(由于规范或实现错误),下一步可能会读写无效内存。实现应始终检查此断言,如失败应以实现定义方式终止(如崩溃或 erroring the stream)。

    8. 执行 !CopyDataBlockBytes(descriptorBuffer.[[ArrayBufferData]], destStart, queueBuffer.[[ArrayBufferData]], queueByteOffset, bytesToCopy)。

    9. 如果 headOfQueuebyte lengthbytesToCopy

      1. 移除 queue[0]。

    10. 否则,

      1. headOfQueuebyte offset 设为 headOfQueuebyte offset + bytesToCopy

      2. headOfQueuebyte length 设为 headOfQueuebyte lengthbytesToCopy

    11. controller.[[queueTotalSize]] 设为 controller.[[queueTotalSize]]bytesToCopy

    12. 执行 !ReadableByteStreamControllerFillHeadPullIntoDescriptor(controller, bytesToCopy, pullIntoDescriptor)。

    13. totalBytesToCopyRemaining 设为 totalBytesToCopyRemainingbytesToCopy

  12. 如果 ready 为 false,

    1. 断言:controller.[[queueTotalSize]] 为 0。

    2. 断言:pullIntoDescriptorbytes filled > 0。

    3. 断言:pullIntoDescriptorbytes filled < pullIntoDescriptorminimum fill

  13. 返回 ready

ReadableByteStreamControllerFillReadRequestFromQueue(controller, readRequest) 执行以下步骤:
  1. 断言:controller.[[queueTotalSize]] > 0。

  2. entrycontroller.[[queue]][0]。

  3. 移除 entrycontroller.[[queue]]

  4. controller.[[queueTotalSize]] 设为 controller.[[queueTotalSize]]entrybyte length

  5. 执行 !ReadableByteStreamControllerHandleQueueDrain(controller)。

  6. view 为 !Construct(%Uint8Array%, « entrybufferentrybyte offsetentrybyte length »)。

  7. view 作为参数,执行 readRequestchunk steps

ReadableByteStreamControllerGetBYOBRequest(controller) 执行以下步骤:
  1. 如果 controller.[[byobRequest]] 为 null 且 controller.[[pendingPullIntos]] 不为

    1. firstDescriptorcontroller.[[pendingPullIntos]][0]。

    2. view 为 !Construct(%Uint8Array%, « firstDescriptorbufferfirstDescriptorbyte offset + firstDescriptorbytes filledfirstDescriptorbyte lengthfirstDescriptorbytes filled »)。

    3. byobRequest新的 ReadableStreamBYOBRequest

    4. byobRequest.[[controller]] 设为 controller

    5. byobRequest.[[view]] 设为 view

    6. controller.[[byobRequest]] 设为 byobRequest

  2. 返回 controller.[[byobRequest]]

ReadableByteStreamControllerGetDesiredSize(controller) 执行以下步骤:
  1. statecontroller.[[stream]].[[state]]

  2. 如果 state 为 "errored",返回 null。

  3. 如果 state 为 "closed",返回 0。

  4. 返回 controller.[[strategyHWM]]controller.[[queueTotalSize]]

ReadableByteStreamControllerHandleQueueDrain(controller) 执行以下步骤:
  1. 断言:controller.[[stream]].[[state]] 为 "readable"。

  2. 如果 controller.[[queueTotalSize]] 为 0 且 controller.[[closeRequested]] 为 true,

    1. 执行 !ReadableByteStreamControllerClearAlgorithms(controller)。

    2. 执行 !ReadableStreamClose(controller.[[stream]])。

  3. 否则,

    1. 执行 !ReadableByteStreamControllerCallPullIfNeeded(controller)。

ReadableByteStreamControllerInvalidateBYOBRequest(controller) 执行以下步骤:
  1. 如果 controller.[[byobRequest]] 为 null,返回。

  2. controller.[[byobRequest]].[[controller]] 设为 undefined。

  3. controller.[[byobRequest]].[[view]] 设为 null。

  4. controller.[[byobRequest]] 设为 null。

ReadableByteStreamControllerProcessPullIntoDescriptorsUsingQueue(controller) 执行以下步骤:
  1. 断言:controller.[[closeRequested]] 为 false。

  2. filledPullIntos 为新的空 列表

  3. controller.[[pendingPullIntos]] 不为

    1. 如果 controller.[[queueTotalSize]] 为 0,则 跳出循环

    2. pullIntoDescriptorcontroller.[[pendingPullIntos]][0]。

    3. 如果 !ReadableByteStreamControllerFillPullIntoDescriptorFromQueue(controller, pullIntoDescriptor) 为 true,

      1. 执行 !ReadableByteStreamControllerShiftPendingPullInto(controller)。

      2. 追加 pullIntoDescriptorfilledPullIntos

  4. 返回 filledPullIntos

ReadableByteStreamControllerProcessReadRequestsUsingQueue(controller) 执行以下步骤:
  1. readercontroller.[[stream]].[[reader]]

  2. 断言:reader 实现了 ReadableStreamDefaultReader

  3. reader.[[readRequests]] 不为 时,

    1. 如果 controller.[[queueTotalSize]] 为 0,返回。

    2. readRequestreader.[[readRequests]][0]。

    3. 移除 readRequestreader.[[readRequests]]

    4. 执行 !ReadableByteStreamControllerFillReadRequestFromQueue(controller, readRequest)。

ReadableByteStreamControllerPullInto(controller, view, min, readIntoRequest) 执行以下步骤:
  1. streamcontroller.[[stream]]

  2. elementSize 为 1。

  3. ctor%DataView%

  4. 如果 view 有 [[TypedArrayName]] 内部槽(即非 DataView),

    1. elementSize 设为 typed array constructors tableview.[[TypedArrayName]] 对应的元素大小。

    2. ctor 设为 typed array constructors tableview.[[TypedArrayName]] 对应的构造函数。

  5. minimumFillmin × elementSize

  6. 断言:minimumFill ≥ 0 且 minimumFillview.[[ByteLength]]。

  7. 断言:minimumFill 除以 elementSize 的余数为 0。

  8. byteOffsetview.[[ByteOffset]]。

  9. byteLengthview.[[ByteLength]]。

  10. bufferResultTransferArrayBuffer(view.[[ViewedArrayBuffer]])。

  11. 如果 bufferResult 为异常完成,

    1. bufferResult.[[Value]] 为参数,执行 readIntoRequesterror steps

    2. 返回。

  12. bufferbufferResult.[[Value]]。

  13. pullIntoDescriptor 为新的 pull-into descriptor,其中

    buffer
    buffer
    buffer byte length
    buffer.[[ArrayBufferByteLength]]
    byte offset
    byteOffset
    byte length
    byteLength
    bytes filled
    0
    minimum fill
    minimumFill
    element size
    elementSize
    view constructor
    ctor
    reader type
    "byob"
  14. 如果 controller.[[pendingPullIntos]] 不为空,

    1. 追加 pullIntoDescriptorcontroller.[[pendingPullIntos]]

    2. 执行 !ReadableStreamAddReadIntoRequest(stream, readIntoRequest)。

    3. 返回。

  15. 如果 stream.[[state]] 为 "closed",

    1. emptyView 为 !Construct(ctor, « pullIntoDescriptorbufferpullIntoDescriptorbyte offset,0 »)。

    2. emptyView 为参数,执行 readIntoRequestclose steps

    3. 返回。

  16. 如果 controller.[[queueTotalSize]] > 0,

    1. 如果 !ReadableByteStreamControllerFillPullIntoDescriptorFromQueue(controller, pullIntoDescriptor) 为 true,

      1. filledView 为 !ReadableByteStreamControllerConvertPullIntoDescriptor(pullIntoDescriptor)。

      2. 执行 !ReadableByteStreamControllerHandleQueueDrain(controller)。

      3. filledView 为参数,执行 readIntoRequestchunk steps

      4. 返回。

    2. 如果 controller.[[closeRequested]] 为 true,

      1. eTypeError 异常。

      2. 执行 !ReadableByteStreamControllerError(controller, e)。

      3. e 为参数,执行 readIntoRequesterror steps

      4. 返回。

  17. 追加 pullIntoDescriptorcontroller.[[pendingPullIntos]]

  18. 执行 !ReadableStreamAddReadIntoRequest(stream, readIntoRequest)。

  19. 执行 !ReadableByteStreamControllerCallPullIfNeeded(controller)。

ReadableByteStreamControllerRespond(controller, bytesWritten) 执行以下步骤:
  1. 断言:controller.[[pendingPullIntos]] 不为空。

  2. firstDescriptorcontroller.[[pendingPullIntos]][0]。

  3. statecontroller.[[stream]].[[state]]

  4. 如果 state 为 "closed",

    1. 如果 bytesWritten 不为 0,抛出 TypeError 异常。

  5. 否则,

    1. 断言:state 为 "readable"。

    2. 如果 bytesWritten 为 0,抛出 TypeError 异常。

    3. 如果 firstDescriptorbytes filled + bytesWritten > firstDescriptorbyte length,抛出 RangeError 异常。

  6. firstDescriptorbuffer 设为 !TransferArrayBuffer(firstDescriptorbuffer)。

  7. 执行 ?ReadableByteStreamControllerRespondInternal(controller, bytesWritten)。

ReadableByteStreamControllerRespondInClosedState(controller, firstDescriptor) 执行以下步骤:
  1. 断言:firstDescriptorbytes filled 除以 firstDescriptorelement size 的余数为 0。

  2. 如果 firstDescriptorreader type 为 "none",执行 !ReadableByteStreamControllerShiftPendingPullInto(controller)。

  3. streamcontroller.[[stream]]

  4. 如果 !ReadableStreamHasBYOBReader(stream) 为 true,

    1. filledPullIntos 为新的空 列表

    2. filledPullIntos大小 小于 !ReadableStreamGetNumReadIntoRequests(stream),

      1. pullIntoDescriptor 为 !ReadableByteStreamControllerShiftPendingPullInto(controller)。

      2. 追加 pullIntoDescriptorfilledPullIntos

    3. 对于每个 filledPullInto 属于 filledPullIntos

      1. 执行 !ReadableByteStreamControllerCommitPullIntoDescriptor(stream, filledPullInto)。

ReadableByteStreamControllerRespondInReadableState(controller, bytesWritten, pullIntoDescriptor) 执行以下步骤:
  1. 断言:pullIntoDescriptorbytes filled + bytesWrittenpullIntoDescriptorbyte length

  2. 执行 !ReadableByteStreamControllerFillHeadPullIntoDescriptor(controller, bytesWritten, pullIntoDescriptor)。

  3. 如果 pullIntoDescriptorreader type 为 "none",

    1. 执行 ?ReadableByteStreamControllerEnqueueDetachedPullIntoToQueue(controller, pullIntoDescriptor)。

    2. filledPullIntos 为执行 !ReadableByteStreamControllerProcessPullIntoDescriptorsUsingQueue(controller) 的结果。

    3. 对于每个 filledPullInto 属于 filledPullIntos

      1. 执行 !ReadableByteStreamControllerCommitPullIntoDescriptor(controller.[[stream]], filledPullInto)。

    4. 返回。

  4. 如果 pullIntoDescriptorbytes filled < pullIntoDescriptorminimum fill,返回。

    尚未填充到最小长度的 read() 请求描述符会保持在队列头部,底层源 underlying source 可以继续填充。

  5. 执行 !ReadableByteStreamControllerShiftPendingPullInto(controller)。

  6. remainderSizepullIntoDescriptorbytes filled 除以 pullIntoDescriptorelement size 的余数。

  7. 如果 remainderSize > 0,

    1. endpullIntoDescriptorbyte offset + pullIntoDescriptorbytes filled

    2. 执行 ?ReadableByteStreamControllerEnqueueClonedChunkToQueue(controller, pullIntoDescriptorbuffer, endremainderSize, remainderSize)。

  8. pullIntoDescriptorbytes filled 设为 pullIntoDescriptorbytes filledremainderSize

  9. filledPullIntos 为执行 !ReadableByteStreamControllerProcessPullIntoDescriptorsUsingQueue(controller) 的结果。

  10. 执行 !ReadableByteStreamControllerCommitPullIntoDescriptor(controller.[[stream]], pullIntoDescriptor)。

  11. 对于每个 filledPullInto 属于 filledPullIntos

    1. 执行 !ReadableByteStreamControllerCommitPullIntoDescriptor(controller.[[stream]], filledPullInto)。

ReadableByteStreamControllerRespondInternal(controller, bytesWritten) 执行以下步骤:
  1. firstDescriptorcontroller.[[pendingPullIntos]][0]。

  2. 断言:!CanTransferArrayBuffer(firstDescriptorbuffer) 为 true。

  3. 执行 !ReadableByteStreamControllerInvalidateBYOBRequest(controller)。

  4. statecontroller.[[stream]].[[state]]

  5. 如果 state 为 "closed",

    1. 断言:bytesWritten 为 0。

    2. 执行 !ReadableByteStreamControllerRespondInClosedState(controller, firstDescriptor)。

  6. 否则,

    1. 断言:state 为 "readable"。

    2. 断言:bytesWritten > 0。

    3. 执行 ?ReadableByteStreamControllerRespondInReadableState(controller, bytesWritten, firstDescriptor)。

  7. 执行 !ReadableByteStreamControllerCallPullIfNeeded(controller)。

ReadableByteStreamControllerRespondWithNewView(controller, view) 执行以下步骤:
  1. 断言:controller.[[pendingPullIntos]] 不为

  2. 断言:!IsDetachedBuffer(view.[[ViewedArrayBuffer]]) 为 false。

  3. firstDescriptorcontroller.[[pendingPullIntos]][0]。

  4. statecontroller.[[stream]].[[state]]

  5. 如果 state 为 "closed",

    1. 如果 view.[[ByteLength]] 不为 0,抛出 TypeError 异常。

  6. 否则,

    1. 断言:state 为 "readable"。

    2. 如果 view.[[ByteLength]] 为 0,抛出 TypeError 异常。

  7. 如果 firstDescriptorbyte offset + firstDescriptorbytes filled 不等于 view.[[ByteOffset]],抛出 RangeError 异常。

  8. 如果 firstDescriptorbuffer byte length 不等于 view.[[ViewedArrayBuffer]].[[ByteLength]],抛出 RangeError 异常。

  9. 如果 firstDescriptorbytes filled + view.[[ByteLength]] > firstDescriptorbyte length,抛出 RangeError 异常。

  10. viewByteLengthview.[[ByteLength]]。

  11. firstDescriptorbuffer 设为 ?TransferArrayBuffer(view.[[ViewedArrayBuffer]])。

  12. 执行 ?ReadableByteStreamControllerRespondInternal(controller, viewByteLength)。

ReadableByteStreamControllerShiftPendingPullInto(controller) 执行以下步骤:
  1. 断言:controller.[[byobRequest]] 为 null。

  2. descriptorcontroller.[[pendingPullIntos]][0]。

  3. 移除 descriptorcontroller.[[pendingPullIntos]]

  4. 返回 descriptor

ReadableByteStreamControllerShouldCallPull(controller) 执行以下步骤:
  1. streamcontroller.[[stream]]

  2. 如果 stream.[[state]] 不为 "readable",返回 false。

  3. 如果 controller.[[closeRequested]] 为 true,返回 false。

  4. 如果 controller.[[started]] 为 false,返回 false。

  5. 如果 !ReadableStreamHasDefaultReader(stream) 为 true 且 !ReadableStreamGetNumReadRequests(stream) > 0,返回 true。

  6. 如果 !ReadableStreamHasBYOBReader(stream) 为 true 且 !ReadableStreamGetNumReadIntoRequests(stream) > 0,返回 true。

  7. desiredSize 为 !ReadableByteStreamControllerGetDesiredSize(controller)。

  8. 断言:desiredSize 不为 null。

  9. 如果 desiredSize > 0,返回 true。

  10. 返回 false。

SetUpReadableByteStreamController(stream, controller, startAlgorithm, pullAlgorithm, cancelAlgorithm, highWaterMark, autoAllocateChunkSize) 执行以下步骤:
  1. 断言:stream.[[controller]] 为 undefined。

  2. 如果 autoAllocateChunkSize 不为 undefined,

    1. 断言:!IsInteger(autoAllocateChunkSize) 为 true。

    2. 断言:autoAllocateChunkSize 为正。

  3. controller.[[stream]] 设为 stream

  4. controller.[[pullAgain]]controller.[[pulling]] 设为 false。

  5. controller.[[byobRequest]] 设为 null。

  6. 执行 !ResetQueue(controller)。

  7. controller.[[closeRequested]]controller.[[started]] 设为 false。

  8. controller.[[strategyHWM]] 设为 highWaterMark

  9. controller.[[pullAlgorithm]] 设为 pullAlgorithm

  10. controller.[[cancelAlgorithm]] 设为 cancelAlgorithm

  11. controller.[[autoAllocateChunkSize]] 设为 autoAllocateChunkSize

  12. controller.[[pendingPullIntos]] 设为新的空 列表

  13. stream.[[controller]] 设为 controller

  14. startResult 为执行 startAlgorithm 的结果。

  15. startPromisestartResult 解决的 promise

  16. startPromise fulfilled 时

    1. controller.[[started]] 设为 true。

    2. 断言:controller.[[pulling]] 为 false。

    3. 断言:controller.[[pullAgain]] 为 false。

    4. 执行 !ReadableByteStreamControllerCallPullIfNeeded(controller)。

  17. startPromise 被拒绝,原因为 r 时,

    1. 执行 !ReadableByteStreamControllerError(controller, r)。

SetUpReadableByteStreamControllerFromUnderlyingSource(stream, underlyingSource, underlyingSourceDict, highWaterMark) 执行以下步骤:
  1. controller新的 ReadableByteStreamController

  2. startAlgorithm 为一个返回 undefined 的算法。

  3. pullAlgorithm 为一个返回 以 undefined 解决的 promise 的算法。

  4. cancelAlgorithm 为一个返回 以 undefined 解决的 promise 的算法。

  5. 如果 underlyingSourceDict["start"] 存在,则将 startAlgorithm 设为一个算法,其返回 调用 underlyingSourceDict["start"], 参数为 « controller »,回调 this 值underlyingSource 的结果。

  6. 如果 underlyingSourceDict["pull"] 存在,则将 pullAlgorithm 设为一个算法,其返回 调用 underlyingSourceDict["pull"], 参数为 « controller »,回调 this 值underlyingSource 的结果。

  7. 如果 underlyingSourceDict["cancel"] 存在,则将 cancelAlgorithm 设为一个接收参数 reason 并返回 调用 underlyingSourceDict["cancel"], 参数为 « reason »,回调 this 值underlyingSource 的结果的算法。

  8. autoAllocateChunkSizeunderlyingSourceDict["autoAllocateChunkSize"], 如果 存在,否则为 undefined。

  9. 如果 autoAllocateChunkSize 为 0,则抛出 TypeError 异常。

  10. 执行 ?SetUpReadableByteStreamController(stream, controller, startAlgorithm, pullAlgorithm, cancelAlgorithm, highWaterMark, autoAllocateChunkSize)。

5. 可写流

5.1. 使用可写流

写入可写流的常用方式是直接将一个可读流 管道到它。这可以确保遵循背压,也就是说,如果可写流的底层接收器处理数据的速度慢于可读流生成的速度,可读流会收到通知,有机会减慢数据生成的速度。
readableStream.pipeTo(writableStream)
  .then(() => console.log("所有数据已成功写入!"))
  .catch(e => console.error("出现错误!", e));
你也可以通过获取一个writer并使用它的 write()close() 方法,直接向可写流写入数据。由于可写流会自动排队所有写入操作,并依次将它们转发到底层接收器,所以你可以直接写入,不需要复杂操作:
function writeArrayToStream(array, writableStream) {
  const writer = writableStream.getWriter();
  array.forEach(chunk => writer.write(chunk).catch(() => {}));

  return writer.close();
}

writeArrayToStream([1, 2, 3, 4, 5], writableStream)
  .then(() => console.log("全部完成!"))
  .catch(e => console.error("流发生错误: " + e));

注意我们使用 .catch(() => {}) 来抑制 write() 方法的拒绝;任何致命错误会通过 close() 方法的 promise 拒绝来通知,如果未捕获会导致 unhandledrejection 事件和控制台警告。

在上一个例子中,我们只关注整个流的成功或失败,通过检查 writer 的 close() 方法返回的 promise。 这个 promise 会在流出错(初始化、写入或者关闭时)时拒绝,在流成功关闭时兑现。通常这就是你需要关心的全部内容。

然而,如果你想知道写入某个特定是否成功,可以使用 writer 的 write() 方法返回的 promise:

writer.write("我是一个数据块")
  .then(() => console.log("块成功写入!"))
  .catch(e => console.error(e));

这里“成功”的含义由具体流实例(更准确地说,是它的底层接收器)决定。例如,对于文件流,可能只是操作系统接受了写入,而不一定表示数据已经刷新到磁盘。有些流可能无法提供这种信号,promise 会立即兑现。

desiredSizeready 属性允许可写流 writer生产者更精确地响应流的流控信号,保证内存使用不超过流设定的高水位线。下面的例子会向流写入无限序列的随机字节,通过 desiredSize 决定每次生成多少字节,并通过 ready 等待背压消退。
async function writeRandomBytesForever(writableStream) {
  const writer = writableStream.getWriter();

  while (true) {
    await writer.ready;

    const bytes = new Uint8Array(writer.desiredSize);
    crypto.getRandomValues(bytes);

    // 有意不await,此时等待writer.ready即可。
    writer.write(bytes).catch(() => {});
  }
}

writeRandomBytesForever(myWritableStream).catch(e => console.error("出现异常", e));

注意我们没有 await write() 返回的 promise;这和等待 ready promise 是重复的。此外,和前面例子类似,我们对 write() 返回的 promise 使用 .catch(() => {}),在这种情况下,任何失败会在等待 ready promise 时通知我们。

为了进一步强调 await write() 返回的 promise 并不是好主意,来看上面例子的一个改进版,我们继续直接使用 WritableStreamDefaultWriter 接口,但不控制每次要写多少字节。在这种情况下,遵循背压的代码如下:
async function writeSuppliedBytesForever(writableStream, getBytes) {
  const writer = writableStream.getWriter();

  while (true) {
    await writer.ready;

    const bytes = getBytes();
    writer.write(bytes).catch(() => {});
  }
}

和前一个例子不同,前面我们每次都写入 writer.desiredSize 字节,write()ready 的 promise 是同步的。而在这里,ready 的 promise 很可能比 write() 更早兑现。 请记住,ready 的 promise 在内部队列期望大小转为正数时兑现,这可能比写入成功早(尤其是在高水位线较大时)。

换句话说,await write() 的返回值意味着你永远不会在流的内部队列中排队写入,而是只有前一次写入成功后才执行下一次写入,这会导致吞吐量较低。

5.2. WritableStream

WritableStream 表示一个可写流

5.2.1. 接口定义

WritableStream 类的 Web IDL 定义如下:

[Exposed=*, Transferable]
interface WritableStream {
  constructor(optional object underlyingSink, optional QueuingStrategy strategy = {});

  readonly attribute boolean locked;

  Promise<undefined> abort(optional any reason);
  Promise<undefined> close();
  WritableStreamDefaultWriter getWriter();
};

5.2.2. 内部槽

WritableStream 的实例创建时拥有下表描述的内部槽:

内部槽 描述(非规范性
[[backpressure]] 一个布尔值,表示由控制器设置的背压信号
[[closeRequest]] writer 的 close() 方法返回的 promise
[[controller]] 一个 WritableStreamDefaultController ,用于控制此流的状态和队列
[[Detached]] 当流被转移时为 true 的布尔标志
[[inFlightWriteRequest]] 在底层接收器的写入算法正在执行且尚未完成时,指向当前进行中写入操作的 promise,用于防止重入调用
[[inFlightCloseRequest]] 在底层接收器的关闭算法正在执行且尚未完成时,指向当前进行中关闭操作的 promise,用于防止 abort() 方法中断关闭过程
[[pendingAbortRequest]] 一个待中止请求
[[state]] 包含流当前状态的字符串,仅供内部使用;可为 "writable"、"closed"、"erroring" 或 "errored"
[[storedError]] 指示流失败原因的值,在流处于 "errored" 状态下操作时作为失败原因或异常抛出
[[writer]] 若流被锁定到 writer,则为一个 WritableStreamDefaultWriter 实例,否则为 undefined
[[writeRequests]] 一个列表,表示尚未被底层接收器处理的流内部写入请求队列中的 promise

[[inFlightCloseRequest]] 槽和 [[closeRequest]] 槽互斥。同样,在 [[inFlightWriteRequest]] 不为 undefined 时,[[writeRequests]] 中不会有元素被移除。实现可以基于这些不变式优化这些槽的存储。

待中止请求 是一个用于追踪中止流请求,直到该请求最终被处理的数据结构。它包含以下项目

promise

一个从 WritableStreamAbort 返回的 promise

reason

一个 JavaScript 值,作为中止原因传递给 WritableStreamAbort

was already erroring

一个布尔值,指示在调用 WritableStreamAbort 时流是否处于 "erroring" 状态,这会影响中止请求的结果

5.2.3. 底层接收器 API

WritableStream() 构造函数的第一个参数为一个 JavaScript 对象,表示底层接收器。该对象可包含如下属性:

dictionary UnderlyingSink {
  UnderlyingSinkStartCallback start;
  UnderlyingSinkWriteCallback write;
  UnderlyingSinkCloseCallback close;
  UnderlyingSinkAbortCallback abort;
  any type;
};

callback UnderlyingSinkStartCallback = any (WritableStreamDefaultController controller);
callback UnderlyingSinkWriteCallback = Promise<undefined> (any chunk, WritableStreamDefaultController controller);
callback UnderlyingSinkCloseCallback = Promise<undefined> ();
callback UnderlyingSinkAbortCallback = Promise<undefined> (optional any reason);
start(controller), 类型为 UnderlyingSinkStartCallback

在创建 WritableStream 时立即调用的函数。

通常用于获取所代表的 底层接收器资源的访问权。

如果此初始化过程为异步,可以返回一个 promise 以表示成功或失败;拒绝的 promise 会使流进入错误状态。抛出的异常会被 WritableStream() 构造函数重新抛出。

write(chunk, controller), 类型为 UnderlyingSinkWriteCallback

当有新的数据块可写入底层接收器时调用。流实现保证该函数仅在前一次写入已成功后调用,且不会在 start() 成功之前或 close()abort() 被调用之后调用。

此函数通常用于实际发送数据到 底层接收器所表示的资源,比如调用底层 API。

如果写入过程为异步,并且需要向调用者通知成功或失败,则该函数可以返回一个 promise。此 promise 的返回值会被传递给 writer.write() 的调用者,以便他们能监控该次写入。抛出异常等同于返回拒绝的 promise。

注意,并非所有情况下都能获得此类信号;参见 § 10.6 无背压或成功信号的可写流§ 10.7 带有背压和成功信号的可写流。在这些情况下,最好什么都不返回。

该函数可能返回的 promise 也决定了此数据块是否计入内部队列期望填充值的计算。在 promise 达到最终状态之前,writer.desiredSize 会保持不变,仅当写入成功后才增加以表明需要更多块。

最后,此 promise 还用于保证良好行为生产者不会在数据块尚未完全处理时修改其内容。(这并非由规范机制保证,而是生产者底层接收器之间的非正式约定。)

close(), 类型为 UnderlyingSinkCloseCallback

生产者通过 writer.close() 表示写入完成,并且所有排队的写入已成功完成后调用。

此函数可以执行任何必要的收尾或刷新操作,并释放所持有的资源。

如果关闭过程为异步,函数可返回 promise 以表示成功或失败;结果会通过 writer.close() 方法的返回值传递。返回拒绝的 promise 会使流进入错误状态,而不是成功关闭。抛出异常等同于返回拒绝的 promise。

abort(reason), 类型为 UnderlyingSinkAbortCallback

生产者通过 stream.abort()writer.abort() 表示希望中止流时调用。它的参数与这些方法传入的值一致。

可写流在管道过程中也可能因某些情况被中止;详见 pipeTo() 方法的定义。

此函数可用于清理资源,类似 close() ,但也可以自定义处理。

如果中止过程为异步,函数可返回 promise 以表示成功或失败;结果会通过 writer.abort() 方法的返回值传递。抛出异常等同于返回拒绝的 promise。不管怎样,流都会因新的 TypeError 而进入错误状态,表示其已被中止。

type, 类型为 any

该属性为保留属性,当前任何赋值尝试都会抛出异常。

传递给 start()write()controller 参数是 WritableStreamDefaultController 的实例,可用于使流出错。主要用于与非 promise 风格 API 的衔接,例如见 § 10.6 无背压或成功信号的可写流

5.2.4. 构造函数、方法和属性

stream = new WritableStream(underlyingSink[, strategy)

创建一个新的 WritableStream ,包装提供的 底层接收器。关于 underlyingSink 参数的详细信息,参见 § 5.2.3 底层接收器 API

strategy 参数表示流的 排队策略,详见 § 7.1 排队策略 API。如果未提供,则默认行为与 CountQueuingStrategy高水位线为 1 的效果一致。

isLocked = stream.locked

返回该可写流是否已锁定到某个 writer

await stream.abort([ reason ])

中止该流,表示生产者无法再成功写入流,流会立即进入错误状态,所有排队的写入会被丢弃。这还会执行 底层接收器 的中止机制。

返回的 promise 在流正常关闭时兑现,如果底层接收器在此过程中发生错误则拒绝。此外,如果流当前已锁定,则会以 TypeError 拒绝(不会尝试取消流)。

await stream.close()

关闭流。底层接收器将在处理完所有已写入的数据块后,执行关闭行为。此期间任何进一步写入都会失败(但不会使流进入错误状态)。

该方法返回一个 promise,如果所有剩余数据块都成功写入且流成功关闭,则兑现;否则在过程中遇到错误则拒绝。此外,如果流当前已锁定,则会以 TypeError 拒绝(不会尝试取消流)。

writer = stream.getWriter()

创建一个writer(即 WritableStreamDefaultWriter 实例),并将流锁定到该 writer。流被锁定期间,不能获取其他 writer,直到当前 writer 被释放

该功能对需要独占写入流、避免中断或交错的抽象非常有用。通过获取流的 writer,可以确保不会有其他人同时写入,避免写入数据不可预测甚至无效。

new WritableStream(underlyingSink, strategy) 构造函数步骤如下:
  1. 如果 underlyingSink 未提供,则将其设为 null。

  2. underlyingSinkDictunderlyingSink转换为类型为 IDL 的值 UnderlyingSink

    不能直接将 underlyingSink 参数声明为 UnderlyingSink 类型,否则会丢失原对象的引用。需要保留对象以便后续调用其方法。

  3. 如果 underlyingSinkDict["type"]存在,则抛出 RangeError 异常。

    此设计方便未来添加新类型,而不会影响向后兼容。

  4. 执行 ! InitializeWritableStream(this)。

  5. sizeAlgorithm 为 ! ExtractSizeAlgorithm(strategy)。

  6. highWaterMark 为 ? ExtractHighWaterMark(strategy, 1)。

  7. 执行 ? SetUpWritableStreamDefaultControllerFromUnderlyingSink(this, underlyingSink, underlyingSinkDict, highWaterMark, sizeAlgorithm)。

locked 属性的步骤如下:
  1. 返回 ! IsWritableStreamLocked(this)。

abort(reason) 方法的步骤如下:
  1. 如果 ! IsWritableStreamLocked(this) 为 true,则返回 一个拒绝的 promise,异常为 TypeError

  2. 返回 ! WritableStreamAbort(this, reason)。

close() 方法的步骤如下:
  1. 如果 ! IsWritableStreamLocked(this) 为 true,则返回 一个拒绝的 promise,异常为 TypeError

  2. 如果 ! WritableStreamCloseQueuedOrInFlight(this) 为 true,则返回 一个拒绝的 promise,异常为 TypeError

  3. 返回 ! WritableStreamClose(this)。

getWriter() 方法的步骤如下:
  1. 返回 ? AcquireWritableStreamDefaultWriter(this)。

5.2.5. 通过 postMessage() 进行传递

destination.postMessage(ws, { transfer: [ws] });

将一个 WritableStream 发送到另一个 frame、窗口或 worker。

被传递的流可以像原始流一样使用,原流会变为锁定,无法直接使用。

WritableStream 对象属于可传递对象。它们的传递步骤,以 valuedataHolder 为参数如下:
  1. 如果 ! IsWritableStreamLocked(value) 为 true,则抛出 "DataCloneError" DOMException

  2. port1新建MessagePort,创建于当前 Realm

  3. port2新建MessagePort,创建于当前 Realm

  4. 绑定 port1port2

  5. readable新建ReadableStream,创建于当前 Realm

  6. 执行 ! SetUpCrossRealmTransformReadable(readable, port1)。

  7. promise 为 ! ReadableStreamPipeTo(readable, value, false, false, false)。

  8. promise.[[PromiseIsHandled]] 设为 true。

  9. dataHolder.[[port]] 设为 ! StructuredSerializeWithTransfer(port2, « port2 »)。

它们的传递接收步骤,以 dataHoldervalue 为参数如下:
  1. deserializedRecord 为 ! StructuredDeserializeWithTransfer(dataHolder.[[port]], 当前 Realm)。

  2. portdeserializedRecord.[[Deserialized]]。

  3. 执行 ! SetUpCrossRealmTransformWritable(value, port)。

5.3. WritableStreamDefaultWriter

WritableStreamDefaultWriter 类表示一个可写流 writer,用于由 WritableStream 实例提供。

5.3.1. 接口定义

WritableStreamDefaultWriter 类的 Web IDL 定义如下:

[Exposed=*]
interface WritableStreamDefaultWriter {
  constructor(WritableStream stream);

  readonly attribute Promise<undefined> closed;
  readonly attribute unrestricted double? desiredSize;
  readonly attribute Promise<undefined> ready;

  Promise<undefined> abort(optional any reason);
  Promise<undefined> close();
  undefined releaseLock();
  Promise<undefined> write(optional any chunk);
};

5.3.2. 内部槽

WritableStreamDefaultWriter 的实例在创建时拥有下表描述的内部槽:

内部槽 描述(非规范性
[[closedPromise]] writer 的 closed getter 返回的 promise
[[readyPromise]] writer 的 ready getter 返回的 promise
[[stream]] WritableStream 实例,拥有当前 reader

5.3.3. 构造函数、方法和属性

writer = new WritableStreamDefaultWriter(stream)

这等价于调用 stream.getWriter()

await writer.closed

返回一个 promise,当流关闭时兑现,如果流发生错误或 writer 的锁已释放且流未完成关闭,则拒绝。

desiredSize = writer.desiredSize

返回内部队列期望填充值。如果队列超满则可能为负值。生产者可据此判断写入的数据量。

若流无法正常写入(如已错误或有中止请求),则为 null。若流已关闭,则返回零。若在 writer 的锁释放后调用,则会抛出异常。

await writer.ready

返回一个 promise,当内部队列期望填充值从非正转为正时兑现,表示不再施加背压。当期望值回落到零或以下时,会返回新的挂起 promise,直到下次变化。

若流发生错误或中止,或 writer 的锁释放,返回的 promise 会被拒绝。

await writer.abort([ reason ])

若 reader 激活,行为与 stream.abort(reason) 相同。

await writer.close()

如果读取器为 活动,行为与 stream.close() 相同。

writer.releaseLock()

释放 writer 对流的锁。锁释放后,writer 不再激活。若锁释放时流已错误,writer 也会表现为错误状态;否则表现为已关闭。

注意即使有未完成的写入也可释放锁(即使 write() 返回的 promise 未决)。不需要在整个写入期间持有锁,锁只是为了防止其他生产者交错写入。

await writer.write(chunk)

将指定数据块写入可写流,会等待前一次写入成功后再发送该数据块底层接收器write() 方法。返回的 promise 在成功写入后兑现,若写入失败或流在写入前已错误则拒绝。

注意“成功”由底层接收器决定,可能仅表示数据块已被接受,而不一定已安全保存到最终目的地。

chunk 可变,生产者应避免在传递给 write() 后修改其内容,直到 write() 返回的 promise 达到终态。以确保底层接收器接收到的是原始值。

new WritableStreamDefaultWriter(stream) 构造函数步骤如下:
  1. 执行 ? SetUpWritableStreamDefaultWriter(this, stream)。

closed 属性步骤如下:
  1. 返回 this.[[closedPromise]]

desiredSize 属性步骤如下:
  1. 如果 this.[[stream]] 为 undefined,抛出 TypeError 异常。

  2. 返回 ! WritableStreamDefaultWriterGetDesiredSize(this)。

ready 属性步骤如下:
  1. 返回 this.[[readyPromise]]

abort(reason) 方法步骤如下:
  1. 如果 this.[[stream]] 为 undefined,返回 一个拒绝的 promise,异常为 TypeError

  2. 返回 ! WritableStreamDefaultWriterAbort(this, reason)。

close() 方法步骤如下:
  1. streamthis.[[stream]]

  2. 如果 stream 为 undefined,返回 一个拒绝的 promise,异常为 TypeError

  3. 如果 ! WritableStreamCloseQueuedOrInFlight(stream) 为 true,返回 一个拒绝的 promise,异常为 TypeError

  4. 返回 ! WritableStreamDefaultWriterClose(this)。

releaseLock() 方法步骤如下:
  1. streamthis.[[stream]]

  2. 如果 stream 为 undefined,则返回。

  3. 断言:stream.[[writer]] 不为 undefined。

  4. 执行 ! WritableStreamDefaultWriterRelease(this)。

write(chunk) 方法步骤如下:
  1. 如果 this.[[stream]] 为 undefined,返回 一个拒绝的 promise,异常为 TypeError

  2. 返回 ! WritableStreamDefaultWriterWrite(this, chunk)。

5.4. WritableStreamDefaultController

WritableStreamDefaultController 类拥有可用于控制 WritableStream 状态的方法。在构造 WritableStream 时, 底层接收器会获得一个对应的 WritableStreamDefaultController 实例用于操作。

5.4.1. 接口定义

WritableStreamDefaultController 类的 Web IDL 定义如下:

[Exposed=*]
interface WritableStreamDefaultController {
  readonly attribute AbortSignal signal;
  undefined error(optional any e);
};

5.4.2. 内部槽

WritableStreamDefaultController 的实例在创建时拥有下表描述的内部槽:

内部槽 描述(非规范性
[[abortAlgorithm]] 一个返回 promise 的算法,接收一个参数(中止原因),用于将中止请求传递给 底层接收器
[[abortController]] 一个 AbortController ,可用于在流被中止时终止正在进行的写入或关闭操作
[[closeAlgorithm]] 一个返回 promise 的算法,用于将关闭请求传递给 底层接收器
[[queue]] 一个列表,表示流内部排队的数据块
[[queueTotalSize]] 存储于 [[queue]] 中所有块的总大小(见 § 8.1 带大小的队列
[[started]] 一个布尔标志,指示 底层接收器是否已完成初始化
[[strategyHWM]] 由流创建者作为流排队策略一部分提供的数字,表示流何时会对底层接收器施加背压
[[strategySizeAlgorithm]] 用于计算入队数据块大小的算法,属于流的排队策略
[[stream]] 被控制的 WritableStream 实例
[[writeAlgorithm]] 一个返回 promise 的算法,接收一个参数(要写入的数据块),用于将数据写入 底层接收器

关闭哨兵是一个特殊值,会被入队到 [[queue]],代替数据块,用于标记流已关闭。仅内部使用,不会暴露给 Web 开发者。

5.4.3. 方法和属性

controller.signal

一个 AbortSignal,可用于在流被中止时终止正在进行的写入或关闭操作。

controller.error(e)

关闭被控制的可写流,使之后所有操作都因错误 e 失败。

该方法很少被使用,通常只需在 底层接收器的方法返回拒绝的 promise 即可。但在流生命周期外因事件需突然关闭流时,此方法会很有用。

signal 属性步骤如下:
  1. 返回 this.[[abortController]]signal

error(e) 方法步骤如下:
  1. statethis.[[stream]].[[state]]

  2. 如果 state 不是 "writable",则返回。

  3. 执行 ! WritableStreamDefaultControllerError(this, e)。

5.4.4. 内部方法

以下为每个 WritableStreamDefaultController 实例实现的内部方法, 可写流实现会调用这些方法。

这些以方法形式而不是抽象操作出现,是为了明确可写流实现与控制器实现解耦,未来可扩展为其他控制器,只要这些控制器实现相应内部方法。类似情况见可读流(参见 § 4.9.2 与控制器接口),实际有多种控制器类型,因此内部方法多态使用。

[[AbortSteps]](reason) 实现 [[AbortSteps]] 合约。其步骤如下:
  1. result 为执行 this.[[abortAlgorithm]],传入 reason 的结果。

  2. 执行 ! WritableStreamDefaultControllerClearAlgorithms(this)。

  3. 返回 result

[[ErrorSteps]]() 实现 [[ErrorSteps]] 合约。其步骤如下:
  1. 执行 ! ResetQueue(this)。

5.5. 抽象操作

5.5.1. 可写流相关操作

以下抽象操作用于在更高层次上操作 WritableStream 实例。

AcquireWritableStreamDefaultWriter(stream) 步骤如下:
  1. writer 为一个 新建WritableStreamDefaultWriter

  2. 执行 ? SetUpWritableStreamDefaultWriter(writer, stream)。

  3. 返回 writer

CreateWritableStream(startAlgorithm, writeAlgorithm, closeAlgorithm, abortAlgorithm, highWaterMark, sizeAlgorithm) 步骤如下:
  1. 断言:! IsNonNegativeNumber(highWaterMark) 为 true。

  2. stream 为一个 新建WritableStream

  3. 执行 ! InitializeWritableStream(stream)。

  4. controller 为一个 新建WritableStreamDefaultController

  5. 执行 ? SetUpWritableStreamDefaultController(stream, controller, startAlgorithm, writeAlgorithm, closeAlgorithm, abortAlgorithm, highWaterMark, sizeAlgorithm)。

  6. 返回 stream

该抽象操作仅在 startAlgorithm 抛出异常时才会抛出异常。

InitializeWritableStream(stream) 步骤如下:
  1. stream.[[state]] 设为 "writable"。

  2. stream.[[storedError]]stream.[[writer]]stream.[[controller]]stream.[[inFlightWriteRequest]]stream.[[closeRequest]]stream.[[inFlightCloseRequest]]stream.[[pendingAbortRequest]] 设为 undefined。

  3. stream.[[writeRequests]] 设为一个新的空列表

  4. stream.[[backpressure]] 设为 false。

IsWritableStreamLocked(stream) 步骤如下:
  1. 如果 stream.[[writer]] 为 undefined,则返回 false。

  2. 返回 true。

SetUpWritableStreamDefaultWriter(writer, stream) 步骤如下:
  1. 如果 ! IsWritableStreamLocked(stream) 为 true,则抛出 TypeError 异常。

  2. writer.[[stream]] 设为 stream

  3. stream.[[writer]] 设为 writer

  4. statestream.[[state]]

  5. 如果 state 为 "writable",

    1. 如果 ! WritableStreamCloseQueuedOrInFlight(stream) 为 false 且 stream.[[backpressure]] 为 true,则将 writer.[[readyPromise]] 设为一个新的 promise

    2. 否则,将 writer.[[readyPromise]] 设为已解决的 promise,值为 undefined。

    3. writer.[[closedPromise]] 设为新的 promise

  6. 否则如果 state 为 "erroring",

    1. writer.[[readyPromise]] 设为拒绝的 promise,原因为 stream.[[storedError]]

    2. writer.[[readyPromise]].[[PromiseIsHandled]] 设为 true。

    3. writer.[[closedPromise]] 设为新的 promise

  7. 否则如果 state 为 "closed",

    1. writer.[[readyPromise]] 设为已解决的 promise,值为 undefined。

    2. writer.[[closedPromise]] 设为已解决的 promise,值为 undefined。

  8. 否则,

    1. 断言:state 为 "errored"。

    2. storedErrorstream.[[storedError]]

    3. writer.[[readyPromise]] 设为拒绝的 promise,原因为 storedError

    4. writer.[[readyPromise]].[[PromiseIsHandled]] 设为 true。

    5. writer.[[closedPromise]] 设为拒绝的 promise,原因为 storedError

    6. writer.[[closedPromise]].[[PromiseIsHandled]] 设为 true。

WritableStreamAbort(stream, reason) 步骤如下:
  1. 如果 stream.[[state]] 为 "closed" 或 "errored",返回已解决的 promise,值为 undefined。

  2. stream.[[controller]].[[abortController]] 上用 reason 发出中止信号

  3. statestream.[[state]]

  4. 如果 state 为 "closed" 或 "errored",返回已解决的 promise,值为 undefined。

    我们需要再次检查状态,因为 发出中止信号 会运行作者代码,状态可能会被改变。

  5. 如果 stream.[[pendingAbortRequest]] 不为 undefined,则返回 stream.[[pendingAbortRequest]]promise

  6. 断言:state 为 "writable" 或 "erroring"。

  7. wasAlreadyErroring 为 false。

  8. 如果 state 为 "erroring",

    1. wasAlreadyErroring 设为 true。

    2. reason 设为 undefined。

  9. promise新的 promise

  10. stream.[[pendingAbortRequest]] 设为一个新的 待中止请求,其 promisepromisereasonreasonwas already erroringwasAlreadyErroring

  11. 如果 wasAlreadyErroring 为 false,执行 ! WritableStreamStartErroring(stream, reason)。

  12. 返回 promise

WritableStreamClose(stream) 步骤如下:
  1. statestream.[[state]]

  2. 如果 state 为 "closed" 或 "errored",返回拒绝的 promise,异常为 TypeError

  3. 断言:state 为 "writable" 或 "erroring"。

  4. 断言:! WritableStreamCloseQueuedOrInFlight(stream) 为 false。

  5. promise新的 promise

  6. stream.[[closeRequest]] 设为 promise

  7. writerstream.[[writer]]

  8. 如果 writer 不为 undefined,且 stream.[[backpressure]] 为 true,并且 state 为 "writable",则用 undefined 解决 writer.[[readyPromise]]

  9. 执行 ! WritableStreamDefaultControllerClose(stream.[[controller]])。

  10. 返回 promise

5.5.2. 与控制器的接口

为了将来能够灵活地添加不同的可写流行为(类似于默认可读流与可读字节流的区别),可写流的大部分内部状态由 WritableStreamDefaultController 类进行封装。

每个控制器类都定义了两个内部方法,由 WritableStream 算法调用:

[[AbortSteps]](reason)
控制器在流中止时运行的步骤,用于清理控制器中存储的状态并通知底层接收器
[[ErrorSteps]]()
控制器在流出错时运行的步骤,用于清理控制器中存储的状态。

(这些被定义为内部方法,而不是抽象操作,是为了允许 WritableStream 算法以多态方式调用,无需区分当前控制器的类型。目前只有 WritableStreamDefaultController,但这种设计为未来扩展留下了空间。)

本节剩余部分关注控制器反向调用可写流对象的抽象操作。控制器通过这些操作影响其关联的 WritableStream ,将内部状态变化转化为开发者可见的 WritableStream 公共 API 行为。

WritableStreamAddWriteRequest(stream) 步骤如下:
  1. 断言:! IsWritableStreamLocked(stream) 为 true。

  2. 断言:stream.[[state]] 为 "writable"。

  3. promise新的 promise

  4. 追加 promisestream.[[writeRequests]]

  5. 返回 promise

WritableStreamCloseQueuedOrInFlight(stream) 步骤如下:
  1. 如果 stream.[[closeRequest]] 为 undefined 且 stream.[[inFlightCloseRequest]] 为 undefined,则返回 false。

  2. 返回 true。

WritableStreamDealWithRejection(stream, error) 步骤如下:
  1. statestream.[[state]]

  2. 如果 state 为 "writable",

    1. 执行 ! WritableStreamStartErroring(stream, error)。

    2. 返回。

  3. 断言:state 为 "erroring"。

  4. 执行 ! WritableStreamFinishErroring(stream)。

WritableStreamFinishErroring(stream) 步骤如下:
  1. 断言:stream.[[state]] 为 "erroring"。

  2. 断言:! WritableStreamHasOperationMarkedInFlight(stream) 为 false。

  3. stream.[[state]] 设为 "errored"。

  4. 执行 ! stream.[[controller]].[[ErrorSteps]]()。

  5. storedErrorstream.[[storedError]]

  6. 遍历 stream.[[writeRequests]] 中的每个 writeRequest

    1. 拒绝 writeRequest,原因为 storedError

  7. stream.[[writeRequests]] 设为空列表

  8. 如果 stream.[[pendingAbortRequest]] 为 undefined,

    1. 执行 ! WritableStreamRejectCloseAndClosedPromiseIfNeeded(stream)。

    2. 返回。

  9. abortRequeststream.[[pendingAbortRequest]]

  10. stream.[[pendingAbortRequest]] 设为 undefined。

  11. 如果 abortRequestwas already erroring 为 true,

    1. 拒绝 abortRequestpromise,原因为 storedError

    2. 执行 ! WritableStreamRejectCloseAndClosedPromiseIfNeeded(stream)。

    3. 返回。

  12. promise 为 ! stream.[[controller]].[[AbortSteps]](abortRequestreason)。

  13. promise 兑现时,

    1. 解决 abortRequestpromise,值为 undefined。

    2. 执行 ! WritableStreamRejectCloseAndClosedPromiseIfNeeded(stream)。

  14. promise 被拒绝,原因为 reason 时,

    1. 拒绝 abortRequestpromise,原因为 reason

    2. 执行 ! WritableStreamRejectCloseAndClosedPromiseIfNeeded(stream)。

WritableStreamFinishInFlightClose(stream) 步骤如下:
  1. 断言:stream.[[inFlightCloseRequest]] 不为 undefined。

  2. 解决 stream.[[inFlightCloseRequest]],值为 undefined。

  3. stream.[[inFlightCloseRequest]] 设为 undefined。

  4. statestream.[[state]]

  5. 断言:stream.[[state]] 为 "writable" 或 "erroring"。

  6. 如果 state 为 "erroring",

    1. stream.[[storedError]] 设为 undefined。

    2. 如果 stream.[[pendingAbortRequest]] 不为 undefined,

      1. 解决 stream.[[pendingAbortRequest]]promise,值为 undefined。

      2. stream.[[pendingAbortRequest]] 设为 undefined。

  7. stream.[[state]] 设为 "closed"。

  8. writerstream.[[writer]]

  9. 如果 writer 不为 undefined,解决 writer.[[closedPromise]],值为 undefined。

  10. 断言:stream.[[pendingAbortRequest]] 为 undefined。

  11. 断言:stream.[[storedError]] 为 undefined。

WritableStreamFinishInFlightCloseWithError(stream, error) 步骤如下:
  1. 断言:stream.[[inFlightCloseRequest]] 不为 undefined。

  2. 拒绝 stream.[[inFlightCloseRequest]],原因为 error

  3. stream.[[inFlightCloseRequest]] 设为 undefined。

  4. 断言:stream.[[state]] 为 "writable" 或 "erroring"。

  5. 如果 stream.[[pendingAbortRequest]] 不为 undefined,

    1. 拒绝 stream.[[pendingAbortRequest]]promise,原因为 error

    2. stream.[[pendingAbortRequest]] 设为 undefined。

  6. 执行 ! WritableStreamDealWithRejection(stream, error)。

WritableStreamFinishInFlightWrite(stream) 步骤如下:
  1. 断言:stream.[[inFlightWriteRequest]] 不为 undefined。

  2. 解决 stream.[[inFlightWriteRequest]],值为 undefined。

  3. stream.[[inFlightWriteRequest]] 设为 undefined。

WritableStreamFinishInFlightWriteWithError(stream, error) 步骤如下:
  1. 断言:stream.[[inFlightWriteRequest]] 不为 undefined。

  2. 拒绝 stream.[[inFlightWriteRequest]],原因为 error

  3. stream.[[inFlightWriteRequest]] 设为 undefined。

  4. 断言:stream.[[state]] 为 "writable" 或 "erroring"。

  5. 执行 ! WritableStreamDealWithRejection(stream, error)。

WritableStreamHasOperationMarkedInFlight(stream) 步骤如下:
  1. 如果 stream.[[inFlightWriteRequest]] 为 undefined 且 stream.[[inFlightCloseRequest]] 为 undefined,则返回 false。

  2. 返回 true。

WritableStreamMarkCloseRequestInFlight(stream) 步骤如下:
  1. 断言:stream.[[inFlightCloseRequest]] 为 undefined。

  2. 断言:stream.[[closeRequest]] 不为 undefined。

  3. stream.[[inFlightCloseRequest]] 设为 stream.[[closeRequest]]

  4. stream.[[closeRequest]] 设为 undefined。

WritableStreamMarkFirstWriteRequestInFlight(stream) 步骤如下:
  1. 断言:stream.[[inFlightWriteRequest]] 为 undefined。

  2. 断言:stream.[[writeRequests]] 非空。

  3. writeRequeststream.[[writeRequests]][0]。

  4. 移除 writeRequeststream.[[writeRequests]]

  5. stream.[[inFlightWriteRequest]] 设为 writeRequest

WritableStreamRejectCloseAndClosedPromiseIfNeeded(stream) 步骤如下:
  1. 断言:stream.[[state]] 为 "errored"。

  2. 如果 stream.[[closeRequest]] 不为 undefined,

    1. 断言:stream.[[inFlightCloseRequest]] 为 undefined。

    2. 拒绝 stream.[[closeRequest]],原因为 stream.[[storedError]]

    3. stream.[[closeRequest]] 设为 undefined。

  3. writerstream.[[writer]]

  4. 如果 writer 不为 undefined,

    1. 拒绝 writer.[[closedPromise]],原因为 stream.[[storedError]]

    2. writer.[[closedPromise]].[[PromiseIsHandled]] 设为 true。

WritableStreamStartErroring(stream, reason) 步骤如下:
  1. 断言:stream.[[storedError]] 为 undefined。

  2. 断言:stream.[[state]] 为 "writable"。

  3. controllerstream.[[controller]]

  4. 断言:controller 不为 undefined。

  5. stream.[[state]] 设为 "erroring"。

  6. stream.[[storedError]] 设为 reason

  7. writerstream.[[writer]]

  8. 如果 writer 不为 undefined,执行 ! WritableStreamDefaultWriterEnsureReadyPromiseRejected(writer, reason)。

  9. 如果 ! WritableStreamHasOperationMarkedInFlight(stream) 为 false 且 controller.[[started]] 为 true,执行 ! WritableStreamFinishErroring(stream)。

WritableStreamUpdateBackpressure(stream, backpressure) 步骤如下:
  1. 断言:stream.[[state]] 为 "writable"。

  2. 断言:! WritableStreamCloseQueuedOrInFlight(stream) 为 false。

  3. writerstream.[[writer]]

  4. 如果 writer 不为 undefined 且 backpressure 不等于 stream.[[backpressure]]

    1. 如果 backpressure 为 true,则将 writer.[[readyPromise]] 设为新的 promise

    2. 否则,

      1. 断言:backpressure 为 false。

      2. 解决 writer.[[readyPromise]],值为 undefined。

  5. stream.[[backpressure]] 设为 backpressure

5.5.3. Writer 抽象操作

以下抽象操作用于实现和操作 WritableStreamDefaultWriter 实例。

WritableStreamDefaultWriterAbort(writer, reason) 步骤如下:
  1. streamwriter.[[stream]]

  2. 断言:stream 不为 undefined。

  3. 返回 ! WritableStreamAbort(stream, reason)。

WritableStreamDefaultWriterClose(writer) 步骤如下:
  1. streamwriter.[[stream]]

  2. 断言:stream 不为 undefined。

  3. 返回 ! WritableStreamClose(stream)。

WritableStreamDefaultWriterCloseWithErrorPropagation(writer) 步骤如下:
  1. streamwriter.[[stream]]

  2. 断言:stream 不为 undefined。

  3. statestream.[[state]]

  4. 如果 ! WritableStreamCloseQueuedOrInFlight(stream) 为 true 或 state 为 "closed",返回已解决的 promise,值为 undefined。

  5. 如果 state 为 "errored",返回拒绝的 promise,原因为 stream.[[storedError]]

  6. 断言:state 为 "writable" 或 "erroring"。

  7. 返回 ! WritableStreamDefaultWriterClose(writer)。

该抽象操作用于实现 ReadableStreampipeTo() 的错误传播语义。

WritableStreamDefaultWriterEnsureClosedPromiseRejected(writer, error) 步骤如下:
  1. 如果 writer.[[closedPromise]].[[PromiseState]] 为 "pending",拒绝 writer.[[closedPromise]],原因为 error

  2. 否则,将 writer.[[closedPromise]] 设为拒绝的 promise,原因为 error

  3. writer.[[closedPromise]].[[PromiseIsHandled]] 设为 true。

WritableStreamDefaultWriterEnsureReadyPromiseRejected(writer, error) 步骤如下:
  1. 如果 writer.[[readyPromise]].[[PromiseState]] 为 "pending",拒绝 writer.[[readyPromise]],原因为 error

  2. 否则,将 writer.[[readyPromise]] 设为拒绝的 promise,原因为 error

  3. writer.[[readyPromise]].[[PromiseIsHandled]] 设为 true。

WritableStreamDefaultWriterGetDesiredSize(writer) 步骤如下:
  1. streamwriter.[[stream]]

  2. statestream.[[state]]

  3. 如果 state 为 "errored" 或 "erroring",返回 null。

  4. 如果 state 为 "closed",返回 0。

  5. 返回 ! WritableStreamDefaultControllerGetDesiredSize(stream.[[controller]])。

WritableStreamDefaultWriterRelease(writer) 步骤如下:
  1. streamwriter.[[stream]]

  2. 断言:stream 不为 undefined。

  3. 断言:stream.[[writer]]writer

  4. releasedError 为新建 TypeError

  5. 执行 ! WritableStreamDefaultWriterEnsureReadyPromiseRejected(writer, releasedError)。

  6. 执行 ! WritableStreamDefaultWriterEnsureClosedPromiseRejected(writer, releasedError)。

  7. stream.[[writer]] 设为 undefined。

  8. writer.[[stream]] 设为 undefined。

WritableStreamDefaultWriterWrite(writer, chunk) 步骤如下:
  1. streamwriter.[[stream]]

  2. 断言:stream 不为 undefined。

  3. controllerstream.[[controller]]

  4. chunkSize 为 ! WritableStreamDefaultControllerGetChunkSize(controller, chunk)。

  5. 如果 stream 不等于 writer.[[stream]],返回拒绝的 promise,异常为 TypeError

  6. statestream.[[state]]

  7. 如果 state 为 "errored",返回拒绝的 promise,原因为 stream.[[storedError]]

  8. 如果 ! WritableStreamCloseQueuedOrInFlight(stream) 为 true 或 state 为 "closed",返回拒绝的 promise,异常为 TypeError,表示流正在关闭或已关闭。

  9. 如果 state 为 "erroring",返回拒绝的 promise,原因为 stream.[[storedError]]

  10. 断言:state 为 "writable"。

  11. promise 为 ! WritableStreamAddWriteRequest(stream)。

  12. 执行 ! WritableStreamDefaultControllerWrite(controller, chunk, chunkSize)。

  13. 返回 promise

5.5.4. 默认控制器

以下抽象操作用于实现 WritableStreamDefaultController 类。

SetUpWritableStreamDefaultController(stream, controller, startAlgorithm, writeAlgorithm, closeAlgorithm, abortAlgorithm, highWaterMark, sizeAlgorithm) 执行以下步骤:
  1. 断言:stream 实现 WritableStream

  2. 断言:stream.[[controller]] 为 undefined。

  3. controller.[[stream]] 设为 stream

  4. stream.[[controller]] 设为 controller

  5. 执行 ! ResetQueue(controller)。

  6. controller.[[abortController]] 设为新建 AbortController

  7. controller.[[started]] 设为 false。

  8. controller.[[strategySizeAlgorithm]] 设为 sizeAlgorithm

  9. controller.[[strategyHWM]] 设为 highWaterMark

  10. controller.[[writeAlgorithm]] 设为 writeAlgorithm

  11. controller.[[closeAlgorithm]] 设为 closeAlgorithm

  12. controller.[[abortAlgorithm]] 设为 abortAlgorithm

  13. backpressure 为 ! WritableStreamDefaultControllerGetBackpressure(controller)。

  14. 执行 ! WritableStreamUpdateBackpressure(stream, backpressure)。

  15. startResult 为执行 startAlgorithm 的结果。(可能会抛出异常。)

  16. startPromise已解决的 promise,值为 startResult

  17. startPromise 兑现时,

    1. 断言:stream.[[state]] 为 "writable" 或 "erroring"。

    2. controller.[[started]] 设为 true。

    3. 执行 ! WritableStreamDefaultControllerAdvanceQueueIfNeeded(controller)。

  18. startPromise 被拒绝,原因为 r 时,

    1. 断言:stream.[[state]] 为 "writable" 或 "erroring"。

    2. controller.[[started]] 设为 true。

    3. 执行 ! WritableStreamDealWithRejection(stream, r)。

SetUpWritableStreamDefaultControllerFromUnderlyingSink(stream, underlyingSink, underlyingSinkDict, highWaterMark, sizeAlgorithm) 执行以下步骤:
  1. controller 为新建 WritableStreamDefaultController

  2. startAlgorithm 为返回 undefined 的算法。

  3. writeAlgorithm 为返回已解决的 promise,值为 undefined 的算法。

  4. closeAlgorithm 为返回已解决的 promise,值为 undefined 的算法。

  5. abortAlgorithm 为返回已解决的 promise,值为 undefined 的算法。

  6. 如果 underlyingSinkDict["start"]存在,则将 startAlgorithm 设为:返回 调用 underlyingSinkDict["start"],参数列表为 « controller »,异常行为为 "rethrow",callback this valueunderlyingSink 的算法。

  7. 如果 underlyingSinkDict["write"]存在,则将 writeAlgorithm 设为:接受 chunk 参数,并返回 调用 underlyingSinkDict["write"],参数列表为 « chunk, controller »,callback this valueunderlyingSink 的算法。

  8. 如果 underlyingSinkDict["close"]存在,则将 closeAlgorithm 设为:返回 调用 underlyingSinkDict["close"],参数列表为 «»,callback this valueunderlyingSink 的算法。

  9. 如果 underlyingSinkDict["abort"]存在,则将 abortAlgorithm 设为:接受 reason 参数,并返回 调用 underlyingSinkDict["abort"],参数列表为 « reason »,callback this valueunderlyingSink 的算法。

  10. 执行 ? SetUpWritableStreamDefaultController(stream, controller, startAlgorithm, writeAlgorithm, closeAlgorithm, abortAlgorithm, highWaterMark, sizeAlgorithm)。

WritableStreamDefaultControllerAdvanceQueueIfNeeded(controller) 执行以下步骤:
  1. streamcontroller.[[stream]]

  2. 如果 controller.[[started]] 为 false,则返回。

  3. 如果 stream.[[inFlightWriteRequest]] 不为 undefined,则返回。

  4. statestream.[[state]]

  5. 断言:state 不为 "closed" 或 "errored"。

  6. 如果 state 为 "erroring",

    1. 执行 ! WritableStreamFinishErroring(stream)。

    2. 返回。

  7. 如果 controller.[[queue]] 为空,则返回。

  8. value 为 ! PeekQueueValue(controller)。

  9. 如果 value关闭哨兵,执行 ! WritableStreamDefaultControllerProcessClose(controller)。

  10. 否则,执行 ! WritableStreamDefaultControllerProcessWrite(controller, value)。

WritableStreamDefaultControllerClearAlgorithms(controller) 在流关闭或出错且这些算法不再会被执行时调用。移除算法引用可以让底层接收器对象被垃圾回收,即使 WritableStream 本身仍被引用。

这可以通过 弱引用观察。详见 tc39/proposal-weakrefs#31

执行以下步骤:

  1. controller.[[writeAlgorithm]] 设为 undefined。

  2. controller.[[closeAlgorithm]] 设为 undefined。

  3. controller.[[abortAlgorithm]] 设为 undefined。

  4. controller.[[strategySizeAlgorithm]] 设为 undefined。

该算法在某些边界情况会多次执行,第一次以后不会有任何效果。

WritableStreamDefaultControllerClose(controller) 执行以下步骤:
  1. 执行 ! EnqueueValueWithSize(controller, 关闭哨兵, 0)。

  2. 执行 ! WritableStreamDefaultControllerAdvanceQueueIfNeeded(controller)。

WritableStreamDefaultControllerError(controller, error) 执行以下步骤:
  1. streamcontroller.[[stream]]

  2. 断言:stream.[[state]] 为 "writable"。

  3. 执行 ! WritableStreamDefaultControllerClearAlgorithms(controller)。

  4. 执行 ! WritableStreamStartErroring(stream, error)。

WritableStreamDefaultControllerErrorIfNeeded(controller, error) 执行以下步骤:
  1. 如果 controller.[[stream]].[[state]] 为 "writable",执行 ! WritableStreamDefaultControllerError(controller, error)。

WritableStreamDefaultControllerGetBackpressure(controller) 执行以下步骤:
  1. desiredSize 为 ! WritableStreamDefaultControllerGetDesiredSize(controller)。

  2. 如果 desiredSize ≤ 0,返回 true,否则返回 false。

WritableStreamDefaultControllerGetChunkSize(controller, chunk) 执行以下步骤:
  1. 如果 controller.[[strategySizeAlgorithm]] 为 undefined,则:

    1. 断言:controller.[[stream]].[[state]] 不为 "writable"。

    2. 返回 1。

  2. returnValue 为执行 controller.[[strategySizeAlgorithm]],传入 chunk,并将结果解释为 completion record

  3. 如果 returnValue 是 abrupt completion,

    1. 执行 ! WritableStreamDefaultControllerErrorIfNeeded(controller, returnValue.[[Value]])。

    2. 返回 1。

  4. 返回 returnValue.[[Value]]。

WritableStreamDefaultControllerGetDesiredSize(controller) 执行以下步骤:
  1. 返回 controller.[[strategyHWM]]controller.[[queueTotalSize]]

WritableStreamDefaultControllerProcessClose(controller) 执行以下步骤:
  1. streamcontroller.[[stream]]

  2. 执行 ! WritableStreamMarkCloseRequestInFlight(stream)。

  3. 执行 ! DequeueValue(controller)。

  4. 断言:controller.[[queue]] 为空。

  5. sinkClosePromise 为执行 controller.[[closeAlgorithm]] 的结果。

  6. 执行 ! WritableStreamDefaultControllerClearAlgorithms(controller)。

  7. sinkClosePromise 兑现时,

    1. 执行 ! WritableStreamFinishInFlightClose(stream)。

  8. sinkClosePromise 被拒绝,原因为 reason 时,

    1. 执行 ! WritableStreamFinishInFlightCloseWithError(stream, reason)。

WritableStreamDefaultControllerProcessWrite(controller, chunk) 执行以下步骤:
  1. streamcontroller.[[stream]]

  2. 执行 ! WritableStreamMarkFirstWriteRequestInFlight(stream)。

  3. sinkWritePromise 为执行 controller.[[writeAlgorithm]],传入 chunk 的结果。

  4. sinkWritePromise 兑现时,

    1. 执行 ! WritableStreamFinishInFlightWrite(stream)。

    2. statestream.[[state]]

    3. 断言:state 为 "writable" 或 "erroring"。

    4. 执行 ! DequeueValue(controller)。

    5. 如果 ! WritableStreamCloseQueuedOrInFlight(stream) 为 false 且 state 为 "writable",

      1. backpressure 为 ! WritableStreamDefaultControllerGetBackpressure(controller)。

      2. 执行 ! WritableStreamUpdateBackpressure(stream, backpressure)。

    6. 执行 ! WritableStreamDefaultControllerAdvanceQueueIfNeeded(controller)。

  5. sinkWritePromise 被拒绝,原因为 reason 时,

    1. 如果 stream.[[state]] 为 "writable",执行 ! WritableStreamDefaultControllerClearAlgorithms(controller)。

    2. 执行 ! WritableStreamFinishInFlightWriteWithError(stream, reason)。

WritableStreamDefaultControllerWrite(controller, chunk, chunkSize) 执行以下步骤:
  1. enqueueResultEnqueueValueWithSize(controller, chunk, chunkSize)。

  2. 如果 enqueueResult 是 abrupt completion,

    1. 执行 ! WritableStreamDefaultControllerErrorIfNeeded(controller, enqueueResult.[[Value]])。

    2. 返回。

  3. streamcontroller.[[stream]]

  4. 如果 ! WritableStreamCloseQueuedOrInFlight(stream) 为 false 且 stream.[[state]] 为 "writable",

    1. backpressure 为 ! WritableStreamDefaultControllerGetBackpressure(controller)。

    2. 执行 ! WritableStreamUpdateBackpressure(stream, backpressure)。

  5. 执行 ! WritableStreamDefaultControllerAdvanceQueueIfNeeded(controller)。

6. 转换流

6.1. 使用转换流

使用转换流的自然方式是在管道中将其置于可读流可写流之间。来自可读流可写流数据块会在通过转换流时被转换。背压会被遵循,因此数据不会被读取得比转换和消费更快。
readableStream
  .pipeThrough(transformStream)
  .pipeTo(writableStream)
  .then(() => console.log("所有数据已成功转换!"))
  .catch(e => console.error("发生错误!", e));
你也可以直接使用转换流的 readablewritable 属性来访问可读流和可写流的常规接口。此示例中我们使用 可写端writer 接口提供数据。然后,将 可读端通过管道连接到 anotherWritableStream
const writer = transformStream.writable.getWriter();
writer.write("输入块");
transformStream.readable.pipeTo(anotherWritableStream);
恒等变换流的一个用途是便捷地在可读流与可写流之间转换。例如,fetch() API 接收一个可读流请求体,但通过可写流接口上传数据会更方便。使用恒等变换流可以实现这一点:
const { writable, readable } = new TransformStream();
fetch("...", { body: readable }).then(response => /* ... */);

const writer = writable.getWriter();
writer.write(new Uint8Array([0x73, 0x74, 0x72, 0x65, 0x61, 0x6D, 0x73, 0x21]));
writer.close();

恒等变换流的另一个用途是为管道增加额外缓冲区。此示例在 readableStreamwritableStream 之间增加了额外缓冲。

const writableStrategy = new ByteLengthQueuingStrategy({ highWaterMark: 1024 * 1024 });

readableStream
  .pipeThrough(new TransformStream(undefined, writableStrategy))
  .pipeTo(writableStream);

6.2. TransformStream

TransformStream 类是通用转换流概念的具体实现。

6.2.1. 接口定义

TransformStream 类的 Web IDL 定义如下:

[Exposed=*, Transferable]
interface TransformStream {
  constructor(optional object transformer,
              optional QueuingStrategy writableStrategy = {},
              optional QueuingStrategy readableStrategy = {});

  readonly attribute ReadableStream readable;
  readonly attribute WritableStream writable;
};

6.2.2. 内部槽

TransformStream 的实例创建时拥有如下表描述的内部槽:

内部槽 描述(非规范性
[[backpressure]] 上一次观察到 [[readable]] 时是否存在背压
[[backpressureChangePromise]] 每次 [[backpressure]] 变化时都会被 fulfill 并替换的 promise
[[controller]] 用以控制 [[readable]][[writable]]TransformStreamDefaultController 实例
[[Detached]] 流被 transfer 时设为 true 的布尔标志
[[readable]] 该对象控制的 ReadableStream 实例
[[writable]] 该对象控制的 WritableStream 实例

6.2.3. transformer API

TransformStream() 构造函数的第一个参数接受一个表示transformer的 JavaScript 对象。此对象可包含下列任意方法:

dictionary Transformer {
  TransformerStartCallback start;
  TransformerTransformCallback transform;
  TransformerFlushCallback flush;
  TransformerCancelCallback cancel;
  any readableType;
  any writableType;
};

callback TransformerStartCallback = any (TransformStreamDefaultController controller);
callback TransformerFlushCallback = Promise<undefined> (TransformStreamDefaultController controller);
callback TransformerTransformCallback = Promise<undefined> (any chunk, TransformStreamDefaultController controller);
callback TransformerCancelCallback = Promise<undefined> (any reason);
start(controller), 类型为 TransformerStartCallback

TransformStream 创建时立即调用的函数。

通常用于通过 controller.enqueue() 入队前缀数据块。这些数据块会从可读端读取,不依赖于任何对可写端的写入。

如果此初始化过程是异步的,例如需要获取前缀块,则该函数可以返回 promise 以表示成功或失败;被拒绝的 promise 会使流出错。抛出的异常会被 TransformStream() 构造函数重新抛出。

transform(chunk, controller), 类型为 TransformerTransformCallback

当新的数据块可写端写入并准备转换时调用的函数。流实现保证此函数仅在前一次转换成功后才会被调用,并且不会在 start() 完成之前或 flush() 被调用之后调用。

此函数执行转换流的实际转换工作。可以使用 controller.enqueue() 入队转换结果。这允许在可写端写入的单个数据块在可读端产生零个或多个数据块,具体取决于 controller.enqueue() 被调用的次数。§ 10.9 替换模板标签的转换流演示了有时会入队零块的情况。

如果转换过程是异步的,此函数可以返回 promise 以表示转换的成功或失败。被拒绝的 promise 会导致转换流的可读端和可写端都出错。

此函数可能返回的 promise 用于确保良好行为生产者不会在数据块完全转换前对其进行修改。(这不是规范机制保证的,而是生产者transformer之间的非正式约定。)

如果未提供 transform() 方法,则使用恒等变换,即数据块不变地从可写端传递到可读端。

flush(controller), 类型为 TransformerFlushCallback

所有写入可写端的数据块都已通过 transform() 成功转换后,且可写端即将关闭时调用的函数。

通常用于在可读端关闭前,将后缀数据块入队到可读端。可见示例 § 10.9 替换模板标签的转换流

如果刷新过程是异步的,可返回 promise 表示成功或失败;结果会传递给 stream.writable.write() 的调用者。被拒绝的 promise 会导致流的可读端和可写端都出错。抛出异常等同于返回被拒绝的 promise。

(注意无需在 flush() 内调用 controller.terminate();此时流已经在正常关闭流程中,终止反而会适得其反。)

cancel(reason), 类型为 TransformerCancelCallback

可读端被取消,或可写端被中止时调用的函数。

通常用于在流被中止或取消时清理底层 transformer 资源。

如果取消过程是异步的,可返回 promise 表示成功或失败;结果会传递给 stream.writable.abort()stream.readable.cancel() 的调用者。抛出异常等同于返回被拒绝的 promise。

(注意无需在 cancel() 内调用 controller.terminate();此时流已经在取消/中止流程中,终止反而会适得其反。)

readableType, 类型为 any

该属性保留用于未来使用,任何尝试赋值都会抛出异常。

writableType, 类型为 any

该属性保留用于未来使用,任何尝试赋值都会抛出异常。

传递给 controller 对象的 start()transform()flush() 均为 TransformStreamDefaultController 实例,可用于向可读端入队数据块,或终止、错误流。

6.2.4. 构造函数与属性

stream = new TransformStream([transformer[, writableStrategy[, readableStrategy]]])

创建一个新的 TransformStream,包装所提供的transformer。关于 transformer 参数的详细信息见 § 6.2.3 transformer API

如果未提供 transformer 参数,则结果将是一个恒等变换流。参见该示例了解其用途。

writableStrategyreadableStrategy 参数分别是队列策略对象,用于 可写端可读端。这些参数会用于构造 WritableStreamReadableStream 对象,可以为 TransformStream 增加缓冲区,从而平滑转换速度的变化或增加管道缓冲。若未提供,则默认为 CountQueuingStrategy,高水位线分别为 1 和 0。

readable = stream.readable

返回一个 ReadableStream,表示该转换流的可读端

writable = stream.writable

返回一个 WritableStream,表示该转换流的可写端

new TransformStream(transformer, writableStrategy, readableStrategy) 构造步骤如下:
  1. 如果 transformer 缺失,则设为 null。

  2. transformerDicttransformer转换为 IDL 类型 Transformer

    不能直接声明 transformer 参数为 Transformer 类型,否则会丢失原对象引用。需要保留对象以便后续调用其方法。

  3. 如果 transformerDict["readableType"]存在,抛出 RangeError 异常。

  4. 如果 transformerDict["writableType"]存在,抛出 RangeError 异常。

  5. readableHighWaterMark 为 ? ExtractHighWaterMark(readableStrategy, 0)。

  6. readableSizeAlgorithm 为 ! ExtractSizeAlgorithm(readableStrategy)。

  7. writableHighWaterMark 为 ? ExtractHighWaterMark(writableStrategy, 1)。

  8. writableSizeAlgorithm 为 ! ExtractSizeAlgorithm(writableStrategy)。

  9. startPromise新的 promise

  10. 执行 ! InitializeTransformStream(this, startPromise, writableHighWaterMark, writableSizeAlgorithm, readableHighWaterMark, readableSizeAlgorithm)。

  11. 执行 ? SetUpTransformStreamDefaultControllerFromTransformer(this, transformer, transformerDict)。

  12. 如果 transformerDict["start"]存在,则解决 startPromise,值为调用 transformerDict["start"], 参数列表为 « this.[[controller]] », callback this valuetransformer

  13. 否则,解决 startPromise,值为 undefined。

readable getter 步骤如下:
  1. 返回 this.[[readable]]

writable getter 步骤如下:
  1. 返回 this.[[writable]]

6.2.5. 通过 postMessage() 传递

destination.postMessage(ts, { transfer: [ts] });

TransformStream 发送到其他 frame、window 或 worker。

被传递的流可像原始流一样使用,但其可读端可写端将被锁定,无法直接使用。

TransformStream 对象是可转移对象。它们的转移步骤,给定 valuedataHolder,如下:
  1. readablevalue.[[readable]]

  2. writablevalue.[[writable]]

  3. 如果 ! IsReadableStreamLocked(readable) 为 true,则抛出 "DataCloneError" DOMException

  4. 如果 ! IsWritableStreamLocked(writable) 为 true,则抛出 "DataCloneError" DOMException

  5. dataHolder.[[readable]] 为 ! StructuredSerializeWithTransfer(readable, « readable »)。

  6. dataHolder.[[writable]] 为 ! StructuredSerializeWithTransfer(writable, « writable »)。

接收转移步骤,给定 dataHoldervalue,如下:
  1. readableRecord 为 ! StructuredDeserializeWithTransfer(dataHolder.[[readable]], 当前 Realm)。

  2. writableRecord 为 ! StructuredDeserializeWithTransfer(dataHolder.[[writable]], 当前 Realm)。

  3. value.[[readable]]readableRecord.[[Deserialized]]。

  4. value.[[writable]]writableRecord.[[Deserialized]]。

  5. value.[[backpressure]]value.[[backpressureChangePromise]]value.[[controller]] 均设为 undefined。

[[backpressure]][[backpressureChangePromise]][[controller]] 槽在被传递的 TransformStream 中不再被使用。

6.3. TransformStreamDefaultController

TransformStreamDefaultController 类有方法可以操作关联的 ReadableStreamWritableStream。 构造 TransformStream 时,transformer 对象会获得对应的 TransformStreamDefaultController 实例用于操作。

6.3.1. 接口定义

TransformStreamDefaultController 类的 Web IDL 定义如下:

[Exposed=*]
interface TransformStreamDefaultController {
  readonly attribute unrestricted double? desiredSize;

  undefined enqueue(optional any chunk);
  undefined error(optional any reason);
  undefined terminate();
};

6.3.2. 内部槽

TransformStreamDefaultController 的实例创建时拥有如下表描述的内部槽:

内部槽 描述(非规范性
[[cancelAlgorithm]] 一个返回 promise 的算法,接受一个参数(取消原因),用于将取消请求传递给transformer
[[finishPromise]] 一个 promise,在[[cancelAlgorithm]][[flushAlgorithm]]完成时 resolve。如果该字段未赋值(即为 undefined),则表示尚未调用这两个算法
[[flushAlgorithm]] 一个返回 promise 的算法,用于向transformer传递关闭请求
[[stream]] 被控制的 TransformStream 实例
[[transformAlgorithm]] 一个返回 promise 的算法,接受一个参数(待转换的数据块),请求transformer执行转换

6.3.3. 方法与属性

desiredSize = controller.desiredSize

返回填充可读端内部队列的期望大小。如果队列过满,该值可能为负。

controller.enqueue(chunk)

将给定的数据块 chunk入队到被控制转换流的可读端

controller.error(e)

使被控制转换流的可读端可写端都出错,后续操作均会因错误 e 失败。所有排队等待转换的数据块会被丢弃。

controller.terminate()

关闭被控制转换流的可读端,并使可写端出错。当transformer只需要消费一部分数据块时很有用。

desiredSize getter 步骤如下:
  1. readableControllerthis.[[stream]].[[readable]].[[controller]]

  2. 返回 ! ReadableStreamDefaultControllerGetDesiredSize(readableController)。

enqueue(chunk) 方法步骤如下:
  1. 执行 ? TransformStreamDefaultControllerEnqueue(this, chunk)。

error(e) 方法步骤如下:
  1. 执行 ? TransformStreamDefaultControllerError(this, e)。

terminate() 方法步骤如下:
  1. 执行 ? TransformStreamDefaultControllerTerminate(this)。

6.4. 抽象操作

6.4.1. 与转换流相关操作

以下抽象操作在更高层面操作 TransformStream 实例。

InitializeTransformStream(stream, startPromise, writableHighWaterMark, writableSizeAlgorithm, readableHighWaterMark, readableSizeAlgorithm) 执行以下步骤:
  1. startAlgorithm 为返回 startPromise 的算法。

  2. writeAlgorithm 为如下步骤,接收 chunk 参数:

    1. 返回 ! TransformStreamDefaultSinkWriteAlgorithm(stream, chunk)。

  3. abortAlgorithm 为如下步骤,接收 reason 参数:

    1. 返回 ! TransformStreamDefaultSinkAbortAlgorithm(stream, reason)。

  4. closeAlgorithm 为如下步骤:

    1. 返回 ! TransformStreamDefaultSinkCloseAlgorithm(stream)。

  5. stream.[[writable]] 设为 ! CreateWritableStream(startAlgorithm, writeAlgorithm, closeAlgorithm, abortAlgorithm, writableHighWaterMark, writableSizeAlgorithm)。

  6. pullAlgorithm 为如下步骤:

    1. 返回 ! TransformStreamDefaultSourcePullAlgorithm(stream)。

  7. cancelAlgorithm 为如下步骤,接收 reason 参数:

    1. 返回 ! TransformStreamDefaultSourceCancelAlgorithm(stream, reason)。

  8. stream.[[readable]] 设为 ! CreateReadableStream(startAlgorithm, pullAlgorithm, cancelAlgorithm, readableHighWaterMark, readableSizeAlgorithm)。

  9. stream.[[backpressure]]stream.[[backpressureChangePromise]] 设为 undefined。

    [[backpressure]] 槽被设为 undefined,以便通过 TransformStreamSetBackpressure 初始化。或者实现可以直接用布尔值并改变初始化方式。只要初始化在 transformer 的 start() 方法调用前完成,用户代码不可见。

  10. 执行 ! TransformStreamSetBackpressure(stream, true)。

  11. stream.[[controller]] 设为 undefined。

TransformStreamError(stream, e) 执行以下步骤:
  1. 执行 ! ReadableStreamDefaultControllerError(stream.[[readable]].[[controller]], e)。

  2. 执行 ! TransformStreamErrorWritableAndUnblockWrite(stream, e)。

当一端或双方已出错时,该操作依然有效。因此,调用算法在处理错误时无需检查流状态。

TransformStreamErrorWritableAndUnblockWrite(stream, e) 执行以下步骤:
  1. 执行 ! TransformStreamDefaultControllerClearAlgorithms(stream.[[controller]])。

  2. 执行 ! WritableStreamDefaultControllerErrorIfNeeded(stream.[[writable]].[[controller]], e)。

  3. 执行 ! TransformStreamUnblockWrite(stream)。

TransformStreamSetBackpressure(stream, backpressure) 执行以下步骤:
  1. 断言:stream.[[backpressure]] 不等于 backpressure

  2. 如果 stream.[[backpressureChangePromise]] 不为 undefined,则解决 stream.[[backpressureChangePromise]],值为 undefined。

  3. stream.[[backpressureChangePromise]] 设为新的 promise

  4. stream.[[backpressure]] 设为 backpressure

TransformStreamUnblockWrite(stream) 执行以下步骤:
  1. 如果 stream.[[backpressure]] 为 true,则执行 ! TransformStreamSetBackpressure(stream, false)。

TransformStreamDefaultSinkWriteAlgorithm 抽象操作可能在等待 [[backpressureChangePromise]] 槽中的 promise 被解决。调用 TransformStreamSetBackpressure 可保证 promise 总能被解决。

6.4.2. 默认控制器

以下抽象操作用于实现 TransformStreamDefaultController 类。

SetUpTransformStreamDefaultController(stream, controller, transformAlgorithm, flushAlgorithm, cancelAlgorithm) 执行以下步骤:
  1. 断言:stream 实现 TransformStream

  2. 断言:stream.[[controller]] 为 undefined。

  3. controller.[[stream]] 设为 stream

  4. stream.[[controller]] 设为 controller

  5. controller.[[transformAlgorithm]] 设为 transformAlgorithm

  6. controller.[[flushAlgorithm]] 设为 flushAlgorithm

  7. controller.[[cancelAlgorithm]] 设为 cancelAlgorithm

SetUpTransformStreamDefaultControllerFromTransformer(stream, transformer, transformerDict) 执行以下步骤:
  1. controller 为新建 TransformStreamDefaultController

  2. transformAlgorithm 为如下步骤,接收 chunk 参数:

    1. resultTransformStreamDefaultControllerEnqueue(controller, chunk)。

    2. 如果 result 是 abrupt completion,返回拒绝的 promise,值为 result.[[Value]]。

    3. 否则返回已解决的 promise,值为 undefined。

  3. flushAlgorithm 为返回已解决的 promise,值为 undefined 的算法。

  4. cancelAlgorithm 为返回已解决的 promise,值为 undefined 的算法。

  5. 如果 transformerDict["transform"]存在,则将 transformAlgorithm 设为一个算法,接收 chunk 参数并返回 调用 transformerDict["transform"],参数列表为 « chunk, controller »,callback this valuetransformer

  6. 如果 transformerDict["flush"]存在,则将 flushAlgorithm 设为一个算法,返回 调用 transformerDict["flush"],参数列表为 « controller »,callback this valuetransformer

  7. 如果 transformerDict["cancel"]存在,则将 cancelAlgorithm 设为一个算法,接收 reason 参数并返回 调用 transformerDict["cancel"],参数列表为 « reason »,callback this valuetransformer

  8. 执行 ! SetUpTransformStreamDefaultController(stream, controller, transformAlgorithm, flushAlgorithm, cancelAlgorithm)。

TransformStreamDefaultControllerClearAlgorithms(controller) 在流关闭或出错且这些算法不再会被执行时调用。移除算法引用可以让transformer对象被垃圾回收,即使 TransformStream 本身仍被引用。

这可以通过 弱引用观察。详见 tc39/proposal-weakrefs#31

执行以下步骤:

  1. controller.[[transformAlgorithm]] 设为 undefined。

  2. controller.[[flushAlgorithm]] 设为 undefined。

  3. controller.[[cancelAlgorithm]] 设为 undefined。

TransformStreamDefaultControllerEnqueue(controller, chunk) 执行以下步骤:
  1. streamcontroller.[[stream]]

  2. readableControllerstream.[[readable]].[[controller]]

  3. 如果 ! ReadableStreamDefaultControllerCanCloseOrEnqueue(readableController) 为 false,抛出 TypeError 异常。

  4. enqueueResultReadableStreamDefaultControllerEnqueue(readableController, chunk)。

  5. 如果 enqueueResult 是 abrupt completion,

    1. 执行 ! TransformStreamErrorWritableAndUnblockWrite(stream, enqueueResult.[[Value]])。

    2. 抛出 stream.[[readable]].[[storedError]]

  6. backpressure 为 ! ReadableStreamDefaultControllerHasBackpressure(readableController)。

  7. 如果 backpressure 不等于 stream.[[backpressure]]

    1. 断言:backpressure 为 true。

    2. 执行 ! TransformStreamSetBackpressure(stream, true)。

TransformStreamDefaultControllerError(controller, e) 执行以下步骤:
  1. 执行 ! TransformStreamError(controller.[[stream]], e)。

TransformStreamDefaultControllerPerformTransform(controller, chunk) 执行以下步骤:
  1. transformPromise 为执行 controller.[[transformAlgorithm]],传入 chunk 的结果。

  2. 返回 transformPromise 的反应,拒绝步骤如下,参数为 r

    1. 执行 ! TransformStreamError(controller.[[stream]], r)。

    2. 抛出 r

TransformStreamDefaultControllerTerminate(controller) 执行以下步骤:
  1. streamcontroller.[[stream]]

  2. readableControllerstream.[[readable]].[[controller]]

  3. 执行 ! ReadableStreamDefaultControllerClose(readableController)。

  4. errorTypeError 异常,表示流已被终止。

  5. 执行 ! TransformStreamErrorWritableAndUnblockWrite(stream, error)。

6.4.3. 默认 sink

以下抽象操作用于实现 转换流可写端底层 sink

TransformStreamDefaultSinkWriteAlgorithm(stream, chunk) 执行以下步骤:
  1. 断言:stream.[[writable]].[[state]] 为 "writable"。

  2. controllerstream.[[controller]]

  3. 如果 stream.[[backpressure]] 为 true,

    1. backpressureChangePromisestream.[[backpressureChangePromise]]

    2. 断言:backpressureChangePromise 不为 undefined。

    3. 返回 响应 backpressureChangePromise 的如下 fulfill 步骤:

      1. writablestream.[[writable]]

      2. statewritable.[[state]]

      3. 如果 state 为 "erroring",抛出 writable.[[storedError]]

      4. 断言:state 为 "writable"。

      5. 返回 ! TransformStreamDefaultControllerPerformTransform(controller, chunk)。

  4. 返回 ! TransformStreamDefaultControllerPerformTransform(controller, chunk)。

TransformStreamDefaultSinkAbortAlgorithm(stream, reason) 执行以下步骤:
  1. controllerstream.[[controller]]

  2. 如果 controller.[[finishPromise]] 不为 undefined,返回 controller.[[finishPromise]]

  3. readablestream.[[readable]]

  4. controller.[[finishPromise]] 为新的 promise。

  5. cancelPromise 为执行 controller.[[cancelAlgorithm]],传入 reason 的结果。

  6. 执行 ! TransformStreamDefaultControllerClearAlgorithms(controller)。

  7. 响应 cancelPromise

    1. 如果 cancelPromise 被 fulfill:

      1. 如果 readable.[[state]] 为 "errored",拒绝 controller.[[finishPromise]],原因为 readable.[[storedError]]

      2. 否则:

        1. 执行 ! ReadableStreamDefaultControllerError(readable.[[controller]], reason)。

        2. 解决 controller.[[finishPromise]],值为 undefined。

    2. 如果 cancelPromise 被拒绝,原因为 r

      1. 执行 ! ReadableStreamDefaultControllerError(readable.[[controller]], r)。

      2. 拒绝 controller.[[finishPromise]],原因为 r

  8. 返回 controller.[[finishPromise]]

TransformStreamDefaultSinkCloseAlgorithm(stream) 执行以下步骤:
  1. controllerstream.[[controller]]

  2. 如果 controller.[[finishPromise]] 不为 undefined,返回 controller.[[finishPromise]]

  3. readablestream.[[readable]]

  4. controller.[[finishPromise]] 为新的 promise。

  5. flushPromise 为执行 controller.[[flushAlgorithm]] 的结果。

  6. 执行 ! TransformStreamDefaultControllerClearAlgorithms(controller)。

  7. 响应 flushPromise

    1. 如果 flushPromise 被 fulfill:

      1. 如果 readable.[[state]] 为 "errored",拒绝 controller.[[finishPromise]],原因为 readable.[[storedError]]

      2. 否则:

        1. 执行 ! ReadableStreamDefaultControllerClose(readable.[[controller]])。

        2. 解决 controller.[[finishPromise]],值为 undefined。

    2. 如果 flushPromise 被拒绝,原因为 r

      1. 执行 ! ReadableStreamDefaultControllerError(readable.[[controller]], r)。

      2. 拒绝 controller.[[finishPromise]],原因为 r

  8. 返回 controller.[[finishPromise]]

6.4.4. 默认 source

以下抽象操作用于实现 转换流可读端底层 source

TransformStreamDefaultSourceCancelAlgorithm(stream, reason) 执行以下步骤:
  1. controllerstream.[[controller]]

  2. 如果 controller.[[finishPromise]] 不为 undefined,返回 controller.[[finishPromise]]

  3. writablestream.[[writable]]

  4. controller.[[finishPromise]] 为新的 promise。

  5. cancelPromise 为执行 controller.[[cancelAlgorithm]],传入 reason 的结果。

  6. 执行 ! TransformStreamDefaultControllerClearAlgorithms(controller)。

  7. 响应 cancelPromise

    1. 如果 cancelPromise 被 fulfill:

      1. 如果 writable.[[state]] 为 "errored",拒绝 controller.[[finishPromise]],原因为 writable.[[storedError]]

      2. 否则:

        1. 执行 ! WritableStreamDefaultControllerErrorIfNeeded(writable.[[controller]], reason)。

        2. 执行 ! TransformStreamUnblockWrite(stream)。

        3. 解决 controller.[[finishPromise]],值为 undefined。

    2. 如果 cancelPromise 被拒绝,原因为 r

      1. 执行 ! WritableStreamDefaultControllerErrorIfNeeded(writable.[[controller]], r)。

      2. 执行 ! TransformStreamUnblockWrite(stream)。

      3. 拒绝 controller.[[finishPromise]],原因为 r

  8. 返回 controller.[[finishPromise]]

TransformStreamDefaultSourcePullAlgorithm(stream) 执行以下步骤:
  1. 断言:stream.[[backpressure]] 为 true。

  2. 断言:stream.[[backpressureChangePromise]] 不为 undefined。

  3. 执行 ! TransformStreamSetBackpressure(stream, false)。

  4. 返回 stream.[[backpressureChangePromise]]

7. 队列策略

7.1. 队列策略 API

ReadableStream()WritableStream()TransformStream() 构造函数都至少接受一个参数,表示所创建流的合适队列策略。这些对象包含以下属性:

dictionary QueuingStrategy {
  unrestricted double highWaterMark;
  QueuingStrategySize size;
};

callback QueuingStrategySize = unrestricted double (any chunk);
highWaterMark, 类型为 unrestricted double

非负数,表示使用该队列策略的流的高水位线

size(chunk)(仅用于非字节流),类型为 QueuingStrategySize

一个函数,用于计算并返回给定数据块值的有限非负大小。

该结果用于确定背压,通过相应的 desiredSize 属性体现:可能是 defaultController.desiredSizebyteController.desiredSizewriter.desiredSize,依据队列策略使用位置而定。对于可读流,还决定何时调用底层 sourcepull() 方法。

此函数必须是幂等且无副作用,否则会造成很奇怪的结果。

对于可读字节流,该函数不会被使用,因为数据块始终以字节计量。

任何具有这些属性的对象都可用作队列策略对象。但我们提供了两种内建队列策略类,用于特定场景下的通用表达:ByteLengthQueuingStrategyCountQueuingStrategy。 它们的构造函数都使用以下 Web IDL 片段:

dictionary QueuingStrategyInit {
  required unrestricted double highWaterMark;
};

7.2. ByteLengthQueuingStrategy

处理字节时常见的队列策略是等待输入数据块byteLength 属性累计达到指定高水位线。因此,作为内建队列策略提供,可用于构造流。

创建可读流可写流时,可以直接提供字节长度队列策略:
const stream = new ReadableStream(
  { ... },
  new ByteLengthQueuingStrategy({ highWaterMark: 16 * 1024 })
);

在此情况下,可读流的底层 source可入队总计 16 KiB 的数据块,然后可读流实现才会向底层 source 发出背压信号。

const stream = new WritableStream(
  { ... },
  new ByteLengthQueuingStrategy({ highWaterMark: 32 * 1024 })
);

在此情况下,可写流的内部队列可累计 32 KiB 的数据块,等待先前写入底层 sink 完成,然后可写流才会向生产者发出背压信号。

无需在可读字节流中使用 ByteLengthQueuingStrategy,因为其始终以字节计量数据块。尝试用 ByteLengthQueuingStrategy 构造字节流会失败。

7.2.1. 接口定义

ByteLengthQueuingStrategy 类的 Web IDL 定义如下:

[Exposed=*]
interface ByteLengthQueuingStrategy {
  constructor(QueuingStrategyInit init);

  readonly attribute unrestricted double highWaterMark;
  readonly attribute Function size;
};

7.2.2. 内部槽

ByteLengthQueuingStrategy 实例有一个 [[highWaterMark]] 内部槽,存储构造函数中给定的值。

此外,每个 global object globalObject 都有一个 字节长度队列策略 size 函数,其值初始化如下:
  1. steps 如下,参数为 chunk

    1. 返回 ? GetV(chunk, "byteLength")。

  2. F 为 ! CreateBuiltinFunction(steps, 1, "size", « », globalObject相关 Realm)。

  3. globalObject字节长度队列策略 size 函数 设为一个 Function,代表对 F 的引用,callback contextglobalObject相关设置对象

此设计带有历史原因。目的是确保 size 是函数而不是方法,即不会检查 this。相关背景见 whatwg/streams#1005heycam/webidl#819

7.2.3. 构造函数与属性

strategy = new ByteLengthQueuingStrategy({ highWaterMark })

创建一个新的 ByteLengthQueuingStrategy,使用提供的 高水位线

注意,所提供的高水位线不会提前验证。如果为负数、NaN 或非数字,则生成的 ByteLengthQueuingStrategy 会导致对应流构造函数抛出异常。

highWaterMark = strategy.highWaterMark

返回构造函数提供的高水位线

strategy.size(chunk)

通过返回 chunkbyteLength 属性值来衡量其大小。

new ByteLengthQueuingStrategy(init) 构造步骤如下:
  1. this.[[highWaterMark]] 设为 init["highWaterMark"]。

highWaterMark getter 步骤如下:
  1. 返回 this.[[highWaterMark]]

size getter 步骤如下:
  1. 返回 this相关全局对象字节长度队列策略 size 函数

7.3. CountQueuingStrategy

处理通用对象流时常见的队列策略是简单计数已累计的数据块数量,直到该数值达到指定高水位线。因此该策略也作为内建提供。

创建可读流可写流时,可以直接提供计数队列策略:
const stream = new ReadableStream(
  { ... },
  new CountQueuingStrategy({ highWaterMark: 10 })
);

在此情况下,可读流的底层 source可入队 10 个(任意类型的)数据块,然后可读流实现才会向底层 source 发出背压信号。

const stream = new WritableStream(
  { ... },
  new CountQueuingStrategy({ highWaterMark: 5 })
);

在此情况下,可写流的内部队列可累计 5 个(任意类型的)数据块,等待先前写入底层 sink 完成,然后可写流才会向生产者发出背压信号。

7.3.1. 接口定义

CountQueuingStrategy 类的 Web IDL 定义如下:

[Exposed=*]
interface CountQueuingStrategy {
  constructor(QueuingStrategyInit init);

  readonly attribute unrestricted double highWaterMark;
  readonly attribute Function size;
};

7.3.2. 内部槽

CountQueuingStrategy 实例有一个 [[highWaterMark]] 内部槽,存储构造函数中给定的值。

此外,每个 global object globalObject 都有一个 计数队列策略 size 函数,其值初始化如下:
  1. steps 如下:

    1. 返回 1。

  2. F 为 ! CreateBuiltinFunction(steps, 0, "size", « », globalObject相关 Realm)。

  3. globalObject计数队列策略 size 函数 设为一个 Function,代表对 F 的引用,callback contextglobalObject相关设置对象

此设计带有历史原因。目的是确保 size 是函数而不是方法,即不会检查 this。相关背景见 whatwg/streams#1005heycam/webidl#819

7.3.3. 构造函数与属性

strategy = new CountQueuingStrategy({ highWaterMark })

创建一个新的 CountQueuingStrategy,使用提供的 高水位线

注意,所提供的高水位线不会提前验证。如果为负数、NaN 或非数字,则生成的 CountQueuingStrategy 会导致对应流构造函数抛出异常。

highWaterMark = strategy.highWaterMark

返回构造函数提供的高水位线

strategy.size(chunk)

通过始终返回 1 来衡量 chunk 的大小。这样保证队列总大小等于队列中的数据块数量。

new CountQueuingStrategy(init) 构造步骤如下:
  1. this.[[highWaterMark]] 设为 init["highWaterMark"]。

highWaterMark getter 步骤如下:
  1. 返回 this.[[highWaterMark]]

size getter 步骤如下:
  1. 返回 this相关全局对象计数队列策略 size 函数

7.4. 抽象操作

以下算法由流构造函数用于从 QueuingStrategy 字典中提取相关内容。

ExtractHighWaterMark(strategy, defaultHWM) 执行以下步骤:
  1. 如果 strategy["highWaterMark"] 不存在,返回 defaultHWM

  2. highWaterMarkstrategy["highWaterMark"]。

  3. 如果 highWaterMark 是 NaN 或 highWaterMark < 0,抛出 RangeError 异常。

  4. 返回 highWaterMark

+∞ 明确允许作为有效的高水位线。它会导致背压永不发生。

ExtractSizeAlgorithm(strategy) 执行以下步骤:
  1. 如果 strategy["size"] 不存在,返回一个总是返回 1 的算法。

  2. 返回一个算法,执行如下步骤,参数为 chunk

    1. 返回 调用 strategy["size"], 参数列表为 « chunk »。

8. 支持性抽象操作

下列抽象操作用于支持多种类型流的实现,因此未在上面主要章节中分组。

8.1. 带大小的队列

本规范中的流使用“带大小的队列”数据结构来存储排队的值及其确定的大小。多个规范对象都包含一个带大小的队列,通过对象拥有一对内部槽表示,总命名为 [[queue]] 和 [[queueTotalSize]]。[[queue]] 是 列表,其中每项为 带大小的值,而 [[queueTotalSize]] 是 JavaScript Number (即双精度浮点数)。

操作包含带大小队列的对象时,下列抽象操作用于确保两个内部槽保持同步。

由于浮点运算精度有限,这里规定的通过 [[queueTotalSize]] 槽保持累积总数的框架,与累加 [[queue]] 中所有数据块的大小并不完全等价。(不过,只有当数据块间大小差异极大(约 1015),或排队数据块数达数万亿级别时才有差异。)

以下定义中,带大小的值是一个 结构体,含两个 成员valuesize

DequeueValue(container) 步骤如下:
  1. 断言:container 有 [[queue]] 和 [[queueTotalSize]] 内部槽。

  2. 断言:container.[[queue]] 不是

  3. valueWithSizecontainer.[[queue]][0]。

  4. container.[[queue]] 移除 valueWithSize

  5. container.[[queueTotalSize]] 为 container.[[queueTotalSize]] − valueWithSizesize

  6. container.[[queueTotalSize]] < 0,则令其为 0。(可能因舍入误差出现。)

  7. 返回 valueWithSizevalue

EnqueueValueWithSize(container, value, size) 步骤如下:
  1. 断言:container 有 [[queue]] 和 [[queueTotalSize]] 内部槽。

  2. 如果 ! IsNonNegativeNumber(size) 为 false,则抛出 RangeError 异常。

  3. 如果 size 为 +∞,则抛出 RangeError 异常。

  4. 追加一个新的带大小的值,其 valuevaluesizesizecontainer.[[queue]]。

  5. container.[[queueTotalSize]] 为 container.[[queueTotalSize]] + size

PeekQueueValue(container) 步骤如下:
  1. 断言:container 有 [[queue]] 和 [[queueTotalSize]] 内部槽。

  2. 断言:container.[[queue]] 不是

  3. valueWithSizecontainer.[[queue]][0]。

  4. 返回 valueWithSizevalue

ResetQueue(container) 步骤如下:
  1. 断言:container 有 [[queue]] 和 [[queueTotalSize]] 内部槽。

  2. container.[[queue]] 为新的空 列表

  3. container.[[queueTotalSize]] 为 0。

8.2. 可转移流

可转移流通过一种特殊的恒等变换实现,其可写端位于一个realm,而可读端位于另一个 realm。以下抽象操作用于实现这些“跨 realm 变换”。

CrossRealmTransformSendError(port, error) 步骤如下:
  1. 执行 PackAndPostMessage(port, "error", error),并丢弃结果。

当执行此抽象操作时已处于错误状态,无法处理更多错误,故直接丢弃。

PackAndPostMessage(port, type, value) 步骤如下:
  1. messageOrdinaryObjectCreate(null)。

  2. 执行 ! CreateDataProperty(message, "type", type)。

  3. 执行 ! CreateDataProperty(message, "value", value)。

  4. targetPort 为与 port 关联的 port(如果有);否则设为 null。

  5. options 为 «[ "transfer" → « » ]»。

  6. 消息端口 post message 步骤,传入 targetPortmessageoptions

使用 JavaScript 对象进行传输,避免重复消息端口 post message 步骤。对象原型设为 null,避免受 %Object.prototype% 干扰。

PackAndPostMessageHandlingError(port, type, value) 步骤如下:
  1. resultPackAndPostMessage(port, type, value)。

  2. 如果 result 是 abrupt completion,

    1. 执行 ! CrossRealmTransformSendError(port, result.[[Value]])。

  3. 返回 result 作为完成记录。

SetUpCrossRealmTransformReadable(stream, port) 步骤如下:
  1. 执行 ! InitializeReadableStream(stream)。

  2. controller 为新建 ReadableStreamDefaultController

  3. portmessage 事件添加处理器,步骤如下:

    1. data 为消息数据。

    2. 断言:data 是对象

    3. type 为 ! Get(data, "type")。

    4. value 为 ! Get(data, "value")。

    5. 断言:type 是字符串

    6. 如果 type 是 "chunk",

      1. 执行 ! ReadableStreamDefaultControllerEnqueue(controller, value)。

    7. 否则如果 type 是 "close",

      1. 执行 ! ReadableStreamDefaultControllerClose(controller)。

      2. 断开 port 的关联。

    8. 否则如果 type 是 "error",

      1. 执行 ! ReadableStreamDefaultControllerError(controller, value)。

      2. 断开 port 的关联。

  4. portmessageerror 事件添加处理器,步骤如下:

    1. error 为新建 "DataCloneError" DOMException

    2. 执行 ! CrossRealmTransformSendError(port, error)。

    3. 执行 ! ReadableStreamDefaultControllerError(controller, error)。

    4. 断开 port 的关联。

  5. 启用 port端口消息队列

  6. startAlgorithm 为返回 undefined 的算法。

  7. pullAlgorithm 为如下步骤:

    1. 执行 ! PackAndPostMessage(port, "pull", undefined)。

    2. 返回已解决的 promise,值为 undefined。

  8. cancelAlgorithm 为如下步骤,参数为 reason

    1. resultPackAndPostMessageHandlingError(port, "error", reason)。

    2. 断开 port 的关联。

    3. 如果 result 是 abrupt completion,返回拒绝的 promise,值为 result.[[Value]]。

    4. 否则,返回已解决的 promise,值为 undefined。

  9. sizeAlgorithm 为总是返回 1 的算法。

  10. 执行 ! SetUpReadableStreamDefaultController(stream, controller, startAlgorithm, pullAlgorithm, cancelAlgorithm, 0, sizeAlgorithm)。

建议实现时显式处理此算法中断言的失败情况,因为输入可能来自不可信环境,否则可能导致安全问题。

SetUpCrossRealmTransformWritable(stream, port) 步骤如下:
  1. 执行 ! InitializeWritableStream(stream)。

  2. controller 为新建 WritableStreamDefaultController

  3. backpressurePromise新的 promise

  4. portmessage 事件添加处理器,步骤如下:

    1. data 为消息数据。

    2. 断言:data 是对象

    3. type 为 ! Get(data, "type")。

    4. value 为 ! Get(data, "value")。

    5. 断言:type 是字符串

    6. 如果 type 是 "pull",

      1. 如果 backpressurePromise 不为 undefined,

        1. 解决 backpressurePromise,值为 undefined。

        2. backpressurePromise 为 undefined。

    7. 否则如果 type 是 "error",

      1. 执行 ! WritableStreamDefaultControllerErrorIfNeeded(controller, value)。

      2. 如果 backpressurePromise 不为 undefined,

        1. 解决 backpressurePromise,值为 undefined。

        2. backpressurePromise 为 undefined。

  5. portmessageerror 事件添加处理器,步骤如下:

    1. error 为新建 "DataCloneError" DOMException

    2. 执行 ! CrossRealmTransformSendError(port, error)。

    3. 执行 ! WritableStreamDefaultControllerErrorIfNeeded(controller, error)。

    4. 断开 port 的关联。

  6. 启用 port端口消息队列

  7. startAlgorithm 为返回 undefined 的算法。

  8. writeAlgorithm 为如下步骤,参数为 chunk

    1. 如果 backpressurePromise 为 undefined,令 backpressurePromise已解决的 promise,值为 undefined。

    2. 返回 响应 backpressurePromise 的如下 fulfill 步骤:

      1. backpressurePromise新的 promise

      2. resultPackAndPostMessageHandlingError(port, "chunk", chunk)。

      3. 如果 result 是 abrupt completion,

        1. 断开 port 的关联。

        2. 返回拒绝的 promise,值为 result.[[Value]]。

      4. 否则,返回已解决的 promise,值为 undefined。

  9. closeAlgorithm 为如下步骤:

    1. 执行 ! PackAndPostMessage(port, "close", undefined)。

    2. 断开 port 的关联。

    3. 返回已解决的 promise,值为 undefined。

  10. abortAlgorithm 为如下步骤,参数为 reason

    1. resultPackAndPostMessageHandlingError(port, "error", reason)。

    2. 断开 port 的关联。

    3. 如果 result 是 abrupt completion,返回拒绝的 promise,值为 result.[[Value]]。

    4. 否则,返回已解决的 promise,值为 undefined。

  11. sizeAlgorithm 为总是返回 1 的算法。

  12. 执行 ! SetUpWritableStreamDefaultController(stream, controller, startAlgorithm, writeAlgorithm, closeAlgorithm, abortAlgorithm, 1, sizeAlgorithm)。

建议实现时显式处理此算法中断言的失败情况,因为输入可能来自不可信环境,否则可能导致安全问题。

8.3. 杂项

以下抽象操作属于各种实用工具。

CanTransferArrayBuffer(O) 步骤如下:
  1. 断言:O 是对象

  2. 断言:O 有 [[ArrayBufferData]] 内部槽。

  3. 如果 ! IsDetachedBuffer(O) 为真,返回 false。

  4. 如果 SameValue(O.[[ArrayBufferDetachKey]], undefined) 为假,返回 false。

  5. 返回 true。

IsNonNegativeNumber(v) 步骤如下:
  1. 如果 v 不是数字,返回 false。

  2. 如果 v 是 NaN,返回 false。

  3. 如果 v < 0,返回 false。

  4. 返回 true。

TransferArrayBuffer(O) 步骤如下:
  1. 断言:! IsDetachedBuffer(O) 为假。

  2. arrayBufferDataO.[[ArrayBufferData]]。

  3. arrayBufferByteLengthO.[[ArrayBufferByteLength]]。

  4. 执行 ? DetachArrayBuffer(O)。

    O 的 [[ArrayBufferDetachKey]] 非 undefined(如 WebAssembly.Memorybuffer)则此步骤会抛异常。 [WASM-JS-API-1]

  5. 返回新建的 ArrayBuffer 对象,创建于当前 Realm,其 [[ArrayBufferData]] 内部槽值为 arrayBufferData,[[ArrayBufferByteLength]] 内部槽值为 arrayBufferByteLength

CloneAsUint8Array(O) 步骤如下:
  1. 断言:O 是对象

  2. 断言:O 有 [[ViewedArrayBuffer]] 内部槽。

  3. 断言:! IsDetachedBuffer(O.[[ViewedArrayBuffer]]) 为假。

  4. buffer 为 ? CloneArrayBuffer(O.[[ViewedArrayBuffer]], O.[[ByteOffset]], O.[[ByteLength]], %ArrayBuffer%)。

  5. array 为 ! Construct(%Uint8Array%, « buffer »)。

  6. 返回 array

StructuredClone(v) 步骤如下:
  1. serialized 为 ? StructuredSerialize(v)。

  2. 返回 ? StructuredDeserialize(serialized, 当前 Realm)。

CanCopyDataBlockBytes(toBuffer, toIndex, fromBuffer, fromIndex, count) 步骤如下:
  1. 断言:toBuffer 是对象

  2. 断言:toBuffer 有 [[ArrayBufferData]] 内部槽。

  3. 断言:fromBuffer 是对象

  4. 断言:fromBuffer 有 [[ArrayBufferData]] 内部槽。

  5. 如果 toBufferfromBuffer,返回 false。

  6. 如果 ! IsDetachedBuffer(toBuffer) 为真,返回 false。

  7. 如果 ! IsDetachedBuffer(fromBuffer) 为真,返回 false。

  8. 如果 toIndex + count > toBuffer.[[ArrayBufferByteLength]],返回 false。

  9. 如果 fromIndex + count > fromBuffer.[[ArrayBufferByteLength]],返回 false。

  10. 返回 true。

9. 在其他规范中使用流

本标准的大部分内容关注于流的内部机制。其他规范通常无需关心这些细节,而应通过本标准定义的各种 IDL 类型及以下定义进行接口对接。

规范不应直接检查或操作本标准定义的各种内部槽。同样,也不应使用此处定义的抽象操作。直接使用可能破坏本标准维护的不变式。

如果你的规范需要以本节未支持的方式与流接口,请提交 issue。本节将根据需要有机扩展。

9.1. 可读流

9.1.1. 创建与操作

设置新通过 Web IDL 创建ReadableStream 对象 stream,给定可选算法 pullAlgorithm、可选算法 cancelAlgorithm、可选数字 highWaterMark(默认 1)、以及可选算法 sizeAlgorithm,请执行以下步骤。如果给定,pullAlgorithmcancelAlgorithm 可返回 promise。若给定,sizeAlgorithm 必须接受数据块对象并返回数字;若给定,highWaterMark 必须为非负且非 NaN 数字。
  1. startAlgorithm 为返回 undefined 的算法。

  2. pullAlgorithmWrapper 为如下算法:

    1. result 为执行 pullAlgorithm 的结果(如提供,否则为 null)。如抛出异常 e,返回拒绝的 promise,值为 e

    2. 如果 resultPromise,返回 result

    3. 返回已解决的 promise,值为 undefined。

  3. cancelAlgorithmWrapper 为如下算法(参数 reason):

    1. result 为执行 cancelAlgorithm(值为 reason)的结果(如提供,否则为 null)。如抛出异常 e,返回拒绝的 promise,值为 e

    2. 如果 resultPromise,返回 result

    3. 返回已解决的 promise,值为 undefined。

  4. 若未给定 sizeAlgorithm,则设为总是返回 1 的算法。

  5. 执行 ! InitializeReadableStream(stream)。

  6. controller 为新建 ReadableStreamDefaultController

  7. 执行 ! SetUpReadableStreamDefaultController(stream, controller, startAlgorithm, pullAlgorithmWrapper, cancelAlgorithmWrapper, highWaterMark, sizeAlgorithm)。

设置支持字节读取新通过 Web IDL 创建ReadableStream 对象 stream,给定可选算法 pullAlgorithm、可选算法 cancelAlgorithm、可选数字 highWaterMark(默认 0),执行以下步骤。如果给定,pullAlgorithmcancelAlgorithm 可返回 promise。若给定,highWaterMark 必须为非负且非 NaN 数字。
  1. startAlgorithm 为返回 undefined 的算法。

  2. pullAlgorithmWrapper 为如下算法:

    1. result 为执行 pullAlgorithm 的结果(如提供,否则为 null)。如抛出异常 e,返回拒绝的 promise,值为 e

    2. 如果 resultPromise,返回 result

    3. 返回已解决的 promise,值为 undefined。

  3. cancelAlgorithmWrapper 为如下算法:

    1. result 为执行 cancelAlgorithm 的结果(如提供,否则为 null)。如抛出异常 e,返回拒绝的 promise,值为 e

    2. 如果 resultPromise,返回 result

    3. 返回已解决的 promise,值为 undefined。

  4. 执行 ! InitializeReadableStream(stream)。

  5. controller 为新建 ReadableByteStreamController

  6. 执行 ! SetUpReadableByteStreamController(stream, controller, startAlgorithm, pullAlgorithmWrapper, cancelAlgorithmWrapper, highWaterMark, undefined)。

从其他规范创建 ReadableStream 的过程分两步:
  1. readableStream 为新建 ReadableStream

  2. readableStream 执行 设置,参数为 ...。

ReadableStream 的子类会直接在构造步骤内对 设置设置支持字节读取操作使用 this 值。


以下算法仅适用于经上述设置设置支持字节读取算法初始化的 ReadableStream 实例(不适用于例如 web 开发者创建的实例):

ReadableStream stream填满高水位线所需大小,按如下步骤:
  1. 如果 stream 不是 可读,则返回 0。

  2. 如果 stream.[[controller]] 实现 ReadableByteStreamController, 返回 ! ReadableByteStreamControllerGetDesiredSize(stream.[[controller]])。

  3. 返回 ! ReadableStreamDefaultControllerGetDesiredSize(stream.[[controller]])。

ReadableStream 需要更多数据,若其填满高水位线所需大小大于零。

关闭 ReadableStream stream
  1. 如果 stream.[[controller]] 实现 ReadableByteStreamController

    1. 执行 ! ReadableByteStreamControllerClose(stream.[[controller]])。

    2. 如果 stream.[[controller]].[[pendingPullIntos]] 不是 ,则执行 ! ReadableByteStreamControllerRespond(stream.[[controller]], 0)。

  2. 否则,执行 ! ReadableStreamDefaultControllerClose(stream.[[controller]])。

错误一个 ReadableStream stream,给定 JavaScript 值 e
  1. 如果 stream.[[controller]] 实现 ReadableByteStreamController, 则执行 ! ReadableByteStreamControllerError(stream.[[controller]], e)。

  2. 否则,执行 ! ReadableStreamDefaultControllerError(stream.[[controller]], e)。

入队 JavaScript 值 chunkReadableStream stream
  1. 如果 stream.[[controller]] 实现 ReadableStreamDefaultController

    1. 执行 ! ReadableStreamDefaultControllerEnqueue(stream.[[controller]], chunk)。

  2. 否则,

    1. 断言:stream.[[controller]] 实现 ReadableByteStreamController

    2. 断言:chunkArrayBufferView

    3. byobViewstream当前 BYOB 请求视图

    4. 如果 byobView 非 null 且 chunk.[[ViewedArrayBuffer]] 为 byobView.[[ViewedArrayBuffer]],则:

      1. 断言:chunk.[[ByteOffset]] 为 byobView.[[ByteOffset]]。

      2. 断言:chunk.[[ByteLength]] ≤ byobView.[[ByteLength]]。

        这些断言确保调用者不会在当前 BYOB 请求视图写入请求范围之外。

      3. 执行 ? ReadableByteStreamControllerRespond(stream.[[controller]], chunk.[[ByteLength]])。

    5. 否则,执行 ? ReadableByteStreamControllerEnqueue(stream.[[controller]], chunk)。


以下算法仅适用于经上述设置支持字节读取算法初始化的 ReadableStream 实例:

ReadableStream stream当前 BYOB 请求视图ArrayBufferView 或 null,按如下步骤判断:
  1. 断言:stream.[[controller]] 实现 ReadableByteStreamController

  2. byobRequest 为 ! ReadableByteStreamControllerGetBYOBRequest(stream.[[controller]])。

  3. byobRequest 为 null,则返回 null。

  4. 返回 byobRequest.[[view]]

规范不得转移分离 当前 BYOB 请求视图的底层 buffer。

实现可以做类似转移的操作,比如希望从其他线程写入内存,但需对 入队关闭算法做出调整以保持可观察一致性。在规范层面,转移和分离是禁止的。

规范应尽量写入非 null 的 当前 BYOB 请求视图,然后用该视图调用 入队。只有在 创建ArrayBufferView 用于 入队时,当前 BYOB 请求视图为 null 或手头字节多于视图字节长度。这样可避免不必要的复制,更好地尊重流的消费者需求。

如下从字节源 pull算法实现了这些要求,适用于字节来自作为规范级字节序列的常见情况,该序列代表底层字节源。注意其保守地保留字节在字节序列中,而不是积极入队,因此调用者可用剩余字节数作为背压信号。

从字节序列 pull,即将字节序列 bytes导入 ReadableStream stream
  1. 断言:stream.[[controller]] 实现 ReadableByteStreamController

  2. availablebytes长度

  3. desiredSizeavailable

  4. stream当前 BYOB 请求视图非 null,则 desiredSize 设为 stream当前 BYOB 请求视图字节长度

  5. pullSizeavailabledesiredSize 较小者。

  6. pulledbytes 的前 pullSize 个字节。

  7. bytes 删除前 pullSize 个字节。

  8. stream当前 BYOB 请求视图非 null,则:

    1. 写入 pulledstream当前 BYOB 请求视图

    2. 执行 ? ReadableByteStreamControllerRespond(stream.[[controller]], pullSize)。

  9. 否则,

    1. view创建pulledUint8Array,位于 stream相关 Realm

    2. 执行 ? ReadableByteStreamControllerEnqueue(stream.[[controller]], view)。

规范不得在写入当前 BYOB 请求视图从字节序列 pull后再关闭对应的 ReadableStream

9.1.2. 读取

以下算法可用于任意 ReadableStream 实例,包括 Web 开发者创建的实例。所有算法在具体操作上都可能失败,调用规范应处理这些失败。

要为 ReadableStream stream获取 reader,返回 ? AcquireReadableStreamDefaultReader(stream)。 结果为 ReadableStreamDefaultReader

如果 stream锁定,此操作会抛异常。

要从 ReadableStreamDefaultReader reader读取一个数据块,给定 读取请求 readRequest,执行 ! ReadableStreamDefaultReaderRead(reader, readRequest)。

要从 ReadableStreamDefaultReader reader读取所有字节,给定 successSteps(接受 字节序列的算法)和 failureSteps(接受 JavaScript 值的算法):使用 读取循环,参数为 reader、新建 字节序列successStepsfailureSteps

对上述算法而言,读取循环,参数为 readerbytessuccessStepsfailureSteps,步骤如下:
  1. readRequest 为新建读取请求,包含如下成员

    chunk steps,参数 chunk
    1. 如果 chunk 不是 Uint8Array 对象,调用 failureSteps,参数为 TypeError,并终止这些步骤。

    2. chunk 代表的字节追加到 bytes

    3. 使用 读取循环,参数为 readerbytessuccessStepsfailureSteps

      直接递归实现可能导致栈溢出,实际应使用非递归变体、微任务队列或更直接的字节读取方法(如后述)。

    close steps
    1. 调用 successSteps,参数为 bytes

    error steps,参数 e
    1. 调用 failureSteps,参数为 e

  2. 执行 ! ReadableStreamDefaultReaderRead(reader, readRequest)。

由于 reader 对应的 ReadableStream 具有独占访问权,实际读取机制不可观察。实现可采用更直接机制,如获取并使用 ReadableStreamBYOBReader 或直接访问数据块。

释放 ReadableStreamDefaultReader reader,执行 ! ReadableStreamDefaultReaderRelease(reader)。

取消 ReadableStreamDefaultReader reader,参数 reason,执行 ! ReadableStreamReaderGenericCancel(reader, reason)。返回值为 promise,值为 undefined 或拒绝原因。

取消 ReadableStream stream,参数 reason,返回 ! ReadableStreamCancel(stream, reason)。返回值为 promise,值为 undefined 或拒绝原因。

分流 ReadableStream stream, 返回 ? ReadableStreamTee(stream, true)。

因第二参数为 true,返回的第二分支中的数据块会从第一分支克隆(用 HTML 的可序列化对象机制),防止其中一个分支的消费影响另一个分支。

9.1.3. 自省

以下谓词可用于任意 ReadableStream 对象。但除了判断流是否锁定外,其他自省不能通过公开 JS API 实现,规范应使用 § 9.1.2 读取中的算法(例如不要测试流是否可读,而应尝试获取 reader并处理异常)。

ReadableStream stream 可读,当 stream.[[state]] 为 "readable" 时。

ReadableStream stream 已关闭,当 stream.[[state]] 为 "closed" 时。

ReadableStream stream 已出错,当 stream.[[state]] 为 "errored" 时。

ReadableStream stream 锁定,当 ! IsReadableStreamLocked(stream) 返回 true 时。

ReadableStream stream 已扰动,当 stream.[[disturbed]] 为 true 时。

表示流是否被读取或取消过。比本节其他谓词更不建议使用,因为 Web 开发者甚至无法间接访问此信息,不应以此为分支平台行为。

9.2. 可写流

9.2.1. 创建与操作

设置新通过 Web IDL 创建WritableStream 对象 stream,给定算法 writeAlgorithm、可选算法 closeAlgorithm、可选算法 abortAlgorithm、可选数字 highWaterMark(默认 1)、可选算法 sizeAlgorithm,执行以下步骤。writeAlgorithm 必须是接受数据块对象并返回 promise 的算法。若给定,closeAlgorithmabortAlgorithm 可返回 promise。若给定,sizeAlgorithm 必须接受数据块对象并返回数字;若给定,highWaterMark 必须为非负且非 NaN 数字。
  1. startAlgorithm 为返回 undefined 的算法。

  2. closeAlgorithmWrapper 为如下算法:

    1. result 为执行 closeAlgorithm 的结果(如提供,否则为 null)。如抛出异常 e,返回拒绝的 promise,值为 e

    2. 如果 resultPromise,返回 result

    3. 返回已解决的 promise,值为 undefined。

  3. abortAlgorithmWrapper 为如下算法(参数 reason):

    1. result 为执行 abortAlgorithm(值为 reason)的结果(如提供,否则为 null)。如抛出异常 e,返回拒绝的 promise,值为 e

    2. 如果 resultPromise,返回 result

    3. 返回已解决的 promise,值为 undefined。

  4. 若未给定 sizeAlgorithm,则设为总是返回 1 的算法。

  5. 执行 ! InitializeWritableStream(stream)。

  6. controller 为新建 WritableStreamDefaultController

  7. 执行 ! SetUpWritableStreamDefaultController(stream, controller, startAlgorithm, writeAlgorithm, closeAlgorithmWrapper, abortAlgorithmWrapper, highWaterMark, sizeAlgorithm)。

其他规范在构造 writeAlgorithm 时应避免对给定数据块进行并行读取,否则会破坏 JS 的运行至完成语义。可通过同步复制或转移给定值避免,例如 StructuredSerializeWithTransfer获取 buffer source 字节副本转移 ArrayBuffer。例外情况:数据块SharedArrayBuffer 时,允许并行操作。

从其他规范创建 WritableStream 的过程分两步:
  1. writableStream 为新建 WritableStream

  2. writableStream 执行 设置,参数为 ...。

WritableStream 的子类会直接在构造步骤内对 设置操作使用 this 值。


以下定义仅适用于经上述设置算法初始化的 WritableStream 实例:

错误 WritableStream stream,给定 JavaScript 值 e,执行 ! WritableStreamDefaultControllerErrorIfNeeded(stream.[[controller]], e)。

WritableStream streamsignalstream.[[controller]].[[abortController]]signal。规范可添加移除算法到该 AbortSignal,或查询其是否已中止及其中止原因abort reason

通常用法是,在设置 WritableStream后, 添加算法到其 signal,以中止对底层 sink的任何写入操作。然后,在 writeAlgorithm 内,一旦底层 sink响应,检查 signal 是否已中止,如是则用 signal 的中止原因拒绝返回的 promise。

9.2.2. 写入

以下算法可用于任意 WritableStream 实例,包括 Web 开发者创建的实例。所有算法在具体操作上都可能失败,调用规范应处理这些失败。

要为 WritableStream stream获取 writer,返回 ? AcquireWritableStreamDefaultWriter(stream)。 结果为 WritableStreamDefaultWriter

如果 stream 已锁定,此操作会抛出异常。

要对 WritableStreamDefaultWriter writer写入数据块,给定值 chunk, 返回 ! WritableStreamDefaultWriterWrite(writer, chunk)。

释放 WritableStreamDefaultWriter writer,执行 ! WritableStreamDefaultWriterRelease(writer)。

关闭 WritableStream stream,返回 ! WritableStreamClose(stream)。返回值为 promise,值为 undefined 或拒绝原因。

中止 WritableStream stream,参数 reason,返回 ! WritableStreamAbort(stream, reason)。返回值为 promise,值为 undefined 或拒绝原因。

9.3. 转换流

9.3.1. 创建与操作

设置 新通过 Web IDL 创建TransformStream 对象 stream,给定算法 transformAlgorithm、可选算法 flushAlgorithm、可选算法 cancelAlgorithm,请执行以下步骤。transformAlgorithm 以及(如给定)flushAlgorithmcancelAlgorithm可返回 promise。
  1. writableHighWaterMark 为 1。

  2. writableSizeAlgorithm 为总是返回 1 的算法。

  3. readableHighWaterMark 为 0。

  4. readableSizeAlgorithm 为总是返回 1 的算法。

  5. transformAlgorithmWrapper 为如下算法,参数 chunk

    1. result 为执行 transformAlgorithm(参数 chunk)的结果。如抛出异常 e,返回拒绝的 promise,值为 e

    2. 如果 resultPromise,返回 result

    3. 返回已解决的 promise,值为 undefined。

  6. flushAlgorithmWrapper 为如下算法:

    1. result 为执行 flushAlgorithm(如提供,否则为 null)的结果。如抛出异常 e,返回拒绝的 promise,值为 e

    2. 如果 resultPromise,返回 result

    3. 返回已解决的 promise,值为 undefined。

  7. cancelAlgorithmWrapper 为如下算法,参数 reason

    1. result 为执行 cancelAlgorithm(参数 reason)的结果(如提供,否则为 null)。如抛出异常 e,返回拒绝的 promise,值为 e

    2. 如果 resultPromise,返回 result

    3. 返回已解决的 promise,值为 undefined。

  8. startPromise已解决的 promise,值为 undefined。

  9. 执行 ! InitializeTransformStream(stream, startPromise, writableHighWaterMark, writableSizeAlgorithm, readableHighWaterMark, readableSizeAlgorithm)。

  10. controller 为新建 TransformStreamDefaultController

  11. 执行 ! SetUpTransformStreamDefaultController(stream, controller, transformAlgorithmWrapper, flushAlgorithmWrapper, cancelAlgorithmWrapper)。

其他规范构造 transformAlgorithm 时应避免对给定数据块进行并行读取,否则会破坏 JS 的运行至完成语义。可通过同步复制或转移给定值避免,例如 StructuredSerializeWithTransfer获取 buffer source 字节副本转移 ArrayBuffer。例外情况:数据块为 SharedArrayBuffer 时,允许并行操作。

从其他规范创建 TransformStream 的过程分两步:
  1. transformStream 为新建 TransformStream

  2. transformStream 执行 设置,参数为 ...。

TransformStream 的子类会直接在构造步骤内对 设置操作使用 this 值。

创建 一个恒等 TransformStream
  1. transformStream 为新建 TransformStream

  2. transformStream 执行 设置,其中 transformAlgorithm 设为如下算法:给定 chunk入队 chunktransformStream

  3. 返回 transformStream


以下算法仅适用于经上述设置算法初始化的 TransformStream 实例。通常作为 transformAlgorithmflushAlgorithm 的一部分调用。

入队 JavaScript 值 chunkTransformStream stream,执行 ! TransformStreamDefaultControllerEnqueue(stream.[[controller]], chunk)。

终止 TransformStream stream, 执行 ! TransformStreamDefaultControllerTerminate(stream.[[controller]])。

错误 TransformStream stream,给定 JavaScript 值 e,执行 ! TransformStreamDefaultControllerError(stream.[[controller]], e)。

9.3.2. 包装为自定义类

其他规范如果希望定义自定义转换流,可能不希望直接从 TransformStream 接口继承。相反,如果需要新类,可以自行创建独立的 Web IDL 接口,并采用如下 mixin:

interface mixin GenericTransformStream {
  readonly attribute ReadableStream readable;
  readonly attribute WritableStream writable;
};

任何平台对象只要包含GenericTransformStream mixin,就有一个相关联的 transform,它是一个实际的 TransformStream

readable getter 步骤:返回 thistransform.[[readable]]

writable getter 步骤:返回 thistransform.[[writable]]


包含 GenericTransformStream mixin 的 IDL 接口会自动拥有 readablewritable 属性。要定制该接口的行为,应在构造器(或其他初始化代码)中,将每个实例的 transform 设为新建的 TransformStream,并通过 transformAlgorithm 和(可选)flushAlgorithm参数进行适当自定义设置

注意:此模式在 Web 平台已有实例,如 CompressionStreamTextDecoderStream[COMPRESSION] [ENCODING]

若无需超出 TransformStream 基类提供的 API,无需创建包装类。最常见的驱动因素是需要自定义构造器步骤,但如果你的流不是为构造而设计,直接使用 TransformStream 即可。

9.4. 其他流对

除了上面讨论的转换流,规范通常会创建可读流可写流的配对。本节为此类情况提供一些指导。

在所有此类情况中,规范应使用 readablewritable 作为暴露流的属性名。不要使用其他名称(比如 input/outputreadableStream/writableStream),也不要使用方法或其他非属性的访问方式。

9.4.1. 双工流

最常见的可读/可写流配对是双工流,其中可读和可写流代表同一共享资源的两端,比如 socket、连接或设备。

规范双工流时最棘手的是如何处理诸如取消可读端、关闭或中止可写端等操作。可以选择让双工流“半开”,即一端的操作不影响另一端;也可以让操作影响另一端,例如指定可读端的 cancelAlgorithm关闭可写端。

一个基本双工流的例子(通过 JS 创建而非规范文本)见 § 10.8 使用同一底层资源包装的 { readable, writable } stream 配对。该例展示了影响传递行为。

另一个需要考虑的是异步获取双工流的创建方式,比如建立连接。推荐模式是有一个可构造类,其属性返回 promise,promise 完成时得到实际双工流对象。该对象还能暴露异步获取的信息,比如连接数据。容器类可以提供便捷 API,比如一次关闭整个连接而非只关闭某一端。

更复杂的双工流例子是正在规范化的 WebSocketStream。见其说明文档设计笔记

由于双工流遵循 readable/writable 属性约定,它们可以用于 pipeThrough()。这不总是有意义,但底层资源确实做某种变换时是方便的。

对于任意 WebSocket,pipeThrough 派生的双工流通常没有意义。但如果服务器专门实现为对收到的数据进行变换后回传,则这种用法既有用又便捷。

9.4.2. 端点配对

另一种可读/可写流配对是端点配对。此时可读和可写流代表一个较长管道的两端,目的是让 Web 开发代码在中间插入转换流

假设平台提供了 createEndpointPair(),Web 开发者可这样写代码:
const { readable, writable } = createEndpointPair();
await readable.pipeThrough(new TransformStream(...)).pipeTo(writable);

WebRTC Encoded Transform 就是这种技术的例子,其 RTCRtpScriptTransformer 接口同时有 readablewritable 属性。

虽然端点配对也遵循 readable/writable 属性约定,但把它们传给 pipeThrough() 没有意义。

9.5. 管道连接

ReadableStream readable 管道连接到 WritableStream writable,可选 boolean preventClose(默认 false)、可选 boolean preventAbort(默认 false)、可选 boolean preventCancel(默认 false)、可选 AbortSignal signal,按如下步骤。返回一个 Promise ,管道完成时 fulfilled,失败时 rejected。
  1. 断言:! IsReadableStreamLocked(readable) 为 false。

  2. 断言:! IsWritableStreamLocked(writable) 为 false。

  3. signalArgsignal(如给定),否则为 undefined。

  4. 返回 ! ReadableStreamPipeTo(readable, writable, preventClose, preventAbort, preventCancel, signalArg)。

若不关心返回的 promise,引用该概念可能有点别扭。最佳建议是“pipe readablewritable”。

ReadableStream readable 管道穿过 TransformStream transform,可选 boolean preventClose (默认 false)、可选 boolean preventAbort(默认 false)、可选 boolean preventCancel(默认 false)、可选 AbortSignal signal,按如下步骤。结果为 transform可读端
  1. 断言:! IsReadableStreamLocked(readable) 为 false。

  2. 断言:! IsWritableStreamLocked(transform.[[writable]]) 为 false。

  3. signalArgsignal(如给定),否则为 undefined。

  4. promise 为 ! ReadableStreamPipeTo(readable, transform.[[writable]], preventClose, preventAbort, preventCancel, signalArg)。

  5. promise.[[PromiseIsHandled]] 设为 true。

  6. 返回 transform.[[readable]]

要为 ReadableStream stream创建代理,执行如下步骤。结果是新 ReadableStream 对象,其数据从 stream 拉取,同时 stream 立即变为锁定扰动
  1. identityTransform创建恒等 TransformStream的结果。

  2. 返回 stream管道穿过 identityTransform的结果。

10. 创建流的示例

本节及所有子节均为非规范内容。

前面的标准示例主要关注如何使用流。这里展示如何创建流,使用 ReadableStreamWritableStream、 以及 TransformStream 构造器。

10.1. 具有底层推送源的可读流(无背压支持)

下述函数创建可读流,用于包装 WebSocket 实例 [WEBSOCKETS], 它们是推送源,不支持背压信号。示例说明了,当适配推送源时,通常大部分工作发生在 start() 方法中。

function makeReadableWebSocketStream(url, protocols) {
  const ws = new WebSocket(url, protocols);
  ws.binaryType = "arraybuffer";

  return new ReadableStream({
    start(controller) {
      ws.onmessage = event => controller.enqueue(event.data);
      ws.onclose = () => controller.close();
      ws.onerror = () => controller.error(new Error("WebSocket 出错了!"));
    },

    cancel() {
      ws.close();
    }
  });
}

我们可以用此函数为 websocket 创建可读流,并将该流 pipe 到任意可写流:

const webSocketStream = makeReadableWebSocketStream("wss://example.com:443/", "protocol");

webSocketStream.pipeTo(writableStream)
  .then(() => console.log("所有数据写入成功!"))
  .catch(e => console.error("发生错误!", e));
这种 websocket 包装方式将 websocket 消息直接解读为数据块。这对于 管道连接可写流转换流非常方便, 因为每个 websocket 消息都可以作为一个数据块消费或转换。

不过,很多人提到“为 websocket 添加流支持”时,其实希望能以流方式发送单个 websocket 消息,比如可在一个消息内传输文件,而无需客户端全部加载进内存。要实现此目的,应该允许单个 websocket 消息本身为 ReadableStream 实例。上面示例并没有实现这一点。

更多背景见 此讨论

10.2. 具有底层推送源并支持背压的可读流

下述函数返回可读流,用于包装“背压 socket”,假设对象 API 与 websocket 类似,但还可通过 readStopreadStart 方法暂停和恢复数据流。该示例展示了如何将背压应用到支持的底层源

function makeReadableBackpressureSocketStream(host, port) {
  const socket = createBackpressureSocket(host, port);

  return new ReadableStream({
    start(controller) {
      socket.ondata = event => {
        controller.enqueue(event.data);

        if (controller.desiredSize <= 0) {
          // 内部队列已满,将背压信号传递到底层源。
          socket.readStop();
        }
      };

      socket.onend = () => controller.close();
      socket.onerror = () => controller.error(new Error("Socket 出错了!"));
    },

    pull() {
      // 队列已清空但消费端还要数据,重启数据流(如之前已暂停)。
      socket.readStart();
    },

    cancel() {
      socket.close();
    }
  });
}

我们可以用此函数为“背压 socket”创建可读流,与 websocket 相同用法。但这次,当 pipe 到不能及时接受数据的目标,或长时间未读取时,会向 socket 发送背压信号。

10.3. 具有底层推送源的可读字节流(无背压支持)

下述函数返回可读字节流,用于包装假设的 UDP socket API, 包括 promise 返回的 select2() 方法,意为模拟 POSIX select(2) 系统调用。

由于 UDP 协议没有内建背压支持,desiredSize 给出的背压信号会被忽略,流保证当 socket 有数据但开发者未请求时,数据会进流的内部队列,避免内核队列溢出和数据丢失。

这对消费端与流的交互有有趣影响。若消费端读取速度慢于 socket 生产速度,数据块会一直留在流的内部队列。这时,用BYOB reader会有多一次复制(从流队列到开发者 buffer)。但如果消费端足够快,BYOB reader 可以实现零拷贝,直接写入开发者提供的 buffer。

(你可以想象更复杂的版本,使用 desiredSize 通知带外背压机制,比如 socket 发送消息调整数据速率。留给读者自行实现。)

const DEFAULT_CHUNK_SIZE = 65536;

function makeUDPSocketStream(host, port) {
  const socket = createUDPSocket(host, port);

  return new ReadableStream({
    type: "bytes",

    start(controller) {
      readRepeatedly().catch(e => controller.error(e));

      function readRepeatedly() {
        return socket.select2().then(() => {
          // socket 可读时可能没有 BYOB 请求,需要两种情况都处理。
          let bytesRead;
          if (controller.byobRequest) {
            const v = controller.byobRequest.view;
            bytesRead = socket.readInto(v.buffer, v.byteOffset, v.byteLength);
            if (bytesRead === 0) {
              controller.close();
            }
            controller.byobRequest.respond(bytesRead);
          } else {
            const buffer = new ArrayBuffer(DEFAULT_CHUNK_SIZE);
            bytesRead = socket.readInto(buffer, 0, DEFAULT_CHUNK_SIZE);
            if (bytesRead === 0) {
              controller.close();
            } else {
              controller.enqueue(new Uint8Array(buffer, 0, bytesRead));
            }
          }

          if (bytesRead === 0) {
            return;
          }

          return readRepeatedly();
        });
      }
    },

    cancel() {
      socket.close();
    }
  });
}

ReadableStream 实例可用 BYOB reader,享受上述优点和注意事项。

10.4. 具有底层拉取源的可读流

下面的函数返回可读流,用于包装 Node.js 文件系统 API(基本等价于 C 的 fopenfreadfclose 三件套)。文件是典型的拉取源。注意,与推送源示例不同,这里大部分操作发生在按需触发的 pull() 方法中,而不是在 start() 启动时。

const fs = require("fs").promises;
const CHUNK_SIZE = 1024;

function makeReadableFileStream(filename) {
  let fileHandle;
  let position = 0;

  return new ReadableStream({
    async start() {
      fileHandle = await fs.open(filename, "r");
    },

    async pull(controller) {
      const buffer = new Uint8Array(CHUNK_SIZE);

      const { bytesRead } = await fileHandle.read(buffer, 0, CHUNK_SIZE, position);
      if (bytesRead === 0) {
        await fileHandle.close();
        controller.close();
      } else {
        position += bytesRead;
        controller.enqueue(buffer.subarray(0, bytesRead));
      }
    },

    cancel() {
      return fileHandle.close();
    }
  });
}

我们可以像处理 socket 一样,创建和使用文件的可读流。

10.5. 具有底层拉取源的可读字节流

下面的函数返回可读字节流,可以高效零拷贝读取文件,同样用 Node.js 文件系统 API。不同于固定块大小 1024,它会尝试填满开发者提供的 buffer,实现完全控制。

const fs = require("fs").promises;
const DEFAULT_CHUNK_SIZE = 1024;

function makeReadableByteFileStream(filename) {
 let fileHandle;
 let position = 0;

  return new ReadableStream({
    type: "bytes",

    async start() {
      fileHandle = await fs.open(filename, "r");
    },

    async pull(controller) {
      // 即使消费端用默认 reader,auto-allocation 也会分配 buffer 并通过 byobRequest 传递给我们。
      const v = controller.byobRequest.view;

      const { bytesRead } = await fileHandle.read(v, 0, v.byteLength, position);
      if (bytesRead === 0) {
        await fileHandle.close();
        controller.close();
        controller.byobRequest.respond(0);
      } else {
        position += bytesRead;
        controller.byobRequest.respond(bytesRead);
      }
    },

    cancel() {
      return fileHandle.close();
    },

    autoAllocateChunkSize: DEFAULT_CHUNK_SIZE
  });
}

有了这个,我们可以为返回的 ReadableStream 创建和使用BYOB reader,也可以像往常一样创建默认 reader。底层字节源的低级字节追踪与默认 reader 的高层块式消费会由流实现自动适配。auto-allocation 功能通过 autoAllocateChunkSize 选项让代码更简单,无需像 § 10.3 具有底层推送源的可读字节流(无背压支持)那样手动分支。

10.6. 无背压或成功信号的可写流

下面的函数返回一个可写流,用于包装 WebSocket [WEBSOCKETS]。WebSocket 并没有办法通知某个数据块已成功发送(除非轮询 bufferedAmount,留给读者实现)。因此,这个可写流无法向生产者准确传递背压信号或写入成功/失败。即 writerwrite() 方法和 ready getter 返回的 promise 总是立即完成。

function makeWritableWebSocketStream(url, protocols) {
  const ws = new WebSocket(url, protocols);

  return new WritableStream({
    start(controller) {
      ws.onerror = () => {
        controller.error(new Error("WebSocket 出错了!"));
        ws.onclose = null;
      };
      ws.onclose = () => controller.error(new Error("服务器意外关闭连接!"));
      return new Promise(resolve => ws.onopen = resolve);
    },

    write(chunk) {
      ws.send(chunk);
      // 立即返回,因为 websocket 没有简单办法判断写入是否完成。
    },

    close() {
      return closeWS(1000);
    },

    abort(reason) {
      return closeWS(4000, reason && reason.message);
    },
  });

  function closeWS(code, reasonString) {
    return new Promise((resolve, reject) => {
      ws.onclose = e => {
        if (e.wasClean) {
          resolve();
        } else {
          reject(new Error("连接未正常关闭"));
        }
      };
      ws.close(code, reasonString);
    });
  }
}

我们可以用此函数为 websocket 创建可写流,并将任意可读流 pipe 到它:

const webSocketStream = makeWritableWebSocketStream("wss://example.com:443/", "protocol");

readableStream.pipeTo(webSocketStream)
  .then(() => console.log("所有数据写入成功!"))
  .catch(e => console.error("发生错误!", e));

关于将 websocket 包装为流的方式,参见前文说明

10.7. 具有背压和成功信号的可写流

下面的函数返回可写流,用于包装 Node.js 文件系统 API(基本映射到 C 的 fopenfwritefclose 三件套)。由于我们包装的 API 能告知写入何时成功,这个流能传递背压信号以及单次写入是否成功或失败。

const fs = require("fs").promises;

function makeWritableFileStream(filename) {
  let fileHandle;

  return new WritableStream({
    async start() {
      fileHandle = await fs.open(filename, "w");
    },

    write(chunk) {
      return fileHandle.write(chunk, 0, chunk.length);
    },

    close() {
      return fileHandle.close();
    },

    abort() {
      return fileHandle.close();
    }
  });
}

我们可以用此函数为文件创建可写流,并向其写入单个数据块

const fileStream = makeWritableFileStream("/example/path/on/fs.txt");
const writer = fileStream.getWriter();

writer.write("To stream, or not to stream\n");
writer.write("That is the question\n");

writer.close()
  .then(() => console.log("数据块写入并流关闭成功!"))
  .catch(e => console.error(e));

注意,如果某次 fileHandle.write 调用用时较长,返回的 promise 会晚些完成。此时后续写入会排队,存储在流的内部队列中。队列中数据块累计,会使 ready getter 返回 pending promise,提示生产者应停止写入(如有可能)。

可写流的写入队列机制尤其重要,因为如 Node.js 文档所述,“在同一文件多次调用 filehandle.write 且不等待 promise 是不安全的。”但写 makeWritableFileStream 时无需担心流实现保证了 底层 sinkwrite() 方法在之前 promise 完成前不会再次调用!

10.8. 包装同一底层资源的 { readable, writable } 流对

下面的函数返回 { readable, writable } 对象,readable 属性是可读流,writable 属性是可写流,两者包装同一个 websocket 底层资源。实质上,这结合了 § 10.1 具有底层推送源的可读流(无背压支持)§ 10.6 无背压或成功信号的可写流

同时展示了如何用 JavaScript 类创建可复用的底层 sink/source 抽象。

function streamifyWebSocket(url, protocol) {
  const ws = new WebSocket(url, protocols);
  ws.binaryType = "arraybuffer";

  return {
    readable: new ReadableStream(new WebSocketSource(ws)),
    writable: new WritableStream(new WebSocketSink(ws))
  };
}

class WebSocketSource {
  constructor(ws) {
    this._ws = ws;
  }

  start(controller) {
    this._ws.onmessage = event => controller.enqueue(event.data);
    this._ws.onclose = () => controller.close();

    this._ws.addEventListener("error", () => {
      controller.error(new Error("WebSocket 出错了!"));
    });
  }

  cancel() {
    this._ws.close();
  }
}

class WebSocketSink {
  constructor(ws) {
    this._ws = ws;
  }

  start(controller) {
    this._ws.onclose = () => controller.error(new Error("服务器意外关闭连接!"));
    this._ws.addEventListener("error", () => {
      controller.error(new Error("WebSocket 出错了!"));
      this._ws.onclose = null;
    });

    return new Promise(resolve => this._ws.onopen = resolve);
  }

  write(chunk) {
    this._ws.send(chunk);
  }

  close() {
    return this._closeWS(1000);
  }

  abort(reason) {
    return this._closeWS(4000, reason && reason.message);
  }

  _closeWS(code, reasonString) {
    return new Promise((resolve, reject) => {
      this._ws.onclose = e => {
        if (e.wasClean) {
          resolve();
        } else {
          reject(new Error("连接未正常关闭"));
        }
      };
      this._ws.close(code, reasonString);
    });
  }
}

我们可以用该函数创建的对象,通过标准流 API 与远程 websocket 交互:

const streamyWS = streamifyWebSocket("wss://example.com:443/", "protocol");
const writer = streamyWS.writable.getWriter();
const reader = streamyWS.readable.getReader();

writer.write("Hello");
writer.write("web socket!");

reader.read().then(({ value, done }) => {
  console.log("WebSocket 返回: ", value);
});

注意,这种实现下,取消 readable 端会隐式关闭 writable 端,关闭或中止 writable 端也会隐式关闭 readable 端。

关于将 websocket 包装为流的方式,参见前文说明

10.9. 替换模板标签的转换流

在数据流上用变量替换标签通常很有用,尤其当需要替换的部分相较整体数据很小。这个示例给出了简单做法。它将字符串映射为字符串,比如将模板 "Time: {{time}} Message: {{message}}" 转换为 "Time: 15:36 Message: hello",假定 { time: "15:36", message: "hello" } 被传给了 LipFuzzTransformersubstitutions 参数。

本例还展示了如何处理 chunk 包含部分数据、无法立即转换、需等待更多数据的情形。此时,部分模板标签会累积在 partialChunk 属性中,直到找到标签结束或流结束。

class LipFuzzTransformer {
  constructor(substitutions) {
    this.substitutions = substitutions;
    this.partialChunk = "";
    this.lastIndex = undefined;
  }

  transform(chunk, controller) {
    chunk = this.partialChunk + chunk;
    this.partialChunk = "";
    // lastIndex 是最后一次替换后的第一个字符索引。
    this.lastIndex = 0;
    chunk = chunk.replace(/\{\{([a-zA-Z0-9_-]+)\}\}/g, this.replaceTag.bind(this));
    // 用于判断字符串末尾是否有不完整模板标签的正则。
    const partialAtEndRegexp = /\{(\{([a-zA-Z0-9_-]+(\})?)?)?$/g;
    // 只处理未被替换的新字符。
    partialAtEndRegexp.lastIndex = this.lastIndex;
    this.lastIndex = undefined;
    const match = partialAtEndRegexp.exec(chunk);
    if (match) {
      this.partialChunk = chunk.substring(match.index);
      chunk = chunk.substring(0, match.index);
    }
    controller.enqueue(chunk);
  }

  flush(controller) {
    if (this.partialChunk.length > 0) {
      controller.enqueue(this.partialChunk);
    }
  }

  replaceTag(match, p1, offset) {
    let replacement = this.substitutions[p1];
    if (replacement === undefined) {
      replacement = "";
    }
    this.lastIndex = offset + replacement.length;
    return replacement;
  }
}

这里我们将需要传给 TransformStream 构造函数的 transformer 定义为类,这样便于跟踪实例数据。

可这样使用该类:

const data = { userName, displayName, icon, date };
const ts = new TransformStream(new LipFuzzTransformer(data));

fetchEvent.respondWith(
  fetch(fetchEvent.request.url).then(response => {
    const transformedBody = response.body
      // 解码二进制响应为字符串
      .pipeThrough(new TextDecoderStream())
      // 应用 LipFuzzTransformer
      .pipeThrough(ts)
      // 编码转换后的字符串
      .pipeThrough(new TextEncoderStream());
    return new Response(transformedBody);
  })
);

为简洁起见,LipFuzzTransformer未进行上下文转义。在实际应用中,模板系统需做上下文转义以保证安全和健壮性。

10.10. 由同步 mapper 函数创建的转换流

下面的函数允许通过同步 "mapper" 函数创建新的 TransformStream 实例,类似于 Array.prototype.map 传递的函数。它说明了 API 对于简单转换也很简洁。

function mapperTransformStream(mapperFunction) {
  return new TransformStream({
    transform(chunk, controller) {
      controller.enqueue(mapperFunction(chunk));
    }
  });
}

此函数可用于创建将所有输入变为大写的 TransformStream

const ts = mapperTransformStream(chunk => chunk.toUpperCase());
const writer = ts.writable.getWriter();
const reader = ts.readable.getReader();

writer.write("No need to shout");

// 输出 "NO NEED TO SHOUT":
reader.read().then(({ value }) => console.log(value));

虽然同步转换自身不会产生背压,但只有在没有背压时才会转换 chunk,因此不会浪费资源。

异常会自然地使流出错:

const ts = mapperTransformStream(chunk => JSON.parse(chunk));
const writer = ts.writable.getWriter();
const reader = ts.readable.getReader();

writer.write("[1, ");

// 会两次输出 SyntaxError:
reader.read().catch(e => console.error(e));
writer.write("{}").catch(e => console.error(e));

10.11. 用恒等转换流作为原语创建新可读流

恒等转换流pipeTo() 结合,是操作流的强大方法。本节包含两个此技术的例子。

有时自然地把可读流的 promise 当作可读流,只需一个简单适配器:

function promiseToReadable(promiseForReadable) {
  const ts = new TransformStream();

  promiseForReadable
      .then(readable => readable.pipeTo(ts.writable))
      .catch(reason => ts.writable.abort(reason))
      .catch(() => {});

  return ts.readable;
}

这里将数据 pipe 到writable 端后返回readable 端。如果 pipe 出错,就中止writable 端,错误会自动传播到返回的 readable 端。如果 writable 端被 pipeTo() 标记为 errored,则 abort() 会返回 rejection,可安全忽略。

更复杂的扩展是将多个可读流拼接为一个:

function concatenateReadables(readables) {
  const ts = new TransformStream();
  let promise = Promise.resolve();

  for (const readable of readables) {
    promise = promise.then(
     () => readable.pipeTo(ts.writable, { preventClose: true }),
     reason => {
       return Promise.all([
         ts.writable.abort(reason),
         readable.cancel(reason)
       ]);
     }
   );
  }

  promise.then(() => ts.writable.close(),
               reason => ts.writable.abort(reason))
         .catch(() => {});

  return ts.readable;
}

这里的错误处理很微妙,因为取消拼接流要取消所有输入流。但成功时很简单,只需依次将 readables 可迭代对象中的每个流 pipe 到恒等转换流writable 端,最后关闭它。readable 端即为所有流的块拼接结果并返回。背压处理方式不变。

致谢

编辑感谢以下人员对本规范的贡献: Anne van Kesteren、 AnthumChris、 Arthur Langereis、 Ben Kelly、 Bert Belder、 Brian di Palma、 Calvin Metcalf、 Dominic Tarr、 Ed Hager、 Eric Skoglund、 Forbes Lindesay、 Forrest Norvell、 Gary Blackwood、 Gorgi Kosev、 Gus Caplan、 贺师俊 (hax)、 Isaac Schlueter、 isonmad、 Jake Archibald、 Jake Verbaten、 James Pryor、 Janessa Det、 Jason Orendorff、 Jeffrey Yasskin、 Jeremy Roman、 Jens Nockert、 Lennart Grahl、 Luca Casonato、 Mangala Sadhu Sangeet Singh Khalsa、 Marcos Caceres、 Marvin Hagemeister、 Mattias Buelens、 Michael Mior、 Mihai Potra、 Nidhi Jaju、 Romain Bellessort、 Simon Menke、 Stephen Sugden、 Surma、 Tab Atkins、 Tanguy Krotoff、 Thorsten Lorenz、 Till Schneidereit、 Tim Caswell、 Trevor Norris、 tzik、 Will Chan、 Youenn Fablet、 平野裕 (Yutaka Hirano)、 以及 Xabier Rodríguez。 社区对本规范的参与远超预期;没有你们,我们无法完成这项工作。

本标准由 Adam Rice (Google, ricea@chromium.org)、Domenic Denicola (Google, d@domenic.me)、 Mattias Buelens,以及吉野剛史 (Takeshi Yoshino, tyoshino@chromium.org) 撰写。

知识产权

版权所有 © WHATWG (Apple, Google, Mozilla, Microsoft)。本作品采用知识共享署名 4.0 国际许可协议授权。若部分内容被纳入源代码,则该部分以BSD 三条款许可证授权。

这是 Living Standard。如需专利审核版本,请参阅 Living Standard Review Draft

索引

本规范定义的术语

引用定义的术语

参考文献

规范性引用

[DOM]
Anne van Kesteren. DOM 标准。Living Standard。 URL: https://dom.spec.whatwg.org/
[ECMASCRIPT]
ECMAScript 语言规范。URL: https://tc39.es/ecma262/multipage/
[HTML]
Anne van Kesteren 等. HTML 标准。 Living Standard。URL: https://html.spec.whatwg.org/multipage/
[IEEE-754]
IEEE 浮点运算标准。2019年7月22日。URL: https://ieeexplore.ieee.org/document/8766229
[INFRA]
Anne van Kesteren; Domenic Denicola. Infra 标准。Living Standard。URL: https://infra.spec.whatwg.org/
[WEBIDL]
Edgar Chen; Timothy Gu. Web IDL 标准。Living Standard。URL: https://webidl.spec.whatwg.org/

参考性引用

[COMPRESSION]
Adam Rice. Compression 标准。Living Standard。URL: https://compression.spec.whatwg.org/
[ENCODING]
Anne van Kesteren. Encoding 标准。Living Standard。URL: https://encoding.spec.whatwg.org/
[FETCH]
Anne van Kesteren. Fetch 标准。Living Standard。URL: https://fetch.spec.whatwg.org/
[SERVICE-WORKERS]
Yoshisato Yanagisawa; Monica CHINTALA. Service Workers。URL: https://w3c.github.io/ServiceWorker/
[WASM-JS-API-1]
Daniel Ehrenberg. WebAssembly JavaScript 接口。URL: https://webassembly.github.io/spec/js-api/
[WASM-JS-API-2]
. Ms2ger. WebAssembly JavaScript 接口。URL: https://webassembly.github.io/spec/js-api/
[WEBRTC-ENCODED-TRANSFORM]
Harald Alvestrand; Guido Urdaneta; youenn fablet. WebRTC 编码转换。URL: https://w3c.github.io/webrtc-encoded-transform/
[WEBSOCKETS]
Adam Rice. WebSockets 标准。Living Standard。URL: https://websockets.spec.whatwg.org/

IDL 索引

[Exposed=*, Transferable]
interface ReadableStream {
  constructor(optional object underlyingSource, optional QueuingStrategy strategy = {});

  static ReadableStream from(any asyncIterable);

  readonly attribute boolean locked;

  Promise<undefined> cancel(optional any reason);
  ReadableStreamReader getReader(optional ReadableStreamGetReaderOptions options = {});
  ReadableStream pipeThrough(ReadableWritablePair transform, optional StreamPipeOptions options = {});
  Promise<undefined> pipeTo(WritableStream destination, optional StreamPipeOptions options = {});
  sequence<ReadableStream> tee();

  async iterable<any>(optional ReadableStreamIteratorOptions options = {});
};

typedef (ReadableStreamDefaultReader or ReadableStreamBYOBReader) ReadableStreamReader;

enum ReadableStreamReaderMode { "byob" };

dictionary ReadableStreamGetReaderOptions {
  ReadableStreamReaderMode mode;
};

dictionary ReadableStreamIteratorOptions {
  boolean preventCancel = false;
};

dictionary ReadableWritablePair {
  required ReadableStream readable;
  required WritableStream writable;
};

dictionary StreamPipeOptions {
  boolean preventClose = false;
  boolean preventAbort = false;
  boolean preventCancel = false;
  AbortSignal signal;
};

dictionary UnderlyingSource {
  UnderlyingSourceStartCallback start;
  UnderlyingSourcePullCallback pull;
  UnderlyingSourceCancelCallback cancel;
  ReadableStreamType type;
  [EnforceRange] unsigned long long autoAllocateChunkSize;
};

typedef (ReadableStreamDefaultController or ReadableByteStreamController) ReadableStreamController;

callback UnderlyingSourceStartCallback = any (ReadableStreamController controller);
callback UnderlyingSourcePullCallback = Promise<undefined> (ReadableStreamController controller);
callback UnderlyingSourceCancelCallback = Promise<undefined> (optional any reason);

enum ReadableStreamType { "bytes" };

interface mixin ReadableStreamGenericReader {
  readonly attribute Promise<undefined> closed;

  Promise<undefined> cancel(optional any reason);
};

[Exposed=*]
interface ReadableStreamDefaultReader {
  constructor(ReadableStream stream);

  Promise<ReadableStreamReadResult> read();
  undefined releaseLock();
};
ReadableStreamDefaultReader includes ReadableStreamGenericReader;

dictionary ReadableStreamReadResult {
  any value;
  boolean done;
};

[Exposed=*]
interface ReadableStreamBYOBReader {
  constructor(ReadableStream stream);

  Promise<ReadableStreamReadResult> read(ArrayBufferView view, optional ReadableStreamBYOBReaderReadOptions options = {});
  undefined releaseLock();
};
ReadableStreamBYOBReader includes ReadableStreamGenericReader;

dictionary ReadableStreamBYOBReaderReadOptions {
  [EnforceRange] unsigned long long min = 1;
};

[Exposed=*]
interface ReadableStreamDefaultController {
  readonly attribute unrestricted double? desiredSize;

  undefined close();
  undefined enqueue(optional any chunk);
  undefined error(optional any e);
};

[Exposed=*]
interface ReadableByteStreamController {
  readonly attribute ReadableStreamBYOBRequest? byobRequest;
  readonly attribute unrestricted double? desiredSize;

  undefined close();
  undefined enqueue(ArrayBufferView chunk);
  undefined error(optional any e);
};

[Exposed=*]
interface ReadableStreamBYOBRequest {
  readonly attribute ArrayBufferView? view;

  undefined respond([EnforceRange] unsigned long long bytesWritten);
  undefined respondWithNewView(ArrayBufferView view);
};

[Exposed=*, Transferable]
interface WritableStream {
  constructor(optional object underlyingSink, optional QueuingStrategy strategy = {});

  readonly attribute boolean locked;

  Promise<undefined> abort(optional any reason);
  Promise<undefined> close();
  WritableStreamDefaultWriter getWriter();
};

dictionary UnderlyingSink {
  UnderlyingSinkStartCallback start;
  UnderlyingSinkWriteCallback write;
  UnderlyingSinkCloseCallback close;
  UnderlyingSinkAbortCallback abort;
  any type;
};

callback UnderlyingSinkStartCallback = any (WritableStreamDefaultController controller);
callback UnderlyingSinkWriteCallback = Promise<undefined> (any chunk, WritableStreamDefaultController controller);
callback UnderlyingSinkCloseCallback = Promise<undefined> ();
callback UnderlyingSinkAbortCallback = Promise<undefined> (optional any reason);

[Exposed=*]
interface WritableStreamDefaultWriter {
  constructor(WritableStream stream);

  readonly attribute Promise<undefined> closed;
  readonly attribute unrestricted double? desiredSize;
  readonly attribute Promise<undefined> ready;

  Promise<undefined> abort(optional any reason);
  Promise<undefined> close();
  undefined releaseLock();
  Promise<undefined> write(optional any chunk);
};

[Exposed=*]
interface WritableStreamDefaultController {
  readonly attribute AbortSignal signal;
  undefined error(optional any e);
};

[Exposed=*, Transferable]
interface TransformStream {
  constructor(optional object transformer,
              optional QueuingStrategy writableStrategy = {},
              optional QueuingStrategy readableStrategy = {});

  readonly attribute ReadableStream readable;
  readonly attribute WritableStream writable;
};

dictionary Transformer {
  TransformerStartCallback start;
  TransformerTransformCallback transform;
  TransformerFlushCallback flush;
  TransformerCancelCallback cancel;
  any readableType;
  any writableType;
};

callback TransformerStartCallback = any (TransformStreamDefaultController controller);
callback TransformerFlushCallback = Promise<undefined> (TransformStreamDefaultController controller);
callback TransformerTransformCallback = Promise<undefined> (any chunk, TransformStreamDefaultController controller);
callback TransformerCancelCallback = Promise<undefined> (any reason);

[Exposed=*]
interface TransformStreamDefaultController {
  readonly attribute unrestricted double? desiredSize;

  undefined enqueue(optional any chunk);
  undefined error(optional any reason);
  undefined terminate();
};

dictionary QueuingStrategy {
  unrestricted double highWaterMark;
  QueuingStrategySize size;
};

callback QueuingStrategySize = unrestricted double (any chunk);

dictionary QueuingStrategyInit {
  required unrestricted double highWaterMark;
};

[Exposed=*]
interface ByteLengthQueuingStrategy {
  constructor(QueuingStrategyInit init);

  readonly attribute unrestricted double highWaterMark;
  readonly attribute Function size;
};

[Exposed=*]
interface CountQueuingStrategy {
  constructor(QueuingStrategyInit init);

  readonly attribute unrestricted double highWaterMark;
  readonly attribute Function size;
};

interface mixin GenericTransformStream {
  readonly attribute ReadableStream readable;
  readonly attribute WritableStream writable;
};

MDN

ByteLengthQueuingStrategy/ByteLengthQueuingStrategy

In all current engines.

Firefox65+Safari10.1+Chrome52+
Opera?Edge79+
Edge (Legacy)16+IENone
Firefox for Android?iOS Safari?Chrome for Android?Android WebView?Samsung Internet?Opera Mobile?
Node.js16.5.0+
MDN

ByteLengthQueuingStrategy/highWaterMark

In all current engines.

Firefox65+Safari10.1+Chrome52+
Opera?Edge79+
Edge (Legacy)16+IENone
Firefox for Android?iOS Safari?Chrome for Android?Android WebView?Samsung Internet?Opera Mobile?
Node.js16.5.0+
MDN

ByteLengthQueuingStrategy/size

In all current engines.

Firefox65+Safari10.1+Chrome52+
Opera?Edge79+
Edge (Legacy)16+IENone
Firefox for Android?iOS Safari?Chrome for Android?Android WebView?Samsung Internet?Opera Mobile?
Node.js16.5.0+
MDN

ByteLengthQueuingStrategy

In all current engines.

Firefox65+Safari10.1+Chrome52+
Opera?Edge79+
Edge (Legacy)16+IENone
Firefox for Android?iOS Safari?Chrome for Android?Android WebView?Samsung Internet?Opera Mobile?
Node.js18.0.0+
MDN

CompressionStream/readable

In all current engines.

Firefox113+Safari16.4+Chrome80+
Opera?Edge80+
Edge (Legacy)?IENone
Firefox for Android?iOS Safari?Chrome for Android?Android WebView?Samsung Internet?Opera Mobile?
Node.js17.0.0+

DecompressionStream/readable

In all current engines.

Firefox113+Safari16.4+Chrome80+
Opera?Edge80+
Edge (Legacy)?IENone
Firefox for Android?iOS Safari?Chrome for Android?Android WebView?Samsung Internet?Opera Mobile?
Node.js17.0.0+

TextDecoderStream/readable

In all current engines.

Firefox105+Safari14.1+Chrome71+
Opera?Edge79+
Edge (Legacy)?IENone
Firefox for Android?iOS Safari?Chrome for Android?Android WebView?Samsung Internet?Opera Mobile?
Node.js16.6.0+

TextEncoderStream/readable

In all current engines.

Firefox105+Safari14.1+Chrome71+
Opera?Edge79+
Edge (Legacy)?IENone
Firefox for Android?iOS Safari?Chrome for Android?Android WebView?Samsung Internet?Opera Mobile?
Node.js16.6.0+
MDN

CompressionStream/writable

In all current engines.

Firefox113+Safari16.4+Chrome80+
Opera?Edge80+
Edge (Legacy)?IENone
Firefox for Android?iOS Safari?Chrome for Android?Android WebView?Samsung Internet?Opera Mobile?
Node.js17.0.0+

DecompressionStream/writable

In all current engines.

Firefox113+Safari16.4+Chrome80+
Opera?Edge80+
Edge (Legacy)?IENone
Firefox for Android?iOS Safari?Chrome for Android?Android WebView?Samsung Internet?Opera Mobile?
Node.js17.0.0+

TextDecoderStream/writable

In all current engines.

Firefox105+Safari14.1+Chrome71+
Opera?Edge79+
Edge (Legacy)?IENone
Firefox for Android?iOS Safari?Chrome for Android?Android WebView?Samsung Internet?Opera Mobile?
Node.js16.6.0+

TextEncoderStream/writable

In all current engines.

Firefox105+Safari14.1+Chrome71+
Opera?Edge79+
Edge (Legacy)?IENone
Firefox for Android?iOS Safari?Chrome for Android?Android WebView?Samsung Internet?Opera Mobile?
Node.js16.6.0+
MDN

CountQueuingStrategy/CountQueuingStrategy

In all current engines.

Firefox65+Safari10.1+Chrome52+
Opera?Edge79+
Edge (Legacy)16+IENone
Firefox for Android?iOS Safari?Chrome for Android?Android WebView?Samsung Internet?Opera Mobile?
Node.js16.5.0+
MDN

CountQueuingStrategy/highWaterMark

In all current engines.

Firefox65+Safari10.1+Chrome52+
Opera?Edge79+
Edge (Legacy)16+IENone
Firefox for Android?iOS Safari?Chrome for Android?Android WebView?Samsung Internet?Opera Mobile?
Node.js16.5.0+
MDN

CountQueuingStrategy/size

In all current engines.

Firefox65+Safari10.1+Chrome52+
Opera?Edge79+
Edge (Legacy)16+IENone
Firefox for Android?iOS Safari?Chrome for Android?Android WebView?Samsung Internet?Opera Mobile?
Node.js16.5.0+
MDN

CountQueuingStrategy

In all current engines.

Firefox65+Safari10.1+Chrome52+
Opera?Edge79+
Edge (Legacy)16+IENone
Firefox for Android?iOS Safari?Chrome for Android?Android WebView?Samsung Internet?Opera Mobile?
Node.js18.0.0+
MDN

ReadableByteStreamController/byobRequest

Firefox102+SafariNoneChrome89+
Opera?Edge89+
Edge (Legacy)?IENone
Firefox for Android?iOS Safari?Chrome for Android?Android WebView?Samsung Internet?Opera Mobile?
Node.js16.5.0+
MDN

ReadableByteStreamController/close

Firefox102+SafariNoneChrome89+
Opera?Edge89+
Edge (Legacy)?IENone
Firefox for Android?iOS Safari?Chrome for Android?Android WebView?Samsung Internet?Opera Mobile?
Node.js16.5.0+
MDN

ReadableByteStreamController/desiredSize

Firefox102+SafariNoneChrome89+
Opera?Edge89+
Edge (Legacy)?IENone
Firefox for Android?iOS Safari?Chrome for Android?Android WebView?Samsung Internet?Opera Mobile?
Node.js16.5.0+
MDN

ReadableByteStreamController/enqueue

Firefox102+SafariNoneChrome89+
Opera?Edge89+
Edge (Legacy)?IENone
Firefox for Android?iOS Safari?Chrome for Android?Android WebView?Samsung Internet?Opera Mobile?
Node.js16.5.0+
MDN

ReadableByteStreamController/error

Firefox102+SafariNoneChrome89+
Opera?Edge89+
Edge (Legacy)?IENone
Firefox for Android?iOS Safari?Chrome for Android?Android WebView?Samsung Internet?Opera Mobile?
Node.js16.5.0+
MDN

ReadableByteStreamController

Firefox102+SafariNoneChrome89+
Opera?Edge89+
Edge (Legacy)?IENone
Firefox for Android?iOS Safari?Chrome for Android?Android WebView?Samsung Internet?Opera Mobile?
Node.js18.0.0+
MDN

ReadableStream/ReadableStream

In all current engines.

Firefox65+Safari10.1+Chrome52+
Opera?Edge79+
Edge (Legacy)?IENone
Firefox for Android?iOS Safari?Chrome for Android?Android WebView?Samsung Internet?Opera Mobile?
Node.js16.5.0+
MDN

ReadableStream/cancel

In all current engines.

Firefox65+Safari10.1+Chrome43+
Opera?Edge79+
Edge (Legacy)14+IENone
Firefox for Android?iOS Safari?Chrome for Android?Android WebView?Samsung Internet?Opera Mobile?
Node.js16.5.0+
MDN

ReadableStream/getReader

In all current engines.

Firefox65+Safari10.1+Chrome43+
Opera?Edge79+
Edge (Legacy)14+IENone
Firefox for Android?iOS Safari?Chrome for Android?Android WebView?Samsung Internet?Opera Mobile?
Node.js16.5.0+
MDN

ReadableStream/locked

In all current engines.

Firefox65+Safari10.1+Chrome52+
Opera?Edge79+
Edge (Legacy)14+IENone
Firefox for Android?iOS Safari?Chrome for Android?Android WebView?Samsung Internet?Opera Mobile?
Node.js16.5.0+
MDN

ReadableStream/pipeThrough

In all current engines.

Firefox102+Safari10.1+Chrome59+
Opera?Edge79+
Edge (Legacy)?IENone
Firefox for Android?iOS Safari?Chrome for Android?Android WebView?Samsung Internet?Opera Mobile?
Node.js16.5.0+
MDN

ReadableStream/pipeTo

In all current engines.

Firefox100+Safari10.1+Chrome59+
Opera?Edge79+
Edge (Legacy)?IENone
Firefox for Android?iOS Safari?Chrome for Android?Android WebView?Samsung Internet?Opera Mobile?
Node.js16.5.0+
MDN

ReadableStream/tee

In all current engines.

Firefox65+Safari10.1+Chrome52+
Opera?Edge79+
Edge (Legacy)?IENone
Firefox for Android?iOS Safari?Chrome for Android?Android WebView?Samsung Internet?Opera Mobile?
Node.js16.5.0+
MDN

/developer.mozilla.org/en-US/docs/Glossary/Transferable_objects

Firefox103+SafariNoneChrome87+
Opera?Edge87+
Edge (Legacy)?IENone
Firefox for Android?iOS Safari?Chrome for Android?Android WebView?Samsung Internet?Opera Mobile?
Node.jsNone
MDN

Reference/Global_Objects/Symbol/asyncIterator

In only one current engine.

Firefox110+SafariNoneChromeNone
Opera?EdgeNone
Edge (Legacy)?IENone
Firefox for Android?iOS Safari?Chrome for Android?Android WebView?Samsung Internet?Opera Mobile?
Node.js16.5.0+
MDN

ReadableStream

In all current engines.

Firefox65+Safari10.1+Chrome43+
Opera?Edge79+
Edge (Legacy)14+IENone
Firefox for Android?iOS Safari?Chrome for Android?Android WebView?Samsung Internet?Opera Mobile?
Node.js18.0.0+
MDN

ReadableStreamBYOBReader/ReadableStreamBYOBReader

Firefox102+SafariNoneChrome89+
Opera?Edge89+
Edge (Legacy)?IENone
Firefox for Android?iOS Safari?Chrome for Android?Android WebView?Samsung Internet?Opera Mobile?
Node.js16.5.0+
MDN

ReadableStreamBYOBReader/cancel

Firefox102+SafariNoneChrome89+
Opera?Edge89+
Edge (Legacy)?IENone
Firefox for Android?iOS Safari?Chrome for Android?Android WebView?Samsung Internet?Opera Mobile?
Node.js16.5.0+

ReadableStreamDefaultReader/cancel

In all current engines.

Firefox65+Safari13.1+Chrome78+
Opera?Edge79+
Edge (Legacy)?IENone
Firefox for Android?iOS Safari?Chrome for Android?Android WebView?Samsung Internet?Opera Mobile?
Node.js16.5.0+
MDN

ReadableStreamBYOBReader/closed

Firefox102+SafariNoneChrome89+
Opera?Edge89+
Edge (Legacy)?IENone
Firefox for Android?iOS Safari?Chrome for Android?Android WebView?Samsung Internet?Opera Mobile?
Node.js16.5.0+

ReadableStreamDefaultReader/closed

In all current engines.

Firefox65+Safari13.1+Chrome78+
Opera?Edge79+
Edge (Legacy)?IENone
Firefox for Android?iOS Safari?Chrome for Android?Android WebView?Samsung Internet?Opera Mobile?
Node.js16.5.0+
MDN

ReadableStreamBYOBReader/read

Firefox102+SafariNoneChrome89+
Opera?Edge89+
Edge (Legacy)?IENone
Firefox for Android?iOS Safari?Chrome for Android?Android WebView?Samsung Internet?Opera Mobile?
Node.js16.5.0+
MDN

ReadableStreamBYOBReader/releaseLock

Firefox102+SafariNoneChrome89+
Opera?Edge89+
Edge (Legacy)?IENone
Firefox for Android?iOS Safari?Chrome for Android?Android WebView?Samsung Internet?Opera Mobile?
Node.js16.5.0+
MDN

ReadableStreamBYOBReader

Firefox102+SafariNoneChrome89+
Opera?Edge89+
Edge (Legacy)?IENone
Firefox for Android?iOS Safari?Chrome for Android?Android WebView?Samsung Internet?Opera Mobile?
Node.js18.0.0+
MDN

ReadableStreamBYOBRequest/respond

Firefox102+SafariNoneChrome89+
Opera?Edge89+
Edge (Legacy)?IENone
Firefox for Android?iOS Safari?Chrome for Android?Android WebView?Samsung Internet?Opera Mobile?
Node.js16.5.0+
MDN

ReadableStreamBYOBRequest/respondWithNewView

Firefox102+SafariNoneChrome89+
Opera?Edge89+
Edge (Legacy)?IENone
Firefox for Android?iOS Safari?Chrome for Android?Android WebView?Samsung Internet?Opera Mobile?
Node.js16.5.0+
MDN

ReadableStreamBYOBRequest/view

Firefox102+SafariNoneChrome89+
Opera?Edge89+
Edge (Legacy)?IENone
Firefox for Android?iOS Safari?Chrome for Android?Android WebView?Samsung Internet?Opera Mobile?
Node.js16.5.0+
MDN

ReadableStreamBYOBRequest

Firefox102+SafariNoneChrome89+
Opera?Edge89+
Edge (Legacy)?IENone
Firefox for Android?iOS Safari?Chrome for Android?Android WebView?Samsung Internet?Opera Mobile?
Node.js18.0.0+
MDN

ReadableStreamDefaultController/close

In all current engines.

Firefox65+Safari13.1+Chrome80+
Opera?Edge80+
Edge (Legacy)?IENone
Firefox for Android?iOS Safari?Chrome for Android?Android WebView?Samsung Internet?Opera Mobile?
Node.js16.5.0+
MDN

ReadableStreamDefaultController/desiredSize

In all current engines.

Firefox65+Safari13.1+Chrome80+
Opera?Edge80+
Edge (Legacy)?IENone
Firefox for Android?iOS Safari?Chrome for Android?Android WebView?Samsung Internet?Opera Mobile?
Node.js16.5.0+
MDN

ReadableStreamDefaultController/enqueue

In all current engines.

Firefox65+Safari13.1+Chrome80+
Opera?Edge80+
Edge (Legacy)?IENone
Firefox for Android?iOS Safari?Chrome for Android?Android WebView?Samsung Internet?Opera Mobile?
Node.js16.5.0+
MDN

ReadableStreamDefaultController/error

In all current engines.

Firefox65+Safari13.1+Chrome80+
Opera?Edge80+
Edge (Legacy)?IENone
Firefox for Android?iOS Safari?Chrome for Android?Android WebView?Samsung Internet?Opera Mobile?
Node.js16.5.0+
MDN

ReadableStreamDefaultController

In all current engines.

Firefox65+Safari13.1+Chrome80+
Opera?Edge80+
Edge (Legacy)?IENone
Firefox for Android?iOS Safari?Chrome for Android?Android WebView?Samsung Internet?Opera Mobile?
Node.js18.0.0+
MDN

ReadableStreamDefaultReader/ReadableStreamDefaultReader

Firefox100+SafariNoneChrome78+
Opera?Edge79+
Edge (Legacy)?IENone
Firefox for Android?iOS Safari?Chrome for Android?Android WebView?Samsung Internet?Opera Mobile?
Node.js16.5.0+
MDN

ReadableStreamDefaultReader/read

In all current engines.

Firefox65+Safari13.1+Chrome78+
Opera?Edge79+
Edge (Legacy)?IENone
Firefox for Android?iOS Safari?Chrome for Android?Android WebView?Samsung Internet?Opera Mobile?
Node.js16.5.0+
MDN

ReadableStreamDefaultReader/releaseLock

In all current engines.

Firefox65+Safari13.1+Chrome78+
Opera?Edge79+
Edge (Legacy)?IENone
Firefox for Android?iOS Safari?Chrome for Android?Android WebView?Samsung Internet?Opera Mobile?
Node.js16.5.0+
MDN

ReadableStreamDefaultReader

In all current engines.

Firefox65+Safari13.1+Chrome78+
Opera?Edge79+
Edge (Legacy)?IENone
Firefox for Android?iOS Safari?Chrome for Android?Android WebView?Samsung Internet?Opera Mobile?
Node.js18.0.0+
MDN

TransformStream/TransformStream

In all current engines.

Firefox102+Safari14.1+Chrome67+
Opera?Edge79+
Edge (Legacy)?IENone
Firefox for Android?iOS Safari?Chrome for Android?Android WebView?Samsung Internet?Opera Mobile?
Node.js16.5.0+
MDN

TransformStream/readable

In all current engines.

Firefox102+Safari14.1+Chrome67+
Opera?Edge79+
Edge (Legacy)?IENone
Firefox for Android?iOS Safari?Chrome for Android?Android WebView?Samsung Internet?Opera Mobile?
Node.js16.5.0+
MDN

/developer.mozilla.org/en-US/docs/Glossary/Transferable_objects

Firefox103+SafariNoneChrome87+
Opera?Edge87+
Edge (Legacy)?IENone
Firefox for Android?iOS Safari?Chrome for Android?Android WebView?Samsung Internet?Opera Mobile?
Node.jsNone
MDN

TransformStream/writable

In all current engines.

Firefox102+Safari14.1+Chrome67+
Opera?Edge79+
Edge (Legacy)?IENone
Firefox for Android?iOS Safari?Chrome for Android?Android WebView?Samsung Internet?Opera Mobile?
Node.js16.5.0+
MDN

TransformStream

In all current engines.

Firefox102+Safari14.1+Chrome67+
Opera?Edge79+
Edge (Legacy)?IENone
Firefox for Android?iOS Safari?Chrome for Android?Android WebView?Samsung Internet?Opera Mobile?
Node.js18.0.0+
MDN

TransformStreamDefaultController/desiredSize

In all current engines.

Firefox102+Safari14.1+Chrome67+
Opera?Edge79+
Edge (Legacy)?IENone
Firefox for Android?iOS Safari?Chrome for Android?Android WebView?Samsung Internet?Opera Mobile?
Node.js16.5.0+
MDN

TransformStreamDefaultController/enqueue

In all current engines.

Firefox102+Safari14.1+Chrome67+
Opera?Edge79+
Edge (Legacy)?IENone
Firefox for Android?iOS Safari?Chrome for Android?Android WebView?Samsung Internet?Opera Mobile?
Node.js16.5.0+
MDN

TransformStreamDefaultController/error

In all current engines.

Firefox102+Safari14.1+Chrome67+
Opera?Edge79+
Edge (Legacy)?IENone
Firefox for Android?iOS Safari?Chrome for Android?Android WebView?Samsung Internet?Opera Mobile?
Node.js16.5.0+
MDN

TransformStreamDefaultController/terminate

In all current engines.

Firefox102+Safari14.1+Chrome67+
Opera?Edge79+
Edge (Legacy)?IENone
Firefox for Android?iOS Safari?Chrome for Android?Android WebView?Samsung Internet?Opera Mobile?
Node.js16.5.0+
MDN

TransformStreamDefaultController

In all current engines.

Firefox102+Safari14.1+Chrome67+
Opera?Edge79+
Edge (Legacy)?IENone
Firefox for Android?iOS Safari?Chrome for Android?Android WebView?Samsung Internet?Opera Mobile?
Node.js16.5.0+
MDN

WritableStream/WritableStream

In all current engines.

Firefox100+Safari14.1+Chrome59+
Opera47+Edge79+
Edge (Legacy)16+IENone
Firefox for Android?iOS Safari?Chrome for Android?Android WebView?Samsung Internet?Opera Mobile44+
Node.js16.5.0+
MDN

WritableStream/abort

In all current engines.

Firefox100+Safari14.1+Chrome59+
Opera47+Edge79+
Edge (Legacy)16+IENone
Firefox for Android?iOS Safari?Chrome for Android?Android WebView?Samsung Internet?Opera Mobile44+
Node.js16.5.0+
MDN

WritableStream/close

In all current engines.

Firefox100+Safari14.1+Chrome81+
Opera?Edge81+
Edge (Legacy)?IENone
Firefox for Android?iOS Safari?Chrome for Android?Android WebView?Samsung Internet?Opera Mobile?
Node.js16.5.0+
MDN

WritableStream/getWriter

In all current engines.

Firefox100+Safari14.1+Chrome59+
Opera47+Edge79+
Edge (Legacy)16+IENone
Firefox for Android?iOS Safari?Chrome for Android?Android WebView?Samsung Internet?Opera Mobile44+
Node.js16.5.0+
MDN

WritableStream/locked

In all current engines.

Firefox100+Safari14.1+Chrome59+
Opera47+Edge79+
Edge (Legacy)16+IENone
Firefox for Android?iOS Safari?Chrome for Android?Android WebView?Samsung Internet?Opera Mobile44+
Node.js16.5.0+
MDN

/developer.mozilla.org/en-US/docs/Glossary/Transferable_objects

Firefox103+SafariNoneChrome87+
Opera?Edge87+
Edge (Legacy)?IENone
Firefox for Android?iOS Safari?Chrome for Android?Android WebView?Samsung Internet?Opera Mobile?
Node.jsNone
MDN

WritableStream

In all current engines.

Firefox100+Safari14.1+Chrome59+
Opera47+Edge79+
Edge (Legacy)16+IENone
Firefox for Android?iOS Safari?Chrome for Android?Android WebView?Samsung Internet?Opera Mobile44+
Node.js18.0.0+
MDN

WritableStreamDefaultController/error

In all current engines.

Firefox100+Safari14.1+Chrome59+
Opera?Edge79+
Edge (Legacy)16+IENone
Firefox for Android?iOS Safari?Chrome for Android?Android WebView?Samsung Internet?Opera Mobile?
Node.js16.5.0+
MDN

WritableStreamDefaultController/signal

In all current engines.

Firefox100+Safari16.4+Chrome98+
Opera?Edge98+
Edge (Legacy)?IENone
Firefox for Android?iOS Safari?Chrome for Android?Android WebView?Samsung Internet?Opera Mobile?
Node.js16.5.0+
MDN

WritableStreamDefaultController

In all current engines.

Firefox100+Safari14.1+Chrome59+
Opera?Edge79+
Edge (Legacy)16+IENone
Firefox for Android?iOS Safari?Chrome for Android?Android WebView?Samsung Internet?Opera Mobile?
Node.js18.0.0+
MDN

WritableStreamDefaultWriter/WritableStreamDefaultWriter

In all current engines.

Firefox100+Safari14.1+Chrome78+
Opera?Edge79+
Edge (Legacy)?IENone
Firefox for Android?iOS Safari?Chrome for Android?Android WebView?Samsung Internet?Opera Mobile?
Node.js16.5.0+
MDN

WritableStreamDefaultWriter/abort

In all current engines.

Firefox100+Safari14.1+Chrome59+
Opera?Edge79+
Edge (Legacy)16+IENone
Firefox for Android?iOS Safari?Chrome for Android?Android WebView?Samsung Internet?Opera Mobile?
Node.js16.5.0+
MDN

WritableStreamDefaultWriter/close

In all current engines.

Firefox100+Safari14.1+Chrome59+
Opera?Edge79+
Edge (Legacy)16+IENone
Firefox for Android?iOS Safari?Chrome for Android?Android WebView?Samsung Internet?Opera Mobile?
Node.js16.5.0+
MDN

WritableStreamDefaultWriter/closed

In all current engines.

Firefox100+Safari14.1+Chrome59+
Opera?Edge79+
Edge (Legacy)16+IENone
Firefox for Android?iOS Safari?Chrome for Android?Android WebView?Samsung Internet?Opera Mobile?
Node.js16.5.0+
MDN

WritableStreamDefaultWriter/desiredSize

In all current engines.

Firefox100+Safari14.1+Chrome59+
Opera?Edge79+
Edge (Legacy)16+IENone
Firefox for Android?iOS Safari?Chrome for Android?Android WebView?Samsung Internet?Opera Mobile?
Node.js16.5.0+
MDN

WritableStreamDefaultWriter/ready

In all current engines.

Firefox100+Safari14.1+Chrome59+
Opera?Edge79+
Edge (Legacy)16+IENone
Firefox for Android?iOS Safari?Chrome for Android?Android WebView?Samsung Internet?Opera Mobile?
Node.js16.5.0+
MDN

WritableStreamDefaultWriter/releaseLock

In all current engines.

Firefox100+Safari14.1+Chrome59+
Opera?Edge79+
Edge (Legacy)16+IENone
Firefox for Android?iOS Safari?Chrome for Android?Android WebView?Samsung Internet?Opera Mobile?
Node.js16.5.0+
MDN

WritableStreamDefaultWriter/write

In all current engines.

Firefox100+Safari14.1+Chrome59+
Opera?Edge79+
Edge (Legacy)16+IENone
Firefox for Android?iOS Safari?Chrome for Android?Android WebView?Samsung Internet?Opera Mobile?
Node.js16.5.0+
MDN

WritableStreamDefaultWriter

In all current engines.

Firefox100+Safari14.1+Chrome59+
Opera?Edge79+
Edge (Legacy)16+IENone
Firefox for Android?iOS Safari?Chrome for Android?Android WebView?Samsung Internet?Opera Mobile?
Node.js18.0.0+