1. 简介
本节为非规范性内容。
Web 平台的大部分内容都建立在流数据之上:也就是说,数据以增量的方式被创建、处理和消费,无需将所有数据一次性读取到内存中。Streams 标准为创建和交互此类流数据提供了一套通用 API,体现在可读流、可写流和转换流中。
这些 API 被设计为能够高效地映射到底层的 I/O 原语,包括在合适情况下对字节流的专门支持。它们允许轻松地将多个流组合为管道链,或者通过读取器和写入器直接使用。最后,它们还被设计为自动提供背压和排队特性。
本标准为其他 Web 平台的组成部分提供了基础流原语,以便它们用于暴露其流式数据。例如,[FETCH]将Response
的 body 暴露为ReadableStream
实例。更一般而言,平台中充满了等待被表达为流的流式抽象:多媒体流、文件流、跨全局通信等,都因能够增量处理数据而无需全部缓冲到内存后再处理而受益。通过为这些流向开发者暴露提供基础,Streams
标准使如下用例成为可能:
-
视频特效:将可读视频流通过转换流实时应用特效。
-
解压缩:将文件流通过转换流,选择性地解压缩 .tgz 归档中的文件,并在用户浏览图片库时,将其转为
img
元素。 -
图像解码:通过转换流将 HTTP 响应流中的字节解码为位图数据,再通过另一个转换流将位图转为 PNG。如果在 service worker 的
fetch
钩子中安装这段代码,开发者就可以透明地为新图像格式提供兼容实现。[SERVICE-WORKERS]
Web 开发者也可以使用此处描述的 API 创建自己的流,其 API 与平台提供的流一致。其他开发者随后可以透明地将平台流与库提供的流进行组合。通过这种方式,这里的 API 提供了所有流的统一抽象,促进了围绕这些共享且可组合接口的生态系统发展。
2. 模型
数据块(chunk)是写入或从流中读取的单个数据片段。它可以是任意类型;流甚至可以包含不同类型的数据块。对于给定流而言,数据块往往不是最原子的单位;例如,字节流中的数据块可能由
16 KiB 大小的 Uint8Array
组成,而不是单个字节。
2.1. 可读流
可读流(readable stream)
代表一个数据源,你可以从中读取数据。换句话说,数据从可读流流出。具体来说,可读流是
ReadableStream
类的一个实例。
尽管可读流可以用任意行为来创建,但大多数可读流会包装一个较低层次的 I/O 源,这被称为底层源(underlying source)。底层源有两种类型:推送源(push sources)和拉取源(pull sources)。
推送源会主动向你推送数据,无论你是否在监听。它们通常还会提供暂停和恢复数据流的机制。推送源的一个例子是 TCP 套接字,数据会持续从操作系统层面推送过来,可以通过调整 TCP 窗口大小来控制速率。
拉取源则需要你主动请求数据。数据可能是同步可用的,例如被操作系统缓存在内存中的缓冲区,也可能是异步的,比如从磁盘读取。拉取源的一个例子是文件句柄,你可以定位到特定位置并读取指定数量的数据。
可读流被设计为通过统一的接口包装这两种类型的源。对于 Web 开发者自定义的流,底层源的实现细节由一个包含特定方法和属性的对象提供,并传递给
ReadableStream()
构造函数。
数据块(chunk)由流的底层源入队至流中。它们随后可以通过流的公共接口逐个读取,特别是通过使用流的 可读流读取器,该读取器可通过流的 getReader()
方法获取。
使用流的公共接口从可读流读取数据的代码被称为消费者(consumer)。
消费者还可以通过 cancel()
方法取消可读流。这表示消费者不再关注该流,会立即关闭流、丢弃队列中的所有数据块,并执行底层源的任何取消机制。
消费者还可以通过流的 tee()
方法分叉(tee)可读流。这会锁定该流,使其无法直接使用,但会创建两个新的流,称为分支(branches),可分别消费。
对于表示字节的流,提供了扩展版本的可读流,以高效处理字节,尤其是减少拷贝。此类可读流的底层源被称为底层字节源(underlying byte
source)。底层源为底层字节源的可读流有时被称为可读字节流(readable byte stream)。可读字节流的消费者可以通过流的 getReader()
方法获取BYOB 读取器。
2.2. 可写流
可写流(writable stream)
代表一个数据目标,你可以向其中写入数据。换句话说,数据流入可写流。具体来说,可写流是
WritableStream
类的一个实例。
类似于可读流,大多数可写流会包装一个较低层次的 I/O 目标,称为底层接收器(underlying sink)。 可写流通过排队后续写入并逐个交付给底层接收器,来抽象底层接收器的一些复杂性。
数据块(chunk)通过流的公共接口写入流中,并依次传递给流的底层接收器。
对于 Web 开发者自定义的流,底层接收器的实现细节由一个包含特定方法的对象提供,并传递给
WritableStream()
构造函数。
使用流的公共接口向可写流写入数据的代码被称为生产者(producer)。
生产者还可以通过 abort()
方法中止可写流,
这表示生产者认为出现了问题,后续写入应当被终止。即使没有来自底层接收器的信号,这也会使流进入错误状态,并丢弃流内部队列中的所有写入。
2.3. 转换流
转换流(transform stream) 由一对流组成:一个可写流,称为其可写端(writable side),以及一个可读流,称为其可读端(readable side)。对于特定的转换流,写入可写端的数据会以特定方式产生可以从可读端读取的新数据。
具体来说,任何包含 writable
属性和 readable
属性的对象都可以作为转换流。然而,标准的TransformStream
类可以更容易地创建一对正确关联的流。它包装了一个转换器(transformer),定义了要执行的具体转换算法。对于 Web 开发者自定义的流,转换器的实现细节由一个包含特定方法和属性的对象提供,并传递给
TransformStream()
构造函数。其他规范可能会使用 GenericTransformStream
混入来创建具有相同 writable
/readable
属性对,但上层带有自定义 API 的类。
恒等转换流(identity
transform stream)是一种特殊的转换流,会将写入其可写端的所有数据块原样转发到其可读端,不做任何更改。这在多种场景下很有用。当传递给
TransformStream
构造函数的transform()
方法不存在于转换器对象上时,默认会创建一个恒等转换流。
一些可能的转换流示例包括:
-
GZIP 压缩器:将未压缩字节写入并从中读取压缩字节;
-
视频解码器:写入编码字节并从中读取未压缩的视频帧;
-
文本解码器:写入字节并从中读取字符串;
-
CSV 转 JSON 转换器:写入表示 CSV 文件行的字符串,并从中读取对应的 JavaScript 对象。
2.4. 管道链与背压
流主要通过管道(piping)方式相互连接使用。可读流可以直接通过其 pipeTo()
方法连接到可写流,也可以先通过一个或多个转换流连接,使用其 pipeThrough()
方法。
这样连接在一起的一组流被称为管道链(pipe chain)。在管道链中,原始源(original source)是链中第一个可读流的底层源;最终接收器(ultimate sink)是链中最后一个可写流的底层接收器。
一旦管道链被构建,关于数据块流经速度的信号就会在整个链上传播。如果链中的某一环节无法立即接收数据块,它会向管道链反向传播信号,最终告知原始源不要太快地产生数据块。这种根据链条处理速度来规范原始源输出速度的机制被称为背压(backpressure)。
具体来说,原始源会获取 controller.desiredSize
(或 byteController.desiredSize
)
的值,并据此调整自身数据流速率。这个值来源于与最终接收器对应的
writer.desiredSize
,该值会随着最终接收器完成写入数据块而更新。用于构建管道链的 pipeTo()
方法会自动确保这些信息能够在管道链中向后传播。
当对可读流进行分叉(tee)时,其两个分支的背压信号会聚合;如果两个分支都没有被读取,则会向原始流的底层源发出背压信号。
管道操作会锁定可读流和可写流,在管道操作期间无法再对它们进行其他操作。这允许实现进行重要的优化,比如在底层源与底层接收器之间直接传递数据,绕过许多中间队列。
2.5. 内部队列与排队策略
可读流和可写流都维护着内部队列(internal queues),它们的用途基本类似。对于可读流,内部队列包含由数据块被底层源入队但尚未被消费者读取的数据。对于可写流,内部队列包含由生产者写入但尚未被底层接收器处理和确认的数据块。
排队策略(queuing strategy)是一个对象,根据流的内部队列状态,决定如何向流发出背压信号。排队策略会为每个数据块分配一个大小,并将队列中所有数据块的总大小与一个特定值(高水位线(high water mark))进行比较。两者的差值(高水位线减去当前总大小)被用来确定队列需要填充的期望大小。
对于可读流,底层源可以将这个期望大小作为背压信号,放慢数据块的生成速度,以尽量保持队列期望大小大于或等于零。对于可写流,生产者也可以类似地避免写入导致期望大小变为负数的数据。
具体来说,Web 开发者自定义流的排队策略由任何具有 highWaterMark
属性的 JavaScript 对象给出。对于字节流,highWaterMark
的单位始终为字节。对于其他流,默认单位为数据块,但也可以在策略对象中包含一个 size()
函数,为每个数据块返回其大小。这允许 highWaterMark
以任意浮点单位表示。
在 JavaScript 中,这样的策略可手动写为
,
或用内置的 CountQueuingStrategy
类写为
。
2.6. 锁定
可读流读取器(reader/readable stream reader),简称 reader,是一个允许直接从可读流读取数据块的对象。如果没有读取器,消费者只能对可读流执行高级操作:取消流或管道到可写流。读取器可通过流的 getReader()
方法获取。
可读字节流可以提供两种读取器:默认读取器(default reader)和BYOB 读取器(BYOB reader)。BYOB("bring your
own buffer")读取器允许读取到开发者自定义的缓冲区,从而减少拷贝。非字节流的可读流只能提供默认读取器。默认读取器是 ReadableStreamDefaultReader
类的实例,BYOB 读取器是 ReadableStreamBYOBReader
类的实例。
类似地,可写流写入器(writer/writable stream writer),简称 writer,是一个允许直接向可写流写入数据块的对象。如果没有写入器,生产者只能执行高级操作,如中止流或管道可读流到可写流。写入器由 WritableStreamDefaultWriter
类表示。
在底层,这些高级操作其实内部也会用到读取器或写入器。
每个可读流或可写流在同一时刻最多只能有一个读取器或写入器。此时称该流被锁定(locked),且该读取器或写入器为活动(active)。可通过 readableStream.locked
或 writableStream.locked
属性判断流是否锁定。
读取器或写入器还可以释放其锁(release its lock),此时其不再为活动状态,允许获取新的读取器或写入器。可通过 defaultReader.releaseLock()
、
byobReader.releaseLock()
、
或 writer.releaseLock()
方法实现。
3. 约定
本规范依赖于 Infra 标准。[INFRA]
本规范在内部算法中使用了 JavaScript 规范中的抽象操作(abstract operation)概念。这包括将其返回值视为完成记录(completion records),并使用 ! 和 ? 前缀来解包这些完成记录。[ECMASCRIPT]
本规范还使用了 JavaScript 规范中的内部插槽(internal slot)概念和记法。(不过,这些内部插槽是用在 Web IDL 的平台对象上的,而不是 JavaScript 对象上。)
采用这些外来的 JavaScript 规范约定主要是历史原因。我们建议你在编写自己的 Web 规范时避免效仿我们的做法。
在本规范中,所有数字都表示为双精度 64 位 IEEE 754 浮点值(如 JavaScript 的Number 类型或 Web IDL 的 unrestricted double
类型),并且所有算术操作都必须按此标准方式执行。这对于§ 8.1 队列含大小中描述的数据结构尤为重要。[IEEE-754]
4. 可读流
4.1. 使用可读流
readableStream. pipeTo( writableStream) . then(() => console. log( "所有数据已成功写入!" )) . catch ( e=> console. error( "发生错误!" , e));
readableStream. pipeTo( new WritableStream({ write( chunk) { console. log( "收到数据块" , chunk); }, close() { console. log( "所有数据已成功读取!" ); }, abort( e) { console. error( "发生错误!" , e); } }));
read()
方法直接读取每一个数据块。例如,下面的代码会打印流中的下一个数据块(如有):
const reader= readableStream. getReader(); reader. read(). then( ({ value, done}) => { if ( done) { console. log( "流已关闭!" ); } else { console. log( value); } }, e=> console. error( "流发生错误,无法读取!" , e) );
const reader= readableStream. getReader({ mode: "byob" }); let startingAB= new ArrayBuffer( 1024 ); const buffer= await readInto( startingAB); console. log( "前 1024 字节: " , buffer); async function readInto( buffer) { let offset= 0 ; while ( offset< buffer. byteLength) { const { value: view, done} = await reader. read( new Uint8Array( buffer, offset, buffer. byteLength- offset)); buffer= view. buffer; if ( done) { break ; } offset+= view. byteLength; } return buffer; }
需要注意的是,最终的 buffer
变量与 startingAB
不同,但它(以及所有中间缓冲区)实际上共享同一块底层内存。在每一步,buffer 都会转移给一个新的 ArrayBuffer
对象。view
是读取结果中解构出来的 Uint8Array
,其
buffer
属性指向 ArrayBuffer
对象,byteOffset
表示字节写入的起始偏移,byteLength
表示写入的字节数。
需要说明的是,这个示例主要用于教学。实际应用中,min
选项可以更简单直接地读取指定字节数:
const reader= readableStream. getReader({ mode: "byob" }); const { value: view, done} = await reader. read( new Uint8Array( 1024 ), { min: 1024 }); console. log( "前 1024 字节: " , view);
4.2. ReadableStream
类
ReadableStream
类是通用可读流概念的具体实现。它适用于任何数据块类型,并维护一个内部队列,用于跟踪由底层源提供但尚未被消费者读取的数据。
4.2.1. 接口定义
ReadableStream
类的 Web IDL 定义如下:
[Exposed=*,Transferable ]interface {
ReadableStream constructor (optional object ,
underlyingSource optional QueuingStrategy = {});
strategy static ReadableStream from (any );
asyncIterable readonly attribute boolean locked ;Promise <undefined >cancel (optional any );
reason ReadableStreamReader getReader (optional ReadableStreamGetReaderOptions = {});
options ReadableStream pipeThrough (ReadableWritablePair ,
transform optional StreamPipeOptions = {});
options Promise <undefined >pipeTo (WritableStream ,
destination optional StreamPipeOptions = {});
options sequence <ReadableStream >tee ();async iterable <any >(optional ReadableStreamIteratorOptions = {}); };
options typedef (ReadableStreamDefaultReader or ReadableStreamBYOBReader );
ReadableStreamReader enum {
ReadableStreamReaderMode };
"byob" dictionary {
ReadableStreamGetReaderOptions ReadableStreamReaderMode ; };
mode dictionary {
ReadableStreamIteratorOptions boolean =
preventCancel false ; };dictionary {
ReadableWritablePair required ReadableStream ;
readable required WritableStream ; };
writable dictionary {
StreamPipeOptions boolean =
preventClose false ;boolean =
preventAbort false ;boolean =
preventCancel false ;AbortSignal ; };
signal
4.2.2. 内部插槽
ReadableStream
的实例具有下表所述的内部插槽:
内部插槽 | 描述(非规范性) |
---|---|
[[controller]] | 一个 ReadableStreamDefaultController
或 ReadableByteStreamController
,用于控制该流的状态和队列
|
[[Detached]] | 当流被转移(transferred)时为 true 的布尔值标志 |
[[disturbed]] | 当流已被读取或已被取消时为 true 的布尔值标志 |
[[reader]] | 如果流被读取器锁定,则为
ReadableStreamDefaultReader
或 ReadableStreamBYOBReader
实例,否则为 undefined
|
[[state]] | 包含流当前状态的字符串,仅供内部使用;取值为 "readable "、"closed " 或
"errored "
|
[[storedError]] | 指示流如何失败的值,用作操作出错流时的失败原因或异常 |
4.2.3. 底层源 API
ReadableStream()
构造函数的第一个参数是一个 JavaScript 对象,表示底层源(underlying source)。
这样的对象可以包含以下任意属性:
dictionary {
UnderlyingSource UnderlyingSourceStartCallback start ;UnderlyingSourcePullCallback pull ;UnderlyingSourceCancelCallback cancel ;ReadableStreamType type ; [EnforceRange ]unsigned long long autoAllocateChunkSize ; };typedef (ReadableStreamDefaultController or ReadableByteStreamController );
ReadableStreamController callback =
UnderlyingSourceStartCallback any (ReadableStreamController );
controller callback =
UnderlyingSourcePullCallback Promise <undefined > (ReadableStreamController );
controller callback =
UnderlyingSourceCancelCallback Promise <undefined > (optional any );
reason enum {
ReadableStreamType "bytes" };
start(controller)
, 类型为 UnderlyingSourceStartCallback-
在创建
ReadableStream
实例时立即调用的函数。通常用于适配推送源,比如设置相关事件监听(见§ 10.1 一个没有背压支持的底层推送源的可读流示例),或者用于获取拉取源的访问权(见§ 10.4 一个底层拉取源的可读流示例)。
如果初始化过程是异步的,可以返回一个 promise 以通知成功或失败;被拒绝的 promise 会使流变为错误状态。抛出的异常将由
ReadableStream()
构造函数重新抛出。 pull(controller)
, 类型为 UnderlyingSourcePullCallback-
当流的内部队列中数据块未满时调用(即队列的期望大小为正时)。通常会反复调用,直到队列达到高水位线(即期望大小≤0)。
对于推送源,可用于恢复暂停的数据流(见§ 10.2 支持背压的底层推送源可读流);对于拉取源,用于获取新的数据块并入队(见§ 10.4 一个底层拉取源的可读流示例)。
只有在
start()
成功完成后才会被调用。并且只有在 pull() 实现中入队了至少一个数据块或满足了 BYOB 请求时才会持续被调用;无操作的pull()
不会持续被调用。如果该函数返回一个 promise,则只有该 promise 履约后才会再次调用 pull()。(若 promise 被拒绝,流会进入错误状态。)这主要用于拉取源,返回的 promise 表示获取新数据块的过程。抛出异常等同于返回被拒绝的 promise。
cancel(reason)
, 类型为 UnderlyingSourceCancelCallback-
当消费者通过
stream.cancel()
或reader.cancel()
取消流时调用。其参数与消费者传递给这些方法的值相同。可读流在管道操作中也可能被取消(见
pipeTo()
方法的定义)。通常用于释放底层资源的访问权(见§ 10.1 一个没有背压支持的底层推送源的可读流示例)。
如果取消过程是异步的,可以返回 promise 通知成功或失败;结果会通过调用
cancel()
方法的返回值反馈。抛出异常等同于返回被拒绝的 promise。即使取消过程失败,流也会关闭,不会进入错误状态。因为一旦消费者通过 cancel 表达了不再关心该流,取消过程的失败对消费者视角已无影响,失败结果只通知给相应方法的直接调用者。
这不同于
close
和abort
这类WritableStream
的底层接收器选项,后者失败时会令对应WritableStream
进入错误状态。因为这些操作对应生产者请求的特定动作,失败时意味着有更严重的问题。 type
(仅字节流), 类型为 ReadableStreamType-
可设置为 "
bytes
",表示构造得到的ReadableStream
是可读字节流。这保证流可通过getReader()
方法成功获取 BYOB 读取器。同时会影响传递给start()
和pull()
方法的 controller 参数类型(见下文)。如何设置一个可读字节流,包括使用不同的控制器接口,可参考 § 10.3 一个没有背压支持的字节推送源可读字节流。
设置为 "
bytes
" 或 undefined 之外的任何值都会导致ReadableStream()
构造函数抛出异常。 autoAllocateChunkSize
(仅字节流), 类型为 unsigned long long-
可设为一个正整数,使实现自动为底层源分配缓冲区。当消费者使用 默认读取器 时,流实现会自动分配指定大小的
ArrayBuffer
,这样controller.byobRequest
始终存在,仿佛消费者使用的是 BYOB 读取器。这样可以减少处理默认读取器消费者时所需代码量(可比较 § 10.3 一个没有背压支持的字节推送源可读字节流 与 § 10.5 一个带自动分配的字节拉取源可读字节流)。
传递给 start()
和 pull()
方法的 controller 参数类型取决于 type
选项的值。如果 type
为 undefined(包括未设置),则 controller 为 ReadableStreamDefaultController
。
如果为
"bytes
",
则 controller 为 ReadableByteStreamController
。
4.2.4. 构造函数、方法和属性
stream = new
ReadableStream
(underlyingSource[, strategy])-
创建一个新的
ReadableStream
,包装所提供的底层源。更多关于 underlyingSource 参数的信息见 § 4.2.3 底层源 API。strategy 参数表示流的排队策略,详见 § 7.1 排队策略 API。如果未提供该参数,则默认行为等同于
CountQueuingStrategy
,其高水位线为 1。 stream =
ReadableStream.from
(asyncIterable)-
创建一个新的
ReadableStream
,包装所提供的可迭代对象或异步可迭代对象。这可以用于将各种对象适配为可读流,如数组、异步生成器,或 Node.js readable stream 等。
isLocked = stream.
locked
-
返回该可读流是否被某个读取器锁定。
await stream.
cancel
([ reason ])-
取消流,表示消费者不再关心该流。提供的 reason 参数会传递给底层源的
cancel()
方法,具体是否使用由底层源决定。返回的 promise 在流被成功关闭时履约,在底层源关闭失败时拒绝。如果流当前被锁定,则会直接拒绝并抛出
TypeError
,不会尝试取消流。 reader = stream.
getReader
()-
创建一个
ReadableStreamDefaultReader
并锁定该流。流被锁定期间,不能获取其他读取器,直到释放该读取器(release)。该功能尤其适合希望独占消费整个流的抽象对象。通过为流获取读取器,可以确保不会有其他人与你并发读取或取消流,从而影响你的抽象。
reader = stream.
getReader
({mode
: "byob
" })-
创建一个
ReadableStreamBYOBReader
并锁定该流。该调用与无参数版本行为相同,但只适用于可读字节流,即专门支持“自带缓冲区”读取的流。返回的BYOB 读取器允许直接通过
read()
方法将数据读入开发者自定义的缓冲区,实现对分配的精细控制。 readable = stream.
pipeThrough
({writable
,readable
}[, {preventClose
,preventAbort
,preventCancel
,signal
}])-
为该可读流提供一个便捷的链式管道方式,通过一个转换流(或任意
{ writable, readable }
对)进行处理。它会将当前流管道到指定对的 writable 端,并返回 readable 端以便继续使用。执行管道操作会在整个管道操作期间锁定该流,防止其他消费者获取读取器。
await stream.
pipeTo
(destination[, {preventClose
,preventAbort
,preventCancel
,signal
}])-
将该可读流管道到给定的可写流destination。通过传递参数可以自定义在各种错误条件下管道的行为。返回一个 promise,在管道过程成功完成时履约,否则在出错时拒绝。
执行管道操作会在整个管道期间锁定该流,防止其他消费者获取读取器。
源流与目标流的错误与关闭传播规则如下:
-
源可读流出错时,会中止destination,除非
preventAbort
为真。promise 会以源流的错误或中止目标流时产生的错误拒绝。 -
destination 出错时,会取消该可读流,除非
preventCancel
为真。promise 会以目标流的错误或取消源流时的错误拒绝。 -
当源可读流关闭时,destination 也会关闭,除非
preventClose
为真。promise 会在关闭目标流过程完成后履约,如果关闭目标流时报错则以该错误拒绝。 -
如果 destination 最初已关闭或正在关闭,则该可读流会被取消,除非
preventCancel
为真。promise 会以管道到已关闭流失败的错误或取消源流时的错误拒绝。
signal
选项可以设置为AbortSignal
,以允许通过相应的AbortController
中止正在进行的管道操作。在这种情况下,源可读流会被取消,destination 会被中止,除非相应的preventCancel
或preventAbort
被设置。 -
[branch1, branch2] = stream.
tee
()-
分叉该可读流,返回一个包含两个分支的新
ReadableStream
实例的数组。分叉操作会锁定该流,防止其他消费者获取读取器。要取消流,需要取消两个分支,最终组合的取消原因会传递给流的底层源。
如果该流是可读字节流,每个分支会收到各自的数据块副本;否则,两个分支看到的数据块是同一个对象。如果数据块不是不可变的,这可能导致两个分支间互相影响。
new ReadableStream(underlyingSource, strategy)
构造函数的步骤如下:
-
如果 underlyingSource 缺失,则将其设为 null。
-
令 underlyingSourceDict 为 underlyingSource,转换为类型为
UnderlyingSource
的 IDL 值。我们不能直接将 underlyingSource 参数声明为
UnderlyingSource
类型,否则会丢失原始对象的引用。我们需要保留该对象,以便调用其上的各种方法。 -
执行 ! InitializeReadableStream(this)。
-
如果 underlyingSourceDict["
type
"] 为 "bytes
":-
如果 strategy["
size
"] 存在,则抛出RangeError
异常。 -
令 highWaterMark 为 ? ExtractHighWaterMark(strategy, 0)。
-
执行 ? SetUpReadableByteStreamControllerFromUnderlyingSource(this, underlyingSource, underlyingSourceDict, highWaterMark)。
-
-
否则,
-
令 sizeAlgorithm 为 ! ExtractSizeAlgorithm(strategy)。
-
令 highWaterMark 为 ? ExtractHighWaterMark(strategy, 1)。
-
执行 ? SetUpReadableStreamDefaultControllerFromUnderlyingSource(this, underlyingSource, underlyingSourceDict, highWaterMark, sizeAlgorithm)。
from(asyncIterable)
方法的步骤如下:
-
返回 ? ReadableStreamFromIterable(asyncIterable)。
locked
getter 步骤如下:
-
返回 ! IsReadableStreamLocked(this)。
cancel(reason)
方法的步骤如下:
-
如果 ! IsReadableStreamLocked(this) 为 true,则返回一个被拒绝的 promise,其原因是
TypeError
异常。 -
返回 ! ReadableStreamCancel(this, reason)。
getReader(options)
方法的步骤如下:
-
如果 options["
mode
"] 不存在,则返回 ? AcquireReadableStreamDefaultReader(this)。
function readAllChunks( readableStream) { const reader= readableStream. getReader(); const chunks= []; return pump(); function pump() { return reader. read(). then(({ value, done}) => { if ( done) { return chunks; } chunks. push( value); return pump(); }); } }
注意它第一步就获取读取器,之后只使用该读取器。这确保了没有其他消费者可以干扰流(如并发读取或取消流)。
pipeThrough(transform, options)
方法步骤如下:
-
如果 ! IsReadableStreamLocked(this) 为 true,则抛出
TypeError
异常。 -
如果 ! IsWritableStreamLocked(transform["
writable
"]) 为 true,则抛出TypeError
异常。 -
令 promise 为 ! ReadableStreamPipeTo(this, transform["
writable
"], options["preventClose
"], options["preventAbort
"], options["preventCancel
"], signal)。 -
将 promise.[[PromiseIsHandled]] 设为 true。
-
返回 transform["
readable
"]。
pipeThrough(transform, options)
构建管道链的典型示例如下:
httpResponseBody. pipeThrough( decompressorTransform) . pipeThrough( ignoreNonImageFilesTransform) . pipeTo( mediaGallery);
pipeTo(destination, options)
方法步骤如下:
-
如果 ! IsReadableStreamLocked(this) 为 true,返回一个被拒绝的 promise,其原因为
TypeError
异常。 -
如果 ! IsWritableStreamLocked(destination) 为 true,返回一个被拒绝的 promise,其原因为
TypeError
异常。 -
返回 ! ReadableStreamPipeTo(this, destination, options["
preventClose
"], options["preventAbort
"], options["preventCancel
"], signal)。
AbortSignal
停止正在进行的 管道操作,示例如下:
const controller= new AbortController(); readable. pipeTo( writable, { signal: controller. signal}); // ... 之后的某个时刻 ... controller. abort();
(上述代码省略了对 pipeTo()
返回的 promise 的错误处理。此外,preventAbort
和 preventCancel
选项对于管道被终止时的影响也值得考虑。)
ReadableStream
,但写入同一个
WritableStream
:
const controller= new AbortController(); const pipePromise= readable1. pipeTo( writable, { preventAbort: true , signal: controller. signal}); // ... 之后的某个时刻 ... controller. abort(); // 等待管道完成后再启动新管道: try { await pipePromise; } catch ( e) { // 预期 "AbortError" DOMException 可吞掉,其他错误需重新抛出。 if ( e. name!== "AbortError" ) { throw e; } } // 启动新的管道! readable2. pipeTo( writable);
tee()
方法步骤如下:
-
返回 ? ReadableStreamTee(this, false)。
cacheEntry
,和一个代表远程服务器上传的可写流
httpRequestBody
,你可以同时将同一个可读流管道到两个目标:
const [ forLocal, forRemote] = readableStream. tee(); Promise. all([ forLocal. pipeTo( cacheEntry), forRemote. pipeTo( httpRequestBody) ]) . then(() => console. log( "已保存到缓存并上传!" )) . catch ( e=> console. error( "缓存或上传失败:" , e));
4.2.5. 异步迭代
for await (const chunk of stream) { ... }
for await (const chunk of stream.values({
preventCancel
: true })) { ... }-
异步遍历流内部队列中的数据块(chunk)。
异步迭代流时会锁定该流,阻止其他消费者获取读取器。如果通过
return()
方法(如通过break
跳出循环)结束异步迭代,会释放该锁。默认情况下,调用异步迭代器的
return()
方法也会取消流。如要阻止此行为,可调用流的values()
方法并为preventCancel
选项传入 true。
-
令 reader 为 ? AcquireReadableStreamDefaultReader(stream)。
-
将 iterator 的 reader 设为 reader。
-
令 preventCancel 为 args[0]["
preventCancel
"]。 -
将 iterator 的 prevent cancel 设为 preventCancel。
ReadableStream
,给定
stream 和 iterator)如下:
-
令 reader 为 iterator 的 reader。
-
断言:reader.[[stream]] 不为 undefined。
-
令 promise 为新建的 promise。
-
令 readRequest 为新的读取请求(read request),包含以下条目:
- chunk steps,给定 chunk
-
-
解析 promise,值为 chunk。
-
- close steps
-
-
执行 ! ReadableStreamDefaultReaderRelease(reader)。
-
- error steps,给定 e
-
-
执行 ! ReadableStreamDefaultReaderRelease(reader)。
-
拒绝 promise,理由为 e。
-
-
执行 ! ReadableStreamDefaultReaderRead(this, readRequest)。
-
返回 promise。
ReadableStream
,给定
stream、iterator 和 arg)如下:
-
令 reader 为 iterator 的 reader。
-
断言:reader.[[stream]] 不为 undefined。
-
断言:reader.[[readRequests]] 为空(async iterator 机制保证所有之前对
next()
的调用都已 settle,才会调用此方法)。 -
如果 iterator 的 prevent cancel 为 false:
-
令 result 为 ! ReadableStreamReaderGenericCancel(reader, arg)。
-
执行 ! ReadableStreamDefaultReaderRelease(reader)。
-
返回 result。
-
-
执行 ! ReadableStreamDefaultReaderRelease(reader)。
4.2.6. 通过 postMessage()
传递
destination.postMessage(rs, { transfer: [rs] });
-
将一个
ReadableStream
发送到另一个 frame、window 或 worker。被传递的流可与原始流一样使用。原始流会被锁定,无法再直接使用。
ReadableStream
对象是可转移对象(transferable objects)。它们的转移步骤(transfer steps),给定 value 和 dataHolder,如下:
-
如果 ! IsReadableStreamLocked(value) 为 true,则抛出 "
DataCloneError
"DOMException
。 -
令 port1 为在当前 Realm中新建的
MessagePort
。 -
令 port2 为在当前 Realm中新建的
MessagePort
。 -
关联 port1 和 port2。
-
令 writable 为在当前 Realm中新建的
WritableStream
。 -
执行 ! SetUpCrossRealmTransformWritable(writable, port1)。
-
令 promise 为 ! ReadableStreamPipeTo(value, writable, false, false, false)。
-
将 promise.[[PromiseIsHandled]] 设为 true。
-
将 dataHolder.[[port]] 设为 ! StructuredSerializeWithTransfer(port2, « port2 »)。
-
令 deserializedRecord 为 ! StructuredDeserializeWithTransfer(dataHolder.[[port]], 当前 Realm)。
-
令 port 为 deserializedRecord.[[Deserialized]]。
-
执行 ! SetUpCrossRealmTransformReadable(value, port)。
4.3. ReadableStreamGenericReader
mixin
ReadableStreamGenericReader
mixin 定义了 ReadableStreamDefaultReader
和 ReadableStreamBYOBReader
对象共享的内部槽、getter 和方法。
4.3.1. 混入定义
ReadableStreamGenericReader
混入的 Web IDL 定义如下:
interface mixin {
ReadableStreamGenericReader readonly attribute Promise <undefined >closed ;Promise <undefined >cancel (optional any ); };
reason
4.3.2. 内部插槽
包含 ReadableStreamGenericReader
混入的类的实例会带有下表描述的内部插槽:
内部插槽 | 描述(非规范性) |
---|---|
[[closedPromise]] | 由 reader 的 closed
getter 返回的 promise
|
[[stream]] | 拥有此 reader 的 ReadableStream
实例
|
4.3.3. 方法与属性
closed
属性步骤如下:
cancel(reason)
方法步骤如下:
-
如果 this.[[stream]] 为 undefined,则返回以
TypeError
异常拒绝的 promise。 -
返回 ! ReadableStreamReaderGenericCancel(this, reason)。
4.4. ReadableStreamDefaultReader
类
ReadableStreamDefaultReader
类表示被 ReadableStream
实例提供的默认 reader。
4.4.1. 接口定义
ReadableStreamDefaultReader
类的 Web IDL 定义如下:
[Exposed=*]interface {
ReadableStreamDefaultReader constructor (ReadableStream );
stream Promise <ReadableStreamReadResult >read ();undefined releaseLock (); };ReadableStreamDefaultReader includes ReadableStreamGenericReader ;dictionary {
ReadableStreamReadResult any ;
value boolean ; };
done
4.4.2. 内部插槽
ReadableStreamDefaultReader
的实例拥有 ReadableStreamGenericReader
定义的内部插槽,以及下表所述的插槽:
内部插槽 | 描述(非规范性) |
---|---|
[[readRequests]] | 一个列表,包含读取请求,用于在消费者请求块时,这些块尚未可用 |
读取请求是一个结构体 ,包含三个算法,用于响应填充可读流的内部队列或其状态变化。它包含如下项:
- chunk steps
-
接收一个块的算法,当有可读取的块时调用
- close steps
-
无参数的算法,当没有块且流已关闭时调用
- error steps
-
接收一个 JavaScript 值的算法,当没有块且流出错时调用
4.4.3. 构造函数、方法和属性
reader = new
ReadableStreamDefaultReader
(stream)-
这等价于调用
stream.
。getReader()
await reader.
closed
-
返回一个 promise,当流关闭时被满足;如果流发生错误或 reader 的锁在流关闭前被释放,则 promise 被拒绝。
await reader.
cancel
([ reason ]){ value, done } = await reader.
read
()-
返回一个 promise,允许访问流的内部队列中的下一个块(如果有)。
- 如果块可用,promise 被满足,返回的对象格式为
。{ value: theChunk, done: false } - 如果流被关闭,promise 被满足,返回对象格式为
。{ value: undefined , done: true } - 如果流出错,promise 被拒绝并带有相关错误。
如果读取一个块导致队列变为空,将会从底层源拉取更多数据。
- 如果块可用,promise 被满足,返回的对象格式为
reader.
releaseLock
()-
释放 reader 对应流的锁。锁被释放后,reader 不再处于活动状态。如果关联的流在释放锁时出错,reader 从此也会表现为出错;否则 reader 表现为已关闭。
如果 reader 在释放锁时还有未完成的读取请求,则
read()
方法返回的 promise 会立即被TypeError
拒绝。所有未读取的块将保留在流的内部队列,可通过获取新 reader 再次读取。
new ReadableStreamDefaultReader(stream)
构造函数步骤如下:
-
执行 ? SetUpReadableStreamDefaultReader(this, stream)。
read()
方法步骤如下:
-
如果 this.[[stream]] 为 undefined,返回以
TypeError
异常拒绝的 promise。 -
令 promise 为一个新的 promise。
-
执行 ! ReadableStreamDefaultReaderRead(this, readRequest)。
-
返回 promise。
releaseLock()
方法步骤如下:
-
如果 this.[[stream]] 为 undefined,则返回。
4.5. ReadableStreamBYOBReader
类
ReadableStreamBYOBReader
类表示一种BYOB reader
,用于由 ReadableStream
实例提供。
4.5.1. 接口定义
ReadableStreamBYOBReader
类的 Web IDL 定义如下:
[Exposed=*]interface {
ReadableStreamBYOBReader constructor (ReadableStream );
stream Promise <ReadableStreamReadResult >read (ArrayBufferView ,
view optional ReadableStreamBYOBReaderReadOptions = {});
options undefined releaseLock (); };ReadableStreamBYOBReader includes ReadableStreamGenericReader ;dictionary { [
ReadableStreamBYOBReaderReadOptions EnforceRange ]unsigned long long = 1; };
min
4.5.2. 内部插槽
ReadableStreamBYOBReader
的实例拥有 ReadableStreamGenericReader
定义的内部插槽,以及下表所述的插槽:
内部插槽 | 描述(非规范性) |
---|---|
[[readIntoRequests]] | 一个列表,包含read-into 请求,用于在消费者请求块时,这些块尚未可用 |
read-into 请求是一个结构体 ,包含三个算法,用于响应填充可读字节流的内部队列或其状态变化。它包含如下项:
- chunk steps
-
接收一个块的算法,当有可读取的块时调用
- close steps
-
接收一个块或 undefined 的算法,当没有块(chunk)且流已关闭时调用
- error steps
-
接收一个 JavaScript 值的算法,当没有块且流出错时调用
close steps 接收一个块,这样在可能的情况下可以将底层内存返回给调用者。例如,byobReader.read(chunk)
会在流关闭时以
形式 fulfill。如果流被取消,则底层内存会被丢弃,byobReader.read(chunk)
会以更传统的
fulfill。
4.5.3. 构造函数、方法和属性
reader = new
ReadableStreamBYOBReader
(stream)await reader.
closed
-
返回一个 promise,当流关闭时被满足;如果流发生错误或 reader 的锁在流关闭前被释放,则 promise 被拒绝。
await reader.
cancel
([ reason ]){ value, done } = await reader.
read
(view[, {min
}])-
尝试将字节读取到 view 中,并返回一个 promise,promise 解析后的结果如下:
- 如果块可用,promise 被满足,返回的对象格式为
。 此时 view 会被分离,不可再用,{ value: newView, done: false } newView
是同一底层内存的新视图,数据已写入其中。 - 如果流被关闭,promise 被满足,返回对象格式为
。 此时 view 会被分离,不可再用,{ value: newView, done: true } newView
是同一底层内存的新视图,未修改数据,目的是保证内存返回给调用者。 - 如果 reader 被取消,promise 被满足,返回对象格式为
。 此时 view 的底层内存会被丢弃,不再返回给调用者。{ value: undefined , done: true } - 如果流出错,promise 被拒绝并带有相关错误。
如果读取一个块导致队列变为空,将会从底层源拉取更多数据。
如果指定了
min
,promise 只会在满足最小元素数量时才会被 fulfill。此处“元素数量”对于 typed array 是newView
的length
,对于DataView
是newView.byteLength
。如果流关闭,则 promise 返回流中剩余元素,可能少于最初请求数量。如果未指定,则至少有一个元素时就会 fulfill。 - 如果块可用,promise 被满足,返回的对象格式为
reader.
releaseLock
()-
释放 reader 对应流的锁。锁被释放后,reader 不再处于活动状态。如果关联的流在释放锁时出错,reader 从此也会表现为出错;否则 reader 表现为已关闭。
如果 reader 在释放锁时还有未完成的读取请求,则
read()
方法返回的 promise 会立即被TypeError
拒绝。所有未读取的块将保留在流的内部队列,可通过获取新 reader 再次读取。
new ReadableStreamBYOBReader(stream)
构造函数步骤如下:
-
执行 ? SetUpReadableStreamBYOBReader(this, stream)。
read(view, options)
方法步骤如下:
-
如果 view.[[ByteLength]] 为 0,返回以
TypeError
异常拒绝的 promise。 -
如果 view.[[ViewedArrayBuffer]].[[ByteLength]] 为 0,返回以
TypeError
异常拒绝的 promise。 -
如果 ! IsDetachedBuffer(view.[[ViewedArrayBuffer]]) 为 true,返回以
TypeError
异常拒绝的 promise。 -
如果 view 拥有 [[TypedArrayName]] 内部插槽,
-
如果 options["
min
"] > view.[[ArrayLength]], 返回以RangeError
异常拒绝的 promise。
-
-
否则(即为
DataView
),-
如果 options["
min
"] > view.[[ByteLength]], 返回以RangeError
异常拒绝的 promise。
-
-
如果 this.[[stream]] 为 undefined,返回以
TypeError
异常拒绝的 promise。 -
令 promise 为一个新的 promise。
-
令 readIntoRequest 为一个新的read-into 请求,包含以下项:
-
执行 ! ReadableStreamBYOBReaderRead(this, view, options["
min
"], readIntoRequest). -
返回 promise。
releaseLock()
方法步骤如下:
-
如果 this.[[stream]] 为 undefined,则返回。
4.6.
ReadableStreamDefaultController
类
ReadableStreamDefaultController
类拥有允许控制 ReadableStream
状态和内部队列的方法。
当构造一个不是可读字节流的
ReadableStream
时,底层源会被赋予一个对应的 ReadableStreamDefaultController
实例用于操作流。
4.6.1. 接口定义
ReadableStreamDefaultController
类的 Web IDL 定义如下:
[Exposed=*]interface {
ReadableStreamDefaultController readonly attribute unrestricted double ?desiredSize ;undefined close ();undefined enqueue (optional any );
chunk undefined error (optional any ); };
e
4.6.2. 内部插槽
ReadableStreamDefaultController
的实例拥有下表所描述的内部插槽:
内部插槽 | 描述(非规范性) |
---|---|
[[cancelAlgorithm]] | 一个返回 promise 的算法,接收一个参数(取消原因),用于将取消请求传递给底层源 |
[[closeRequested]] | 一个布尔标志,指示流是否已被底层源关闭,但其内部队列中仍有未读取的块 |
[[pullAgain]] | 一个布尔标志,如果流的机制请求调用底层源的 pull 算法拉取更多数据,但由于上一次调用尚未完成,不能立即执行,则设置为 true |
[[pullAlgorithm]] | 一个返回 promise 的算法,从底层源拉取数据 |
[[pulling]] | 一个布尔标志,在底层源的 pull 算法执行且返回的 promise 尚未完成时为 true,用于防止重入调用 |
[[queue]] | 一个列表,表示流的内部块队列 |
[[queueTotalSize]] | 存储于[[queue]]中的所有块的总大小(见 § 8.1 队列与大小) |
[[started]] | 一个布尔标志,表示底层源是否已完成启动 |
[[strategyHWM]] | 构造时作为流排队策略一部分提供的数值,表示应用背压于底层源的阈值 |
[[strategySizeAlgorithm]] | 一个算法,用于计算加入队列的块的大小,作为流排队策略的一部分 |
[[stream]] | 被控制的 ReadableStream
实例
|
4.6.3. 方法与属性
desiredSize = controller.
desiredSize
-
返回填满所控制流的内部队列所需的期望大小。如果队列过满,该值可能为负。底层源应利用此信息判断何时以及如何施加背压。
controller.
close
()controller.
enqueue
(chunk)-
将给定的块chunk加入所控制的可读流队列。
controller.
error
(e)-
使所控制的可读流出错,后续所有操作均会因给定错误e失败。
desiredSize
getter 步骤如下:
close()
方法步骤如下:
-
如果 ! ReadableStreamDefaultControllerCanCloseOrEnqueue(this) 为 false,则抛出
TypeError
异常。
enqueue(chunk)
方法步骤如下:
-
如果 ! ReadableStreamDefaultControllerCanCloseOrEnqueue(this) 为 false,则抛出
TypeError
异常。 -
执行 ? ReadableStreamDefaultControllerEnqueue(this, chunk)。
error(e)
方法步骤如下:
-
执行 ! ReadableStreamDefaultControllerError(this, e)。
4.6.4. 内部方法
每个 ReadableStreamDefaultController
实例实现以下内部方法。
可读流实现会多态地调用这些方法,或调用 BYOB 控制器的对应方法,详见§ 4.9.2 控制器接口抽象操作。
-
执行 ! ResetQueue(this)。
-
令 result 为执行 this.[[cancelAlgorithm]],传入 reason 的结果。
-
返回 result。
-
令 stream 为 this.[[stream]]。
-
-
令 chunk 为 ! DequeueValue(this)。
-
如果 this.[[closeRequested]] 为 true 且 this.[[queue]] 为空,
-
执行 ! ReadableStreamClose(stream).
-
否则,执行 ! ReadableStreamDefaultControllerCallPullIfNeeded(this)。
-
执行 readRequest 的 chunk steps,传入 chunk。
-
-
否则,
-
执行 ! ReadableStreamAddReadRequest(stream, readRequest)。
-
-
返回。
4.7. ReadableByteStreamController
类
ReadableByteStreamController
类拥有允许控制 ReadableStream
状态和内部队列的方法。
当构造一个可读字节流的
ReadableStream
时,底层源会被赋予一个对应的 ReadableByteStreamController
实例用于操作流。
4.7.1. 接口定义
ReadableByteStreamController
类的 Web IDL 定义如下:
[Exposed=*]interface {
ReadableByteStreamController readonly attribute ReadableStreamBYOBRequest ?byobRequest ;readonly attribute unrestricted double ?desiredSize ;undefined close ();undefined enqueue (ArrayBufferView );
chunk undefined error (optional any ); };
e
4.7.2. 内部插槽
ReadableByteStreamController
的实例拥有下表所描述的内部插槽:
内部插槽 | 描述(非规范性) |
---|---|
[[autoAllocateChunkSize]] | 一个正整数,当启用自动 buffer 分配特性时存在。在这种情况下,该值指定要分配的 buffer 大小。否则为 undefined。 |
[[byobRequest]] | 一个 ReadableStreamBYOBRequest
实例,表示当前 BYOB 读取请求,如果没有待处理请求则为 null
|
[[cancelAlgorithm]] | 一个返回 promise 的算法,接收一个参数(取消原因),用于将取消请求传递给底层字节源 |
[[closeRequested]] | 一个布尔标志,指示流是否已被底层字节源关闭,但其内部队列中仍有未读取的块 |
[[pullAgain]] | 一个布尔标志,如果流的机制请求调用底层字节源的 pull 算法拉取更多数据,但由于上一次调用尚未完成,不能立即执行,则设置为 true |
[[pullAlgorithm]] | 一个返回 promise 的算法,从底层字节源拉取数据 |
[[pulling]] | 一个布尔标志,在底层字节源的 pull 算法执行且返回的 promise 尚未完成时为 true,用于防止重入调用 |
[[pendingPullIntos]] | 一个列表,包含pull-into 描述符 |
[[queue]] | 一个列表,包含可读字节流队列项,表示流的内部块队列 |
[[queueTotalSize]] | 以字节为单位,存储于[[queue]]中的所有块的总大小(见 § 8.1 队列与大小) |
[[started]] | 一个布尔标志,表示底层字节源是否已完成启动 |
[[strategyHWM]] | 构造时作为流排队策略一部分提供的数值,表示应用背压于底层字节源的阈值 |
[[stream]] | 被控制的 ReadableStream
实例
|
虽然 ReadableByteStreamController
实例有 [[queue]] 和 [[queueTotalSize]] 插槽,但我们并不会对它们使用 § 8.1 队列与大小 中的大多数抽象操作,因为规范中对该队列的操作与其他类型有较大不同。我们会手动同步更新这两个插槽。
这部分可能会在后续规范重构中得到优化。
可读字节流队列项是一个结构体,用于封装可读字节流中特有的块重要信息。它包含如下项:
- buffer
-
一个
ArrayBuffer
,为转移自底层字节源最初提供的 buffer。 - byte offset
-
一个非负整数,表示来自底层字节源视图的字节偏移量。
- byte length
-
一个非负整数,表示来自底层字节源视图的字节长度。
pull-into 描述符是一个结构体,用于表示待处理的 BYOB 拉取请求。其包含如下项:
- buffer
-
一个
ArrayBuffer
- buffer byte length
-
一个正整数,表示 buffer 的初始字节长度
- byte offset
-
一个非负整数,表示 buffer 中,底层字节源将开始写入的字节偏移量
- byte length
-
一个正整数,表示可以写入 buffer 的字节数
- bytes filled
-
一个非负整数,表示当前已写入 buffer 的字节数
- minimum fill
- element size
- view constructor
-
一个typed array 构造器或
%DataView%
,用于构造写入 buffer 的视图 - reader type
-
“
default
” 或 “byob
”,表示发起本次请求的可读流 reader类型,如果发起方reader已释放,则为 “none
”。
4.7.3. 方法与属性
byobRequest = controller.
byobRequest
-
返回当前的 BYOB 拉取请求,如果没有则为 null。
desiredSize = controller.
desiredSize
-
返回填满所控制流的内部队列所需的期望大小。如果队列过满,该值可能为负。底层字节源应利用此信息判断何时以及如何施加背压。
controller.
close
()controller.
enqueue
(chunk)-
将给定的块chunk加入所控制的可读流队列。 chunk 必须是
ArrayBufferView
实例,否则会抛出TypeError
。 controller.
error
(e)-
使所控制的可读流出错,后续所有操作均会因给定错误e失败。
byobRequest
getter 步骤如下:
desiredSize
getter 步骤如下:
close()
方法步骤如下:
-
如果 this.[[closeRequested]] 为 true, 抛出
TypeError
异常。 -
如果 this.[[stream]].[[state]] 不为 "
readable
",抛出TypeError
异常。
enqueue(chunk)
方法步骤如下:
-
如果 chunk.[[ByteLength]] 为 0,抛出
TypeError
异常。 -
如果 chunk.[[ViewedArrayBuffer]].[[ByteLength]] 为 0,抛出
TypeError
异常。 -
如果 this.[[closeRequested]] 为 true, 抛出
TypeError
异常。 -
如果 this.[[stream]].[[state]] 不为 "
readable
",抛出TypeError
异常。 -
返回 ? ReadableByteStreamControllerEnqueue(this, chunk)。
error(e)
方法步骤如下:
-
执行 ! ReadableByteStreamControllerError(this, e)。
4.7.4. 内部方法
每个 ReadableByteStreamController
实例实现以下内部方法。
可读流实现会多态地调用这些方法,或调用默认控制器的对应方法,详见§ 4.9.2 控制器接口抽象操作。
-
执行 ! ReadableByteStreamControllerClearPendingPullIntos(this)。
-
执行 ! ResetQueue(this)。
-
令 result 为执行 this.[[cancelAlgorithm]],传入 reason 的结果。
-
返回 result。
-
令 stream 为 this.[[stream]]。
-
断言:! ReadableStreamHasDefaultReader(stream) 为 true。
-
如果 this.[[queueTotalSize]] > 0,
-
断言:! ReadableStreamGetNumReadRequests(stream) 为 0。
-
执行 ! ReadableByteStreamControllerFillReadRequestFromQueue(this, readRequest)。
-
返回。
-
-
令 autoAllocateChunkSize 为 this.[[autoAllocateChunkSize]]。
-
如果 autoAllocateChunkSize 不为 undefined,
-
令 buffer 为 Construct(
%ArrayBuffer%
, « autoAllocateChunkSize »)。 -
如果 buffer 是异常完成,
-
执行 readRequest 的 error steps,参数为 buffer.[[Value]]。
-
返回。
-
-
令 pullIntoDescriptor 为一个新的pull-into 描述符,内容为
- buffer
- buffer.[[Value]]
- buffer byte length
- autoAllocateChunkSize
- byte offset
- 0
- byte length
- autoAllocateChunkSize
- bytes filled
- 0
- minimum fill
- 1
- element size
- 1
- view constructor
%Uint8Array%
- reader type
- "
default
"
-
将 pullIntoDescriptor 添加到 this.[[pendingPullIntos]]。
-
-
执行 ! ReadableStreamAddReadRequest(stream, readRequest)。
-
如果 this.[[pendingPullIntos]] 非 空,
-
令 firstPendingPullInto 为 this.[[pendingPullIntos]][0]。
-
将 firstPendingPullInto 的 reader type 设为 "
none
"。 -
将 this.[[pendingPullIntos]] 设为仅包含 firstPendingPullInto 的列表 « firstPendingPullInto »。
-
4.8. ReadableStreamBYOBRequest
类
ReadableStreamBYOBRequest
类表示 ReadableByteStreamController
中的拉取(pull-into)请求。
4.8.1. 接口定义
ReadableStreamBYOBRequest
类的 Web IDL 定义如下:
[Exposed=*]interface {
ReadableStreamBYOBRequest readonly attribute ArrayBufferView ?view ;undefined respond ([EnforceRange ]unsigned long long );
bytesWritten undefined respondWithNewView (ArrayBufferView ); };
view
4.8.2. 内部插槽
ReadableStreamBYOBRequest
的实例拥有下表所描述的内部插槽:
内部插槽 | 描述(非规范性) |
---|---|
[[controller]] | 父 ReadableByteStreamController
实例
|
[[view]] | 一个typed array,表示控制器可写入生成数据的目标区域;BYOB 请求失效后为 null。 |
4.8.3. 方法与属性
view = byobRequest.
view
-
返回用于写入的数据视图,如果 BYOB 请求已经响应则为 null。
byobRequest.
respond
(bytesWritten)byobRequest.
respondWithNewView
(view)-
通知关联的可读字节流,底层字节源不再写入
view
, 而是提供一个新的ArrayBufferView
, 会返回给消费者。新 view 必须是和
view
相同底层内存的新视图,即其 buffer 必须等于(或为转移自)view
的 buffer。其byteOffset
必须等于view
的byteOffset
,其byteLength
(表示写入的字节数)必须小于等于view
的byteLength
。调用此方法后,view 会被转移,无法再被修改。
respond(bytesWritten)
方法步骤如下:
-
如果 this.[[controller]] 为 undefined,则抛出
TypeError
异常。 -
如果 ! IsDetachedBuffer(this.[[view]].[[ArrayBuffer]]) 为 true,则抛出
TypeError
异常。 -
执行 ? ReadableByteStreamControllerRespond(this.[[controller]], bytesWritten)。
respondWithNewView(view)
方法步骤如下:
-
如果 this.[[controller]] 为 undefined,则抛出
TypeError
异常。 -
如果 ! IsDetachedBuffer(view.[[ViewedArrayBuffer]]) 为 true, 则抛出
TypeError
异常。 -
返回 ? ReadableByteStreamControllerRespondWithNewView(this.[[controller]], view)。
4.9. 抽象操作
4.9.1. 与可读流协作
下列抽象操作以较高层次操作 ReadableStream
实例。
-
令 reader 为 新建的
ReadableStreamBYOBReader
。 -
执行 ? SetUpReadableStreamBYOBReader(reader, stream)。
-
返回 reader。
-
令 reader 为 新建的
ReadableStreamDefaultReader
。 -
执行 ? SetUpReadableStreamDefaultReader(reader, stream)。
-
返回 reader。
-
如果未传入 highWaterMark,则设为 1。
-
如果未传入 sizeAlgorithm,则设为一个返回 1 的算法。
-
断言:! IsNonNegativeNumber(highWaterMark) 为 true。
-
令 stream 为 新建的
ReadableStream
。 -
执行 ! InitializeReadableStream(stream)。
-
令 controller 为 新建的
ReadableStreamDefaultController
。 -
执行 ? SetUpReadableStreamDefaultController(stream, controller, startAlgorithm, pullAlgorithm, cancelAlgorithm, highWaterMark, sizeAlgorithm)。
-
返回 stream。
本抽象操作仅当传入的 startAlgorithm 抛出异常时才会抛出异常。
-
令 stream 为 新建的
ReadableStream
。 -
执行 ! InitializeReadableStream(stream)。
-
令 controller 为 新建的
ReadableByteStreamController
。 -
执行 ? SetUpReadableByteStreamController(stream, controller, startAlgorithm, pullAlgorithm, cancelAlgorithm, 0, undefined)。
-
返回 stream。
本抽象操作仅当传入的 startAlgorithm 抛出异常时才会抛出异常。
-
将 stream.[[state]] 设为 "
readable
"。 -
将 stream.[[reader]] 和 stream.[[storedError]] 设为 undefined。
-
将 stream.[[disturbed]] 设为 false。
-
如果 stream.[[reader]] 为 undefined,返回 false。
-
返回 true。
-
令 stream 为 undefined。
-
令 iteratorRecord 为 ? GetIterator(asyncIterable, async)。
-
令 startAlgorithm 为一个返回 undefined 的算法。
-
令 pullAlgorithm 为以下步骤:
-
令 nextResult 为 IteratorNext(iteratorRecord)。
-
如果 nextResult 是异常完成,返回 以 nextResult.[[Value]] 拒绝的 promise。
-
令 nextPromise 为 以 nextResult.[[Value]] 解决的 promise。
-
返回对 nextPromise 反应的结果,满足时执行如下步骤,参数 iterResult:
-
令 done 为 ? IteratorComplete(iterResult)。
-
如果 done 为 true:
-
执行 ! ReadableStreamDefaultControllerClose(stream.[[controller]])。
-
-
否则:
-
令 value 为 ? IteratorValue(iterResult)。
-
执行 ! ReadableStreamDefaultControllerEnqueue(stream.[[controller]], value)。
-
-
-
令 cancelAlgorithm 为以下步骤,参数 reason:
-
令 iterator 为 iteratorRecord.[[Iterator]]。
-
令 returnMethod 为 GetMethod(iterator, "
return
")。 -
如果 returnMethod 是异常完成,返回 以 returnMethod.[[Value]] 拒绝的 promise。
-
如果 returnMethod.[[Value]] 为 undefined,返回 以 undefined 解决的 promise。
-
令 returnResult 为 Call(returnMethod.[[Value]], iterator, « reason »)。
-
如果 returnResult 是异常完成,返回 以 returnResult.[[Value]] 拒绝的 promise。
-
令 returnPromise 为 以 returnResult.[[Value]] 解决的 promise。
-
返回对 returnPromise 反应的结果,满足时执行如下步骤,参数 iterResult:
-
-
将 stream 设为 ! CreateReadableStream(startAlgorithm, pullAlgorithm, cancelAlgorithm, 0)。
-
返回 stream。
-
断言:source 实现了
ReadableStream
。 -
断言:dest 实现了
WritableStream
。 -
断言:preventClose、preventAbort 和 preventCancel 都是布尔值。
-
如果未传入 signal,则令 signal 为 undefined。
-
断言:signal 要么为 undefined,要么 signal 实现了
AbortSignal
。 -
断言:! IsReadableStreamLocked(source) 为 false。
-
断言:! IsWritableStreamLocked(dest) 为 false。
-
如果 source.[[controller]] 实现了
ReadableByteStreamController
, 则令 reader 为 ! AcquireReadableStreamBYOBReader(source) 或 ! AcquireReadableStreamDefaultReader(source), 由 user agent 自行决定。 -
否则,令 reader 为 ! AcquireReadableStreamDefaultReader(source)。
-
令 writer 为 ! AcquireWritableStreamDefaultWriter(dest)。
-
将 source.[[disturbed]] 设为 true。
-
令 shuttingDown 为 false。
-
令 promise 为一个新的 promise。
-
如果 signal 不为 undefined,
-
令 abortAlgorithm 为以下步骤:
-
令 error 为 signal 的 中止原因。
-
令 actions 为一个空的有序集合。
-
如果 preventAbort 为 false,添加以下 action 到 actions 中:
-
如果 dest.[[state]] 为 "
writable
",返回 ! WritableStreamAbort(dest, error)。 -
否则,返回 以 undefined 解决的 promise。
-
-
如果 preventCancel 为 false,添加以下 action 到 actions 中:
-
如果 source.[[state]] 为 "
readable
",返回 ! ReadableStreamCancel(source, error)。 -
否则,返回 以 undefined 解决的 promise。
-
-
带有 action 的关闭,action 为等待所有 actions 的 promise,error 为 error。
-
-
如果 signal 已中止,执行 abortAlgorithm 并返回 promise。
-
添加 abortAlgorithm 到 signal。
-
-
并行 但实际上不是;参见 #905,使用 reader 和 writer,从 source 读取所有块并写入 dest。由于 reader 和 writer 提供的锁定机制,具体实现方式对作者代码不可见,因此有灵活性。无论实现方式如何,都必须遵循以下约束:
-
禁止使用公共 API: 在读取或写入,或执行下面的任何操作时,不得使用 JS 可修改的 reader、writer 和 stream API(即相关原型上的方法)。必须直接操作流对象。
-
必须执行背压:
-
当 WritableStreamDefaultWriterGetDesiredSize(writer) ≤ 0 或为 null 时,不得从 reader 读取。
-
如果 reader 是 BYOB reader,应以 WritableStreamDefaultWriterGetDesiredSize(writer) 作为读取块大小的参考。
读取过大或过小的块通常效率较低。可结合其他信息综合确定最佳块大小。
-
读取或写入不应因背压信号外的其他原因被延迟。
-
-
Shutdown 必须停止活动: 若 shuttingDown 变为 true,则不得再从 reader 读取,只能写入已读取的块。在每次读写前都要检查 shutdown 状态。
-
必须传播错误和关闭状态: 下列条件按顺序应用:
-
必须向前传播错误: 如果 source.[[state]] 为或变为 "
errored
",则:-
若 preventAbort 为 false,带有 action 的关闭,action 为 ! WritableStreamAbort(dest, source.[[storedError]]),error 为 source.[[storedError]]。
-
否则,关闭,error 为 source.[[storedError]]。
-
-
必须向后传播错误: 如果 dest.[[state]] 为或变为 "
errored
",则:-
若 preventCancel 为 false,带有 action 的关闭,action 为 ! ReadableStreamCancel(source, dest.[[storedError]]),error 为 dest.[[storedError]]。
-
否则,关闭,error 为 dest.[[storedError]]。
-
-
必须向前传播关闭: 如果 source.[[state]] 为或变为 "
closed
",则:-
若 preventClose 为 false,带有 action 的关闭,action 为 ! WritableStreamDefaultWriterCloseWithErrorPropagation(writer)。
-
否则,关闭。
-
-
必须向后传播关闭: 如果 ! WritableStreamCloseQueuedOrInFlight(dest) 为 true 或 dest.[[state]] 为 "
closed
", 则:-
断言:未读未写任何块。
-
令 destClosed 为新的
TypeError
。 -
若 preventCancel 为 false,带有 action 的关闭,action 为 ! ReadableStreamCancel(source, destClosed),error 为 destClosed。
-
否则,关闭,error 为 destClosed。
-
-
-
带有 action 的关闭:如有上述要求需要带有 action 的关闭,action 为 action,可选 error 为 originalError,则:
-
关闭:如有上述要求或步骤需要关闭,可选 error 为 error,则:
-
若 shuttingDown 为 true,终止以下子步骤。
-
将 shuttingDown 设为 true。
-
如果 dest.[[state]] 为 "
writable
" 且 ! WritableStreamCloseQueuedOrInFlight(dest) 为 false,-
若有已读但未写的块,写入 dest。
-
等待所有已读已写操作(即 promise 已 settle)。
-
-
finalize,传递 error(如有)。
-
-
finalize:上述两种关闭最终都会要求 finalize,可选 error error,步骤如下:
-
执行 ! WritableStreamDefaultWriterRelease(writer)。
-
如果 reader 实现了
ReadableStreamBYOBReader
, 执行 ! ReadableStreamBYOBReaderRelease(reader)。 -
否则,执行 ! ReadableStreamDefaultReaderRelease(reader)。
-
如果 signal 不为 undefined,移除 abortAlgorithm。
-
如果有 error,拒绝 promise,error 为 error。
-
否则,解决 promise,值为 undefined。
-
-
-
返回 promise。
此处执行的多种抽象操作包括对象创建(通常为 promise),按理需指定对象创建的 realm。但由于加锁,外部代码无法观察到这些对象,因此采用何种 realm 并不影响行为。
第二个参数 cloneForBranch2 控制原始流中的数据是否会在出现在返回的第二个分支前被克隆(使用 HTML 的 可序列化对象机制)。这对于两分支都会以会互相干扰的方式消费,例如转移其块的场景很有用。但这样会导致两个分支不对称,并限制块必须可序列化。[HTML]
如果 stream 是可读字节流,则 cloneForBranch2 被忽略,块始终无条件克隆。
在本标准中,ReadbleStreamTee 总是以 cloneForBranch2 为 false 调用;其他规范会通过 tee 包装算法传递 true。
执行以下步骤:
-
断言:stream 实现了
ReadableStream
。 -
断言:cloneForBranch2 为布尔值。
-
如果 stream.[[controller]] 实现了
ReadableByteStreamController
, 返回 ? ReadableByteStreamTee(stream)。 -
否则,返回 ? ReadableStreamDefaultTee(stream, cloneForBranch2)。
-
断言:stream 实现了
ReadableStream
。 -
断言:cloneForBranch2 为布尔值。
-
令 reader 为 ? AcquireReadableStreamDefaultReader(stream)。
-
令 reading 为 false。
-
令 readAgain 为 false。
-
令 canceled1 为 false。
-
令 canceled2 为 false。
-
令 reason1 为 undefined。
-
令 reason2 为 undefined。
-
令 branch1 为 undefined。
-
令 branch2 为 undefined。
-
令 cancelPromise 为一个新的 promise。
-
令 pullAlgorithm 为以下步骤:
-
如果 reading 为 true,
-
设 readAgain 为 true。
-
-
设 reading 为 true。
-
- chunk steps,参数 chunk
-
-
队列一个 microtask执行如下步骤:
-
设 readAgain 为 false。
-
令 chunk1 和 chunk2 皆为 chunk。
-
如果 canceled2 为 false 且 cloneForBranch2 为 true,
-
令 cloneResult 为 StructuredClone(chunk2)。
-
如果 cloneResult 是异常完成,
-
执行 ! ReadableStreamDefaultControllerError(branch1.[[controller]], cloneResult.[[Value]])。
-
执行 ! ReadableStreamDefaultControllerError(branch2.[[controller]], cloneResult.[[Value]])。
-
解决 cancelPromise,值为 ! ReadableStreamCancel(stream, cloneResult.[[Value]])。
-
返回。
-
-
否则,设 chunk2 为 cloneResult.[[Value]]。
-
-
如果 canceled1 为 false,执行 ! ReadableStreamDefaultControllerEnqueue(branch1.[[controller]], chunk1)。
-
如果 canceled2 为 false,执行 ! ReadableStreamDefaultControllerEnqueue(branch2.[[controller]], chunk2)。
-
设 reading 为 false。
-
如果 readAgain 为 true,执行 pullAlgorithm。
-
这里的 microtask 延迟是必要的,因为要检测错误至少需要一个 microtask(见下文 reader.[[closedPromise]])。 这样才能保证流的错误会立即传递到两个分支,而不会先发生成功同步读取。
-
- close steps
-
-
设 reading 为 false。
-
如果 canceled1 为 false,执行 ! ReadableStreamDefaultControllerClose(branch1.[[controller]])。
-
如果 canceled2 为 false,执行 ! ReadableStreamDefaultControllerClose(branch2.[[controller]])。
-
如果 canceled1 为 false 或 canceled2 为 false,解决 cancelPromise,值为 undefined。
-
- error steps
-
-
设 reading 为 false。
-
-
执行 ! ReadableStreamDefaultReaderRead(reader, readRequest)。
-
-
令 cancel1Algorithm 为以下步骤,参数 reason:
-
设 canceled1 为 true。
-
设 reason1 为 reason。
-
如果 canceled2 为 true,
-
令 compositeReason 为 ! CreateArrayFromList(« reason1, reason2 »)。
-
令 cancelResult 为 ! ReadableStreamCancel(stream, compositeReason)。
-
解决 cancelPromise,值为 cancelResult。
-
-
返回 cancelPromise。
-
-
令 cancel2Algorithm 为以下步骤,参数 reason:
-
设 canceled2 为 true。
-
设 reason2 为 reason。
-
如果 canceled1 为 true,
-
令 compositeReason 为 ! CreateArrayFromList(« reason1, reason2 »)。
-
令 cancelResult 为 ! ReadableStreamCancel(stream, compositeReason)。
-
解决 cancelPromise,值为 cancelResult。
-
-
返回 cancelPromise。
-
-
令 startAlgorithm 为一个返回 undefined 的算法。
-
设 branch1 为 ! CreateReadableStream(startAlgorithm, pullAlgorithm, cancel1Algorithm)。
-
设 branch2 为 ! CreateReadableStream(startAlgorithm, pullAlgorithm, cancel2Algorithm)。
-
当拒绝 reader.[[closedPromise]],原因为 r,
-
执行 ! ReadableStreamDefaultControllerError(branch1.[[controller]], r)。
-
执行 ! ReadableStreamDefaultControllerError(branch2.[[controller]], r)。
-
如果 canceled1 为 false 或 canceled2 为 false,解决 cancelPromise,值为 undefined。
-
-
返回 « branch1, branch2 »。
-
断言:stream 实现了
ReadableStream
。 -
断言:stream.[[controller]] 实现了
ReadableByteStreamController
。 -
令 reader 为 ? AcquireReadableStreamDefaultReader(stream)。
-
令 reading 为 false。
-
令 readAgainForBranch1 为 false。
-
令 readAgainForBranch2 为 false。
-
令 canceled1 为 false。
-
令 canceled2 为 false。
-
令 reason1 为 undefined。
-
令 reason2 为 undefined。
-
令 branch1 为 undefined。
-
令 branch2 为 undefined。
-
令 cancelPromise 为一个新的 promise。
-
令 forwardReaderError 为下述步骤,参数 thisReader:
-
当拒绝 thisReader.[[closedPromise]],原因为 r,
-
如果 thisReader 不等于 reader,返回。
-
执行 ! ReadableByteStreamControllerError(branch1.[[controller]], r)。
-
执行 ! ReadableByteStreamControllerError(branch2.[[controller]], r)。
-
若 canceled1 为 false 或 canceled2 为 false,解决 cancelPromise,值为 undefined。
-
-
-
令 pullWithDefaultReader 为下述步骤:
-
如果 reader 实现了
ReadableStreamBYOBReader
,-
断言:reader.[[readIntoRequests]] 为空。
-
执行 ! ReadableStreamBYOBReaderRelease(reader)。
-
设 reader 为 ! AcquireReadableStreamDefaultReader(stream)。
-
执行 forwardReaderError,参数 reader。
-
-
- chunk steps,参数 chunk
-
-
队列一个 microtask,执行以下步骤:
-
设 readAgainForBranch1 和 readAgainForBranch2 均为 false。
-
令 chunk1 和 chunk2 都为 chunk。
-
若 canceled1 和 canceled2 均为 false,
-
令 cloneResult 为 CloneAsUint8Array(chunk)。
-
若 cloneResult 为异常完成,
-
执行 ! ReadableByteStreamControllerError(branch1.[[controller]], cloneResult.[[Value]])。
-
执行 ! ReadableByteStreamControllerError(branch2.[[controller]], cloneResult.[[Value]])。
-
解决 cancelPromise,值为 ! ReadableStreamCancel(stream, cloneResult.[[Value]])。
-
返回。
-
-
否则,设 chunk2 为 cloneResult.[[Value]]。
-
-
若 canceled1 为 false,执行 ! ReadableByteStreamControllerEnqueue(branch1.[[controller]], chunk1)。
-
若 canceled2 为 false,执行 ! ReadableByteStreamControllerEnqueue(branch2.[[controller]], chunk2)。
-
设 reading 为 false。
-
若 readAgainForBranch1 为 true,执行 pull1Algorithm。
-
否则如 readAgainForBranch2 为 true,执行 pull2Algorithm。
-
此处 microtask 延迟是必要的,因为要检测错误至少需要一个 microtask,见下文 reader.[[closedPromise]]。 这样可保证流错误立即传递到两个分支,而不会让同步读取领先于异步错误。
-
- close steps
-
-
设 reading 为 false。
-
若 canceled1 为 false,执行 ! ReadableByteStreamControllerClose(branch1.[[controller]])。
-
若 canceled2 为 false,执行 ! ReadableByteStreamControllerClose(branch2.[[controller]])。
-
若 branch1.[[controller]].[[pendingPullIntos]] 非空,执行 ! ReadableByteStreamControllerRespond(branch1.[[controller]], 0)。
-
若 branch2.[[controller]].[[pendingPullIntos]] 非空,执行 ! ReadableByteStreamControllerRespond(branch2.[[controller]], 0)。
-
若 canceled1 为 false 或 canceled2 为 false,解决 cancelPromise,值为 undefined。
-
- error steps
-
-
设 reading 为 false。
-
-
执行 ! ReadableStreamDefaultReaderRead(reader, readRequest)。
-
-
令 pullWithBYOBReader 为下述步骤,参数 view 和 forBranch2:
-
如果 reader 实现了
ReadableStreamDefaultReader
,-
断言:reader.[[readRequests]] 为空。
-
执行 ! ReadableStreamDefaultReaderRelease(reader)。
-
设 reader 为 ! AcquireReadableStreamBYOBReader(stream)。
-
执行 forwardReaderError,参数 reader。
-
-
令 byobBranch 为 branch2 若 forBranch2 为 true,否则为 branch1。
-
令 otherBranch 为 branch2 若 forBranch2 为 false,否则为 branch1。
-
令 readIntoRequest 为一个read-into 请求,包含如下项:
- chunk steps,参数 chunk
-
-
队列一个 microtask,执行以下步骤:
-
设 readAgainForBranch1、readAgainForBranch2 均为 false。
-
令 byobCanceled 为 canceled2 若 forBranch2 为 true,否则为 canceled1。
-
令 otherCanceled 为 canceled2 若 forBranch2 为 false,否则为 canceled1。
-
若 otherCanceled 为 false,
-
令 cloneResult 为 CloneAsUint8Array(chunk)。
-
若 cloneResult 为异常完成,
-
执行 ! ReadableByteStreamControllerError(byobBranch.[[controller]], cloneResult.[[Value]])。
-
执行 ! ReadableByteStreamControllerError(otherBranch.[[controller]], cloneResult.[[Value]])。
-
解决 cancelPromise,值为 ! ReadableStreamCancel(stream, cloneResult.[[Value]])。
-
返回。
-
-
否则,令 clonedChunk 为 cloneResult.[[Value]]。
-
若 byobCanceled 为 false,执行 ! ReadableByteStreamControllerRespondWithNewView(byobBranch.[[controller]], chunk)。
-
执行 ! ReadableByteStreamControllerEnqueue(otherBranch.[[controller]], clonedChunk)。
-
-
否则如 byobCanceled 为 false,执行 ! ReadableByteStreamControllerRespondWithNewView(byobBranch.[[controller]], chunk)。
-
设 reading 为 false。
-
若 readAgainForBranch1 为 true,执行 pull1Algorithm。
-
否则如 readAgainForBranch2 为 true,执行 pull2Algorithm。
-
此处 microtask 延迟是必要的,因为要检测错误至少需要一个 microtask,见下文 reader.[[closedPromise]]。 这样可保证流错误立即传递到两个分支,而不会让同步读取领先于异步错误。
-
- close steps,参数 chunk
-
-
设 reading 为 false。
-
令 byobCanceled 为 canceled2 若 forBranch2 为 true,否则为 canceled1。
-
令 otherCanceled 为 canceled2 若 forBranch2 为 false,否则为 canceled1。
-
若 byobCanceled 为 false,执行 ! ReadableByteStreamControllerClose(byobBranch.[[controller]])。
-
若 otherCanceled 为 false,执行 ! ReadableByteStreamControllerClose(otherBranch.[[controller]])。
-
如果 chunk 不为 undefined,
-
断言:chunk.[[ByteLength]] 为 0。
-
若 byobCanceled 为 false,执行 ! ReadableByteStreamControllerRespondWithNewView(byobBranch.[[controller]], chunk)。
-
若 otherCanceled 为 false 且 otherBranch.[[controller]].[[pendingPullIntos]] 非空,执行 ! ReadableByteStreamControllerRespond(otherBranch.[[controller]], 0)。
-
-
若 byobCanceled 为 false 或 otherCanceled 为 false, 解决 cancelPromise,值为 undefined。
-
- error steps
-
-
设 reading 为 false。
-
-
执行 ! ReadableStreamBYOBReaderRead(reader, view, 1, readIntoRequest)。
-
-
令 pull1Algorithm 为下述步骤:
-
若 reading 为 true,
-
设 readAgainForBranch1 为 true。
-
-
设 reading 为 true。
-
令 byobRequest 为 ! ReadableByteStreamControllerGetBYOBRequest(branch1.[[controller]])。
-
若 byobRequest 为 null,执行 pullWithDefaultReader。
-
否则,执行 pullWithBYOBReader,参数为 byobRequest.[[view]] 和 false。
-
-
令 pull2Algorithm 为下述步骤:
-
若 reading 为 true,
-
设 readAgainForBranch2 为 true。
-
-
设 reading 为 true。
-
令 byobRequest 为 ! ReadableByteStreamControllerGetBYOBRequest(branch2.[[controller]])。
-
若 byobRequest 为 null,执行 pullWithDefaultReader。
-
否则,执行 pullWithBYOBReader,参数为 byobRequest.[[view]] 和 true。
-
-
令 cancel1Algorithm 为下述步骤,参数 reason:
-
设 canceled1 为 true。
-
设 reason1 为 reason。
-
若 canceled2 为 true,
-
令 compositeReason 为 ! CreateArrayFromList(« reason1, reason2 »)。
-
令 cancelResult 为 ! ReadableStreamCancel(stream, compositeReason)。
-
解决 cancelPromise,值为 cancelResult。
-
-
返回 cancelPromise。
-
-
令 cancel2Algorithm 为下述步骤,参数 reason:
-
设 canceled2 为 true。
-
设 reason2 为 reason。
-
若 canceled1 为 true,
-
令 compositeReason 为 ! CreateArrayFromList(« reason1, reason2 »)。
-
令 cancelResult 为 ! ReadableStreamCancel(stream, compositeReason)。
-
解决 cancelPromise,值为 cancelResult。
-
-
返回 cancelPromise。
-
-
令 startAlgorithm 为一个返回 undefined 的算法。
-
设 branch1 为 ! CreateReadableByteStream(startAlgorithm, pull1Algorithm, cancel1Algorithm)。
-
设 branch2 为 ! CreateReadableByteStream(startAlgorithm, pull2Algorithm, cancel2Algorithm)。
-
执行 forwardReaderError,参数 reader。
-
返回 « branch1, branch2 »。
4.9.2. 与控制器的接口
在规范结构上,ReadableStream
类将简单可读流和可读字节流的行为封装到一个类中,主要通过将大部分可能变化的逻辑集中到两个控制器类:ReadableStreamDefaultController
和 ReadableByteStreamController
。
这两个类定义了大多数有状态的内部插槽和抽象操作,来管理流的内部队列及与底层源或底层字节源的接口。
每个控制器类都定义了三个内部方法,由ReadableStream
的算法调用:
- [[CancelSteps]](reason)
- 控制器在流被取消时运行的步骤,用于清理控制器存储的状态,并通知底层源。
- [[PullSteps]](readRequest)
- 控制器在默认 reader被读取时运行的步骤,用于从控制器拉取排队的块,或从底层源拉取更多块。
- [[ReleaseSteps]]()
- 控制器在reader被释放时运行的步骤,用于清理控制器中 reader 相关的资源。
(这些定义为内部方法而非抽象操作,因此 ReadableStream
算法可以多态调用,无需判断控制器类型。)
本节剩余部分是控制器反向调用流对象的抽象操作:用于控制器实现影响其关联的 ReadableStream
,将控制器的内部状态变化反映到开发者可见的 public API。
-
断言:stream.[[reader]] 实现了
ReadableStreamBYOBReader
。 -
断言:stream.[[state]] 为 "
readable
" 或 "closed
"。 -
追加 readRequest 到 stream.[[reader]].[[readIntoRequests]]。
-
断言:stream.[[reader]] 实现了
ReadableStreamDefaultReader
。 -
断言:stream.[[state]] 为 "
readable
"。 -
追加 readRequest 到 stream.[[reader]].[[readRequests]]。
-
将 stream.[[disturbed]] 设为 true。
-
如果 stream.[[state]] 为 "
closed
",返回 以 undefined 解决的 promise。 -
如果 stream.[[state]] 为 "
errored
",返回 以 stream.[[storedError]] 拒绝的 promise。 -
执行 ! ReadableStreamClose(stream)。
-
令 reader 为 stream.[[reader]]。
-
如果 reader 不为 undefined 且 reader 实现了
ReadableStreamBYOBReader
,-
令 readIntoRequests 为 reader.[[readIntoRequests]]。
-
将 reader.[[readIntoRequests]] 设为空 列表。
-
对于每个 readIntoRequest 属于 readIntoRequests,
-
以 undefined 作为参数,执行 readIntoRequest 的 close steps。
-
-
-
令 sourceCancelPromise 为 ! stream.[[controller]].[[CancelSteps]](reason)。
-
返回 对 sourceCancelPromise 的反应,fulfilled 时返回 undefined。
-
断言:stream.[[state]] 为 "
readable
"。 -
将 stream.[[state]] 设为 "
closed
"。 -
令 reader 为 stream.[[reader]]。
-
如果 reader 未定义,返回。
-
解决 reader.[[closedPromise]],值为 undefined。
-
如果 reader 实现了
ReadableStreamDefaultReader
,-
令 readRequests 为 reader.[[readRequests]]。
-
将 reader.[[readRequests]] 设为空 列表。
-
对于每个 readRequest 属于 readRequests,
-
执行 readRequest 的 close steps。
-
-
-
断言:stream.[[state]] 为 "
readable
"。 -
将 stream.[[state]] 设为 "
errored
"。 -
将 stream.[[storedError]] 设为 e。
-
令 reader 为 stream.[[reader]]。
-
如果 reader 未定义,返回。
-
拒绝 reader.[[closedPromise]],原因为 e。
-
将 reader.[[closedPromise]].[[PromiseIsHandled]] 设为 true。
-
如果 reader 实现了
ReadableStreamDefaultReader
,-
执行 !ReadableStreamDefaultReaderErrorReadRequests(reader, e)。
-
-
否则,
-
断言:reader 实现了
ReadableStreamBYOBReader
。 -
执行 !ReadableStreamBYOBReaderErrorReadIntoRequests(reader, e)。
-
-
断言:!ReadableStreamHasBYOBReader(stream) 为 true。
-
令 reader 为 stream.[[reader]]。
-
断言:reader.[[readIntoRequests]] 不为 空。
-
令 readIntoRequest 为 reader.[[readIntoRequests]][0]。
-
移除 readIntoRequest 从 reader.[[readIntoRequests]]。
-
如果 done 为 true,执行 readIntoRequest 的 close steps,参数为 chunk。
-
否则,执行 readIntoRequest 的 chunk steps,参数为 chunk。
-
断言:!ReadableStreamHasDefaultReader(stream) 为 true。
-
令 reader 为 stream.[[reader]]。
-
断言:reader.[[readRequests]] 不为 空。
-
令 readRequest 为 reader.[[readRequests]][0]。
-
移除 readRequest 从 reader.[[readRequests]]。
-
如果 done 为 true,执行 readRequest 的 close steps。
-
否则,执行 readRequest 的 chunk steps,参数为 chunk。
-
断言:!ReadableStreamHasBYOBReader(stream) 为 true。
-
返回 stream.[[reader]].[[readIntoRequests]] 的 大小。
-
断言:!ReadableStreamHasDefaultReader(stream) 为 true。
-
返回 stream.[[reader]].[[readRequests]] 的 大小。
-
令 reader 为 stream.[[reader]]。
-
如果 reader 未定义,返回 false。
-
如果 reader 实现了
ReadableStreamBYOBReader
,返回 true。 -
返回 false。
-
令 reader 为 stream.[[reader]]。
-
如果 reader 未定义,返回 false。
-
如果 reader 实现了
ReadableStreamDefaultReader
,返回 true。 -
返回 false。
4.9.3. 读者(Readers)
以下抽象操作用于支持 ReadableStreamDefaultReader
和 ReadableStreamBYOBReader
实例的实现和操作。
-
令 stream 为 reader.[[stream]]。
-
断言:stream 不为 undefined。
-
返回 !ReadableStreamCancel(stream, reason)。
-
将 reader.[[stream]] 设为 stream。
-
将 stream.[[reader]] 设为 reader。
-
如果 stream.[[state]] 为 "
readable
",-
将 reader.[[closedPromise]] 设为新的 promise。
-
-
否则,如果 stream.[[state]] 为 "
closed
",-
将 reader.[[closedPromise]] 设为以 undefined 解决的 promise。
-
-
否则,
-
断言:stream.[[state]] 为 "
errored
"。 -
将 reader.[[closedPromise]] 设置为 一个被拒绝的 promise, 原因为 stream.[[storedError]]。
-
将 reader.[[closedPromise]].[[PromiseIsHandled]] 设为 true。
-
-
令 stream 为 reader.[[stream]]。
-
断言:stream 不为 undefined。
-
断言:stream.[[reader]] 为 reader。
-
如果 stream.[[state]] 为 "
readable
",拒绝 reader.[[closedPromise]],原因为TypeError
异常。 -
否则,将 reader.[[closedPromise]] 设置为 一个被拒绝的 promise, 原因为
TypeError
异常。 -
将 reader.[[closedPromise]].[[PromiseIsHandled]] 设为 true。
-
执行 !stream.[[controller]].[[ReleaseSteps]]()。
-
将 stream.[[reader]] 设为 undefined。
-
将 reader.[[stream]] 设为 undefined。
-
令 readIntoRequests 为 reader.[[readIntoRequests]]。
-
将 reader.[[readIntoRequests]] 设为一个新的空 列表。
-
对于每个 readIntoRequest 属于 readIntoRequests,
-
以 e 作为参数,执行 readIntoRequest 的 error steps。
-
-
令 stream 为 reader.[[stream]]。
-
断言:stream 不为 undefined。
-
将 stream.[[disturbed]] 设为 true。
-
如果 stream.[[state]] 为 "
errored
",以 stream.[[storedError]] 作为参数,执行 readIntoRequest 的 error steps。 -
否则,执行 !ReadableByteStreamControllerPullInto(stream.[[controller]], view, min, readIntoRequest)。
-
执行 !ReadableStreamReaderGenericRelease(reader)。
-
令 e 为一个新的
TypeError
异常。 -
执行 !ReadableStreamBYOBReaderErrorReadIntoRequests(reader, e)。
-
令 readRequests 为 reader.[[readRequests]]。
-
将 reader.[[readRequests]] 设为新的空 列表。
-
对于每个 readRequest 属于 readRequests,
-
以 e 作为参数,执行 readRequest 的 error steps。
-
-
令 stream 为 reader.[[stream]]。
-
断言:stream 不为 undefined。
-
将 stream.[[disturbed]] 设为 true。
-
如果 stream.[[state]] 为 "
closed
",执行 readRequest 的 close steps。 -
否则,如果 stream.[[state]] 为 "
errored
",以 stream.[[storedError]] 作为参数,执行 readRequest 的 error steps。 -
否则,
-
断言:stream.[[state]] 为 "
readable
"。 -
执行 !stream.[[controller]].[[PullSteps]](readRequest)。
-
-
执行 !ReadableStreamReaderGenericRelease(reader)。
-
令 e 为一个新的
TypeError
异常。 -
执行 !ReadableStreamDefaultReaderErrorReadRequests(reader, e)。
-
如果 !IsReadableStreamLocked(stream) 为 true,抛出
TypeError
异常。 -
如果 stream.[[controller]] 未实现
ReadableByteStreamController
, 抛出TypeError
异常。 -
执行 !ReadableStreamReaderGenericInitialize(reader, stream)。
-
将 reader.[[readIntoRequests]] 设为新的空 列表。
-
如果 !IsReadableStreamLocked(stream) 为 true,抛出
TypeError
异常。 -
执行 !ReadableStreamReaderGenericInitialize(reader, stream)。
-
将 reader.[[readRequests]] 设为新的空 列表。
4.9.4. 默认控制器
以下抽象操作用于支持 ReadableStreamDefaultController
类的实现。
-
令 shouldPull 为 !ReadableStreamDefaultControllerShouldCallPull(controller)。
-
如果 shouldPull 为 false,返回。
-
如果 controller.[[pulling]] 为 true,
-
将 controller.[[pullAgain]] 设为 true。
-
返回。
-
-
断言:controller.[[pullAgain]] 为 false。
-
将 controller.[[pulling]] 设为 true。
-
令 pullPromise 为执行 controller.[[pullAlgorithm]] 的结果。
-
-
将 controller.[[pulling]] 设为 false。
-
如果 controller.[[pullAgain]] 为 true,
-
将 controller.[[pullAgain]] 设为 false。
-
执行 !ReadableStreamDefaultControllerCallPullIfNeeded(controller)。
-
-
-
当 pullPromise 被拒绝且原因为 e 时,
-
执行 !ReadableStreamDefaultControllerError(controller, e)。
-
-
令 stream 为 controller.[[stream]]。
-
如果 !ReadableStreamDefaultControllerCanCloseOrEnqueue(controller) 为 false,返回 false。
-
如果 controller.[[started]] 为 false,返回 false。
-
如果 !IsReadableStreamLocked(stream) 为 true 且 !ReadableStreamGetNumReadRequests(stream) > 0,返回 true。
-
令 desiredSize 为 !ReadableStreamDefaultControllerGetDesiredSize(controller)。
-
断言:desiredSize 不为 null。
-
如果 desiredSize > 0,返回 true。
-
返回 false。
ReadableStream
仍被引用,也允许底层源对象被垃圾回收。
这可以通过弱引用观察到。详细内容见 tc39/proposal-weakrefs#31。
执行以下步骤:
-
将 controller.[[pullAlgorithm]] 设为 undefined。
-
将 controller.[[cancelAlgorithm]] 设为 undefined。
-
将 controller.[[strategySizeAlgorithm]] 设为 undefined。
-
如果 !ReadableStreamDefaultControllerCanCloseOrEnqueue(controller) 为 false,返回。
-
令 stream 为 controller.[[stream]]。
-
将 controller.[[closeRequested]] 设为 true。
-
-
执行 !ReadableStreamDefaultControllerClearAlgorithms(controller)。
-
执行 !ReadableStreamClose(stream)。
-
-
如果 !ReadableStreamDefaultControllerCanCloseOrEnqueue(controller) 为 false,返回。
-
令 stream 为 controller.[[stream]]。
-
如果 !IsReadableStreamLocked(stream) 为 true 且 !ReadableStreamGetNumReadRequests(stream) > 0,执行 !ReadableStreamFulfillReadRequest(stream, chunk, false)。
-
否则,
-
令 result 为执行 controller.[[strategySizeAlgorithm]],参数为 chunk 并将结果视为 completion record。
-
如果 result 为异常完成,
-
执行 !ReadableStreamDefaultControllerError(controller, result.[[Value]])。
-
返回 result。
-
-
令 chunkSize 为 result.[[Value]]。
-
令 enqueueResult 为 EnqueueValueWithSize(controller, chunk, chunkSize)。
-
如果 enqueueResult 为异常完成,
-
执行 !ReadableStreamDefaultControllerError(controller, enqueueResult.[[Value]])。
-
返回 enqueueResult。
-
-
-
执行 !ReadableStreamDefaultControllerCallPullIfNeeded(controller)。
-
令 stream 为 controller.[[stream]]。
-
如果 stream.[[state]] 不为 "
readable
",返回。 -
执行 !ResetQueue(controller)。
-
执行 !ReadableStreamDefaultControllerClearAlgorithms(controller)。
-
执行 !ReadableStreamError(stream, e)。
-
令 state 为 controller.[[stream]].[[state]]。
-
如果 state 为 "
errored
",返回 null。 -
如果 state 为 "
closed
",返回 0。 -
返回 controller.[[strategyHWM]] − controller.[[queueTotalSize]]。
TransformStream
的实现。
执行以下步骤:
-
如果 !ReadableStreamDefaultControllerShouldCallPull(controller) 为 true,返回 false。
-
否则,返回 true。
-
令 state 为 controller.[[stream]].[[state]]。
-
如果 controller.[[closeRequested]] 为 false 且 state 为 "
readable
",返回 true。 -
否则,返回 false。
当 controller.[[closeRequested]] 为 false,但
state 不为 "readable
" 时,可能是因为流被 controller.error()
置为错误,或者流被关闭但未调用其控制器的 controller.close()
方法,例如通过调用 stream.cancel()
关闭流。
-
断言:stream.[[controller]] 未定义。
-
将 controller.[[stream]] 设为 stream。
-
执行 !ResetQueue(controller)。
-
将 controller.[[started]]、 controller.[[closeRequested]]、 controller.[[pullAgain]] 和 controller.[[pulling]] 设为 false。
-
将 controller.[[strategySizeAlgorithm]] 设为 sizeAlgorithm,controller.[[strategyHWM]] 设为 highWaterMark。
-
将 controller.[[pullAlgorithm]] 设为 pullAlgorithm。
-
将 controller.[[cancelAlgorithm]] 设为 cancelAlgorithm。
-
将 stream.[[controller]] 设为 controller。
-
令 startResult 为执行 startAlgorithm 的结果。(此操作可能抛出异常。)
-
令 startPromise 为 以 startResult 解决的 promise。
-
-
将 controller.[[started]] 设为 true。
-
断言:controller.[[pulling]] 为 false。
-
断言:controller.[[pullAgain]] 为 false。
-
执行 !ReadableStreamDefaultControllerCallPullIfNeeded(controller)。
-
-
-
执行 !ReadableStreamDefaultControllerError(controller, r)。
-
-
令 controller 为 新的
ReadableStreamDefaultController
。 -
令 startAlgorithm 为一个返回 undefined 的算法。
-
令 pullAlgorithm 为一个返回 以 undefined 解决的 promise 的算法。
-
令 cancelAlgorithm 为一个返回 以 undefined 解决的 promise 的算法。
-
如果 underlyingSourceDict["
start
"] 存在,则将 startAlgorithm 设为一个算法,其返回 调用 underlyingSourceDict["start
"], 参数为 « controller »,回调 this 值为 underlyingSource 的结果。 -
如果 underlyingSourceDict["
pull
"] 存在,则将 pullAlgorithm 设为一个算法,其返回 调用 underlyingSourceDict["pull
"], 参数为 « controller »,回调 this 值为 underlyingSource 的结果。 -
如果 underlyingSourceDict["
cancel
"] 存在,则将 cancelAlgorithm 设为一个接收参数 reason 并返回 调用 underlyingSourceDict["cancel
"], 参数为 « reason »,回调 this 值为 underlyingSource 的结果的算法。 -
执行 ?SetUpReadableStreamDefaultController(stream, controller, startAlgorithm, pullAlgorithm, cancelAlgorithm, highWaterMark, sizeAlgorithm)。
4.9.5. 字节流控制器
-
令 shouldPull 为 !ReadableByteStreamControllerShouldCallPull(controller)。
-
如果 shouldPull 为 false,返回。
-
如果 controller.[[pulling]] 为 true,
-
将 controller.[[pullAgain]] 设为 true。
-
返回。
-
-
断言:controller.[[pullAgain]] 为 false。
-
将 controller.[[pulling]] 设为 true。
-
令 pullPromise 为执行 controller.[[pullAlgorithm]] 的结果。
-
-
将 controller.[[pulling]] 设为 false。
-
如果 controller.[[pullAgain]] 为 true,
-
将 controller.[[pullAgain]] 设为 false。
-
执行 !ReadableByteStreamControllerCallPullIfNeeded(controller)。
-
-
-
-
执行 !ReadableByteStreamControllerError(controller, e)。
-
ReadableStream
仍被引用,也允许底层字节源对象被垃圾回收。
这可以通过弱引用观察到。详细内容见 tc39/proposal-weakrefs#31。
执行以下步骤:
-
将 controller.[[pullAlgorithm]] 设为 undefined。
-
将 controller.[[cancelAlgorithm]] 设为 undefined。
-
执行 !ReadableByteStreamControllerInvalidateBYOBRequest(controller)。
-
将 controller.[[pendingPullIntos]] 设为新的空 列表。
-
令 stream 为 controller.[[stream]]。
-
如果 controller.[[closeRequested]] 为 true 或 stream.[[state]] 不为 "
readable
",返回。 -
如果 controller.[[queueTotalSize]] > 0,
-
将 controller.[[closeRequested]] 设为 true。
-
返回。
-
-
如果 controller.[[pendingPullIntos]] 不为空,
-
令 firstPendingPullInto 为 controller.[[pendingPullIntos]][0]。
-
如果 firstPendingPullInto 的 bytes filled 除以 firstPendingPullInto 的 element size 的余数不为 0,
-
令 e 为一个新的
TypeError
异常。 -
执行 !ReadableByteStreamControllerError(controller, e)。
-
抛出 e。
-
-
-
执行 !ReadableByteStreamControllerClearAlgorithms(controller)。
-
执行 !ReadableStreamClose(stream)。
-
断言:stream.[[state]] 不为 "
errored
"。 -
断言:pullIntoDescriptor.reader type 不为 "
none
"。 -
令 done 为 false。
-
如果 stream.[[state]] 为 "
closed
",-
断言:pullIntoDescriptor 的 bytes filled 除以 pullIntoDescriptor 的 element size 的余数为 0。
-
将 done 设为 true。
-
-
令 filledView 为 !ReadableByteStreamControllerConvertPullIntoDescriptor(pullIntoDescriptor)。
-
如果 pullIntoDescriptor 的 reader type 为 "
default
",-
执行 !ReadableStreamFulfillReadRequest(stream, filledView, done)。
-
-
否则,
-
断言:pullIntoDescriptor 的 reader type 为 "
byob
"。 -
执行 !ReadableStreamFulfillReadIntoRequest(stream, filledView, done)。
-
-
令 bytesFilled 为 pullIntoDescriptor 的 bytes filled。
-
令 elementSize 为 pullIntoDescriptor 的 element size。
-
断言:bytesFilled ≤ pullIntoDescriptor 的 byte length。
-
断言:bytesFilled 除以 elementSize 的余数为 0。
-
令 buffer 为 !TransferArrayBuffer(pullIntoDescriptor 的 buffer)。
-
返回 !Construct(pullIntoDescriptor 的 view constructor,« buffer, pullIntoDescriptor 的 byte offset, bytesFilled ÷ elementSize »)。
-
令 stream 为 controller.[[stream]]。
-
如果 controller.[[closeRequested]] 为 true 或 stream.[[state]] 不为 "
readable
",返回。 -
令 buffer 为 chunk.[[ViewedArrayBuffer]]。
-
令 byteOffset 为 chunk.[[ByteOffset]]。
-
令 byteLength 为 chunk.[[ByteLength]]。
-
如果 !IsDetachedBuffer(buffer) 为 true,抛出
TypeError
异常。 -
令 transferredBuffer 为 ?TransferArrayBuffer(buffer)。
-
如果 controller.[[pendingPullIntos]] 不为 空,
-
令 firstPendingPullInto 为 controller.[[pendingPullIntos]][0]。
-
如果 !IsDetachedBuffer(firstPendingPullInto 的 buffer) 为 true,抛出
TypeError
异常。 -
执行 !ReadableByteStreamControllerInvalidateBYOBRequest(controller)。
-
将 firstPendingPullInto 的 buffer 设为 !TransferArrayBuffer(firstPendingPullInto 的 buffer)。
-
如果 firstPendingPullInto 的 reader type 为 "
none
",执行 ?ReadableByteStreamControllerEnqueueDetachedPullIntoToQueue(controller, firstPendingPullInto)。
-
-
如果 !ReadableStreamHasDefaultReader(stream) 为 true,
-
执行 !ReadableByteStreamControllerProcessReadRequestsUsingQueue(controller)。
-
如果 !ReadableStreamGetNumReadRequests(stream) 为 0,
-
断言:controller.[[pendingPullIntos]] 为 空。
-
执行 !ReadableByteStreamControllerEnqueueChunkToQueue(controller, transferredBuffer, byteOffset, byteLength)。
-
-
否则,
-
如果 controller.[[pendingPullIntos]] 不为 空,
-
断言:controller.[[pendingPullIntos]][0] 的 reader type 为 "
default
"。 -
执行 !ReadableByteStreamControllerShiftPendingPullInto(controller)。
-
-
令 transferredView 为 !Construct(
%Uint8Array%
, « transferredBuffer, byteOffset, byteLength »)。 -
执行 !ReadableStreamFulfillReadRequest(stream, transferredView, false)。
-
-
否则,如果 !ReadableStreamHasBYOBReader(stream) 为 true,
-
执行 !ReadableByteStreamControllerEnqueueChunkToQueue(controller, transferredBuffer, byteOffset, byteLength)。
-
令 filledPullIntos 为执行 !ReadableByteStreamControllerProcessPullIntoDescriptorsUsingQueue(controller) 的结果。
-
对于每个 filledPullInto 属于 filledPullIntos,
-
执行 !ReadableByteStreamControllerCommitPullIntoDescriptor(stream, filledPullInto)。
-
-
-
否则,
-
断言:!IsReadableStreamLocked(stream) 为 false。
-
执行 !ReadableByteStreamControllerEnqueueChunkToQueue(controller, transferredBuffer, byteOffset, byteLength)。
-
-
执行 !ReadableByteStreamControllerCallPullIfNeeded(controller)。
-
追加 一个带有 buffer buffer、byte offset byteOffset 和 byte length byteLength 的新的 readable byte stream queue entry 到 controller.[[queue]]。
-
将 controller.[[queueTotalSize]] 设为 controller.[[queueTotalSize]] + byteLength。
-
令 cloneResult 为 CloneArrayBuffer(buffer, byteOffset, byteLength,
%ArrayBuffer%
)。 -
如果 cloneResult 为异常完成,
-
执行 !ReadableByteStreamControllerError(controller, cloneResult.[[Value]])。
-
返回 cloneResult。
-
-
执行 !ReadableByteStreamControllerEnqueueChunkToQueue(controller, cloneResult.[[Value]], 0, byteLength)。
-
断言:pullIntoDescriptor 的 reader type 为 "
none
"。 -
如果 pullIntoDescriptor 的 bytes filled > 0,执行 ?ReadableByteStreamControllerEnqueueClonedChunkToQueue(controller, pullIntoDescriptor 的 buffer, pullIntoDescriptor 的 byte offset, pullIntoDescriptor 的 bytes filled)。
-
执行 !ReadableByteStreamControllerShiftPendingPullInto(controller)。
-
令 stream 为 controller.[[stream]]。
-
如果 stream.[[state]] 不为 "
readable
",返回。 -
执行 !ReadableByteStreamControllerClearPendingPullIntos(controller)。
-
执行 !ResetQueue(controller)。
-
执行 !ReadableByteStreamControllerClearAlgorithms(controller)。
-
执行 !ReadableStreamError(stream, e)。
-
断言:controller.[[pendingPullIntos]] 为空,或 controller.[[pendingPullIntos]][0] 为 pullIntoDescriptor。
-
断言:controller.[[byobRequest]] 为 null。
-
将 pullIntoDescriptor 的 bytes filled 设为 bytes filled + size。
-
令 maxBytesToCopy 为 min(controller.[[queueTotalSize]], pullIntoDescriptor 的 byte length − pullIntoDescriptor 的 bytes filled)。
-
令 maxBytesFilled 为 pullIntoDescriptor 的 bytes filled + maxBytesToCopy。
-
令 totalBytesToCopyRemaining 为 maxBytesToCopy。
-
令 ready 为 false。
-
断言:!IsDetachedBuffer(pullIntoDescriptor 的 buffer) 为 false。
-
断言:pullIntoDescriptor 的 bytes filled < pullIntoDescriptor 的 minimum fill。
-
令 remainderBytes 为 maxBytesFilled 除以 pullIntoDescriptor 的 element size 的余数。
-
令 maxAlignedBytes 为 maxBytesFilled − remainderBytes。
-
如果 maxAlignedBytes ≥ pullIntoDescriptor 的 minimum fill,
-
将 totalBytesToCopyRemaining 设为 maxAlignedBytes − pullIntoDescriptor 的 bytes filled。
-
将 ready 设为 true。
尚未填充到最小长度的
read()
请求描述符会保持在队列头部,底层源 underlying source 可以继续填充。
-
-
令 queue 为 controller.[[queue]]。
-
当 totalBytesToCopyRemaining > 0,
-
令 headOfQueue 为 queue[0]。
-
令 bytesToCopy 为 min(totalBytesToCopyRemaining, headOfQueue 的 byte length)。
-
令 destStart 为 pullIntoDescriptor 的 byte offset + pullIntoDescriptor 的 bytes filled。
-
令 descriptorBuffer 为 pullIntoDescriptor 的 buffer。
-
令 queueBuffer 为 headOfQueue 的 buffer。
-
令 queueByteOffset 为 headOfQueue 的 byte offset。
-
断言:!CanCopyDataBlockBytes(descriptorBuffer, destStart, queueBuffer, queueByteOffset, bytesToCopy) 为 true。
如断言失败(由于规范或实现错误),下一步可能会读写无效内存。实现应始终检查此断言,如失败应以实现定义方式终止(如崩溃或 erroring the stream)。
-
执行 !CopyDataBlockBytes(descriptorBuffer.[[ArrayBufferData]], destStart, queueBuffer.[[ArrayBufferData]], queueByteOffset, bytesToCopy)。
-
如果 headOfQueue 的 byte length 为 bytesToCopy,
-
移除 queue[0]。
-
-
否则,
-
将 headOfQueue 的 byte offset 设为 headOfQueue 的 byte offset + bytesToCopy。
-
将 headOfQueue 的 byte length 设为 headOfQueue 的 byte length − bytesToCopy。
-
-
将 controller.[[queueTotalSize]] 设为 controller.[[queueTotalSize]] − bytesToCopy。
-
执行 !ReadableByteStreamControllerFillHeadPullIntoDescriptor(controller, bytesToCopy, pullIntoDescriptor)。
-
将 totalBytesToCopyRemaining 设为 totalBytesToCopyRemaining − bytesToCopy。
-
-
如果 ready 为 false,
-
断言:controller.[[queueTotalSize]] 为 0。
-
断言:pullIntoDescriptor 的 bytes filled > 0。
-
断言:pullIntoDescriptor 的 bytes filled < pullIntoDescriptor 的 minimum fill。
-
-
返回 ready。
-
断言:controller.[[queueTotalSize]] > 0。
-
令 entry 为 controller.[[queue]][0]。
-
将 controller.[[queueTotalSize]] 设为 controller.[[queueTotalSize]] − entry 的 byte length。
-
执行 !ReadableByteStreamControllerHandleQueueDrain(controller)。
-
令 view 为 !Construct(
%Uint8Array%
, « entry 的 buffer,entry 的 byte offset, entry 的 byte length »)。 -
以 view 作为参数,执行 readRequest 的 chunk steps。
-
如果 controller.[[byobRequest]] 为 null 且 controller.[[pendingPullIntos]] 不为 空,
-
令 firstDescriptor 为 controller.[[pendingPullIntos]][0]。
-
令 view 为 !Construct(
%Uint8Array%
, « firstDescriptor 的 buffer,firstDescriptor 的 byte offset + firstDescriptor 的 bytes filled, firstDescriptor 的 byte length − firstDescriptor 的 bytes filled »)。 -
令 byobRequest 为 新的
ReadableStreamBYOBRequest
。 -
将 byobRequest.[[controller]] 设为 controller。
-
将 byobRequest.[[view]] 设为 view。
-
将 controller.[[byobRequest]] 设为 byobRequest。
-
-
返回 controller.[[byobRequest]]。
-
令 state 为 controller.[[stream]].[[state]]。
-
如果 state 为 "
errored
",返回 null。 -
如果 state 为 "
closed
",返回 0。 -
返回 controller.[[strategyHWM]] − controller.[[queueTotalSize]]。
-
断言:controller.[[stream]].[[state]] 为 "
readable
"。 -
如果 controller.[[queueTotalSize]] 为 0 且 controller.[[closeRequested]] 为 true,
-
执行 !ReadableByteStreamControllerClearAlgorithms(controller)。
-
执行 !ReadableStreamClose(controller.[[stream]])。
-
-
否则,
-
执行 !ReadableByteStreamControllerCallPullIfNeeded(controller)。
-
-
如果 controller.[[byobRequest]] 为 null,返回。
-
将 controller.[[byobRequest]].[[controller]] 设为 undefined。
-
将 controller.[[byobRequest]].[[view]] 设为 null。
-
将 controller.[[byobRequest]] 设为 null。
-
断言:controller.[[closeRequested]] 为 false。
-
令 filledPullIntos 为新的空 列表。
-
当 controller.[[pendingPullIntos]] 不为 空,
-
如果 controller.[[queueTotalSize]] 为 0,则 跳出循环。
-
令 pullIntoDescriptor 为 controller.[[pendingPullIntos]][0]。
-
如果 !ReadableByteStreamControllerFillPullIntoDescriptorFromQueue(controller, pullIntoDescriptor) 为 true,
-
执行 !ReadableByteStreamControllerShiftPendingPullInto(controller)。
-
追加 pullIntoDescriptor 到 filledPullIntos。
-
-
-
返回 filledPullIntos。
-
令 reader 为 controller.[[stream]].[[reader]]。
-
断言:reader 实现了
ReadableStreamDefaultReader
。 -
当 reader.[[readRequests]] 不为 空 时,
-
如果 controller.[[queueTotalSize]] 为 0,返回。
-
令 readRequest 为 reader.[[readRequests]][0]。
-
移除 readRequest 从 reader.[[readRequests]]。
-
执行 !ReadableByteStreamControllerFillReadRequestFromQueue(controller, readRequest)。
-
-
令 stream 为 controller.[[stream]]。
-
令 elementSize 为 1。
-
令 ctor 为
%DataView%
。 -
如果 view 有 [[TypedArrayName]] 内部槽(即非
DataView
),-
将 elementSize 设为 typed array constructors table 中 view.[[TypedArrayName]] 对应的元素大小。
-
将 ctor 设为 typed array constructors table 中 view.[[TypedArrayName]] 对应的构造函数。
-
-
令 minimumFill 为 min × elementSize。
-
断言:minimumFill ≥ 0 且 minimumFill ≤ view.[[ByteLength]]。
-
断言:minimumFill 除以 elementSize 的余数为 0。
-
令 byteOffset 为 view.[[ByteOffset]]。
-
令 byteLength 为 view.[[ByteLength]]。
-
令 bufferResult 为 TransferArrayBuffer(view.[[ViewedArrayBuffer]])。
-
如果 bufferResult 为异常完成,
-
以 bufferResult.[[Value]] 为参数,执行 readIntoRequest 的 error steps。
-
返回。
-
-
令 buffer 为 bufferResult.[[Value]]。
-
令 pullIntoDescriptor 为新的 pull-into descriptor,其中
- buffer
- buffer
- buffer byte length
- buffer.[[ArrayBufferByteLength]]
- byte offset
- byteOffset
- byte length
- byteLength
- bytes filled
- 0
- minimum fill
- minimumFill
- element size
- elementSize
- view constructor
- ctor
- reader type
- "
byob
"
-
如果 controller.[[pendingPullIntos]] 不为空,
-
追加 pullIntoDescriptor 到 controller.[[pendingPullIntos]]。
-
执行 !ReadableStreamAddReadIntoRequest(stream, readIntoRequest)。
-
返回。
-
-
如果 stream.[[state]] 为 "
closed
",-
令 emptyView 为 !Construct(ctor, « pullIntoDescriptor 的 buffer, pullIntoDescriptor 的 byte offset,0 »)。
-
以 emptyView 为参数,执行 readIntoRequest 的 close steps。
-
返回。
-
-
如果 controller.[[queueTotalSize]] > 0,
-
如果 !ReadableByteStreamControllerFillPullIntoDescriptorFromQueue(controller, pullIntoDescriptor) 为 true,
-
令 filledView 为 !ReadableByteStreamControllerConvertPullIntoDescriptor(pullIntoDescriptor)。
-
执行 !ReadableByteStreamControllerHandleQueueDrain(controller)。
-
以 filledView 为参数,执行 readIntoRequest 的 chunk steps。
-
返回。
-
-
如果 controller.[[closeRequested]] 为 true,
-
令 e 为
TypeError
异常。 -
执行 !ReadableByteStreamControllerError(controller, e)。
-
以 e 为参数,执行 readIntoRequest 的 error steps。
-
返回。
-
-
-
追加 pullIntoDescriptor 到 controller.[[pendingPullIntos]]。
-
执行 !ReadableStreamAddReadIntoRequest(stream, readIntoRequest)。
-
执行 !ReadableByteStreamControllerCallPullIfNeeded(controller)。
-
断言:controller.[[pendingPullIntos]] 不为空。
-
令 firstDescriptor 为 controller.[[pendingPullIntos]][0]。
-
令 state 为 controller.[[stream]].[[state]]。
-
如果 state 为 "
closed
",-
如果 bytesWritten 不为 0,抛出
TypeError
异常。
-
-
否则,
-
断言:state 为 "
readable
"。 -
如果 bytesWritten 为 0,抛出
TypeError
异常。 -
如果 firstDescriptor 的 bytes filled + bytesWritten > firstDescriptor 的 byte length,抛出
RangeError
异常。
-
-
将 firstDescriptor 的 buffer 设为 !TransferArrayBuffer(firstDescriptor 的 buffer)。
-
执行 ?ReadableByteStreamControllerRespondInternal(controller, bytesWritten)。
-
断言:firstDescriptor 的 bytes filled 除以 firstDescriptor 的 element size 的余数为 0。
-
如果 firstDescriptor 的 reader type 为 "
none
",执行 !ReadableByteStreamControllerShiftPendingPullInto(controller)。 -
令 stream 为 controller.[[stream]]。
-
如果 !ReadableStreamHasBYOBReader(stream) 为 true,
-
令 filledPullIntos 为新的空 列表。
-
当 filledPullIntos 的 大小 小于 !ReadableStreamGetNumReadIntoRequests(stream),
-
令 pullIntoDescriptor 为 !ReadableByteStreamControllerShiftPendingPullInto(controller)。
-
追加 pullIntoDescriptor 到 filledPullIntos。
-
-
对于每个 filledPullInto 属于 filledPullIntos,
-
执行 !ReadableByteStreamControllerCommitPullIntoDescriptor(stream, filledPullInto)。
-
-
-
断言:pullIntoDescriptor 的 bytes filled + bytesWritten ≤ pullIntoDescriptor 的 byte length。
-
执行 !ReadableByteStreamControllerFillHeadPullIntoDescriptor(controller, bytesWritten, pullIntoDescriptor)。
-
如果 pullIntoDescriptor 的 reader type 为 "
none
",-
执行 ?ReadableByteStreamControllerEnqueueDetachedPullIntoToQueue(controller, pullIntoDescriptor)。
-
令 filledPullIntos 为执行 !ReadableByteStreamControllerProcessPullIntoDescriptorsUsingQueue(controller) 的结果。
-
对于每个 filledPullInto 属于 filledPullIntos,
-
执行 !ReadableByteStreamControllerCommitPullIntoDescriptor(controller.[[stream]], filledPullInto)。
-
-
返回。
-
-
如果 pullIntoDescriptor 的 bytes filled < pullIntoDescriptor 的 minimum fill,返回。
尚未填充到最小长度的
read()
请求描述符会保持在队列头部,底层源 underlying source 可以继续填充。 -
执行 !ReadableByteStreamControllerShiftPendingPullInto(controller)。
-
令 remainderSize 为 pullIntoDescriptor 的 bytes filled 除以 pullIntoDescriptor 的 element size 的余数。
-
如果 remainderSize > 0,
-
令 end 为 pullIntoDescriptor 的 byte offset + pullIntoDescriptor 的 bytes filled。
-
执行 ?ReadableByteStreamControllerEnqueueClonedChunkToQueue(controller, pullIntoDescriptor 的 buffer, end − remainderSize, remainderSize)。
-
-
将 pullIntoDescriptor 的 bytes filled 设为 pullIntoDescriptor 的 bytes filled − remainderSize。
-
令 filledPullIntos 为执行 !ReadableByteStreamControllerProcessPullIntoDescriptorsUsingQueue(controller) 的结果。
-
执行 !ReadableByteStreamControllerCommitPullIntoDescriptor(controller.[[stream]], pullIntoDescriptor)。
-
对于每个 filledPullInto 属于 filledPullIntos,
-
执行 !ReadableByteStreamControllerCommitPullIntoDescriptor(controller.[[stream]], filledPullInto)。
-
-
令 firstDescriptor 为 controller.[[pendingPullIntos]][0]。
-
断言:!CanTransferArrayBuffer(firstDescriptor 的 buffer) 为 true。
-
执行 !ReadableByteStreamControllerInvalidateBYOBRequest(controller)。
-
令 state 为 controller.[[stream]].[[state]]。
-
如果 state 为 "
closed
",-
断言:bytesWritten 为 0。
-
执行 !ReadableByteStreamControllerRespondInClosedState(controller, firstDescriptor)。
-
-
否则,
-
断言:state 为 "
readable
"。 -
断言:bytesWritten > 0。
-
执行 ?ReadableByteStreamControllerRespondInReadableState(controller, bytesWritten, firstDescriptor)。
-
-
执行 !ReadableByteStreamControllerCallPullIfNeeded(controller)。
-
断言:controller.[[pendingPullIntos]] 不为 空。
-
断言:!IsDetachedBuffer(view.[[ViewedArrayBuffer]]) 为 false。
-
令 firstDescriptor 为 controller.[[pendingPullIntos]][0]。
-
令 state 为 controller.[[stream]].[[state]]。
-
如果 state 为 "
closed
",-
如果 view.[[ByteLength]] 不为 0,抛出
TypeError
异常。
-
-
否则,
-
断言:state 为 "
readable
"。 -
如果 view.[[ByteLength]] 为 0,抛出
TypeError
异常。
-
-
如果 firstDescriptor 的 byte offset + firstDescriptor 的 bytes filled 不等于 view.[[ByteOffset]],抛出
RangeError
异常。 -
如果 firstDescriptor 的 buffer byte length 不等于 view.[[ViewedArrayBuffer]].[[ByteLength]],抛出
RangeError
异常。 -
如果 firstDescriptor 的 bytes filled + view.[[ByteLength]] > firstDescriptor 的 byte length,抛出
RangeError
异常。 -
令 viewByteLength 为 view.[[ByteLength]]。
-
将 firstDescriptor 的 buffer 设为 ?TransferArrayBuffer(view.[[ViewedArrayBuffer]])。
-
执行 ?ReadableByteStreamControllerRespondInternal(controller, viewByteLength)。
-
断言:controller.[[byobRequest]] 为 null。
-
令 descriptor 为 controller.[[pendingPullIntos]][0]。
-
移除 descriptor 从 controller.[[pendingPullIntos]]。
-
返回 descriptor。
-
令 stream 为 controller.[[stream]]。
-
如果 stream.[[state]] 不为 "
readable
",返回 false。 -
如果 controller.[[closeRequested]] 为 true,返回 false。
-
如果 controller.[[started]] 为 false,返回 false。
-
如果 !ReadableStreamHasDefaultReader(stream) 为 true 且 !ReadableStreamGetNumReadRequests(stream) > 0,返回 true。
-
如果 !ReadableStreamHasBYOBReader(stream) 为 true 且 !ReadableStreamGetNumReadIntoRequests(stream) > 0,返回 true。
-
令 desiredSize 为 !ReadableByteStreamControllerGetDesiredSize(controller)。
-
断言:desiredSize 不为 null。
-
如果 desiredSize > 0,返回 true。
-
返回 false。
-
断言:stream.[[controller]] 为 undefined。
-
如果 autoAllocateChunkSize 不为 undefined,
-
断言:!IsInteger(autoAllocateChunkSize) 为 true。
-
断言:autoAllocateChunkSize 为正。
-
-
将 controller.[[stream]] 设为 stream。
-
将 controller.[[pullAgain]] 和 controller.[[pulling]] 设为 false。
-
将 controller.[[byobRequest]] 设为 null。
-
执行 !ResetQueue(controller)。
-
将 controller.[[closeRequested]] 和 controller.[[started]] 设为 false。
-
将 controller.[[strategyHWM]] 设为 highWaterMark。
-
将 controller.[[pullAlgorithm]] 设为 pullAlgorithm。
-
将 controller.[[cancelAlgorithm]] 设为 cancelAlgorithm。
-
将 controller.[[autoAllocateChunkSize]] 设为 autoAllocateChunkSize。
-
将 controller.[[pendingPullIntos]] 设为新的空 列表。
-
将 stream.[[controller]] 设为 controller。
-
令 startResult 为执行 startAlgorithm 的结果。
-
令 startPromise 为 以 startResult 解决的 promise。
-
-
将 controller.[[started]] 设为 true。
-
断言:controller.[[pulling]] 为 false。
-
断言:controller.[[pullAgain]] 为 false。
-
执行 !ReadableByteStreamControllerCallPullIfNeeded(controller)。
-
-
-
执行 !ReadableByteStreamControllerError(controller, r)。
-
-
令 controller 为 新的
ReadableByteStreamController
。 -
令 startAlgorithm 为一个返回 undefined 的算法。
-
令 pullAlgorithm 为一个返回 以 undefined 解决的 promise 的算法。
-
令 cancelAlgorithm 为一个返回 以 undefined 解决的 promise 的算法。
-
如果 underlyingSourceDict["
start
"] 存在,则将 startAlgorithm 设为一个算法,其返回 调用 underlyingSourceDict["start
"], 参数为 « controller »,回调 this 值为 underlyingSource 的结果。 -
如果 underlyingSourceDict["
pull
"] 存在,则将 pullAlgorithm 设为一个算法,其返回 调用 underlyingSourceDict["pull
"], 参数为 « controller »,回调 this 值为 underlyingSource 的结果。 -
如果 underlyingSourceDict["
cancel
"] 存在,则将 cancelAlgorithm 设为一个接收参数 reason 并返回 调用 underlyingSourceDict["cancel
"], 参数为 « reason »,回调 this 值为 underlyingSource 的结果的算法。 -
令 autoAllocateChunkSize 为 underlyingSourceDict["
autoAllocateChunkSize
"], 如果 存在,否则为 undefined。 -
如果 autoAllocateChunkSize 为 0,则抛出
TypeError
异常。 -
执行 ?SetUpReadableByteStreamController(stream, controller, startAlgorithm, pullAlgorithm, cancelAlgorithm, highWaterMark, autoAllocateChunkSize)。
5. 可写流
5.1. 使用可写流
readableStream. pipeTo( writableStream) . then(() => console. log( "所有数据已成功写入!" )) . catch ( e=> console. error( "出现错误!" , e));
write()
和 close()
方法,直接向可写流写入数据。由于可写流会自动排队所有写入操作,并依次将它们转发到底层接收器,所以你可以直接写入,不需要复杂操作:
function writeArrayToStream( array, writableStream) { const writer= writableStream. getWriter(); array. forEach( chunk=> writer. write( chunk). catch (() => {})); return writer. close(); } writeArrayToStream([ 1 , 2 , 3 , 4 , 5 ], writableStream) . then(() => console. log( "全部完成!" )) . catch ( e=> console. error( "流发生错误: " + e));
注意我们使用 .catch(() => {})
来抑制 write()
方法的拒绝;任何致命错误会通过 close()
方法的 promise 拒绝来通知,如果未捕获会导致 unhandledrejection
事件和控制台警告。
close()
方法返回的 promise。
这个 promise 会在流出错(初始化、写入或者关闭时)时拒绝,在流成功关闭时兑现。通常这就是你需要关心的全部内容。
然而,如果你想知道写入某个特定块是否成功,可以使用 writer 的 write()
方法返回的 promise:
writer. write( "我是一个数据块" ) . then(() => console. log( "块成功写入!" )) . catch ( e=> console. error( e));
这里“成功”的含义由具体流实例(更准确地说,是它的底层接收器)决定。例如,对于文件流,可能只是操作系统接受了写入,而不一定表示数据已经刷新到磁盘。有些流可能无法提供这种信号,promise 会立即兑现。
desiredSize
和 ready
属性允许可写流 writer的生产者更精确地响应流的流控信号,保证内存使用不超过流设定的高水位线。下面的例子会向流写入无限序列的随机字节,通过 desiredSize
决定每次生成多少字节,并通过 ready
等待背压消退。
async function writeRandomBytesForever( writableStream) { const writer= writableStream. getWriter(); while ( true ) { await writer. ready; const bytes= new Uint8Array( writer. desiredSize); crypto. getRandomValues( bytes); // 有意不await,此时等待writer.ready即可。 writer. write( bytes). catch (() => {}); } } writeRandomBytesForever( myWritableStream). catch ( e=> console. error( "出现异常" , e));
注意我们没有 await
write()
返回的 promise;这和等待 ready
promise 是重复的。此外,和前面例子类似,我们对 write()
返回的 promise 使用 .catch(() => {})
,在这种情况下,任何失败会在等待 ready
promise 时通知我们。
await
write()
返回的 promise 并不是好主意,来看上面例子的一个改进版,我们继续直接使用 WritableStreamDefaultWriter
接口,但不控制每次要写多少字节。在这种情况下,遵循背压的代码如下:
async function writeSuppliedBytesForever( writableStream, getBytes) { const writer= writableStream. getWriter(); while ( true ) { await writer. ready; const bytes= getBytes(); writer. write( bytes). catch (() => {}); } }
和前一个例子不同,前面我们每次都写入 writer.desiredSize
字节,write()
和 ready
的 promise 是同步的。而在这里,ready
的 promise 很可能比 write()
更早兑现。
请记住,ready
的 promise 在内部队列期望大小转为正数时兑现,这可能比写入成功早(尤其是在高水位线较大时)。
换句话说,await
write()
的返回值意味着你永远不会在流的内部队列中排队写入,而是只有前一次写入成功后才执行下一次写入,这会导致吞吐量较低。
5.2. WritableStream
类
WritableStream
表示一个可写流。
5.2.1. 接口定义
该 WritableStream
类的 Web IDL 定义如下:
[Exposed=*,Transferable ]interface {
WritableStream constructor (optional object ,
underlyingSink optional QueuingStrategy = {});
strategy readonly attribute boolean locked ;Promise <undefined >abort (optional any );
reason Promise <undefined >close ();WritableStreamDefaultWriter getWriter (); };
5.2.2. 内部槽
WritableStream
的实例创建时拥有下表描述的内部槽:
内部槽 | 描述(非规范性) |
---|---|
[[backpressure]] | 一个布尔值,表示由控制器设置的背压信号 |
[[closeRequest]] | writer 的 close()
方法返回的 promise
|
[[controller]] | 一个 WritableStreamDefaultController
,用于控制此流的状态和队列
|
[[Detached]] | 当流被转移时为 true 的布尔标志 |
[[inFlightWriteRequest]] | 在底层接收器的写入算法正在执行且尚未完成时,指向当前进行中写入操作的 promise,用于防止重入调用 |
[[inFlightCloseRequest]] | 在底层接收器的关闭算法正在执行且尚未完成时,指向当前进行中关闭操作的 promise,用于防止 abort()
方法中断关闭过程
|
[[pendingAbortRequest]] | 一个待中止请求 |
[[state]] | 包含流当前状态的字符串,仅供内部使用;可为
"writable "、"closed "、"erroring " 或 "errored "
|
[[storedError]] | 指示流失败原因的值,在流处于 "errored " 状态下操作时作为失败原因或异常抛出
|
[[writer]] | 若流被锁定到
writer,则为一个 WritableStreamDefaultWriter
实例,否则为 undefined
|
[[writeRequests]] | 一个列表,表示尚未被底层接收器处理的流内部写入请求队列中的 promise |
[[inFlightCloseRequest]] 槽和 [[closeRequest]] 槽互斥。同样,在 [[inFlightWriteRequest]] 不为 undefined 时,[[writeRequests]] 中不会有元素被移除。实现可以基于这些不变式优化这些槽的存储。
待中止请求 是一个用于追踪中止流请求,直到该请求最终被处理的数据结构。它包含以下项目:
- promise
-
一个从 WritableStreamAbort 返回的 promise
- reason
-
一个 JavaScript 值,作为中止原因传递给 WritableStreamAbort
- was already erroring
-
一个布尔值,指示在调用 WritableStreamAbort 时流是否处于 "
erroring
" 状态,这会影响中止请求的结果
5.2.3. 底层接收器 API
WritableStream()
构造函数的第一个参数为一个 JavaScript 对象,表示底层接收器。该对象可包含如下属性:
dictionary {
UnderlyingSink UnderlyingSinkStartCallback start ;UnderlyingSinkWriteCallback write ;UnderlyingSinkCloseCallback close ;UnderlyingSinkAbortCallback abort ;any type ; };callback =
UnderlyingSinkStartCallback any (WritableStreamDefaultController );
controller callback =
UnderlyingSinkWriteCallback Promise <undefined > (any ,
chunk WritableStreamDefaultController );
controller callback =
UnderlyingSinkCloseCallback Promise <undefined > ();callback =
UnderlyingSinkAbortCallback Promise <undefined > (optional any );
reason
start(controller)
, 类型为 UnderlyingSinkStartCallback-
在创建
WritableStream
时立即调用的函数。通常用于获取所代表的 底层接收器资源的访问权。
如果此初始化过程为异步,可以返回一个 promise 以表示成功或失败;拒绝的 promise 会使流进入错误状态。抛出的异常会被
WritableStream()
构造函数重新抛出。 write(chunk, controller)
, 类型为 UnderlyingSinkWriteCallback-
当有新的数据块可写入底层接收器时调用。流实现保证该函数仅在前一次写入已成功后调用,且不会在
start()
成功之前或close()
或abort()
被调用之后调用。此函数通常用于实际发送数据到 底层接收器所表示的资源,比如调用底层 API。
如果写入过程为异步,并且需要向调用者通知成功或失败,则该函数可以返回一个 promise。此 promise 的返回值会被传递给
writer.write()
的调用者,以便他们能监控该次写入。抛出异常等同于返回拒绝的 promise。注意,并非所有情况下都能获得此类信号;参见 § 10.6 无背压或成功信号的可写流 与 § 10.7 带有背压和成功信号的可写流。在这些情况下,最好什么都不返回。
该函数可能返回的 promise 也决定了此数据块是否计入内部队列期望填充值的计算。在 promise 达到最终状态之前,
writer.desiredSize
会保持不变,仅当写入成功后才增加以表明需要更多块。最后,此 promise 还用于保证良好行为的生产者不会在数据块尚未完全处理时修改其内容。(这并非由规范机制保证,而是生产者与底层接收器之间的非正式约定。)
close()
, 类型为 UnderlyingSinkCloseCallback-
当生产者通过
writer.close()
表示写入完成,并且所有排队的写入已成功完成后调用。此函数可以执行任何必要的收尾或刷新操作,并释放所持有的资源。
如果关闭过程为异步,函数可返回 promise 以表示成功或失败;结果会通过
writer.close()
方法的返回值传递。返回拒绝的 promise 会使流进入错误状态,而不是成功关闭。抛出异常等同于返回拒绝的 promise。 abort(reason)
, 类型为 UnderlyingSinkAbortCallback-
当生产者通过
stream.abort()
或writer.abort()
表示希望中止流时调用。它的参数与这些方法传入的值一致。可写流在管道过程中也可能因某些情况被中止;详见
pipeTo()
方法的定义。此函数可用于清理资源,类似
close()
,但也可以自定义处理。如果中止过程为异步,函数可返回 promise 以表示成功或失败;结果会通过
writer.abort()
方法的返回值传递。抛出异常等同于返回拒绝的 promise。不管怎样,流都会因新的TypeError
而进入错误状态,表示其已被中止。 type
, 类型为 any-
该属性为保留属性,当前任何赋值尝试都会抛出异常。
传递给 start()
和 write()
的 controller
参数是 WritableStreamDefaultController
的实例,可用于使流出错。主要用于与非 promise 风格 API 的衔接,例如见 § 10.6 无背压或成功信号的可写流。
5.2.4. 构造函数、方法和属性
stream = new
WritableStream
(underlyingSink[, strategy)-
创建一个新的
WritableStream
,包装提供的 底层接收器。关于 underlyingSink 参数的详细信息,参见 § 5.2.3 底层接收器 API。strategy 参数表示流的 排队策略,详见 § 7.1 排队策略 API。如果未提供,则默认行为与
CountQueuingStrategy
且 高水位线为 1 的效果一致。 isLocked = stream.
locked
-
返回该可写流是否已锁定到某个 writer。
await stream.
abort
([ reason ])-
中止该流,表示生产者无法再成功写入流,流会立即进入错误状态,所有排队的写入会被丢弃。这还会执行 底层接收器 的中止机制。
返回的 promise 在流正常关闭时兑现,如果底层接收器在此过程中发生错误则拒绝。此外,如果流当前已锁定,则会以
TypeError
拒绝(不会尝试取消流)。 await stream.
close
()-
关闭流。底层接收器将在处理完所有已写入的数据块后,执行关闭行为。此期间任何进一步写入都会失败(但不会使流进入错误状态)。
该方法返回一个 promise,如果所有剩余数据块都成功写入且流成功关闭,则兑现;否则在过程中遇到错误则拒绝。此外,如果流当前已锁定,则会以
TypeError
拒绝(不会尝试取消流)。 writer = stream.
getWriter
()-
创建一个writer(即
WritableStreamDefaultWriter
实例),并将流锁定到该 writer。流被锁定期间,不能获取其他 writer,直到当前 writer 被释放。该功能对需要独占写入流、避免中断或交错的抽象非常有用。通过获取流的 writer,可以确保不会有其他人同时写入,避免写入数据不可预测甚至无效。
new WritableStream(underlyingSink, strategy)
构造函数步骤如下:
-
如果 underlyingSink 未提供,则将其设为 null。
-
让 underlyingSinkDict 为 underlyingSink,转换为类型为 IDL 的值
UnderlyingSink
。不能直接将 underlyingSink 参数声明为
UnderlyingSink
类型,否则会丢失原对象的引用。需要保留对象以便后续调用其方法。 -
如果 underlyingSinkDict["
type
"]存在,则抛出RangeError
异常。此设计方便未来添加新类型,而不会影响向后兼容。
-
执行 ! InitializeWritableStream(this)。
-
令 sizeAlgorithm 为 ! ExtractSizeAlgorithm(strategy)。
-
令 highWaterMark 为 ? ExtractHighWaterMark(strategy, 1)。
-
执行 ? SetUpWritableStreamDefaultControllerFromUnderlyingSink(this, underlyingSink, underlyingSinkDict, highWaterMark, sizeAlgorithm)。
locked
属性的步骤如下:
-
返回 ! IsWritableStreamLocked(this)。
abort(reason)
方法的步骤如下:
-
如果 ! IsWritableStreamLocked(this) 为 true,则返回 一个拒绝的 promise,异常为
TypeError
。 -
返回 ! WritableStreamAbort(this, reason)。
close()
方法的步骤如下:
-
如果 ! IsWritableStreamLocked(this) 为 true,则返回 一个拒绝的 promise,异常为
TypeError
。 -
如果 ! WritableStreamCloseQueuedOrInFlight(this) 为 true,则返回 一个拒绝的 promise,异常为
TypeError
。 -
返回 ! WritableStreamClose(this)。
getWriter()
方法的步骤如下:
5.2.5. 通过 postMessage()
进行传递
destination.postMessage(ws, { transfer: [ws] });
-
将一个
WritableStream
发送到另一个 frame、窗口或 worker。被传递的流可以像原始流一样使用,原流会变为锁定,无法直接使用。
WritableStream
对象属于可传递对象。它们的传递步骤,以 value 和 dataHolder 为参数如下:
-
如果 ! IsWritableStreamLocked(value) 为 true,则抛出 "
DataCloneError
"DOMException
。 -
令 port1 为新建的
MessagePort
,创建于当前 Realm。 -
令 port2 为新建的
MessagePort
,创建于当前 Realm。 -
绑定 port1 和 port2。
-
令 readable 为新建的
ReadableStream
,创建于当前 Realm。 -
执行 ! SetUpCrossRealmTransformReadable(readable, port1)。
-
令 promise 为 ! ReadableStreamPipeTo(readable, value, false, false, false)。
-
将 promise.[[PromiseIsHandled]] 设为 true。
-
将 dataHolder.[[port]] 设为 ! StructuredSerializeWithTransfer(port2, « port2 »)。
-
令 deserializedRecord 为 ! StructuredDeserializeWithTransfer(dataHolder.[[port]], 当前 Realm)。
-
令 port 为 deserializedRecord.[[Deserialized]]。
-
执行 ! SetUpCrossRealmTransformWritable(value, port)。
5.3. WritableStreamDefaultWriter
类
WritableStreamDefaultWriter
类表示一个可写流 writer,用于由 WritableStream
实例提供。
5.3.1. 接口定义
WritableStreamDefaultWriter
类的 Web IDL 定义如下:
[Exposed=*]interface {
WritableStreamDefaultWriter constructor (WritableStream );
stream readonly attribute Promise <undefined >closed ;readonly attribute unrestricted double ?desiredSize ;readonly attribute Promise <undefined >ready ;Promise <undefined >abort (optional any );
reason Promise <undefined >close ();undefined releaseLock ();Promise <undefined >write (optional any ); };
chunk
5.3.2. 内部槽
WritableStreamDefaultWriter
的实例在创建时拥有下表描述的内部槽:
内部槽 | 描述(非规范性) |
---|---|
[[closedPromise]] | writer 的 closed
getter 返回的 promise
|
[[readyPromise]] | writer 的 ready
getter 返回的 promise
|
[[stream]] | WritableStream
实例,拥有当前 reader
|
5.3.3. 构造函数、方法和属性
writer = new
WritableStreamDefaultWriter
(stream)-
这等价于调用
stream.
。getWriter()
await writer.
closed
-
返回一个 promise,当流关闭时兑现,如果流发生错误或 writer 的锁已释放且流未完成关闭,则拒绝。
desiredSize = writer.
desiredSize
-
返回内部队列期望填充值。如果队列超满则可能为负值。生产者可据此判断写入的数据量。
若流无法正常写入(如已错误或有中止请求),则为 null。若流已关闭,则返回零。若在 writer 的锁释放后调用,则会抛出异常。
await writer.
ready
-
返回一个 promise,当内部队列期望填充值从非正转为正时兑现,表示不再施加背压。当期望值回落到零或以下时,会返回新的挂起 promise,直到下次变化。
若流发生错误或中止,或 writer 的锁释放,返回的 promise 会被拒绝。
await writer.
abort
([ reason ])await writer.
close
()writer.
releaseLock
()-
释放 writer 对流的锁。锁释放后,writer 不再激活。若锁释放时流已错误,writer 也会表现为错误状态;否则表现为已关闭。
注意即使有未完成的写入也可释放锁(即使
write()
返回的 promise 未决)。不需要在整个写入期间持有锁,锁只是为了防止其他生产者交错写入。 await writer.
write
(chunk)-
将指定数据块写入可写流,会等待前一次写入成功后再发送该数据块到底层接收器的
write()
方法。返回的 promise 在成功写入后兑现,若写入失败或流在写入前已错误则拒绝。注意“成功”由底层接收器决定,可能仅表示数据块已被接受,而不一定已安全保存到最终目的地。
若 chunk 可变,生产者应避免在传递给
write()
后修改其内容,直到write()
返回的 promise 达到终态。以确保底层接收器接收到的是原始值。
new WritableStreamDefaultWriter(stream)
构造函数步骤如下:
-
执行 ? SetUpWritableStreamDefaultWriter(this, stream)。
closed
属性步骤如下:
desiredSize
属性步骤如下:
-
如果 this.[[stream]] 为 undefined,抛出
TypeError
异常。
ready
属性步骤如下:
-
返回 this.[[readyPromise]]。
abort(reason)
方法步骤如下:
-
如果 this.[[stream]] 为 undefined,返回 一个拒绝的 promise,异常为
TypeError
。 -
返回 ! WritableStreamDefaultWriterAbort(this, reason)。
close()
方法步骤如下:
-
令 stream 为 this.[[stream]]。
-
如果 stream 为 undefined,返回 一个拒绝的 promise,异常为
TypeError
。 -
如果 ! WritableStreamCloseQueuedOrInFlight(stream) 为 true,返回 一个拒绝的 promise,异常为
TypeError
。
releaseLock()
方法步骤如下:
-
令 stream 为 this.[[stream]]。
-
如果 stream 为 undefined,则返回。
-
断言:stream.[[writer]] 不为 undefined。
write(chunk)
方法步骤如下:
-
如果 this.[[stream]] 为 undefined,返回 一个拒绝的 promise,异常为
TypeError
。 -
返回 ! WritableStreamDefaultWriterWrite(this, chunk)。
5.4.
WritableStreamDefaultController
类
WritableStreamDefaultController
类拥有可用于控制 WritableStream
状态的方法。在构造 WritableStream
时,
底层接收器会获得一个对应的 WritableStreamDefaultController
实例用于操作。
5.4.1. 接口定义
WritableStreamDefaultController
类的 Web IDL 定义如下:
[Exposed=*]interface {
WritableStreamDefaultController readonly attribute AbortSignal signal ;undefined error (optional any ); };
e
5.4.2. 内部槽
WritableStreamDefaultController
的实例在创建时拥有下表描述的内部槽:
内部槽 | 描述(非规范性) |
---|---|
[[abortAlgorithm]] | 一个返回 promise 的算法,接收一个参数(中止原因),用于将中止请求传递给 底层接收器 |
[[abortController]] | 一个 AbortController
,可用于在流被中止时终止正在进行的写入或关闭操作
|
[[closeAlgorithm]] | 一个返回 promise 的算法,用于将关闭请求传递给 底层接收器 |
[[queue]] | 一个列表,表示流内部排队的数据块 |
[[queueTotalSize]] | 存储于 [[queue]] 中所有块的总大小(见 § 8.1 带大小的队列) |
[[started]] | 一个布尔标志,指示 底层接收器是否已完成初始化 |
[[strategyHWM]] | 由流创建者作为流排队策略一部分提供的数字,表示流何时会对底层接收器施加背压 |
[[strategySizeAlgorithm]] | 用于计算入队数据块大小的算法,属于流的排队策略 |
[[stream]] | 被控制的 WritableStream
实例
|
[[writeAlgorithm]] | 一个返回 promise 的算法,接收一个参数(要写入的数据块),用于将数据写入 底层接收器 |
关闭哨兵是一个特殊值,会被入队到 [[queue]],代替数据块,用于标记流已关闭。仅内部使用,不会暴露给 Web 开发者。
5.4.3. 方法和属性
controller.
signal
-
一个 AbortSignal,可用于在流被中止时终止正在进行的写入或关闭操作。
controller.
error
(e)-
关闭被控制的可写流,使之后所有操作都因错误 e 失败。
该方法很少被使用,通常只需在 底层接收器的方法返回拒绝的 promise 即可。但在流生命周期外因事件需突然关闭流时,此方法会很有用。
signal
属性步骤如下:
-
返回 this.[[abortController]] 的 signal。
error(e)
方法步骤如下:
-
令 state 为 this.[[stream]].[[state]]。
-
如果 state 不是 "
writable
",则返回。 -
执行 ! WritableStreamDefaultControllerError(this, e)。
5.4.4. 内部方法
以下为每个 WritableStreamDefaultController
实例实现的内部方法, 可写流实现会调用这些方法。
这些以方法形式而不是抽象操作出现,是为了明确可写流实现与控制器实现解耦,未来可扩展为其他控制器,只要这些控制器实现相应内部方法。类似情况见可读流(参见 § 4.9.2 与控制器接口),实际有多种控制器类型,因此内部方法多态使用。
-
令 result 为执行 this.[[abortAlgorithm]],传入 reason 的结果。
-
返回 result。
-
执行 ! ResetQueue(this)。
5.5. 抽象操作
5.5.1. 可写流相关操作
以下抽象操作用于在更高层次上操作 WritableStream
实例。
-
令 writer 为一个 新建的
WritableStreamDefaultWriter
。 -
执行 ? SetUpWritableStreamDefaultWriter(writer, stream)。
-
返回 writer。
-
断言:! IsNonNegativeNumber(highWaterMark) 为 true。
-
令 stream 为一个 新建的
WritableStream
。 -
执行 ! InitializeWritableStream(stream)。
-
令 controller 为一个 新建的
WritableStreamDefaultController
。 -
执行 ? SetUpWritableStreamDefaultController(stream, controller, startAlgorithm, writeAlgorithm, closeAlgorithm, abortAlgorithm, highWaterMark, sizeAlgorithm)。
-
返回 stream。
该抽象操作仅在 startAlgorithm 抛出异常时才会抛出异常。
-
将 stream.[[state]] 设为 "
writable
"。 -
将 stream.[[storedError]]、stream.[[writer]]、stream.[[controller]]、stream.[[inFlightWriteRequest]]、 stream.[[closeRequest]]、stream.[[inFlightCloseRequest]] 和 stream.[[pendingAbortRequest]] 设为 undefined。
-
将 stream.[[writeRequests]] 设为一个新的空列表。
-
将 stream.[[backpressure]] 设为 false。
-
如果 stream.[[writer]] 为 undefined,则返回 false。
-
返回 true。
-
如果 ! IsWritableStreamLocked(stream) 为 true,则抛出
TypeError
异常。 -
将 writer.[[stream]] 设为 stream。
-
将 stream.[[writer]] 设为 writer。
-
令 state 为 stream.[[state]]。
-
如果 state 为 "
writable
",-
如果 ! WritableStreamCloseQueuedOrInFlight(stream) 为 false 且 stream.[[backpressure]] 为 true,则将 writer.[[readyPromise]] 设为一个新的 promise。
-
否则,将 writer.[[readyPromise]] 设为已解决的 promise,值为 undefined。
-
将 writer.[[closedPromise]] 设为新的 promise。
-
-
否则如果 state 为 "
erroring
",-
将 writer.[[readyPromise]] 设为拒绝的 promise,原因为 stream.[[storedError]]。
-
将 writer.[[readyPromise]].[[PromiseIsHandled]] 设为 true。
-
将 writer.[[closedPromise]] 设为新的 promise。
-
-
否则如果 state 为 "
closed
",-
将 writer.[[readyPromise]] 设为已解决的 promise,值为 undefined。
-
将 writer.[[closedPromise]] 设为已解决的 promise,值为 undefined。
-
-
否则,
-
断言:state 为 "
errored
"。 -
令 storedError 为 stream.[[storedError]]。
-
将 writer.[[readyPromise]] 设为拒绝的 promise,原因为 storedError。
-
将 writer.[[readyPromise]].[[PromiseIsHandled]] 设为 true。
-
将 writer.[[closedPromise]] 设为拒绝的 promise,原因为 storedError。
-
将 writer.[[closedPromise]].[[PromiseIsHandled]] 设为 true。
-
-
如果 stream.[[state]] 为 "
closed
" 或 "errored
",返回已解决的 promise,值为 undefined。 -
在 stream.[[controller]].[[abortController]] 上用 reason 发出中止信号。
-
令 state 为 stream.[[state]]。
-
如果 state 为 "
closed
" 或 "errored
",返回已解决的 promise,值为 undefined。我们需要再次检查状态,因为 发出中止信号 会运行作者代码,状态可能会被改变。
-
如果 stream.[[pendingAbortRequest]] 不为 undefined,则返回 stream.[[pendingAbortRequest]] 的 promise。
-
断言:state 为 "
writable
" 或 "erroring
"。 -
令 wasAlreadyErroring 为 false。
-
如果 state 为 "
erroring
",-
将 wasAlreadyErroring 设为 true。
-
将 reason 设为 undefined。
-
-
令 promise 为新的 promise。
-
将 stream.[[pendingAbortRequest]] 设为一个新的 待中止请求,其 promise 为 promise,reason 为 reason,was already erroring 为 wasAlreadyErroring。
-
如果 wasAlreadyErroring 为 false,执行 ! WritableStreamStartErroring(stream, reason)。
-
返回 promise。
-
令 state 为 stream.[[state]]。
-
如果 state 为 "
closed
" 或 "errored
",返回拒绝的 promise,异常为TypeError
。 -
断言:state 为 "
writable
" 或 "erroring
"。 -
断言:! WritableStreamCloseQueuedOrInFlight(stream) 为 false。
-
令 promise 为新的 promise。
-
将 stream.[[closeRequest]] 设为 promise。
-
令 writer 为 stream.[[writer]]。
-
如果 writer 不为 undefined,且 stream.[[backpressure]] 为 true,并且 state 为 "
writable
",则用 undefined 解决 writer.[[readyPromise]]。 -
执行 ! WritableStreamDefaultControllerClose(stream.[[controller]])。
-
返回 promise。
5.5.2. 与控制器的接口
为了将来能够灵活地添加不同的可写流行为(类似于默认可读流与可读字节流的区别),可写流的大部分内部状态由 WritableStreamDefaultController
类进行封装。
每个控制器类都定义了两个内部方法,由 WritableStream
算法调用:
- [[AbortSteps]](reason)
- 控制器在流中止时运行的步骤,用于清理控制器中存储的状态并通知底层接收器。
- [[ErrorSteps]]()
- 控制器在流出错时运行的步骤,用于清理控制器中存储的状态。
(这些被定义为内部方法,而不是抽象操作,是为了允许 WritableStream
算法以多态方式调用,无需区分当前控制器的类型。目前只有 WritableStreamDefaultController
,但这种设计为未来扩展留下了空间。)
本节剩余部分关注控制器反向调用可写流对象的抽象操作。控制器通过这些操作影响其关联的 WritableStream
,将内部状态变化转化为开发者可见的 WritableStream
公共 API 行为。
-
断言:! IsWritableStreamLocked(stream) 为 true。
-
断言:stream.[[state]] 为 "
writable
"。 -
令 promise 为新的 promise。
-
追加 promise 到 stream.[[writeRequests]]。
-
返回 promise。
-
如果 stream.[[closeRequest]] 为 undefined 且 stream.[[inFlightCloseRequest]] 为 undefined,则返回 false。
-
返回 true。
-
令 state 为 stream.[[state]]。
-
如果 state 为 "
writable
",-
执行 ! WritableStreamStartErroring(stream, error)。
-
返回。
-
-
断言:state 为 "
erroring
"。 -
执行 ! WritableStreamFinishErroring(stream)。
-
断言:stream.[[state]] 为 "
erroring
"。 -
断言:! WritableStreamHasOperationMarkedInFlight(stream) 为 false。
-
将 stream.[[state]] 设为 "
errored
"。 -
执行 ! stream.[[controller]].[[ErrorSteps]]()。
-
令 storedError 为 stream.[[storedError]]。
-
遍历 stream.[[writeRequests]] 中的每个 writeRequest:
-
拒绝 writeRequest,原因为 storedError。
-
-
将 stream.[[writeRequests]] 设为空列表。
-
如果 stream.[[pendingAbortRequest]] 为 undefined,
-
执行 ! WritableStreamRejectCloseAndClosedPromiseIfNeeded(stream)。
-
返回。
-
-
令 abortRequest 为 stream.[[pendingAbortRequest]]。
-
将 stream.[[pendingAbortRequest]] 设为 undefined。
-
如果 abortRequest 的 was already erroring 为 true,
-
执行 ! WritableStreamRejectCloseAndClosedPromiseIfNeeded(stream)。
-
返回。
-
令 promise 为 ! stream.[[controller]].[[AbortSteps]](abortRequest 的 reason)。
-
-
执行 ! WritableStreamRejectCloseAndClosedPromiseIfNeeded(stream)。
-
-
执行 ! WritableStreamRejectCloseAndClosedPromiseIfNeeded(stream)。
-
断言:stream.[[inFlightCloseRequest]] 不为 undefined。
-
解决 stream.[[inFlightCloseRequest]],值为 undefined。
-
将 stream.[[inFlightCloseRequest]] 设为 undefined。
-
令 state 为 stream.[[state]]。
-
断言:stream.[[state]] 为 "
writable
" 或 "erroring
"。 -
如果 state 为 "
erroring
",-
将 stream.[[storedError]] 设为 undefined。
-
如果 stream.[[pendingAbortRequest]] 不为 undefined,
-
解决 stream.[[pendingAbortRequest]] 的 promise,值为 undefined。
-
将 stream.[[pendingAbortRequest]] 设为 undefined。
-
-
-
将 stream.[[state]] 设为 "
closed
"。 -
令 writer 为 stream.[[writer]]。
-
如果 writer 不为 undefined,解决 writer.[[closedPromise]],值为 undefined。
-
断言:stream.[[pendingAbortRequest]] 为 undefined。
-
断言:stream.[[storedError]] 为 undefined。
-
断言:stream.[[inFlightCloseRequest]] 不为 undefined。
-
拒绝 stream.[[inFlightCloseRequest]],原因为 error。
-
将 stream.[[inFlightCloseRequest]] 设为 undefined。
-
断言:stream.[[state]] 为 "
writable
" 或 "erroring
"。 -
如果 stream.[[pendingAbortRequest]] 不为 undefined,
-
拒绝 stream.[[pendingAbortRequest]] 的 promise,原因为 error。
-
将 stream.[[pendingAbortRequest]] 设为 undefined。
-
-
执行 ! WritableStreamDealWithRejection(stream, error)。
-
断言:stream.[[inFlightWriteRequest]] 不为 undefined。
-
解决 stream.[[inFlightWriteRequest]],值为 undefined。
-
将 stream.[[inFlightWriteRequest]] 设为 undefined。
-
断言:stream.[[inFlightWriteRequest]] 不为 undefined。
-
拒绝 stream.[[inFlightWriteRequest]],原因为 error。
-
将 stream.[[inFlightWriteRequest]] 设为 undefined。
-
断言:stream.[[state]] 为 "
writable
" 或 "erroring
"。 -
执行 ! WritableStreamDealWithRejection(stream, error)。
-
如果 stream.[[inFlightWriteRequest]] 为 undefined 且 stream.[[inFlightCloseRequest]] 为 undefined,则返回 false。
-
返回 true。
-
断言:stream.[[inFlightCloseRequest]] 为 undefined。
-
断言:stream.[[closeRequest]] 不为 undefined。
-
将 stream.[[inFlightCloseRequest]] 设为 stream.[[closeRequest]]。
-
将 stream.[[closeRequest]] 设为 undefined。
-
断言:stream.[[inFlightWriteRequest]] 为 undefined。
-
断言:stream.[[writeRequests]] 非空。
-
令 writeRequest 为 stream.[[writeRequests]][0]。
-
移除 writeRequest 从 stream.[[writeRequests]]。
-
将 stream.[[inFlightWriteRequest]] 设为 writeRequest。
-
断言:stream.[[state]] 为 "
errored
"。 -
如果 stream.[[closeRequest]] 不为 undefined,
-
断言:stream.[[inFlightCloseRequest]] 为 undefined。
-
拒绝 stream.[[closeRequest]],原因为 stream.[[storedError]]。
-
将 stream.[[closeRequest]] 设为 undefined。
-
-
令 writer 为 stream.[[writer]]。
-
如果 writer 不为 undefined,
-
拒绝 writer.[[closedPromise]],原因为 stream.[[storedError]]。
-
将 writer.[[closedPromise]].[[PromiseIsHandled]] 设为 true。
-
-
断言:stream.[[storedError]] 为 undefined。
-
断言:stream.[[state]] 为 "
writable
"。 -
令 controller 为 stream.[[controller]]。
-
断言:controller 不为 undefined。
-
将 stream.[[state]] 设为 "
erroring
"。 -
将 stream.[[storedError]] 设为 reason。
-
令 writer 为 stream.[[writer]]。
-
如果 writer 不为 undefined,执行 ! WritableStreamDefaultWriterEnsureReadyPromiseRejected(writer, reason)。
-
如果 ! WritableStreamHasOperationMarkedInFlight(stream) 为 false 且 controller.[[started]] 为 true,执行 ! WritableStreamFinishErroring(stream)。
-
断言:stream.[[state]] 为 "
writable
"。 -
断言:! WritableStreamCloseQueuedOrInFlight(stream) 为 false。
-
令 writer 为 stream.[[writer]]。
-
如果 writer 不为 undefined 且 backpressure 不等于 stream.[[backpressure]],
-
如果 backpressure 为 true,则将 writer.[[readyPromise]] 设为新的 promise。
-
否则,
-
断言:backpressure 为 false。
-
解决 writer.[[readyPromise]],值为 undefined。
-
-
-
将 stream.[[backpressure]] 设为 backpressure。
5.5.3. Writer 抽象操作
以下抽象操作用于实现和操作 WritableStreamDefaultWriter
实例。
-
令 stream 为 writer.[[stream]]。
-
断言:stream 不为 undefined。
-
返回 ! WritableStreamAbort(stream, reason)。
-
令 stream 为 writer.[[stream]]。
-
断言:stream 不为 undefined。
-
返回 ! WritableStreamClose(stream)。
-
令 stream 为 writer.[[stream]]。
-
断言:stream 不为 undefined。
-
令 state 为 stream.[[state]]。
-
如果 ! WritableStreamCloseQueuedOrInFlight(stream) 为 true 或 state 为 "
closed
",返回已解决的 promise,值为 undefined。 -
如果 state 为 "
errored
",返回拒绝的 promise,原因为 stream.[[storedError]]。 -
断言:state 为 "
writable
" 或 "erroring
"。 -
返回 ! WritableStreamDefaultWriterClose(writer)。
该抽象操作用于实现 ReadableStream
的 pipeTo()
的错误传播语义。
-
如果 writer.[[closedPromise]].[[PromiseState]] 为 "
pending
",拒绝 writer.[[closedPromise]],原因为 error。 -
否则,将 writer.[[closedPromise]] 设为拒绝的 promise,原因为 error。
-
将 writer.[[closedPromise]].[[PromiseIsHandled]] 设为 true。
-
如果 writer.[[readyPromise]].[[PromiseState]] 为 "
pending
",拒绝 writer.[[readyPromise]],原因为 error。 -
否则,将 writer.[[readyPromise]] 设为拒绝的 promise,原因为 error。
-
将 writer.[[readyPromise]].[[PromiseIsHandled]] 设为 true。
-
令 stream 为 writer.[[stream]]。
-
令 state 为 stream.[[state]]。
-
如果 state 为 "
errored
" 或 "erroring
",返回 null。 -
如果 state 为 "
closed
",返回 0。 -
返回 ! WritableStreamDefaultControllerGetDesiredSize(stream.[[controller]])。
-
令 stream 为 writer.[[stream]]。
-
断言:stream 不为 undefined。
-
断言:stream.[[writer]] 为 writer。
-
令 releasedError 为新建
TypeError
。 -
执行 ! WritableStreamDefaultWriterEnsureReadyPromiseRejected(writer, releasedError)。
-
执行 ! WritableStreamDefaultWriterEnsureClosedPromiseRejected(writer, releasedError)。
-
将 stream.[[writer]] 设为 undefined。
-
将 writer.[[stream]] 设为 undefined。
-
令 stream 为 writer.[[stream]]。
-
断言:stream 不为 undefined。
-
令 controller 为 stream.[[controller]]。
-
令 chunkSize 为 ! WritableStreamDefaultControllerGetChunkSize(controller, chunk)。
-
如果 stream 不等于 writer.[[stream]],返回拒绝的 promise,异常为
TypeError
。 -
令 state 为 stream.[[state]]。
-
如果 state 为 "
errored
",返回拒绝的 promise,原因为 stream.[[storedError]]。 -
如果 ! WritableStreamCloseQueuedOrInFlight(stream) 为 true 或 state 为 "
closed
",返回拒绝的 promise,异常为TypeError
,表示流正在关闭或已关闭。 -
如果 state 为 "
erroring
",返回拒绝的 promise,原因为 stream.[[storedError]]。 -
断言:state 为 "
writable
"。 -
令 promise 为 ! WritableStreamAddWriteRequest(stream)。
-
执行 ! WritableStreamDefaultControllerWrite(controller, chunk, chunkSize)。
-
返回 promise。
5.5.4. 默认控制器
以下抽象操作用于实现 WritableStreamDefaultController
类。
-
断言:stream 实现
WritableStream
。 -
断言:stream.[[controller]] 为 undefined。
-
将 controller.[[stream]] 设为 stream。
-
将 stream.[[controller]] 设为 controller。
-
执行 ! ResetQueue(controller)。
-
将 controller.[[abortController]] 设为新建
AbortController
。 -
将 controller.[[started]] 设为 false。
-
将 controller.[[strategySizeAlgorithm]] 设为 sizeAlgorithm。
-
将 controller.[[strategyHWM]] 设为 highWaterMark。
-
将 controller.[[writeAlgorithm]] 设为 writeAlgorithm。
-
将 controller.[[closeAlgorithm]] 设为 closeAlgorithm。
-
将 controller.[[abortAlgorithm]] 设为 abortAlgorithm。
-
令 backpressure 为 ! WritableStreamDefaultControllerGetBackpressure(controller)。
-
执行 ! WritableStreamUpdateBackpressure(stream, backpressure)。
-
令 startResult 为执行 startAlgorithm 的结果。(可能会抛出异常。)
-
令 startPromise 为已解决的 promise,值为 startResult。
-
-
断言:stream.[[state]] 为 "
writable
" 或 "erroring
"。 -
将 controller.[[started]] 设为 true。
-
执行 ! WritableStreamDefaultControllerAdvanceQueueIfNeeded(controller)。
-
-
-
断言:stream.[[state]] 为 "
writable
" 或 "erroring
"。 -
将 controller.[[started]] 设为 true。
-
执行 ! WritableStreamDealWithRejection(stream, r)。
-
-
令 controller 为新建 WritableStreamDefaultController。
-
令 startAlgorithm 为返回 undefined 的算法。
-
令 writeAlgorithm 为返回已解决的 promise,值为 undefined 的算法。
-
令 closeAlgorithm 为返回已解决的 promise,值为 undefined 的算法。
-
令 abortAlgorithm 为返回已解决的 promise,值为 undefined 的算法。
-
如果 underlyingSinkDict["
start
"]存在,则将 startAlgorithm 设为:返回 调用 underlyingSinkDict["start
"],参数列表为 « controller »,异常行为为 "rethrow
",callback this value 为 underlyingSink 的算法。 -
如果 underlyingSinkDict["
write
"]存在,则将 writeAlgorithm 设为:接受 chunk 参数,并返回 调用 underlyingSinkDict["write
"],参数列表为 « chunk, controller »,callback this value 为 underlyingSink 的算法。 -
如果 underlyingSinkDict["
close
"]存在,则将 closeAlgorithm 设为:返回 调用 underlyingSinkDict["close
"],参数列表为 «»,callback this value 为 underlyingSink 的算法。 -
如果 underlyingSinkDict["
abort
"]存在,则将 abortAlgorithm 设为:接受 reason 参数,并返回 调用 underlyingSinkDict["abort
"],参数列表为 « reason »,callback this value 为 underlyingSink 的算法。 -
执行 ? SetUpWritableStreamDefaultController(stream, controller, startAlgorithm, writeAlgorithm, closeAlgorithm, abortAlgorithm, highWaterMark, sizeAlgorithm)。
-
令 stream 为 controller.[[stream]]。
-
如果 controller.[[started]] 为 false,则返回。
-
如果 stream.[[inFlightWriteRequest]] 不为 undefined,则返回。
-
令 state 为 stream.[[state]]。
-
断言:state 不为 "
closed
" 或 "errored
"。 -
如果 state 为 "
erroring
",-
执行 ! WritableStreamFinishErroring(stream)。
-
返回。
-
-
如果 controller.[[queue]] 为空,则返回。
-
令 value 为 ! PeekQueueValue(controller)。
-
如果 value 为 关闭哨兵,执行 ! WritableStreamDefaultControllerProcessClose(controller)。
-
否则,执行 ! WritableStreamDefaultControllerProcessWrite(controller, value)。
WritableStream
本身仍被引用。
这可以通过 弱引用观察。详见 tc39/proposal-weakrefs#31。
执行以下步骤:
-
将 controller.[[writeAlgorithm]] 设为 undefined。
-
将 controller.[[closeAlgorithm]] 设为 undefined。
-
将 controller.[[abortAlgorithm]] 设为 undefined。
-
将 controller.[[strategySizeAlgorithm]] 设为 undefined。
该算法在某些边界情况会多次执行,第一次以后不会有任何效果。
-
执行 ! EnqueueValueWithSize(controller, 关闭哨兵, 0)。
-
执行 ! WritableStreamDefaultControllerAdvanceQueueIfNeeded(controller)。
-
令 stream 为 controller.[[stream]]。
-
断言:stream.[[state]] 为 "
writable
"。 -
执行 ! WritableStreamDefaultControllerClearAlgorithms(controller)。
-
执行 ! WritableStreamStartErroring(stream, error)。
-
如果 controller.[[stream]].[[state]] 为 "
writable
",执行 ! WritableStreamDefaultControllerError(controller, error)。
-
令 desiredSize 为 ! WritableStreamDefaultControllerGetDesiredSize(controller)。
-
如果 desiredSize ≤ 0,返回 true,否则返回 false。
-
如果 controller.[[strategySizeAlgorithm]] 为 undefined,则:
-
断言:controller.[[stream]].[[state]] 不为 "
writable
"。 -
返回 1。
-
-
令 returnValue 为执行 controller.[[strategySizeAlgorithm]],传入 chunk,并将结果解释为 completion record。
-
如果 returnValue 是 abrupt completion,
-
执行 ! WritableStreamDefaultControllerErrorIfNeeded(controller, returnValue.[[Value]])。
-
返回 1。
-
-
返回 returnValue.[[Value]]。
-
返回 controller.[[strategyHWM]] − controller.[[queueTotalSize]]。
-
令 stream 为 controller.[[stream]]。
-
执行 ! WritableStreamMarkCloseRequestInFlight(stream)。
-
执行 ! DequeueValue(controller)。
-
断言:controller.[[queue]] 为空。
-
令 sinkClosePromise 为执行 controller.[[closeAlgorithm]] 的结果。
-
执行 ! WritableStreamDefaultControllerClearAlgorithms(controller)。
-
-
执行 ! WritableStreamFinishInFlightClose(stream)。
-
-
当 sinkClosePromise 被拒绝,原因为 reason 时,
-
执行 ! WritableStreamFinishInFlightCloseWithError(stream, reason)。
-
-
令 stream 为 controller.[[stream]]。
-
执行 ! WritableStreamMarkFirstWriteRequestInFlight(stream)。
-
令 sinkWritePromise 为执行 controller.[[writeAlgorithm]],传入 chunk 的结果。
-
-
执行 ! WritableStreamFinishInFlightWrite(stream)。
-
令 state 为 stream.[[state]]。
-
断言:state 为 "
writable
" 或 "erroring
"。 -
执行 ! DequeueValue(controller)。
-
如果 ! WritableStreamCloseQueuedOrInFlight(stream) 为 false 且 state 为 "
writable
",-
令 backpressure 为 ! WritableStreamDefaultControllerGetBackpressure(controller)。
-
执行 ! WritableStreamUpdateBackpressure(stream, backpressure)。
-
-
执行 ! WritableStreamDefaultControllerAdvanceQueueIfNeeded(controller)。
-
-
当 sinkWritePromise 被拒绝,原因为 reason 时,
-
如果 stream.[[state]] 为 "
writable
",执行 ! WritableStreamDefaultControllerClearAlgorithms(controller)。 -
执行 ! WritableStreamFinishInFlightWriteWithError(stream, reason)。
-
-
令 enqueueResult 为 EnqueueValueWithSize(controller, chunk, chunkSize)。
-
如果 enqueueResult 是 abrupt completion,
-
执行 ! WritableStreamDefaultControllerErrorIfNeeded(controller, enqueueResult.[[Value]])。
-
返回。
-
-
令 stream 为 controller.[[stream]]。
-
如果 ! WritableStreamCloseQueuedOrInFlight(stream) 为 false 且 stream.[[state]] 为 "
writable
",-
令 backpressure 为 ! WritableStreamDefaultControllerGetBackpressure(controller)。
-
执行 ! WritableStreamUpdateBackpressure(stream, backpressure)。
-
-
执行 ! WritableStreamDefaultControllerAdvanceQueueIfNeeded(controller)。
6. 转换流
6.1. 使用转换流
readableStream. pipeThrough( transformStream) . pipeTo( writableStream) . then(() => console. log( "所有数据已成功转换!" )) . catch ( e=> console. error( "发生错误!" , e));
readable
和
writable
属性来访问可读流和可写流的常规接口。此示例中我们使用 可写端
的 writer 接口提供数据。然后,将 可读端通过管道连接到 anotherWritableStream
。
const writer= transformStream. writable. getWriter(); writer. write( "输入块" ); transformStream. readable. pipeTo( anotherWritableStream);
fetch()
API 接收一个可读流请求体,但通过可写流接口上传数据会更方便。使用恒等变换流可以实现这一点:
const { writable, readable} = new TransformStream(); fetch( "..." , { body: readable}). then( response=> /* ... */ ); const writer= writable. getWriter(); writer. write( new Uint8Array([ 0x73 , 0x74 , 0x72 , 0x65 , 0x61 , 0x6D , 0x73 , 0x21 ])); writer. close();
恒等变换流的另一个用途是为管道增加额外缓冲区。此示例在 readableStream
和
writableStream
之间增加了额外缓冲。
const writableStrategy= new ByteLengthQueuingStrategy({ highWaterMark: 1024 * 1024 }); readableStream. pipeThrough( new TransformStream( undefined , writableStrategy)) . pipeTo( writableStream);
6.2. TransformStream
类
TransformStream
类是通用转换流概念的具体实现。
6.2.1. 接口定义
TransformStream
类的 Web IDL 定义如下:
[Exposed=*,Transferable ]interface {
TransformStream constructor (optional object ,
transformer optional QueuingStrategy = {},
writableStrategy optional QueuingStrategy = {});
readableStrategy readonly attribute ReadableStream readable ;readonly attribute WritableStream writable ; };
6.2.2. 内部槽
TransformStream
的实例创建时拥有如下表描述的内部槽:
内部槽 | 描述(非规范性) |
---|---|
[[backpressure]] | 上一次观察到 [[readable]] 时是否存在背压 |
[[backpressureChangePromise]] | 每次 [[backpressure]] 变化时都会被 fulfill 并替换的 promise |
[[controller]] | 用以控制 [[readable]] 和 [[writable]] 的
TransformStreamDefaultController
实例
|
[[Detached]] | 流被 transfer 时设为 true 的布尔标志 |
[[readable]] | 该对象控制的 ReadableStream
实例
|
[[writable]] | 该对象控制的 WritableStream
实例
|
6.2.3. transformer API
TransformStream()
构造函数的第一个参数接受一个表示transformer的
JavaScript 对象。此对象可包含下列任意方法:
dictionary {
Transformer TransformerStartCallback start ;TransformerTransformCallback transform ;TransformerFlushCallback flush ;TransformerCancelCallback cancel ;any readableType ;any writableType ; };callback =
TransformerStartCallback any (TransformStreamDefaultController );
controller callback =
TransformerFlushCallback Promise <undefined > (TransformStreamDefaultController );
controller callback =
TransformerTransformCallback Promise <undefined > (any ,
chunk TransformStreamDefaultController );
controller callback =
TransformerCancelCallback Promise <undefined > (any );
reason
start(controller)
, 类型为 TransformerStartCallback-
在
TransformStream
创建时立即调用的函数。通常用于通过
controller.enqueue()
入队前缀数据块。这些数据块会从可读端读取,不依赖于任何对可写端的写入。如果此初始化过程是异步的,例如需要获取前缀块,则该函数可以返回 promise 以表示成功或失败;被拒绝的 promise 会使流出错。抛出的异常会被
TransformStream()
构造函数重新抛出。 transform(chunk, controller)
, 类型为 TransformerTransformCallback-
当新的数据块从可写端写入并准备转换时调用的函数。流实现保证此函数仅在前一次转换成功后才会被调用,并且不会在
start()
完成之前或flush()
被调用之后调用。此函数执行转换流的实际转换工作。可以使用
controller.enqueue()
入队转换结果。这允许在可写端写入的单个数据块在可读端产生零个或多个数据块,具体取决于controller.enqueue()
被调用的次数。§ 10.9 替换模板标签的转换流演示了有时会入队零块的情况。如果转换过程是异步的,此函数可以返回 promise 以表示转换的成功或失败。被拒绝的 promise 会导致转换流的可读端和可写端都出错。
此函数可能返回的 promise 用于确保良好行为的生产者不会在数据块完全转换前对其进行修改。(这不是规范机制保证的,而是生产者与transformer之间的非正式约定。)
如果未提供
transform()
方法,则使用恒等变换,即数据块不变地从可写端传递到可读端。 flush(controller)
, 类型为 TransformerFlushCallback-
所有写入可写端的数据块都已通过
transform()
成功转换后,且可写端即将关闭时调用的函数。通常用于在可读端关闭前,将后缀数据块入队到可读端。可见示例 § 10.9 替换模板标签的转换流。
如果刷新过程是异步的,可返回 promise 表示成功或失败;结果会传递给
stream.writable.write()
的调用者。被拒绝的 promise 会导致流的可读端和可写端都出错。抛出异常等同于返回被拒绝的 promise。(注意无需在
flush()
内调用controller.terminate()
;此时流已经在正常关闭流程中,终止反而会适得其反。) cancel(reason)
, 类型为 TransformerCancelCallback-
通常用于在流被中止或取消时清理底层 transformer 资源。
如果取消过程是异步的,可返回 promise 表示成功或失败;结果会传递给
stream.writable.abort()
或stream.readable.cancel()
的调用者。抛出异常等同于返回被拒绝的 promise。(注意无需在
cancel()
内调用controller.terminate()
;此时流已经在取消/中止流程中,终止反而会适得其反。) readableType
, 类型为 any-
该属性保留用于未来使用,任何尝试赋值都会抛出异常。
writableType
, 类型为 any-
该属性保留用于未来使用,任何尝试赋值都会抛出异常。
传递给 controller
对象的 start()
、
transform()
和
flush()
均为 TransformStreamDefaultController
实例,可用于向可读端入队数据块,或终止、错误流。
6.2.4. 构造函数与属性
stream = new
TransformStream
([transformer[, writableStrategy[, readableStrategy]]])-
创建一个新的
TransformStream
,包装所提供的transformer。关于 transformer 参数的详细信息见 § 6.2.3 transformer API。如果未提供 transformer 参数,则结果将是一个恒等变换流。参见该示例了解其用途。
writableStrategy 和 readableStrategy 参数分别是队列策略对象,用于 可写端和 可读端。这些参数会用于构造
WritableStream
和ReadableStream
对象,可以为TransformStream
增加缓冲区,从而平滑转换速度的变化或增加管道缓冲。若未提供,则默认为CountQueuingStrategy
,高水位线分别为 1 和 0。 readable = stream.
readable
-
返回一个
ReadableStream
,表示该转换流的可读端。 writable = stream.
writable
-
返回一个
WritableStream
,表示该转换流的可写端。
new TransformStream(transformer, writableStrategy, readableStrategy)
构造步骤如下:
-
如果 transformer 缺失,则设为 null。
-
令 transformerDict 为 transformer,转换为 IDL 类型
Transformer
。不能直接声明 transformer 参数为
Transformer
类型,否则会丢失原对象引用。需要保留对象以便后续调用其方法。 -
如果 transformerDict["
readableType
"]存在,抛出RangeError
异常。 -
如果 transformerDict["
writableType
"]存在,抛出RangeError
异常。 -
令 readableHighWaterMark 为 ? ExtractHighWaterMark(readableStrategy, 0)。
-
令 readableSizeAlgorithm 为 ! ExtractSizeAlgorithm(readableStrategy)。
-
令 writableHighWaterMark 为 ? ExtractHighWaterMark(writableStrategy, 1)。
-
令 writableSizeAlgorithm 为 ! ExtractSizeAlgorithm(writableStrategy)。
-
令 startPromise 为新的 promise。
-
执行 ! InitializeTransformStream(this, startPromise, writableHighWaterMark, writableSizeAlgorithm, readableHighWaterMark, readableSizeAlgorithm)。
-
执行 ? SetUpTransformStreamDefaultControllerFromTransformer(this, transformer, transformerDict)。
-
如果 transformerDict["
start
"]存在,则解决 startPromise,值为调用 transformerDict["start
"], 参数列表为 « this.[[controller]] », callback this value 为 transformer。 -
否则,解决 startPromise,值为 undefined。
readable
getter 步骤如下:
-
返回 this.[[readable]]。
writable
getter 步骤如下:
-
返回 this.[[writable]]。
6.2.5. 通过 postMessage()
传递
destination.postMessage(ts, { transfer: [ts] });
-
将
TransformStream
发送到其他 frame、window 或 worker。
TransformStream
对象是可转移对象。它们的转移步骤,给定 value 和 dataHolder,如下:
-
令 readable 为 value.[[readable]]。
-
令 writable 为 value.[[writable]]。
-
如果 ! IsReadableStreamLocked(readable) 为 true,则抛出 "
DataCloneError
"DOMException
。 -
如果 ! IsWritableStreamLocked(writable) 为 true,则抛出 "
DataCloneError
"DOMException
。 -
设 dataHolder.[[readable]] 为 ! StructuredSerializeWithTransfer(readable, « readable »)。
-
设 dataHolder.[[writable]] 为 ! StructuredSerializeWithTransfer(writable, « writable »)。
-
令 readableRecord 为 ! StructuredDeserializeWithTransfer(dataHolder.[[readable]], 当前 Realm)。
-
令 writableRecord 为 ! StructuredDeserializeWithTransfer(dataHolder.[[writable]], 当前 Realm)。
-
设 value.[[readable]] 为 readableRecord.[[Deserialized]]。
-
设 value.[[writable]] 为 writableRecord.[[Deserialized]]。
-
设 value.[[backpressure]]、value.[[backpressureChangePromise]] 和 value.[[controller]] 均设为 undefined。
[[backpressure]]、[[backpressureChangePromise]] 和 [[controller]] 槽在被传递的 TransformStream
中不再被使用。
6.3.
TransformStreamDefaultController
类
TransformStreamDefaultController
类有方法可以操作关联的 ReadableStream
和 WritableStream
。
构造 TransformStream
时,transformer 对象会获得对应的 TransformStreamDefaultController
实例用于操作。
6.3.1. 接口定义
TransformStreamDefaultController
类的 Web IDL 定义如下:
[Exposed=*]interface {
TransformStreamDefaultController readonly attribute unrestricted double ?desiredSize ;undefined enqueue (optional any );
chunk undefined error (optional any );
reason undefined terminate (); };
6.3.2. 内部槽
TransformStreamDefaultController
的实例创建时拥有如下表描述的内部槽:
内部槽 | 描述(非规范性) |
---|---|
[[cancelAlgorithm]] | 一个返回 promise 的算法,接受一个参数(取消原因),用于将取消请求传递给transformer |
[[finishPromise]] | 一个 promise,在[[cancelAlgorithm]]或[[flushAlgorithm]]完成时 resolve。如果该字段未赋值(即为 undefined),则表示尚未调用这两个算法 |
[[flushAlgorithm]] | 一个返回 promise 的算法,用于向transformer传递关闭请求 |
[[stream]] | 被控制的 TransformStream
实例
|
[[transformAlgorithm]] | 一个返回 promise 的算法,接受一个参数(待转换的数据块),请求transformer执行转换 |
6.3.3. 方法与属性
desiredSize = controller.
desiredSize
-
返回填充可读端内部队列的期望大小。如果队列过满,该值可能为负。
controller.
enqueue
(chunk)controller.
error
(e)controller.
terminate
()-
关闭被控制转换流的可读端,并使可写端出错。当transformer只需要消费一部分数据块时很有用。
desiredSize
getter 步骤如下:
-
令 readableController 为 this.[[stream]].[[readable]].[[controller]]。
-
返回 ! ReadableStreamDefaultControllerGetDesiredSize(readableController)。
enqueue(chunk)
方法步骤如下:
-
执行 ? TransformStreamDefaultControllerEnqueue(this, chunk)。
error(e)
方法步骤如下:
-
执行 ? TransformStreamDefaultControllerError(this, e)。
terminate()
方法步骤如下:
6.4. 抽象操作
6.4.1. 与转换流相关操作
以下抽象操作在更高层面操作 TransformStream
实例。
-
令 startAlgorithm 为返回 startPromise 的算法。
-
令 writeAlgorithm 为如下步骤,接收 chunk 参数:
-
返回 ! TransformStreamDefaultSinkWriteAlgorithm(stream, chunk)。
-
-
令 abortAlgorithm 为如下步骤,接收 reason 参数:
-
返回 ! TransformStreamDefaultSinkAbortAlgorithm(stream, reason)。
-
-
令 closeAlgorithm 为如下步骤:
-
返回 ! TransformStreamDefaultSinkCloseAlgorithm(stream)。
-
-
将 stream.[[writable]] 设为 ! CreateWritableStream(startAlgorithm, writeAlgorithm, closeAlgorithm, abortAlgorithm, writableHighWaterMark, writableSizeAlgorithm)。
-
令 pullAlgorithm 为如下步骤:
-
返回 ! TransformStreamDefaultSourcePullAlgorithm(stream)。
-
-
令 cancelAlgorithm 为如下步骤,接收 reason 参数:
-
返回 ! TransformStreamDefaultSourceCancelAlgorithm(stream, reason)。
-
-
将 stream.[[readable]] 设为 ! CreateReadableStream(startAlgorithm, pullAlgorithm, cancelAlgorithm, readableHighWaterMark, readableSizeAlgorithm)。
-
将 stream.[[backpressure]] 和 stream.[[backpressureChangePromise]] 设为 undefined。
[[backpressure]] 槽被设为 undefined,以便通过 TransformStreamSetBackpressure 初始化。或者实现可以直接用布尔值并改变初始化方式。只要初始化在 transformer 的
start()
方法调用前完成,用户代码不可见。 -
执行 ! TransformStreamSetBackpressure(stream, true)。
-
将 stream.[[controller]] 设为 undefined。
-
执行 ! ReadableStreamDefaultControllerError(stream.[[readable]].[[controller]], e)。
-
执行 ! TransformStreamErrorWritableAndUnblockWrite(stream, e)。
当一端或双方已出错时,该操作依然有效。因此,调用算法在处理错误时无需检查流状态。
-
执行 ! TransformStreamDefaultControllerClearAlgorithms(stream.[[controller]])。
-
执行 ! WritableStreamDefaultControllerErrorIfNeeded(stream.[[writable]].[[controller]], e)。
-
执行 ! TransformStreamUnblockWrite(stream)。
-
断言:stream.[[backpressure]] 不等于 backpressure。
-
如果 stream.[[backpressureChangePromise]] 不为 undefined,则解决 stream.[[backpressureChangePromise]],值为 undefined。
-
将 stream.[[backpressureChangePromise]] 设为新的 promise。
-
将 stream.[[backpressure]] 设为 backpressure。
-
如果 stream.[[backpressure]] 为 true,则执行 ! TransformStreamSetBackpressure(stream, false)。
TransformStreamDefaultSinkWriteAlgorithm 抽象操作可能在等待 [[backpressureChangePromise]] 槽中的 promise 被解决。调用 TransformStreamSetBackpressure 可保证 promise 总能被解决。
6.4.2. 默认控制器
以下抽象操作用于实现 TransformStreamDefaultController
类。
-
断言:stream 实现
TransformStream
。 -
断言:stream.[[controller]] 为 undefined。
-
将 controller.[[stream]] 设为 stream。
-
将 stream.[[controller]] 设为 controller。
-
将 controller.[[transformAlgorithm]] 设为 transformAlgorithm。
-
将 controller.[[flushAlgorithm]] 设为 flushAlgorithm。
-
将 controller.[[cancelAlgorithm]] 设为 cancelAlgorithm。
-
令 controller 为新建 TransformStreamDefaultController。
-
令 transformAlgorithm 为如下步骤,接收 chunk 参数:
-
令 result 为 TransformStreamDefaultControllerEnqueue(controller, chunk)。
-
如果 result 是 abrupt completion,返回拒绝的 promise,值为 result.[[Value]]。
-
否则返回已解决的 promise,值为 undefined。
-
-
令 flushAlgorithm 为返回已解决的 promise,值为 undefined 的算法。
-
令 cancelAlgorithm 为返回已解决的 promise,值为 undefined 的算法。
-
如果 transformerDict["
transform
"]存在,则将 transformAlgorithm 设为一个算法,接收 chunk 参数并返回 调用 transformerDict["transform
"],参数列表为 « chunk, controller »,callback this value 为 transformer。 -
如果 transformerDict["
flush
"]存在,则将 flushAlgorithm 设为一个算法,返回 调用 transformerDict["flush
"],参数列表为 « controller »,callback this value 为 transformer。 -
如果 transformerDict["
cancel
"]存在,则将 cancelAlgorithm 设为一个算法,接收 reason 参数并返回 调用 transformerDict["cancel
"],参数列表为 « reason »,callback this value 为 transformer。 -
执行 ! SetUpTransformStreamDefaultController(stream, controller, transformAlgorithm, flushAlgorithm, cancelAlgorithm)。
TransformStream
本身仍被引用。
这可以通过 弱引用观察。详见 tc39/proposal-weakrefs#31。
执行以下步骤:
-
将 controller.[[transformAlgorithm]] 设为 undefined。
-
将 controller.[[flushAlgorithm]] 设为 undefined。
-
将 controller.[[cancelAlgorithm]] 设为 undefined。
-
令 stream 为 controller.[[stream]]。
-
令 readableController 为 stream.[[readable]].[[controller]]。
-
如果 ! ReadableStreamDefaultControllerCanCloseOrEnqueue(readableController) 为 false,抛出
TypeError
异常。 -
令 enqueueResult 为 ReadableStreamDefaultControllerEnqueue(readableController, chunk)。
-
如果 enqueueResult 是 abrupt completion,
-
执行 ! TransformStreamErrorWritableAndUnblockWrite(stream, enqueueResult.[[Value]])。
-
抛出 stream.[[readable]].[[storedError]]。
-
-
令 backpressure 为 ! ReadableStreamDefaultControllerHasBackpressure(readableController)。
-
如果 backpressure 不等于 stream.[[backpressure]],
-
断言:backpressure 为 true。
-
执行 ! TransformStreamSetBackpressure(stream, true)。
-
-
执行 ! TransformStreamError(controller.[[stream]], e)。
-
令 transformPromise 为执行 controller.[[transformAlgorithm]],传入 chunk 的结果。
-
返回 对 transformPromise 的反应,拒绝步骤如下,参数为 r:
-
执行 ! TransformStreamError(controller.[[stream]], r)。
-
抛出 r。
-
-
令 stream 为 controller.[[stream]]。
-
令 readableController 为 stream.[[readable]].[[controller]]。
-
执行 ! ReadableStreamDefaultControllerClose(readableController)。
-
令 error 为
TypeError
异常,表示流已被终止。 -
执行 ! TransformStreamErrorWritableAndUnblockWrite(stream, error)。
6.4.3. 默认 sink
-
断言:stream.[[writable]].[[state]] 为 "
writable
"。 -
令 controller 为 stream.[[controller]]。
-
如果 stream.[[backpressure]] 为 true,
-
令 backpressureChangePromise 为 stream.[[backpressureChangePromise]]。
-
断言:backpressureChangePromise 不为 undefined。
-
返回 响应 backpressureChangePromise 的如下 fulfill 步骤:
-
令 writable 为 stream.[[writable]]。
-
令 state 为 writable.[[state]]。
-
如果 state 为 "
erroring
",抛出 writable.[[storedError]]。 -
断言:state 为 "
writable
"。 -
返回 ! TransformStreamDefaultControllerPerformTransform(controller, chunk)。
-
-
-
返回 ! TransformStreamDefaultControllerPerformTransform(controller, chunk)。
-
令 controller 为 stream.[[controller]]。
-
如果 controller.[[finishPromise]] 不为 undefined,返回 controller.[[finishPromise]]。
-
令 readable 为 stream.[[readable]]。
-
令 controller.[[finishPromise]] 为新的 promise。
-
令 cancelPromise 为执行 controller.[[cancelAlgorithm]],传入 reason 的结果。
-
执行 ! TransformStreamDefaultControllerClearAlgorithms(controller)。
-
响应 cancelPromise:
-
如果 cancelPromise 被 fulfill:
-
如果 readable.[[state]] 为 "
errored
",拒绝 controller.[[finishPromise]],原因为 readable.[[storedError]]。 -
否则:
-
执行 ! ReadableStreamDefaultControllerError(readable.[[controller]], reason)。
-
解决 controller.[[finishPromise]],值为 undefined。
-
-
-
如果 cancelPromise 被拒绝,原因为 r:
-
执行 ! ReadableStreamDefaultControllerError(readable.[[controller]], r)。
-
拒绝 controller.[[finishPromise]],原因为 r。
-
-
-
返回 controller.[[finishPromise]]。
-
令 controller 为 stream.[[controller]]。
-
如果 controller.[[finishPromise]] 不为 undefined,返回 controller.[[finishPromise]]。
-
令 readable 为 stream.[[readable]]。
-
令 controller.[[finishPromise]] 为新的 promise。
-
令 flushPromise 为执行 controller.[[flushAlgorithm]] 的结果。
-
执行 ! TransformStreamDefaultControllerClearAlgorithms(controller)。
-
响应 flushPromise:
-
如果 flushPromise 被 fulfill:
-
如果 readable.[[state]] 为 "
errored
",拒绝 controller.[[finishPromise]],原因为 readable.[[storedError]]。 -
否则:
-
执行 ! ReadableStreamDefaultControllerClose(readable.[[controller]])。
-
解决 controller.[[finishPromise]],值为 undefined。
-
-
-
如果 flushPromise 被拒绝,原因为 r:
-
执行 ! ReadableStreamDefaultControllerError(readable.[[controller]], r)。
-
拒绝 controller.[[finishPromise]],原因为 r。
-
-
-
返回 controller.[[finishPromise]]。
6.4.4. 默认 source
-
令 controller 为 stream.[[controller]]。
-
如果 controller.[[finishPromise]] 不为 undefined,返回 controller.[[finishPromise]]。
-
令 writable 为 stream.[[writable]]。
-
令 controller.[[finishPromise]] 为新的 promise。
-
令 cancelPromise 为执行 controller.[[cancelAlgorithm]],传入 reason 的结果。
-
执行 ! TransformStreamDefaultControllerClearAlgorithms(controller)。
-
响应 cancelPromise:
-
如果 cancelPromise 被 fulfill:
-
如果 writable.[[state]] 为 "
errored
",拒绝 controller.[[finishPromise]],原因为 writable.[[storedError]]。 -
否则:
-
执行 ! WritableStreamDefaultControllerErrorIfNeeded(writable.[[controller]], reason)。
-
执行 ! TransformStreamUnblockWrite(stream)。
-
解决 controller.[[finishPromise]],值为 undefined。
-
-
-
如果 cancelPromise 被拒绝,原因为 r:
-
执行 ! WritableStreamDefaultControllerErrorIfNeeded(writable.[[controller]], r)。
-
执行 ! TransformStreamUnblockWrite(stream)。
-
拒绝 controller.[[finishPromise]],原因为 r。
-
-
-
返回 controller.[[finishPromise]]。
-
断言:stream.[[backpressure]] 为 true。
-
断言:stream.[[backpressureChangePromise]] 不为 undefined。
-
执行 ! TransformStreamSetBackpressure(stream, false)。
-
返回 stream.[[backpressureChangePromise]]。
7. 队列策略
7.1. 队列策略 API
ReadableStream()
、
WritableStream()
和
TransformStream()
构造函数都至少接受一个参数,表示所创建流的合适队列策略。这些对象包含以下属性:
dictionary {
QueuingStrategy unrestricted double highWaterMark ;QueuingStrategySize size ; };callback =
QueuingStrategySize unrestricted double (any );
chunk
highWaterMark
, 类型为 unrestricted double-
非负数,表示使用该队列策略的流的高水位线。
size(chunk)
(仅用于非字节流),类型为 QueuingStrategySize-
一个函数,用于计算并返回给定数据块值的有限非负大小。
该结果用于确定背压,通过相应的
desiredSize
属性体现:可能是defaultController.desiredSize
、byteController.desiredSize
或writer.desiredSize
,依据队列策略使用位置而定。对于可读流,还决定何时调用底层 source的pull()
方法。此函数必须是幂等且无副作用,否则会造成很奇怪的结果。
对于可读字节流,该函数不会被使用,因为数据块始终以字节计量。
任何具有这些属性的对象都可用作队列策略对象。但我们提供了两种内建队列策略类,用于特定场景下的通用表达:ByteLengthQueuingStrategy
和 CountQueuingStrategy
。
它们的构造函数都使用以下 Web IDL 片段:
dictionary {
QueuingStrategyInit required unrestricted double ; };
highWaterMark
7.2. ByteLengthQueuingStrategy
类
处理字节时常见的队列策略是等待输入数据块的 byteLength
属性累计达到指定高水位线。因此,作为内建队列策略提供,可用于构造流。
const stream= new ReadableStream( { ... }, new ByteLengthQueuingStrategy({ highWaterMark: 16 * 1024 }) );
在此情况下,可读流的底层 source可入队总计 16 KiB 的数据块,然后可读流实现才会向底层 source 发出背压信号。
const stream= new WritableStream( { ... }, new ByteLengthQueuingStrategy({ highWaterMark: 32 * 1024 }) );
在此情况下,可写流的内部队列可累计 32 KiB 的数据块,等待先前写入底层 sink 完成,然后可写流才会向生产者发出背压信号。
无需在可读字节流中使用 ByteLengthQueuingStrategy
,因为其始终以字节计量数据块。尝试用
ByteLengthQueuingStrategy
构造字节流会失败。
7.2.1. 接口定义
ByteLengthQueuingStrategy
类的 Web IDL 定义如下:
[Exposed=*]interface {
ByteLengthQueuingStrategy constructor (QueuingStrategyInit );
init readonly attribute unrestricted double highWaterMark ;readonly attribute Function size ; };
7.2.2. 内部槽
ByteLengthQueuingStrategy
实例有一个 [[highWaterMark]] 内部槽,存储构造函数中给定的值。
-
令 steps 如下,参数为 chunk:
-
返回 ? GetV(chunk, "
byteLength
")。
-
-
令 F 为 ! CreateBuiltinFunction(steps, 1, "
size
", « », globalObject 的 相关 Realm)。 -
将 globalObject 的 字节长度队列策略 size 函数 设为一个
Function
,代表对 F 的引用,callback context 为 globalObject 的 相关设置对象。
此设计带有历史原因。目的是确保 size
是函数而不是方法,即不会检查 this
。相关背景见 whatwg/streams#1005 和 heycam/webidl#819。
7.2.3. 构造函数与属性
strategy = new
ByteLengthQueuingStrategy
({highWaterMark
})-
创建一个新的
ByteLengthQueuingStrategy
,使用提供的 高水位线。注意,所提供的高水位线不会提前验证。如果为负数、NaN 或非数字,则生成的
ByteLengthQueuingStrategy
会导致对应流构造函数抛出异常。 highWaterMark = strategy.
highWaterMark
-
返回构造函数提供的高水位线。
strategy.
size
(chunk)-
通过返回 chunk 的
byteLength
属性值来衡量其大小。
new ByteLengthQueuingStrategy(init)
构造步骤如下:
-
将 this.[[highWaterMark]] 设为 init["
highWaterMark
"]。
highWaterMark
getter 步骤如下:
size
getter 步骤如下:
-
返回 this 的 相关全局对象的 字节长度队列策略 size 函数。
7.3. CountQueuingStrategy
类
处理通用对象流时常见的队列策略是简单计数已累计的数据块数量,直到该数值达到指定高水位线。因此该策略也作为内建提供。
const stream= new ReadableStream( { ... }, new CountQueuingStrategy({ highWaterMark: 10 }) );
在此情况下,可读流的底层 source可入队 10 个(任意类型的)数据块,然后可读流实现才会向底层 source 发出背压信号。
const stream= new WritableStream( { ... }, new CountQueuingStrategy({ highWaterMark: 5 }) );
在此情况下,可写流的内部队列可累计 5 个(任意类型的)数据块,等待先前写入底层 sink 完成,然后可写流才会向生产者发出背压信号。
7.3.1. 接口定义
CountQueuingStrategy
类的 Web IDL 定义如下:
[Exposed=*]interface {
CountQueuingStrategy constructor (QueuingStrategyInit );
init readonly attribute unrestricted double highWaterMark ;readonly attribute Function size ; };
7.3.2. 内部槽
CountQueuingStrategy
实例有一个 [[highWaterMark]] 内部槽,存储构造函数中给定的值。
-
令 steps 如下:
-
返回 1。
-
-
令 F 为 ! CreateBuiltinFunction(steps, 0, "
size
", « », globalObject 的 相关 Realm)。 -
将 globalObject 的 计数队列策略 size 函数 设为一个
Function
,代表对 F 的引用,callback context 为 globalObject 的 相关设置对象。
此设计带有历史原因。目的是确保 size
是函数而不是方法,即不会检查 this
。相关背景见 whatwg/streams#1005 和 heycam/webidl#819。
7.3.3. 构造函数与属性
strategy = new
CountQueuingStrategy
({highWaterMark
})-
创建一个新的
CountQueuingStrategy
,使用提供的 高水位线。注意,所提供的高水位线不会提前验证。如果为负数、NaN 或非数字,则生成的
CountQueuingStrategy
会导致对应流构造函数抛出异常。 highWaterMark = strategy.
highWaterMark
-
返回构造函数提供的高水位线。
strategy.
size
(chunk)-
通过始终返回 1 来衡量 chunk 的大小。这样保证队列总大小等于队列中的数据块数量。
new CountQueuingStrategy(init)
构造步骤如下:
-
将 this.[[highWaterMark]] 设为 init["
highWaterMark
"]。
highWaterMark
getter 步骤如下:
size
getter 步骤如下:
-
返回 this 的 相关全局对象的 计数队列策略 size 函数。
7.4. 抽象操作
以下算法由流构造函数用于从 QueuingStrategy
字典中提取相关内容。
-
如果 strategy["
highWaterMark
"] 不存在,返回 defaultHWM。 -
令 highWaterMark 为 strategy["
highWaterMark
"]。 -
如果 highWaterMark 是 NaN 或 highWaterMark < 0,抛出
RangeError
异常。 -
返回 highWaterMark。
8. 支持性抽象操作
下列抽象操作用于支持多种类型流的实现,因此未在上面主要章节中分组。
8.1. 带大小的队列
本规范中的流使用“带大小的队列”数据结构来存储排队的值及其确定的大小。多个规范对象都包含一个带大小的队列,通过对象拥有一对内部槽表示,总命名为 [[queue]] 和
[[queueTotalSize]]。[[queue]] 是 列表,其中每项为 带大小的值,而 [[queueTotalSize]] 是 JavaScript Number
(即双精度浮点数)。
操作包含带大小队列的对象时,下列抽象操作用于确保两个内部槽保持同步。
由于浮点运算精度有限,这里规定的通过 [[queueTotalSize]] 槽保持累积总数的框架,与累加 [[queue]] 中所有数据块的大小并不完全等价。(不过,只有当数据块间大小差异极大(约 1015),或排队数据块数达数万亿级别时才有差异。)
以下定义中,带大小的值是一个 结构体,含两个 成员:value 和 size。
-
断言:container 有 [[queue]] 和 [[queueTotalSize]] 内部槽。
-
断言:container.[[queue]] 不是 空。
-
令 valueWithSize 为 container.[[queue]][0]。
-
从 container.[[queue]] 移除 valueWithSize。
-
令 container.[[queueTotalSize]] 为 container.[[queueTotalSize]] − valueWithSize 的 size。
-
若 container.[[queueTotalSize]] < 0,则令其为 0。(可能因舍入误差出现。)
-
返回 valueWithSize 的 value。
-
断言:container 有 [[queue]] 和 [[queueTotalSize]] 内部槽。
-
如果 ! IsNonNegativeNumber(size) 为 false,则抛出
RangeError
异常。 -
如果 size 为 +∞,则抛出
RangeError
异常。 -
追加一个新的带大小的值,其 value 为 value,size 为 size 到 container.[[queue]]。
-
令 container.[[queueTotalSize]] 为 container.[[queueTotalSize]] + size。
-
断言:container 有 [[queue]] 和 [[queueTotalSize]] 内部槽。
-
令 container.[[queue]] 为新的空 列表。
-
令 container.[[queueTotalSize]] 为 0。
8.2. 可转移流
可转移流通过一种特殊的恒等变换实现,其可写端位于一个realm,而可读端位于另一个 realm。以下抽象操作用于实现这些“跨 realm 变换”。
-
执行 PackAndPostMessage(port, "
error
", error),并丢弃结果。
当执行此抽象操作时已处于错误状态,无法处理更多错误,故直接丢弃。
-
令 message 为 OrdinaryObjectCreate(null)。
-
执行 ! CreateDataProperty(message, "
type
", type)。 -
执行 ! CreateDataProperty(message, "
value
", value)。 -
令 targetPort 为与 port 关联的 port(如果有);否则设为 null。
-
令 options 为 «[ "
transfer
" → « » ]»。 -
按消息端口 post message 步骤,传入 targetPort、message 和 options。
使用 JavaScript 对象进行传输,避免重复消息端口 post message 步骤。对象原型设为 null,避免受 %Object.prototype%
干扰。
-
令 result 为 PackAndPostMessage(port, type, value)。
-
如果 result 是 abrupt completion,
-
执行 ! CrossRealmTransformSendError(port, result.[[Value]])。
-
-
返回 result 作为完成记录。
-
执行 ! InitializeReadableStream(stream)。
-
令 controller 为新建 ReadableStreamDefaultController。
-
为 port 的
message
事件添加处理器,步骤如下:-
令 data 为消息数据。
-
断言:data 是对象。
-
令 type 为 ! Get(data, "
type
")。 -
令 value 为 ! Get(data, "
value
")。 -
断言:type 是字符串。
-
如果 type 是 "
chunk
",-
执行 ! ReadableStreamDefaultControllerEnqueue(controller, value)。
-
-
否则如果 type 是 "
close
",-
执行 ! ReadableStreamDefaultControllerClose(controller)。
-
断开 port 的关联。
-
-
否则如果 type 是 "
error
",-
执行 ! ReadableStreamDefaultControllerError(controller, value)。
-
断开 port 的关联。
-
-
-
为 port 的
messageerror
事件添加处理器,步骤如下:-
令 error 为新建 "
DataCloneError
"DOMException
。 -
执行 ! CrossRealmTransformSendError(port, error)。
-
执行 ! ReadableStreamDefaultControllerError(controller, error)。
-
断开 port 的关联。
-
-
启用 port 的 端口消息队列。
-
令 startAlgorithm 为返回 undefined 的算法。
-
令 pullAlgorithm 为如下步骤:
-
执行 ! PackAndPostMessage(port, "
pull
", undefined)。 -
返回已解决的 promise,值为 undefined。
-
-
令 cancelAlgorithm 为如下步骤,参数为 reason:
-
令 result 为 PackAndPostMessageHandlingError(port, "
error
", reason)。 -
断开 port 的关联。
-
如果 result 是 abrupt completion,返回拒绝的 promise,值为 result.[[Value]]。
-
否则,返回已解决的 promise,值为 undefined。
-
-
令 sizeAlgorithm 为总是返回 1 的算法。
-
执行 ! SetUpReadableStreamDefaultController(stream, controller, startAlgorithm, pullAlgorithm, cancelAlgorithm, 0, sizeAlgorithm)。
建议实现时显式处理此算法中断言的失败情况,因为输入可能来自不可信环境,否则可能导致安全问题。
-
执行 ! InitializeWritableStream(stream)。
-
令 controller 为新建 WritableStreamDefaultController。
-
令 backpressurePromise 为新的 promise。
-
为 port 的
message
事件添加处理器,步骤如下:-
令 data 为消息数据。
-
断言:data 是对象。
-
令 type 为 ! Get(data, "
type
")。 -
令 value 为 ! Get(data, "
value
")。 -
断言:type 是字符串。
-
如果 type 是 "
pull
",-
如果 backpressurePromise 不为 undefined,
-
解决 backpressurePromise,值为 undefined。
-
令 backpressurePromise 为 undefined。
-
-
-
否则如果 type 是 "
error
",-
执行 ! WritableStreamDefaultControllerErrorIfNeeded(controller, value)。
-
如果 backpressurePromise 不为 undefined,
-
解决 backpressurePromise,值为 undefined。
-
令 backpressurePromise 为 undefined。
-
-
-
-
为 port 的
messageerror
事件添加处理器,步骤如下:-
令 error 为新建 "
DataCloneError
"DOMException
。 -
执行 ! CrossRealmTransformSendError(port, error)。
-
执行 ! WritableStreamDefaultControllerErrorIfNeeded(controller, error)。
-
断开 port 的关联。
-
-
启用 port 的 端口消息队列。
-
令 startAlgorithm 为返回 undefined 的算法。
-
令 writeAlgorithm 为如下步骤,参数为 chunk:
-
如果 backpressurePromise 为 undefined,令 backpressurePromise 为已解决的 promise,值为 undefined。
-
返回 响应 backpressurePromise 的如下 fulfill 步骤:
-
令 backpressurePromise 为新的 promise。
-
令 result 为 PackAndPostMessageHandlingError(port, "
chunk
", chunk)。 -
如果 result 是 abrupt completion,
-
断开 port 的关联。
-
返回拒绝的 promise,值为 result.[[Value]]。
-
-
否则,返回已解决的 promise,值为 undefined。
-
-
-
令 closeAlgorithm 为如下步骤:
-
执行 ! PackAndPostMessage(port, "
close
", undefined)。 -
断开 port 的关联。
-
返回已解决的 promise,值为 undefined。
-
-
令 abortAlgorithm 为如下步骤,参数为 reason:
-
令 result 为 PackAndPostMessageHandlingError(port, "
error
", reason)。 -
断开 port 的关联。
-
如果 result 是 abrupt completion,返回拒绝的 promise,值为 result.[[Value]]。
-
否则,返回已解决的 promise,值为 undefined。
-
-
令 sizeAlgorithm 为总是返回 1 的算法。
-
执行 ! SetUpWritableStreamDefaultController(stream, controller, startAlgorithm, writeAlgorithm, closeAlgorithm, abortAlgorithm, 1, sizeAlgorithm)。
建议实现时显式处理此算法中断言的失败情况,因为输入可能来自不可信环境,否则可能导致安全问题。
8.3. 杂项
以下抽象操作属于各种实用工具。
-
断言:O 是对象。
-
断言:O 有 [[ArrayBufferData]] 内部槽。
-
如果 ! IsDetachedBuffer(O) 为真,返回 false。
-
如果 SameValue(O.[[ArrayBufferDetachKey]], undefined) 为假,返回 false。
-
返回 true。
-
如果 v 不是数字,返回 false。
-
如果 v 是 NaN,返回 false。
-
如果 v < 0,返回 false。
-
返回 true。
-
断言:! IsDetachedBuffer(O) 为假。
-
令 arrayBufferData 为 O.[[ArrayBufferData]]。
-
令 arrayBufferByteLength 为 O.[[ArrayBufferByteLength]]。
-
执行 ? DetachArrayBuffer(O)。
若 O 的 [[ArrayBufferDetachKey]] 非 undefined(如
WebAssembly.Memory
的buffer
)则此步骤会抛异常。 [WASM-JS-API-1] -
返回新建的
ArrayBuffer
对象,创建于当前 Realm,其 [[ArrayBufferData]] 内部槽值为 arrayBufferData,[[ArrayBufferByteLength]] 内部槽值为 arrayBufferByteLength。
-
断言:O 是对象。
-
断言:O 有 [[ViewedArrayBuffer]] 内部槽。
-
断言:! IsDetachedBuffer(O.[[ViewedArrayBuffer]]) 为假。
-
令 buffer 为 ? CloneArrayBuffer(O.[[ViewedArrayBuffer]], O.[[ByteOffset]], O.[[ByteLength]],
%ArrayBuffer%
)。 -
令 array 为 ! Construct(
%Uint8Array%
, « buffer »)。 -
返回 array。
-
令 serialized 为 ? StructuredSerialize(v)。
-
返回 ? StructuredDeserialize(serialized, 当前 Realm)。
-
断言:toBuffer 是对象。
-
断言:toBuffer 有 [[ArrayBufferData]] 内部槽。
-
断言:fromBuffer 是对象。
-
断言:fromBuffer 有 [[ArrayBufferData]] 内部槽。
-
如果 toBuffer 为 fromBuffer,返回 false。
-
如果 ! IsDetachedBuffer(toBuffer) 为真,返回 false。
-
如果 ! IsDetachedBuffer(fromBuffer) 为真,返回 false。
-
如果 toIndex + count > toBuffer.[[ArrayBufferByteLength]],返回 false。
-
如果 fromIndex + count > fromBuffer.[[ArrayBufferByteLength]],返回 false。
-
返回 true。
9. 在其他规范中使用流
本标准的大部分内容关注于流的内部机制。其他规范通常无需关心这些细节,而应通过本标准定义的各种 IDL 类型及以下定义进行接口对接。
规范不应直接检查或操作本标准定义的各种内部槽。同样,也不应使用此处定义的抽象操作。直接使用可能破坏本标准维护的不变式。
如果你的规范需要以本节未支持的方式与流接口,请提交 issue。本节将根据需要有机扩展。
9.1. 可读流
9.1.1. 创建与操作
ReadableStream
对象 stream,给定可选算法 pullAlgorithm、可选算法 cancelAlgorithm、可选数字
highWaterMark(默认
1)、以及可选算法 sizeAlgorithm,请执行以下步骤。如果给定,pullAlgorithm
和 cancelAlgorithm 可返回 promise。若给定,sizeAlgorithm 必须接受数据块对象并返回数字;若给定,highWaterMark 必须为非负且非 NaN 数字。
-
令 startAlgorithm 为返回 undefined 的算法。
-
令 pullAlgorithmWrapper 为如下算法:
-
令 result 为执行 pullAlgorithm 的结果(如提供,否则为 null)。如抛出异常 e,返回拒绝的 promise,值为 e。
-
如果 result 是
Promise
,返回 result。 -
返回已解决的 promise,值为 undefined。
-
-
令 cancelAlgorithmWrapper 为如下算法(参数 reason):
-
令 result 为执行 cancelAlgorithm(值为 reason)的结果(如提供,否则为 null)。如抛出异常 e,返回拒绝的 promise,值为 e。
-
如果 result 是
Promise
,返回 result。 -
返回已解决的 promise,值为 undefined。
-
-
若未给定 sizeAlgorithm,则设为总是返回 1 的算法。
-
执行 ! InitializeReadableStream(stream)。
-
令 controller 为新建 ReadableStreamDefaultController。
-
执行 ! SetUpReadableStreamDefaultController(stream, controller, startAlgorithm, pullAlgorithmWrapper, cancelAlgorithmWrapper, highWaterMark, sizeAlgorithm)。
ReadableStream
对象 stream,给定可选算法 pullAlgorithm、可选算法
cancelAlgorithm、可选数字
highWaterMark(默认
0),执行以下步骤。如果给定,pullAlgorithm 和 cancelAlgorithm 可返回
promise。若给定,highWaterMark 必须为非负且非 NaN 数字。
-
令 startAlgorithm 为返回 undefined 的算法。
-
令 pullAlgorithmWrapper 为如下算法:
-
令 result 为执行 pullAlgorithm 的结果(如提供,否则为 null)。如抛出异常 e,返回拒绝的 promise,值为 e。
-
如果 result 是
Promise
,返回 result。 -
返回已解决的 promise,值为 undefined。
-
-
令 cancelAlgorithmWrapper 为如下算法:
-
令 result 为执行 cancelAlgorithm 的结果(如提供,否则为 null)。如抛出异常 e,返回拒绝的 promise,值为 e。
-
如果 result 是
Promise
,返回 result。 -
返回已解决的 promise,值为 undefined。
-
-
执行 ! InitializeReadableStream(stream)。
-
令 controller 为新建 ReadableByteStreamController。
-
执行 ! SetUpReadableByteStreamController(stream, controller, startAlgorithm, pullAlgorithmWrapper, cancelAlgorithmWrapper, highWaterMark, undefined)。
ReadableStream
的子类会直接在构造步骤内对 设置或 设置支持字节读取操作使用 this 值。
以下算法仅适用于经上述设置或设置支持字节读取算法初始化的 ReadableStream
实例(不适用于例如 web 开发者创建的实例):
ReadableStream
stream 的 填满高水位线所需大小,按如下步骤:
-
如果 stream 不是 可读,则返回 0。
-
如果 stream.[[controller]] 实现
ReadableByteStreamController
, 返回 ! ReadableByteStreamControllerGetDesiredSize(stream.[[controller]])。 -
返回 ! ReadableStreamDefaultControllerGetDesiredSize(stream.[[controller]])。
ReadableStream
需要更多数据,若其填满高水位线所需大小大于零。
ReadableStream
stream:
-
如果 stream.[[controller]] 实现
ReadableByteStreamController
:-
执行 ! ReadableByteStreamControllerClose(stream.[[controller]])。
-
如果 stream.[[controller]].[[pendingPullIntos]] 不是 空,则执行 ! ReadableByteStreamControllerRespond(stream.[[controller]], 0)。
-
-
否则,执行 ! ReadableStreamDefaultControllerClose(stream.[[controller]])。
ReadableStream
stream,给定 JavaScript
值 e:
-
如果 stream.[[controller]] 实现
ReadableByteStreamController
, 则执行 ! ReadableByteStreamControllerError(stream.[[controller]], e)。 -
否则,执行 ! ReadableStreamDefaultControllerError(stream.[[controller]], e)。
ReadableStream
stream:
-
如果 stream.[[controller]] 实现
ReadableStreamDefaultController
,-
执行 ! ReadableStreamDefaultControllerEnqueue(stream.[[controller]], chunk)。
-
-
否则,
-
断言:stream.[[controller]] 实现
ReadableByteStreamController
。 -
断言:chunk 是
ArrayBufferView
。 -
令 byobView 为 stream 的当前 BYOB 请求视图。
-
如果 byobView 非 null 且 chunk.[[ViewedArrayBuffer]] 为 byobView.[[ViewedArrayBuffer]],则:
-
断言:chunk.[[ByteOffset]] 为 byobView.[[ByteOffset]]。
-
断言:chunk.[[ByteLength]] ≤ byobView.[[ByteLength]]。
这些断言确保调用者不会在当前 BYOB 请求视图写入请求范围之外。
-
执行 ? ReadableByteStreamControllerRespond(stream.[[controller]], chunk.[[ByteLength]])。
-
-
否则,执行 ? ReadableByteStreamControllerEnqueue(stream.[[controller]], chunk)。
-
以下算法仅适用于经上述设置支持字节读取算法初始化的 ReadableStream
实例:
ReadableStream
stream 的 当前 BYOB 请求视图为 ArrayBufferView
或 null,按如下步骤判断:
-
断言:stream.[[controller]] 实现
ReadableByteStreamController
。 -
令 byobRequest 为 ! ReadableByteStreamControllerGetBYOBRequest(stream.[[controller]])。
-
若 byobRequest 为 null,则返回 null。
-
返回 byobRequest.[[view]]。
规范不得转移或分离 当前 BYOB 请求视图的底层 buffer。
实现可以做类似转移的操作,比如希望从其他线程写入内存,但需对 入队 和 关闭算法做出调整以保持可观察一致性。在规范层面,转移和分离是禁止的。
规范应尽量写入非 null 的 当前 BYOB 请求视图,然后用该视图调用 入队。只有在 创建新 ArrayBufferView
用于 入队时,当前
BYOB 请求视图为 null 或手头字节多于视图字节长度。这样可避免不必要的复制,更好地尊重流的消费者需求。
如下从字节源 pull算法实现了这些要求,适用于字节来自作为规范级字节序列的常见情况,该序列代表底层字节源。注意其保守地保留字节在字节序列中,而不是积极入队,因此调用者可用剩余字节数作为背压信号。
ReadableStream
stream:
-
断言:stream.[[controller]] 实现
ReadableByteStreamController
。 -
令 available 为 bytes 的长度。
-
令 desiredSize 为 available。
-
若 stream 的当前 BYOB 请求视图非 null,则 desiredSize 设为 stream 的当前 BYOB 请求视图的字节长度。
-
令 pullSize 为 available 和 desiredSize 较小者。
-
令 pulled 为 bytes 的前 pullSize 个字节。
-
从 bytes 删除前 pullSize 个字节。
-
若 stream 的当前 BYOB 请求视图非 null,则:
-
写入 pulled 到 stream 的当前 BYOB 请求视图。
-
执行 ? ReadableByteStreamControllerRespond(stream.[[controller]], pullSize)。
-
-
否则,
-
令 view 为 创建自 pulled 的
Uint8Array
,位于 stream 的相关 Realm。 -
执行 ? ReadableByteStreamControllerEnqueue(stream.[[controller]], view)。
-
规范不得在写入当前 BYOB 请求视图或从字节序列 pull后再关闭对应的 ReadableStream
。
9.1.2. 读取
以下算法可用于任意 ReadableStream
实例,包括 Web 开发者创建的实例。所有算法在具体操作上都可能失败,调用规范应处理这些失败。
要为 ReadableStream
stream获取 reader,返回 ? AcquireReadableStreamDefaultReader(stream)。
结果为 ReadableStreamDefaultReader
。
如果 stream 已锁定,此操作会抛异常。
要从 ReadableStreamDefaultReader
reader读取一个数据块,给定 读取请求 readRequest,执行 ! ReadableStreamDefaultReaderRead(reader,
readRequest)。
要从 ReadableStreamDefaultReader
reader读取所有字节,给定 successSteps(接受 字节序列的算法)和 failureSteps(接受 JavaScript 值的算法):使用 读取循环,参数为 reader、新建 字节序列、successSteps 和 failureSteps。
-
- chunk steps,参数 chunk
-
-
如果 chunk 不是
Uint8Array
对象,调用 failureSteps,参数为TypeError
,并终止这些步骤。 -
将 chunk 代表的字节追加到 bytes。
-
使用 读取循环,参数为 reader、bytes、successSteps 和 failureSteps。
直接递归实现可能导致栈溢出,实际应使用非递归变体、微任务队列或更直接的字节读取方法(如后述)。
-
- close steps
-
-
调用 successSteps,参数为 bytes。
-
- error steps,参数 e
-
-
调用 failureSteps,参数为 e。
-
-
执行 ! ReadableStreamDefaultReaderRead(reader, readRequest)。
由于 reader 对应的 ReadableStream
具有独占访问权,实际读取机制不可观察。实现可采用更直接机制,如获取并使用 ReadableStreamBYOBReader
或直接访问数据块。
要释放 ReadableStreamDefaultReader
reader,执行 ! ReadableStreamDefaultReaderRelease(reader)。
要取消 ReadableStreamDefaultReader
reader,参数 reason,执行 ! ReadableStreamReaderGenericCancel(reader,
reason)。返回值为 promise,值为 undefined 或拒绝原因。
要取消 ReadableStream
stream,参数 reason,返回 ! ReadableStreamCancel(stream,
reason)。返回值为 promise,值为 undefined 或拒绝原因。
要分流 ReadableStream
stream,
返回 ? ReadableStreamTee(stream, true)。
因第二参数为 true,返回的第二分支中的数据块会从第一分支克隆(用 HTML 的可序列化对象机制),防止其中一个分支的消费影响另一个分支。
9.1.3. 自省
以下谓词可用于任意 ReadableStream
对象。但除了判断流是否锁定外,其他自省不能通过公开 JS API 实现,规范应使用 § 9.1.2 读取中的算法(例如不要测试流是否可读,而应尝试获取 reader并处理异常)。
ReadableStream
stream 可读,当 stream.[[state]] 为
"readable
" 时。
ReadableStream
stream 已关闭,当 stream.[[state]] 为 "closed
"
时。
ReadableStream
stream 已出错,当 stream.[[state]] 为
"errored
" 时。
ReadableStream
stream 锁定,当 ! IsReadableStreamLocked(stream) 返回 true 时。
ReadableStream
stream 已扰动,当 stream.[[disturbed]] 为
true 时。
表示流是否被读取或取消过。比本节其他谓词更不建议使用,因为 Web 开发者甚至无法间接访问此信息,不应以此为分支平台行为。
9.2. 可写流
9.2.1. 创建与操作
WritableStream
对象 stream,给定算法 writeAlgorithm、可选算法
closeAlgorithm、可选算法
abortAlgorithm、可选数字
highWaterMark(默认 1)、可选算法
sizeAlgorithm,执行以下步骤。writeAlgorithm
必须是接受数据块对象并返回 promise
的算法。若给定,closeAlgorithm 和 abortAlgorithm 可返回 promise。若给定,sizeAlgorithm
必须接受数据块对象并返回数字;若给定,highWaterMark
必须为非负且非 NaN 数字。
-
令 startAlgorithm 为返回 undefined 的算法。
-
令 closeAlgorithmWrapper 为如下算法:
-
令 result 为执行 closeAlgorithm 的结果(如提供,否则为 null)。如抛出异常 e,返回拒绝的 promise,值为 e。
-
如果 result 是
Promise
,返回 result。 -
返回已解决的 promise,值为 undefined。
-
-
令 abortAlgorithmWrapper 为如下算法(参数 reason):
-
令 result 为执行 abortAlgorithm(值为 reason)的结果(如提供,否则为 null)。如抛出异常 e,返回拒绝的 promise,值为 e。
-
如果 result 是
Promise
,返回 result。 -
返回已解决的 promise,值为 undefined。
-
-
若未给定 sizeAlgorithm,则设为总是返回 1 的算法。
-
执行 ! InitializeWritableStream(stream)。
-
令 controller 为新建 WritableStreamDefaultController。
-
执行 ! SetUpWritableStreamDefaultController(stream, controller, startAlgorithm, writeAlgorithm, closeAlgorithmWrapper, abortAlgorithmWrapper, highWaterMark, sizeAlgorithm)。
其他规范在构造 writeAlgorithm 时应避免对给定数据块进行并行读取,否则会破坏 JS
的运行至完成语义。可通过同步复制或转移给定值避免,例如 StructuredSerializeWithTransfer、获取 buffer source 字节副本或转移
ArrayBuffer。例外情况:数据块为 SharedArrayBuffer
时,允许并行操作。
WritableStream
的子类会直接在构造步骤内对 设置操作使用 this 值。
以下定义仅适用于经上述设置算法初始化的 WritableStream
实例:
要错误 WritableStream
stream,给定 JavaScript 值 e,执行 ! WritableStreamDefaultControllerErrorIfNeeded(stream.[[controller]], e)。
WritableStream
stream 的 signal 为 stream.[[controller]].[[abortController]] 的 signal。规范可添加或移除算法到该 AbortSignal
,或查询其是否已中止及其中止原因abort reason。
通常用法是,在设置 WritableStream
后,
添加算法到其 signal,以中止对底层 sink的任何写入操作。然后,在 writeAlgorithm 内,一旦底层 sink响应,检查 signal 是否已中止,如是则用 signal 的中止原因拒绝返回的 promise。
9.2.2. 写入
以下算法可用于任意 WritableStream
实例,包括 Web 开发者创建的实例。所有算法在具体操作上都可能失败,调用规范应处理这些失败。
要为 WritableStream
stream获取 writer,返回 ? AcquireWritableStreamDefaultWriter(stream)。
结果为 WritableStreamDefaultWriter
。
如果 stream 已锁定,此操作会抛出异常。
要对 WritableStreamDefaultWriter
writer写入数据块,给定值 chunk,
返回 ! WritableStreamDefaultWriterWrite(writer,
chunk)。
要释放 WritableStreamDefaultWriter
writer,执行 ! WritableStreamDefaultWriterRelease(writer)。
要关闭 WritableStream
stream,返回 ! WritableStreamClose(stream)。返回值为 promise,值为 undefined
或拒绝原因。
要中止 WritableStream
stream,参数 reason,返回 ! WritableStreamAbort(stream, reason)。返回值为
promise,值为 undefined 或拒绝原因。
9.3. 转换流
9.3.1. 创建与操作
TransformStream
对象 stream,给定算法 transformAlgorithm、可选算法 flushAlgorithm、可选算法
cancelAlgorithm,请执行以下步骤。transformAlgorithm
以及(如给定)flushAlgorithm和cancelAlgorithm可返回 promise。
-
令 writableHighWaterMark 为 1。
-
令 writableSizeAlgorithm 为总是返回 1 的算法。
-
令 readableHighWaterMark 为 0。
-
令 readableSizeAlgorithm 为总是返回 1 的算法。
-
令 transformAlgorithmWrapper 为如下算法,参数 chunk:
-
令 result 为执行 transformAlgorithm(参数 chunk)的结果。如抛出异常 e,返回拒绝的 promise,值为 e。
-
如果 result 是
Promise
,返回 result。 -
返回已解决的 promise,值为 undefined。
-
-
令 flushAlgorithmWrapper 为如下算法:
-
令 result 为执行 flushAlgorithm(如提供,否则为 null)的结果。如抛出异常 e,返回拒绝的 promise,值为 e。
-
如果 result 是
Promise
,返回 result。 -
返回已解决的 promise,值为 undefined。
-
-
令 cancelAlgorithmWrapper 为如下算法,参数 reason:
-
令 result 为执行 cancelAlgorithm(参数 reason)的结果(如提供,否则为 null)。如抛出异常 e,返回拒绝的 promise,值为 e。
-
如果 result 是
Promise
,返回 result。 -
返回已解决的 promise,值为 undefined。
-
-
令 startPromise 为已解决的 promise,值为 undefined。
-
执行 ! InitializeTransformStream(stream, startPromise, writableHighWaterMark, writableSizeAlgorithm, readableHighWaterMark, readableSizeAlgorithm)。
-
令 controller 为新建 TransformStreamDefaultController。
-
执行 ! SetUpTransformStreamDefaultController(stream, controller, transformAlgorithmWrapper, flushAlgorithmWrapper, cancelAlgorithmWrapper)。
其他规范构造 transformAlgorithm 时应避免对给定数据块进行并行读取,否则会破坏 JS
的运行至完成语义。可通过同步复制或转移给定值避免,例如 StructuredSerializeWithTransfer、获取 buffer source 字节副本或转移
ArrayBuffer。例外情况:数据块为 SharedArrayBuffer
时,允许并行操作。
TransformStream
的过程分两步:
-
令 transformStream 为新建 TransformStream。
-
对 transformStream 执行 设置,参数为 ...。
TransformStream
的子类会直接在构造步骤内对 设置操作使用 this 值。
TransformStream
:
-
令 transformStream 为新建 TransformStream。
-
对 transformStream 执行 设置,其中 transformAlgorithm 设为如下算法:给定 chunk,入队 chunk 到 transformStream。
-
返回 transformStream。
以下算法仅适用于经上述设置算法初始化的 TransformStream
实例。通常作为 transformAlgorithm 或 flushAlgorithm 的一部分调用。
要入队
JavaScript 值 chunk 到 TransformStream
stream,执行 ! TransformStreamDefaultControllerEnqueue(stream.[[controller]], chunk)。
要终止
TransformStream
stream,
执行 ! TransformStreamDefaultControllerTerminate(stream.[[controller]])。
要错误 TransformStream
stream,给定 JavaScript 值 e,执行 ! TransformStreamDefaultControllerError(stream.[[controller]], e)。
9.3.2. 包装为自定义类
其他规范如果希望定义自定义转换流,可能不希望直接从
TransformStream
接口继承。相反,如果需要新类,可以自行创建独立的 Web IDL 接口,并采用如下 mixin:
interface mixin {
GenericTransformStream readonly attribute ReadableStream readable ;readonly attribute WritableStream writable ; };
任何平台对象只要包含该 GenericTransformStream
mixin,就有一个相关联的 transform,它是一个实际的 TransformStream
。
readable
getter 步骤:返回 this 的 transform.[[readable]]。
writable
getter 步骤:返回 this 的 transform.[[writable]]。
包含 GenericTransformStream
mixin 的 IDL 接口会自动拥有 readable
和 writable
属性。要定制该接口的行为,应在构造器(或其他初始化代码)中,将每个实例的 transform 设为新建的 TransformStream
,并通过
transformAlgorithm 和(可选)flushAlgorithm参数进行适当自定义设置。
注意:此模式在 Web 平台已有实例,如 CompressionStream
和 TextDecoderStream
。
[COMPRESSION] [ENCODING]
若无需超出 TransformStream
基类提供的 API,无需创建包装类。最常见的驱动因素是需要自定义构造器步骤,但如果你的流不是为构造而设计,直接使用 TransformStream
即可。
9.4. 其他流对
除了上面讨论的转换流,规范通常会创建可读流和可写流的配对。本节为此类情况提供一些指导。
在所有此类情况中,规范应使用 readable
和 writable
作为暴露流的属性名。不要使用其他名称(比如
input
/output
或
readableStream
/writableStream
),也不要使用方法或其他非属性的访问方式。
9.4.1. 双工流
最常见的可读/可写流配对是双工流,其中可读和可写流代表同一共享资源的两端,比如 socket、连接或设备。
规范双工流时最棘手的是如何处理诸如取消可读端、关闭或中止可写端等操作。可以选择让双工流“半开”,即一端的操作不影响另一端;也可以让操作影响另一端,例如指定可读端的 cancelAlgorithm 会关闭可写端。
一个基本双工流的例子(通过 JS 创建而非规范文本)见 § 10.8 使用同一底层资源包装的 { readable, writable } stream 配对。该例展示了影响传递行为。
另一个需要考虑的是异步获取双工流的创建方式,比如建立连接。推荐模式是有一个可构造类,其属性返回 promise,promise 完成时得到实际双工流对象。该对象还能暴露异步获取的信息,比如连接数据。容器类可以提供便捷 API,比如一次关闭整个连接而非只关闭某一端。
更复杂的双工流例子是正在规范化的 WebSocketStream
。见其说明文档和设计笔记。
由于双工流遵循 readable
/writable
属性约定,它们可以用于 pipeThrough()
。这不总是有意义,但底层资源确实做某种变换时是方便的。
对于任意 WebSocket,pipeThrough 派生的双工流通常没有意义。但如果服务器专门实现为对收到的数据进行变换后回传,则这种用法既有用又便捷。
9.4.2. 端点配对
另一种可读/可写流配对是端点配对。此时可读和可写流代表一个较长管道的两端,目的是让 Web 开发代码在中间插入转换流。
createEndpointPair()
,Web
开发者可这样写代码:
const { readable, writable} = createEndpointPair(); await readable. pipeThrough( new TransformStream(...)). pipeTo( writable);
WebRTC Encoded Transform 就是这种技术的例子,其 RTCRtpScriptTransformer
接口同时有 readable
和 writable
属性。
虽然端点配对也遵循 readable
/writable
属性约定,但把它们传给 pipeThrough()
没有意义。
9.5. 管道连接
ReadableStream
readable 管道连接到 WritableStream
writable,可选 boolean preventClose(默认 false)、可选 boolean preventAbort(默认
false)、可选 boolean preventCancel(默认 false)、可选 AbortSignal
signal,按如下步骤。返回一个 Promise
,管道完成时 fulfilled,失败时 rejected。
-
断言:! IsReadableStreamLocked(readable) 为 false。
-
断言:! IsWritableStreamLocked(writable) 为 false。
-
令 signalArg 为 signal(如给定),否则为 undefined。
-
返回 ! ReadableStreamPipeTo(readable, writable, preventClose, preventAbort, preventCancel, signalArg)。
若不关心返回的 promise,引用该概念可能有点别扭。最佳建议是“pipe readable 到 writable”。
ReadableStream
readable 管道穿过 TransformStream
transform,可选 boolean preventClose
(默认 false)、可选 boolean preventAbort(默认 false)、可选 boolean preventCancel(默认 false)、可选 AbortSignal
signal,按如下步骤。结果为
transform 的可读端。
-
断言:! IsReadableStreamLocked(readable) 为 false。
-
断言:! IsWritableStreamLocked(transform.[[writable]]) 为 false。
-
令 signalArg 为 signal(如给定),否则为 undefined。
-
令 promise 为 ! ReadableStreamPipeTo(readable, transform.[[writable]], preventClose, preventAbort, preventCancel, signalArg)。
-
将 promise.[[PromiseIsHandled]] 设为 true。
-
返回 transform.[[readable]]。
ReadableStream
stream创建代理,执行如下步骤。结果是新
ReadableStream
对象,其数据从 stream 拉取,同时 stream 立即变为锁定和扰动。
-
令 identityTransform 为创建恒等
TransformStream
的结果。 -
返回 stream管道穿过 identityTransform的结果。
10. 创建流的示例
本节及所有子节均为非规范内容。
前面的标准示例主要关注如何使用流。这里展示如何创建流,使用 ReadableStream
、
WritableStream
、
以及 TransformStream
构造器。
10.1. 具有底层推送源的可读流(无背压支持)
下述函数创建可读流,用于包装 WebSocket
实例 [WEBSOCKETS],
它们是推送源,不支持背压信号。示例说明了,当适配推送源时,通常大部分工作发生在 start()
方法中。
function makeReadableWebSocketStream( url, protocols) { const ws= new WebSocket( url, protocols); ws. binaryType= "arraybuffer" ; return new ReadableStream({ start( controller) { ws. onmessage= event=> controller. enqueue( event. data); ws. onclose= () => controller. close(); ws. onerror= () => controller. error( new Error ( "WebSocket 出错了!" )); }, cancel() { ws. close(); } }); }
我们可以用此函数为 websocket 创建可读流,并将该流 pipe 到任意可写流:
const webSocketStream= makeReadableWebSocketStream( "wss://example.com:443/" , "protocol" ); webSocketStream. pipeTo( writableStream) . then(() => console. log( "所有数据写入成功!" )) . catch ( e=> console. error( "发生错误!" , e));
不过,很多人提到“为 websocket 添加流支持”时,其实希望能以流方式发送单个 websocket 消息,比如可在一个消息内传输文件,而无需客户端全部加载进内存。要实现此目的,应该允许单个
websocket 消息本身为 ReadableStream
实例。上面示例并没有实现这一点。
更多背景见 此讨论。
10.2. 具有底层推送源并支持背压的可读流
下述函数返回可读流,用于包装“背压
socket”,假设对象 API 与 websocket 类似,但还可通过 readStop
和 readStart
方法暂停和恢复数据流。该示例展示了如何将背压应用到支持的底层源。
function makeReadableBackpressureSocketStream( host, port) { const socket= createBackpressureSocket( host, port); return new ReadableStream({ start( controller) { socket. ondata= event=> { controller. enqueue( event. data); if ( controller. desiredSize<= 0 ) { // 内部队列已满,将背压信号传递到底层源。 socket. readStop(); } }; socket. onend= () => controller. close(); socket. onerror= () => controller. error( new Error ( "Socket 出错了!" )); }, pull() { // 队列已清空但消费端还要数据,重启数据流(如之前已暂停)。 socket. readStart(); }, cancel() { socket. close(); } }); }
我们可以用此函数为“背压 socket”创建可读流,与 websocket 相同用法。但这次,当 pipe 到不能及时接受数据的目标,或长时间未读取时,会向 socket 发送背压信号。
10.3. 具有底层推送源的可读字节流(无背压支持)
下述函数返回可读字节流,用于包装假设的 UDP socket API,
包括 promise 返回的 select2()
方法,意为模拟 POSIX select(2) 系统调用。
由于 UDP 协议没有内建背压支持,desiredSize
给出的背压信号会被忽略,流保证当 socket 有数据但开发者未请求时,数据会进流的内部队列,避免内核队列溢出和数据丢失。
这对消费端与流的交互有有趣影响。若消费端读取速度慢于 socket 生产速度,数据块会一直留在流的内部队列。这时,用BYOB reader会有多一次复制(从流队列到开发者 buffer)。但如果消费端足够快,BYOB reader 可以实现零拷贝,直接写入开发者提供的 buffer。
(你可以想象更复杂的版本,使用 desiredSize
通知带外背压机制,比如 socket 发送消息调整数据速率。留给读者自行实现。)
const DEFAULT_CHUNK_SIZE= 65536 ; function makeUDPSocketStream( host, port) { const socket= createUDPSocket( host, port); return new ReadableStream({ type: "bytes" , start( controller) { readRepeatedly(). catch ( e=> controller. error( e)); function readRepeatedly() { return socket. select2(). then(() => { // socket 可读时可能没有 BYOB 请求,需要两种情况都处理。 let bytesRead; if ( controller. byobRequest) { const v= controller. byobRequest. view; bytesRead= socket. readInto( v. buffer, v. byteOffset, v. byteLength); if ( bytesRead=== 0 ) { controller. close(); } controller. byobRequest. respond( bytesRead); } else { const buffer= new ArrayBuffer( DEFAULT_CHUNK_SIZE); bytesRead= socket. readInto( buffer, 0 , DEFAULT_CHUNK_SIZE); if ( bytesRead=== 0 ) { controller. close(); } else { controller. enqueue( new Uint8Array( buffer, 0 , bytesRead)); } } if ( bytesRead=== 0 ) { return ; } return readRepeatedly(); }); } }, cancel() { socket. close(); } }); }
ReadableStream
实例可用 BYOB reader,享受上述优点和注意事项。
10.4. 具有底层拉取源的可读流
下面的函数返回可读流,用于包装 Node.js 文件系统 API(基本等价于 C 的
fopen
、fread
、fclose
三件套)。文件是典型的拉取源。注意,与推送源示例不同,这里大部分操作发生在按需触发的 pull()
方法中,而不是在 start()
启动时。
const fs= require( "fs" ). promises; const CHUNK_SIZE= 1024 ; function makeReadableFileStream( filename) { let fileHandle; let position= 0 ; return new ReadableStream({ async start() { fileHandle= await fs. open( filename, "r" ); }, async pull( controller) { const buffer= new Uint8Array( CHUNK_SIZE); const { bytesRead} = await fileHandle. read( buffer, 0 , CHUNK_SIZE, position); if ( bytesRead=== 0 ) { await fileHandle. close(); controller. close(); } else { position+= bytesRead; controller. enqueue( buffer. subarray( 0 , bytesRead)); } }, cancel() { return fileHandle. close(); } }); }
我们可以像处理 socket 一样,创建和使用文件的可读流。
10.5. 具有底层拉取源的可读字节流
下面的函数返回可读字节流,可以高效零拷贝读取文件,同样用 Node.js 文件系统 API。不同于固定块大小 1024,它会尝试填满开发者提供的 buffer,实现完全控制。
const fs= require( "fs" ). promises; const DEFAULT_CHUNK_SIZE= 1024 ; function makeReadableByteFileStream( filename) { let fileHandle; let position= 0 ; return new ReadableStream({ type: "bytes" , async start() { fileHandle= await fs. open( filename, "r" ); }, async pull( controller) { // 即使消费端用默认 reader,auto-allocation 也会分配 buffer 并通过 byobRequest 传递给我们。 const v= controller. byobRequest. view; const { bytesRead} = await fileHandle. read( v, 0 , v. byteLength, position); if ( bytesRead=== 0 ) { await fileHandle. close(); controller. close(); controller. byobRequest. respond( 0 ); } else { position+= bytesRead; controller. byobRequest. respond( bytesRead); } }, cancel() { return fileHandle. close(); }, autoAllocateChunkSize: DEFAULT_CHUNK_SIZE}); }
有了这个,我们可以为返回的 ReadableStream
创建和使用BYOB reader,也可以像往常一样创建默认
reader。底层字节源的低级字节追踪与默认 reader 的高层块式消费会由流实现自动适配。auto-allocation 功能通过 autoAllocateChunkSize
选项让代码更简单,无需像 § 10.3 具有底层推送源的可读字节流(无背压支持)那样手动分支。
10.6. 无背压或成功信号的可写流
下面的函数返回一个可写流,用于包装
WebSocket
[WEBSOCKETS]。WebSocket 并没有办法通知某个数据块已成功发送(除非轮询 bufferedAmount
,留给读者实现)。因此,这个可写流无法向生产者准确传递背压信号或写入成功/失败。即 writer 的 write()
方法和 ready
getter 返回的 promise 总是立即完成。
function makeWritableWebSocketStream( url, protocols) { const ws= new WebSocket( url, protocols); return new WritableStream({ start( controller) { ws. onerror= () => { controller. error( new Error ( "WebSocket 出错了!" )); ws. onclose= null ; }; ws. onclose= () => controller. error( new Error ( "服务器意外关闭连接!" )); return new Promise( resolve=> ws. onopen= resolve); }, write( chunk) { ws. send( chunk); // 立即返回,因为 websocket 没有简单办法判断写入是否完成。 }, close() { return closeWS( 1000 ); }, abort( reason) { return closeWS( 4000 , reason&& reason. message); }, }); function closeWS( code, reasonString) { return new Promise(( resolve, reject) => { ws. onclose= e=> { if ( e. wasClean) { resolve(); } else { reject( new Error ( "连接未正常关闭" )); } }; ws. close( code, reasonString); }); } }
我们可以用此函数为 websocket 创建可写流,并将任意可读流 pipe 到它:
const webSocketStream= makeWritableWebSocketStream( "wss://example.com:443/" , "protocol" ); readableStream. pipeTo( webSocketStream) . then(() => console. log( "所有数据写入成功!" )) . catch ( e=> console. error( "发生错误!" , e));
关于将 websocket 包装为流的方式,参见前文说明。
10.7. 具有背压和成功信号的可写流
下面的函数返回可写流,用于包装 Node.js 文件系统 API(基本映射到 C 的
fopen
、fwrite
、fclose
三件套)。由于我们包装的 API 能告知写入何时成功,这个流能传递背压信号以及单次写入是否成功或失败。
const fs= require( "fs" ). promises; function makeWritableFileStream( filename) { let fileHandle; return new WritableStream({ async start() { fileHandle= await fs. open( filename, "w" ); }, write( chunk) { return fileHandle. write( chunk, 0 , chunk. length); }, close() { return fileHandle. close(); }, abort() { return fileHandle. close(); } }); }
我们可以用此函数为文件创建可写流,并向其写入单个数据块:
const fileStream= makeWritableFileStream( "/example/path/on/fs.txt" ); const writer= fileStream. getWriter(); writer. write( "To stream, or not to stream\n" ); writer. write( "That is the question\n" ); writer. close() . then(() => console. log( "数据块写入并流关闭成功!" )) . catch ( e=> console. error( e));
注意,如果某次 fileHandle.write
调用用时较长,返回的 promise 会晚些完成。此时后续写入会排队,存储在流的内部队列中。队列中数据块累计,会使 ready
getter 返回 pending promise,提示生产者应停止写入(如有可能)。
可写流的写入队列机制尤其重要,因为如 Node.js
文档所述,“在同一文件多次调用 filehandle.write
且不等待 promise 是不安全的。”但写
makeWritableFileStream
时无需担心流实现保证了 底层 sink 的 write()
方法在之前 promise 完成前不会再次调用!
10.8. 包装同一底层资源的 { readable, writable } 流对
下面的函数返回 { readable, writable }
对象,readable
属性是可读流,writable
属性是可写流,两者包装同一个 websocket 底层资源。实质上,这结合了 § 10.1
具有底层推送源的可读流(无背压支持) 和 § 10.6 无背压或成功信号的可写流。
同时展示了如何用 JavaScript 类创建可复用的底层 sink/source 抽象。
function streamifyWebSocket( url, protocol) { const ws= new WebSocket( url, protocols); ws. binaryType= "arraybuffer" ; return { readable: new ReadableStream( new WebSocketSource( ws)), writable: new WritableStream( new WebSocketSink( ws)) }; } class WebSocketSource{ constructor ( ws) { this . _ws= ws; } start( controller) { this . _ws. onmessage= event=> controller. enqueue( event. data); this . _ws. onclose= () => controller. close(); this . _ws. addEventListener( "error" , () => { controller. error( new Error ( "WebSocket 出错了!" )); }); } cancel() { this . _ws. close(); } } class WebSocketSink{ constructor ( ws) { this . _ws= ws; } start( controller) { this . _ws. onclose= () => controller. error( new Error ( "服务器意外关闭连接!" )); this . _ws. addEventListener( "error" , () => { controller. error( new Error ( "WebSocket 出错了!" )); this . _ws. onclose= null ; }); return new Promise( resolve=> this . _ws. onopen= resolve); } write( chunk) { this . _ws. send( chunk); } close() { return this . _closeWS( 1000 ); } abort( reason) { return this . _closeWS( 4000 , reason&& reason. message); } _closeWS( code, reasonString) { return new Promise(( resolve, reject) => { this . _ws. onclose= e=> { if ( e. wasClean) { resolve(); } else { reject( new Error ( "连接未正常关闭" )); } }; this . _ws. close( code, reasonString); }); } }
我们可以用该函数创建的对象,通过标准流 API 与远程 websocket 交互:
const streamyWS= streamifyWebSocket( "wss://example.com:443/" , "protocol" ); const writer= streamyWS. writable. getWriter(); const reader= streamyWS. readable. getReader(); writer. write( "Hello" ); writer. write( "web socket!" ); reader. read(). then(({ value, done}) => { console. log( "WebSocket 返回: " , value); });
注意,这种实现下,取消 readable
端会隐式关闭 writable
端,关闭或中止 writable
端也会隐式关闭
readable
端。
关于将 websocket 包装为流的方式,参见前文说明。
10.9. 替换模板标签的转换流
在数据流上用变量替换标签通常很有用,尤其当需要替换的部分相较整体数据很小。这个示例给出了简单做法。它将字符串映射为字符串,比如将模板 "Time: {{time}} Message:
{{message}}"
转换为 "Time: 15:36 Message: hello"
,假定 { time:
"15:36", message: "hello" }
被传给了 LipFuzzTransformer
的 substitutions
参数。
本例还展示了如何处理 chunk 包含部分数据、无法立即转换、需等待更多数据的情形。此时,部分模板标签会累积在 partialChunk
属性中,直到找到标签结束或流结束。
class LipFuzzTransformer{ constructor ( substitutions) { this . substitutions= substitutions; this . partialChunk= "" ; this . lastIndex= undefined ; } transform( chunk, controller) { chunk= this . partialChunk+ chunk; this . partialChunk= "" ; // lastIndex 是最后一次替换后的第一个字符索引。 this . lastIndex= 0 ; chunk= chunk. replace( /\{\{([a-zA-Z0-9_-]+)\}\}/g , this . replaceTag. bind( this )); // 用于判断字符串末尾是否有不完整模板标签的正则。 const partialAtEndRegexp= /\{(\{([a-zA-Z0-9_-]+(\})?)?)?$/g ; // 只处理未被替换的新字符。 partialAtEndRegexp. lastIndex= this . lastIndex; this . lastIndex= undefined ; const match= partialAtEndRegexp. exec( chunk); if ( match) { this . partialChunk= chunk. substring( match. index); chunk= chunk. substring( 0 , match. index); } controller. enqueue( chunk); } flush( controller) { if ( this . partialChunk. length> 0 ) { controller. enqueue( this . partialChunk); } } replaceTag( match, p1, offset) { let replacement= this . substitutions[ p1]; if ( replacement=== undefined ) { replacement= "" ; } this . lastIndex= offset+ replacement. length; return replacement; } }
这里我们将需要传给 TransformStream
构造函数的 transformer
定义为类,这样便于跟踪实例数据。
可这样使用该类:
const data= { userName, displayName, icon, date}; const ts= new TransformStream( new LipFuzzTransformer( data)); fetchEvent. respondWith( fetch( fetchEvent. request. url). then( response=> { const transformedBody= response. body// 解码二进制响应为字符串 . pipeThrough( new TextDecoderStream()) // 应用 LipFuzzTransformer . pipeThrough( ts) // 编码转换后的字符串 . pipeThrough( new TextEncoderStream()); return new Response( transformedBody); }) );
为简洁起见,LipFuzzTransformer
未进行上下文转义。在实际应用中,模板系统需做上下文转义以保证安全和健壮性。
10.10. 由同步 mapper 函数创建的转换流
下面的函数允许通过同步 "mapper" 函数创建新的 TransformStream
实例,类似于 Array.prototype.map
传递的函数。它说明了 API 对于简单转换也很简洁。
function mapperTransformStream( mapperFunction) { return new TransformStream({ transform( chunk, controller) { controller. enqueue( mapperFunction( chunk)); } }); }
此函数可用于创建将所有输入变为大写的 TransformStream
:
const ts= mapperTransformStream( chunk=> chunk. toUpperCase()); const writer= ts. writable. getWriter(); const reader= ts. readable. getReader(); writer. write( "No need to shout" ); // 输出 "NO NEED TO SHOUT": reader. read(). then(({ value}) => console. log( value));
虽然同步转换自身不会产生背压,但只有在没有背压时才会转换 chunk,因此不会浪费资源。
异常会自然地使流出错:
const ts= mapperTransformStream( chunk=> JSON. parse( chunk)); const writer= ts. writable. getWriter(); const reader= ts. readable. getReader(); writer. write( "[1, " ); // 会两次输出 SyntaxError: reader. read(). catch ( e=> console. error( e)); writer. write( "{}" ). catch ( e=> console. error( e));
10.11. 用恒等转换流作为原语创建新可读流
将恒等转换流与
pipeTo()
结合,是操作流的强大方法。本节包含两个此技术的例子。
有时自然地把可读流的 promise 当作可读流,只需一个简单适配器:
function promiseToReadable( promiseForReadable) { const ts= new TransformStream(); promiseForReadable. then( readable=> readable. pipeTo( ts. writable)) . catch ( reason=> ts. writable. abort( reason)) . catch (() => {}); return ts. readable; }
这里将数据 pipe 到writable 端后返回readable 端。如果 pipe 出错,就中止writable 端,错误会自动传播到返回的 readable 端。如果 writable 端被 pipeTo()
标记为
errored,则 abort()
会返回
rejection,可安全忽略。
更复杂的扩展是将多个可读流拼接为一个:
function concatenateReadables( readables) { const ts= new TransformStream(); let promise= Promise. resolve(); for ( const readableof readables) { promise= promise. then( () => readable. pipeTo( ts. writable, { preventClose: true }), reason=> { return Promise. all([ ts. writable. abort( reason), readable. cancel( reason) ]); } ); } promise. then(() => ts. writable. close(), reason=> ts. writable. abort( reason)) . catch (() => {}); return ts. readable; }
这里的错误处理很微妙,因为取消拼接流要取消所有输入流。但成功时很简单,只需依次将 readables
可迭代对象中的每个流 pipe 到恒等转换流的writable 端,最后关闭它。readable
端即为所有流的块拼接结果并返回。背压处理方式不变。
致谢
编辑感谢以下人员对本规范的贡献: Anne van Kesteren、 AnthumChris、 Arthur Langereis、 Ben Kelly、 Bert Belder、 Brian di Palma、 Calvin Metcalf、 Dominic Tarr、 Ed Hager、 Eric Skoglund、 Forbes Lindesay、 Forrest Norvell、 Gary Blackwood、 Gorgi Kosev、 Gus Caplan、 贺师俊 (hax)、 Isaac Schlueter、 isonmad、 Jake Archibald、 Jake Verbaten、 James Pryor、 Janessa Det、 Jason Orendorff、 Jeffrey Yasskin、 Jeremy Roman、 Jens Nockert、 Lennart Grahl、 Luca Casonato、 Mangala Sadhu Sangeet Singh Khalsa、 Marcos Caceres、 Marvin Hagemeister、 Mattias Buelens、 Michael Mior、 Mihai Potra、 Nidhi Jaju、 Romain Bellessort、 Simon Menke、 Stephen Sugden、 Surma、 Tab Atkins、 Tanguy Krotoff、 Thorsten Lorenz、 Till Schneidereit、 Tim Caswell、 Trevor Norris、 tzik、 Will Chan、 Youenn Fablet、 平野裕 (Yutaka Hirano)、 以及 Xabier Rodríguez。 社区对本规范的参与远超预期;没有你们,我们无法完成这项工作。
本标准由 Adam Rice (Google, ricea@chromium.org)、Domenic Denicola (Google, d@domenic.me)、 Mattias Buelens,以及吉野剛史 (Takeshi Yoshino, tyoshino@chromium.org) 撰写。
知识产权
版权所有 © WHATWG (Apple, Google, Mozilla, Microsoft)。本作品采用知识共享署名 4.0 国际许可协议授权。若部分内容被纳入源代码,则该部分以BSD 三条款许可证授权。
这是 Living Standard。如需专利审核版本,请参阅 Living Standard Review Draft。