1. 介绍
本节为非规范性内容。
2. 核心架构
2.1. Subscriber
接口
[Exposed=*]interface {
Subscriber undefined next (any );
value undefined error (any );
error undefined complete ();undefined addTeardown (VoidFunction ); // 创建后为 true,直到 complete()/error() 被调用,或订阅者取消订阅。在 complete()/error() 内该属性为 true。
teardown readonly attribute boolean active ;readonly attribute AbortSignal signal ; };
每个 Subscriber
都有一个 有序集合的 内部观察者,初始为空。
每个 Subscriber
都有一个 清理回调,它是一个 列表,列表元素为
VoidFunction
,
初始为空。
每个 Subscriber
都有一个 订阅控制器,它是一个
AbortController
。
每个 Subscriber
都有一个 active 布尔值,初始为 true。
注意:这是一个用于记录的变量,用于确保
Subscriber
在关闭后不再调用其拥有的任意回调。
active
的 getter 步骤为返回 this 的
active 布尔值。
signal
的 getter 步骤为返回 this 的
订阅控制器 的 signal。
next(value)
方法的步骤如下:
error(error)
方法的步骤如下:
complete()
方法的步骤如下:
addTeardown(teardown)
方法的步骤如下:
Subscriber
subscriber,以及可选的 any
reason,执行以下步骤:
-
如果 subscriber 的 active 为 false,则返回。
此操作用于防止可重入调用,可能出现在“生产者主动取消订阅”场景。如下例:
const outerController= new AbortController(); const observable= new Observable( subscriber=> { subscriber. addTeardown(() => { // 2.) 此清理操作在“关闭”算法执行期间运行。中止下游 signal 会运行其 abort 算法。其中之一就是当前正在运行的“关闭”算法。 outerController. abort(); }); // 1.) 这会立即调用“关闭”算法,将 subscriber.active 设为 false。 subscriber. complete(); }); observable. subscribe({}, { signal: outerController. signal}); -
将 subscriber 的 active 设为 false。
2.2. Observable
接口
// SubscribeCallback 是 Observable "创建者" 的代码所在。它在调用 subscribe() 时执行,用于建立新订阅。callback =
SubscribeCallback undefined (Subscriber );
subscriber callback =
ObservableSubscriptionCallback undefined (any );
value dictionary {
SubscriptionObserver ObservableSubscriptionCallback ;
next ObservableSubscriptionCallback ;
error VoidFunction ; };
complete callback =
ObservableInspectorAbortHandler undefined (any );
value dictionary {
ObservableInspector ObservableSubscriptionCallback ;
next ObservableSubscriptionCallback ;
error VoidFunction ;
complete VoidFunction ;
subscribe ObservableInspectorAbortHandler ; };
abort typedef (ObservableSubscriptionCallback or SubscriptionObserver );
ObserverUnion typedef (ObservableSubscriptionCallback or ObservableInspector );
ObservableInspectorUnion dictionary {
SubscribeOptions AbortSignal ; };
signal callback =
Predicate boolean (any ,
value unsigned long long );
index callback =
Reducer any (any ,
accumulator any ,
currentValue unsigned long long );
index callback =
Mapper any (any ,
value unsigned long long ); // 与 Mapper 的区别仅在于返回类型,此回调仅用于访问序列中的每个元素,不进行变换。
index callback =
Visitor undefined (any ,
value unsigned long long ); // 此回调返回一个 `any`,必须通过 `Observable` 的转换语义转为 `Observable`。
index callback =
CatchCallback any (any ); [Exposed=*]
value interface {
Observable constructor (SubscribeCallback );
callback undefined subscribe (optional ObserverUnion = {},
observer optional SubscribeOptions = {}); // 如果 value 是以下任意类型,则构造一个原生 Observable: // - Observable // - AsyncIterable // - Iterable // - Promise
options static Observable from (any ); // 返回 Observable 的操作符。详见规范的 Operators 部分。 // // takeUntil() 可消费 promise、iterable、async iterable 以及其他 observable。
value Observable takeUntil (any );
value Observable map (Mapper );
mapper Observable filter (Predicate );
predicate Observable take (unsigned long long );
amount Observable drop (unsigned long long );
amount Observable flatMap (Mapper );
mapper Observable switchMap (Mapper );
mapper Observable inspect (optional ObservableInspectorUnion = {});
inspectorUnion Observable catch (CatchCallback );
callback Observable finally (VoidFunction ); // 返回 Promise 的操作符。
callback Promise <sequence <any >>toArray (optional SubscribeOptions = {});
options Promise <undefined >forEach (Visitor ,
callback optional SubscribeOptions = {});
options Promise <boolean >every (Predicate ,
predicate optional SubscribeOptions = {});
options Promise <any >first (optional SubscribeOptions = {});
options Promise <any >last (optional SubscribeOptions = {});
options Promise <any >find (Predicate ,
predicate optional SubscribeOptions = {});
options Promise <boolean >some (Predicate ,
predicate optional SubscribeOptions = {});
options Promise <any >reduce (Reducer ,
reducer optional any ,
initialValue optional SubscribeOptions = {}); };
options
每个 Observable
都有一个 subscribe callback,它是一个
SubscribeCallback
或一组接收 Subscriber
的步骤。
每个 Observable
都有一个 weak subscriber,它是指向
Subscriber
的弱引用或 null,初始为 null。
注意:这些类型的“联合”用于支持两种
Observable
的构造方式:由 JavaScript 创建的
(总是用 SubscribeCallback
构造),
以及原生构造的 Observable
对象(其 subscribe callback 可以是任意原生步骤而不是 JS 回调)。
when()
的返回值就是后者的一个例子。
new
Observable(callback)
构造函数步骤如下:
-
将 this 的 subscribe callback 设为 callback。
注意:此回调将在后续调用
subscribe()
时被调用。
2.2.1. 支持性概念
any
error,执行以下步骤:
注意:单独抽出此默认算法,便于规范中所有原生订阅
Observable(即规范语句订阅,不经过 subscribe()
方法)时不必重复定义这些步骤。
内部观察者 是具有如下 结构的对象:
- next 步骤
-
接收一个
any
参数的算法。初始时,这些步骤不执行任何操作。 - error 步骤
- complete 步骤
-
无参数的算法。初始时,这些步骤不执行任何操作。
内部观察者结构用于映射 next
、
error
和 complete
回调函数。对于通过 subscribe()
方法
由 JavaScript 订阅的 Observable
,
这些算法步骤其实就是对脚本提供的
回调函数做包装,
分别对应 next
、
error
和 complete
回调函数。
但当规范内部(非脚本)订阅 Observable
时,
这些“步骤”是任意规范算法,不是包含 Web IDL 回调函数的 ObserverUnion
。如
§ 2.3.3 返回 Promise 的操作符 就用到了这种方式。
any
value,执行以下步骤:
注意:将此算法从 Web IDL 的 from()
方法中拆分出来,
以便规范正文可以直接 转换值,不经过 Web IDL 绑定。
-
如果 Type(value) 不是 Object,抛出
TypeError
。注意:此步骤防止原始类型被强制转换为 iterable(如 String)。讨论见 WICG/observable#125。
-
Observable 转换:如果 value 的 具体类型 是
Observable
,则直接返回 value。 -
异步 iterable 转换:令 asyncIteratorMethod 为 ? GetMethod(value,
%Symbol.asyncIterator%
)。注意:我们使用 GetMethod 而不是 GetIterator,因为我们仅检测异步迭代协议支持,不希望因未实现而抛错。 GetIterator 在以下两种情况都会抛错:(a) 没有实现迭代协议,(b) 实现了但不可调用或 getter 抛错。 GetMethod 只在后者情况下抛错。
-
如果 asyncIteratorMethod 是 undefined 或 null,则跳到 Iterable 转换 步骤。
-
令 nextAlgorithm 为如下步骤,给定
Subscriber
subscriber 和 Iterator Record iteratorRecord:-
令 nextPromise 为
Promise
或 undefined,初始为 undefined。 -
令 nextCompletion 为 IteratorNext(iteratorRecord)。
注意:此处用 IteratorNext,而不是 IteratorStepValue, 因为 IteratorStepValue 要求迭代器的
next()
方法返回可立即检查的对象, 而异步迭代时next()
返回 Promise/thenable(需包装为 Promise 再获取值)。 -
如果 nextCompletion 是 throw completion,则:
-
断言:iteratorRecord 的 [[Done]] 为 true。
-
令 nextPromise 为 被拒绝的 promise, 值为 nextRecord 的 [[Value]]。
-
-
否则,如果 nextRecord 是 normal completion, 令 nextPromise 为 已解决的 promise, 值为 nextRecord 的 [[Value]]。
注意:这样做是确保 nextRecord 的 [[Value]] 即使不是 Promise,也会被包装为 Promise。
-
响应 nextPromise:
-
如果 nextPromise 被 fulfilled,值为 iteratorResult,则:
-
如果 Type(iteratorResult) 不是 Object,则以
error()
和TypeError
调用 subscriber,并终止步骤。 -
令 done 为 IteratorComplete(iteratorResult)。
-
如果 done 是 throw completion, 则以 done 的 [[Value]] 调用 subscriber 的
error()
,并终止步骤。 -
如果 done 的 [[Value]] 为 true,则以 subscriber 的
complete()
调用,并终止步骤。 -
令 value 为 IteratorValue(iteratorResult)。
-
如果 value 是 throw completion, 则以 value 的 [[Value]] 调用 subscriber 的
error()
,并终止步骤。 -
以 value 的 [[Value]] 调用 subscriber 的
next()
。 -
再次以 subscriber 和 iteratorRecord 运行 nextAlgorithm。
-
-
如果 nextPromise 被 rejected,原因为 r,则以 r 调用 subscriber 的
error()
。
-
-
返回一个 新的
Observable
, 其 subscribe callback 是一个接收Subscriber
subscriber 并执行如下操作的算法:-
令 iteratorRecordCompletion 为 GetIterator(value, async)。
注意:此步骤会重新调用
%Symbol.asyncIterator%
的 getter, 并触发协议本身以获取 Iterator Record。极端情况见 issue#127。 -
如果 iteratorRecordCompletion 是 throw completion, 则以 iteratorRecordCompletion 的 [[Value]] 同步调用 subscriber 的
error()
,并终止步骤。注意:此处同步传播错误,仅在异步 iterable 转 Observable 时可能发生; 其它情况错误会异步经 microtask 传播,因为被包装为 rejected
Promise
。 此同步传播行为与语言结构一致,如 for-await of 循环会同步抛出异常到外部 catch 块,且未发生 Await 前即抛出。 -
令 iteratorRecord 为 ! iteratorRecordCompletion。
-
断言:iteratorRecord 是 Iterator Record。
-
添加如下中止算法到 subscriber 的 订阅控制器的 signal:
-
运行 AsyncIteratorClose(iteratorRecord, NormalCompletion(subscriber 的 订阅控制器 的 abort reason))。
-
-
以 subscriber 和 iteratorRecord 运行 nextAlgorithm。
-
Iterable 转换:令 iteratorMethod 为 ? GetMethod(value,
%Symbol.iterator%
)。 -
如果 iteratorMethod 为 undefined,则跳到 Promise 转换 步骤。
否则,返回一个 新的
Observable
, 其 subscribe callback 是一个接收Subscriber
subscriber 并执行如下操作的算法:-
令 iteratorRecordCompletion 为 GetIterator(value, sync)。
-
如果 iteratorRecordCompletion 是 throw completion, 则以 iteratorRecordCompletion 的 [[Value]] 调用 subscriber 的
error()
,并终止步骤。 -
令 iteratorRecord 为 ! iteratorRecordCompletion。
-
添加如下中止算法到 subscriber 的 订阅控制器的 signal:
-
运行 IteratorClose(iteratorRecord, NormalCompletion(UNUSED))。
-
-
当 true 时:
-
令 next 为 IteratorStepValue(iteratorRecord)。
-
如果 next 是 throw completion, 则以 next 的 [[Value]] 调用 subscriber 的
error()
,并 跳出。 -
令 next 为 ! next。
-
如果 next 已结束,则:
-
断言:iteratorRecord 的 [[Done]] 为 true。
-
调用 subscriber 的
complete()
。 -
返回。
-
-
以 next 调用 subscriber 的
next()
。
-
-
Promise 转换:如果 IsPromise(value) 为 true, 则:
-
返回一个 新的
Observable
, 其 subscribe callback 是一个接收Subscriber
subscriber 并执行如下操作的算法:-
-
如果 value 被 fulfilled,值为 v,则:
-
以 v 运行 subscriber 的
next()
方法。 -
运行 subscriber 的
complete()
方法。
-
-
如果 value 被 rejected,原因为 r,则以 r 运行 subscriber 的
error()
方法。
-
-
-
Observable
,给定一个
ObserverUnion
或 内部观察者
observer,以及 SubscribeOptions
options,执行以下步骤:
注意:我们将此算法与 Web IDL 的 subscribe()
方法分离开,使规范正文可以 订阅一个 Observable
而无需经过 Web IDL 绑定。参见 w3c/IntersectionObserver#464
相关背景,其中“内部”正文 必须 不经过 Web IDL 绑定处理属性可能被 JavaScript 修改的对象。用法参见 § 2.3.3 返回 Promise 的操作符。
-
如果 this 的 相关全局对象 是
Window
对象,且其 关联的 Document 非 完全激活,则返回。 -
令 internal observer 为新的 内部观察者。
-
按如下处理 observer:
-
- 如果 observer 是
ObservableSubscriptionCallback
-
设置 internal observer 的 next 步骤如下,接收
any
value:-
调用 observer,参数为 «value» 和 "
report
"。
-
- 如果 observer 是
SubscriptionObserver
- 如果 observer 是 内部观察者
- 将 internal observer 设为 observer。
- 如果 observer 是
-
-
断言:internal observer 的 error 步骤要么是 默认错误算法,要么是 调用提供的
error
回调函数的算法。 -
如果 this 的 weak subscriber 非 null 且 this 的 weak subscriber 的 active 为 true:
-
令 subscriber 为 this 的 weak subscriber。
-
返回。
-
-
令 subscriber 为 新建的
Subscriber
。 -
设置 this 的 weak subscriber 为 subscriber。
-
如果 this 的 subscribe callback 是
SubscribeCallback
, 则以「subscriber」和 "rethrow
" 调用它。 -
否则,以 subscriber 运行 this 的 subscribe callback 给出的步骤。
2.3. 操作符
目前请参见 https://github.com/wicg/observable#operators。
2.3.1. from()
from(value)
方法步骤如下:
-
返回 转换 value 为
Observable
的结果。 如有异常则抛出。
2.3.2.
Observable
返回型操作符
takeUntil(value)
方法步骤如下:
-
令 sourceObservable 为 this。
-
令 notifier 为 转换 value 为
Observable
的结果。 -
令 observable 为 新建的
Observable
, 其 subscribe callback 是一个接受Subscriber
subscriber 的算法,具体如下:注意:此方法涉及对两个Observable
进行 订阅: (1) notifier,(2) sourceObservable。在以下情况我们会从两者取消订阅:-
当 notifier 开始发出值(无论是 "next" 还是 "error")。此时, 我们会从 notifier 取消订阅,因为我们已获得所需,不再需要其继续产生值。同时也会从 sourceObservable 取消订阅, 因为它不再需要继续产生用于本方法返回的 observable 的值,因为我们已经手动结束了 observable 的订阅, 因为 notifier 最终产生了值。
-
当 sourceObservable 调用
error()
或complete()
。此时,从 notifier 取消订阅, 因为我们不再需要监听其产生的值以决定 observable 何时停止镜像 sourceObservable 的值 (因为 sourceObservable 已自行完成)。无需从 sourceObservable 取消订阅, 因为其订阅已自动结束。
-
令 notifierObserver 为新的 内部观察者,初始化如下:
- next 步骤
-
运行 subscriber 的
complete()
方法。注意:这会从 sourceObservable 取消订阅, 如果此时已订阅。原因是 sourceObservable 使用 "外部" subscriber 的 subscription controller 的 signal 作为输入信号, 当上述(或下述)调用
complete()
时, 该信号会被 中止。 - error 步骤
-
运行 subscriber 的
complete()
方法。
注意:没有指定 complete 步骤, 因为如果 notifier
Observable
自行完成, 我们无需对与该方法返回的 observable 相关的 subscriber 调用 complete。 此时 observable 会继续无中断地镜像 sourceObservable。 -
令 options 为新建的
SubscribeOptions
, 其signal
是 subscriber 的 subscription controller 的 signal。 -
订阅 notifier, 传入 notifierObserver 和 options。
-
如果 subscriber 的 active 为 false,则返回。
注意:这意味着如果 notifier 同步发出值, sourceObservable 的 subscribe callback 不会被调用。 但如果 notifier 仅同步 "complete"(没有发出 "next" 或 "error"),则 subscriber 的 active 仍为 true,我们会继续订阅 sourceObservable, observable 会无中断地镜像它。
-
令 sourceObserver 为新的 内部观察者,初始化如下:
- next 步骤
-
以传入的 value 调用 subscriber 的
next()
方法。 - error 步骤
-
以传入的 error 调用 subscriber 的
error()
方法。 - complete 步骤
-
运行 subscriber 的
complete()
方法。
注意: sourceObserver 主要是透传, 镜像 sourceObservable 发出的所有内容,唯一例外是当 sourceObservable 被耗尽, 在 notifier 尚未产生任何内容时,可以取消订阅 notifier
Observable
。 -
订阅 sourceObservable, 传入 sourceObserver 和 options。
-
-
返回 observable。
map(mapper)
方法步骤如下:
-
令 sourceObservable 为 this。
-
令 observable 为 新建的
Observable
, 其 subscribe callback 是一个接受Subscriber
subscriber 的算法,具体如下:-
令 idx 为
unsigned long long
,初始为 0。 -
令 sourceObserver 为新的 内部观察者,初始化如下:
- next 步骤
- error 步骤
-
以传入的 error 调用 subscriber 的
error()
方法。 - complete 步骤
-
调用 subscriber 的
complete()
方法。
-
令 options 为新建的
SubscribeOptions
, 其signal
为 subscriber 的 subscription controller 的 signal。 -
订阅 sourceObservable, 传入 sourceObserver 和 options。
-
-
返回 observable。
filter(predicate)
方法步骤如下:
-
令 sourceObservable 为 this。
-
令 observable 为 新建的
Observable
, 其 subscribe callback 是一个接受Subscriber
subscriber 的算法,具体如下:-
令 idx 为
unsigned long long
,初始为 0。 -
令 sourceObserver 为新的 内部观察者,初始化如下:
- next 步骤
- error 步骤
-
以传入的 error 调用 subscriber 的
error()
方法。 - complete 步骤
-
调用 subscriber 的
complete()
方法。
-
令 options 为新建的
SubscribeOptions
, 其signal
为 subscriber 的 subscription controller 的 signal。 -
订阅 sourceObservable, 传入 sourceObserver 和 options。
-
-
返回 observable。
take(amount)
方法步骤如下:
-
令 sourceObservable 为 this。
-
令 observable 为 新建的
Observable
, 其 subscribe callback 是一个接受Subscriber
subscriber 的算法,具体如下:-
令 remaining 为 amount。
-
如果 remaining 为 0,则运行 subscriber 的
complete()
方法并终止这些步骤。 -
令 sourceObserver 为新的 内部观察者,初始化如下:
- next 步骤
-
-
以传入的 value 调用 subscriber 的
next()
方法。 -
递减 remaining。
-
如果 remaining 为 0,则运行 subscriber 的
complete()
方法。
-
- error 步骤
-
以传入的 error 调用 subscriber 的
error()
方法。 - complete 步骤
-
调用 subscriber 的
complete()
方法。
-
令 options 为新建的
SubscribeOptions
, 其signal
为 subscriber 的 subscription controller 的 signal。 -
订阅 sourceObservable, 传入 sourceObserver 和 options。
-
-
返回 observable。
drop(amount)
方法步骤如下:
-
令 sourceObservable 为 this。
-
令 observable 为 新建的
Observable
, 其 subscribe callback 是一个接受Subscriber
subscriber 的算法,具体如下:-
令 remaining 为 amount。
-
令 sourceObserver 为新的 内部观察者,初始化如下:
- next 步骤
- error 步骤
-
以传入的 error 调用 subscriber 的
error()
方法。 - complete 步骤
-
调用 subscriber 的
complete()
方法。
-
令 options 为新建的
SubscribeOptions
, 其signal
为 subscriber 的 subscription controller 的 signal。 -
订阅 sourceObservable, 传入 sourceObserver 和 options。
-
-
返回 observable。
flatMap(mapper)
方法步骤如下:
-
令 sourceObservable 为 this。
-
令 observable 为 新建的
Observable
, 其 subscribe callback 是一个接受Subscriber
subscriber 的算法,具体如下:-
令 idx 为
unsigned long long
,初始为 0。 -
令 outerSubscriptionHasCompleted 为 boolean,初始为 false。
-
令 queue 为新的 列表,元素类型为
any
,初始为空。注意:该 queue 用于存储 sourceObservable 发出的所有
Observable
, 当 observable 当前正在订阅某个较早由 sourceObservable 发出的且尚未耗尽的Observable
时。 -
令 activeInnerSubscription 为 boolean,初始为 false。
-
令 sourceObserver 为新的 内部观察者,初始化如下:
- next 步骤
-
-
如果 activeInnerSubscription 为 true:
-
追加 value 到 queue。
注意:该 value 会在当前
Observable
订阅结束后处理。
-
-
否则:
-
将 activeInnerSubscription 设为 true。
-
以 value、subscriber、mapper, 以及对以下变量的引用:queue、activeInnerSubscription、outerSubscriptionHasCompleted、idx 运行 flatmap 处理下一个值步骤。
注意:该 flatmap 处理下一个值步骤 会订阅由 value 派生的
Observable
(如果能派生)并持续处理其值,直到订阅失效(错误或完成)。如果该“内部”Observable
完成,则处理步骤会递归调用自身,处理 queue 下一个值。如果不存在下一个值,则处理步骤终止,取消设置 activeInnerSubscription,以便后续 sourceObservable 产生的值能正确处理。
-
-
- error 步骤
-
以传入的 error 调用 subscriber 的
error()
方法。 - complete 步骤
-
-
将 outerSubscriptionHasCompleted 设为 true。
注意:如果 activeInnerSubscription 为 true,下面的步骤不会完成 subscriber。此时,flatmap 处理下一个值步骤 会在 queue 为空且“内部”订阅失效后负责完成 subscriber。
-
如果 activeInnerSubscription 为 false 且 queue 为空,则运行 subscriber 的
complete()
方法。
-
-
令 options 为新建的
SubscribeOptions
, 其signal
为 subscriber 的 subscription controller 的 signal。 -
订阅 sourceObservable, 传入 sourceObserver 和 options。
-
-
返回 observable。
any
value、Subscriber
subscriber、Mapper
mapper,以及对以下所有变量的引用:列表类型的
any
值 queue、boolean activeInnerSubscription、boolean
outerSubscriptionHasCompleted,以及 unsigned long long
idx:
-
令 mappedResult 为 调用 mapper,参数为 «value, idx» 和 "
rethrow
" 的结果。 -
将 idx 设为 idx + 1。
-
令 innerObservable 为调用
from()
,参数为 mappedResult 的结果。如果 抛出异常 E,则以 E 运行 subscriber 的
error()
方法,并终止这些步骤。不应该直接调用
from()
,应调用某个内部算法以便异常能正确传递到 subscriber。 -
令 innerObserver 为新建的 内部观察者,初始化如下:
- next 步骤
-
以传入的 value 调用 subscriber 的
next()
方法。 - error 步骤
-
以传入的 error 调用 subscriber 的
error()
方法。 - complete 步骤
-
-
如果 queue 非空:
-
令 nextValue 为 queue 的第一个元素;移除该元素。
-
以 nextValue、subscriber、mapper、以及对 queue 和 activeInnerSubscription 的引用, 运行 flatmap 处理下一个值步骤。
-
-
否则:
-
将 activeInnerSubscription 设为 false。
注意: 由于 activeInnerSubscription 是引用类型,这能确保后续“外部”
Observable
(即 sourceObservable)产生的所有值都能被正确处理。 -
如果 outerSubscriptionHasCompleted 为 true,则运行 subscriber 的
complete()
方法。注意: 这表示“外部”
Observable
已经完成,但因为还有至少一个待处理的“内部”Observable
(即 innerObservable)尚未完成,所以直到现在才真正完成 subscriber。
-
-
-
令 innerOptions 为新建的
SubscribeOptions
, 其signal
为 subscriber 的 subscription controller 的 signal。 -
订阅 innerObservable, 传入 innerObserver 和 innerOptions。
switchMap(mapper)
方法步骤如下:
-
令 sourceObservable 为 this。
-
令 observable 为 新建的
Observable
, 其 subscribe callback 是一个接受Subscriber
subscriber 的算法,具体如下:-
令 idx 为
unsigned long long
,初始为 0。 -
令 outerSubscriptionHasCompleted 为 boolean,初始为 false。
-
令 activeInnerAbortController 为
AbortController
或 null,初始为 null。注意:
AbortController
只会在本算法的 next 步骤(见下方)被赋值为新实例,并只会在 switchmap 处理下一个值步骤 被赋值为 null(当“内部”Observable
完成或错误时)。 此变量用作当前是否有活跃“内部”订阅的标志。下方 complete 步骤 判断此变量, 如果 sourceObservable 完成但有活跃“内部”订阅,则不会立即完成 subscriber,而是等待“内部”订阅完成后再完成。 -
令 sourceObserver 为新建的 内部观察者,初始化如下:
-
- next 步骤
-
-
如果 activeInnerAbortController 不为 null,则 中止信号 activeInnerAbortController。
注意: 这会“取消订阅”上一次由 sourceObservable 推送的值派生出的“内部”
Observable
, 然后我们会立即订阅即将由当前 value 推送产生的“新的”Observable
。 -
将 activeInnerAbortController 设为新建
AbortController
。 -
以 value、subscriber、mapper, 以及对 activeInnerAbortController、outerSubscriptionHasCompleted、idx 的引用, 运行 switchmap 处理下一个值步骤。
注意: switchmap 处理下一个值步骤 会订阅由 value 派生的
Observable
, 持续处理其值,直到 (1) 订阅失效(错误或完成);或 (2) activeInnerAbortController 被 中止(因为 sourceObservable 推送了更新的值,替换了当前“内部”订阅)。
-
- error 步骤
-
以传入的 error 调用 subscriber 的
error()
方法。 -
- complete 步骤
-
-
将 outerSubscriptionHasCompleted 设为 true。
注意: 如果 activeInnerAbortController 不为 null,则不会立即完成 subscriber。 此时 switchmap 处理下一个值步骤 会在“内部”订阅完成时完成 subscriber。
-
如果 activeInnerAbortController 为 null,则运行 subscriber 的
complete()
方法。
-
-
-
令 options 为新建的
SubscribeOptions
, 其signal
为 subscriber 的 subscription controller 的 signal。 -
订阅 sourceObservable, 传入 sourceObserver 和 options。
-
-
返回 observable。
any
value、Subscriber
subscriber、Mapper
mapper,以及对以下所有变量的引用:AbortController
activeInnerAbortController、boolean
outerSubscriptionHasCompleted,
以及 unsigned long long
idx:
-
令 mappedResult 为 调用 mapper,参数为 «value, idx» 和 "
rethrow
" 的结果。 -
将 idx 设为 idx + 1。
-
令 innerObservable 为调用
from()
,参数为 mappedResult 的结果。 -
令 innerObserver 为新建的 内部观察者,初始化如下:
- next 步骤
-
以传入的 value 调用 subscriber 的
next()
方法。 - error 步骤
-
以传入的 error 调用 subscriber 的
error()
方法。注意: 此处无需将 activeInnerAbortController 设为 null, 因为调用 subscriber 的
error()
方法会自动从“外部”源 Observable 取消订阅, 不会再收到更多值。 - complete 步骤
-
-
如果 outerSubscriptionHasCompleted 为 true,则运行 subscriber 的
complete()
方法。 -
否则,将 activeInnerAbortController 设为 null。
注意: 由于该变量是引用类型,能向 switchMap 完成步骤传递没有活跃内部订阅的信号。
-
-
令 innerOptions 为新建的
SubscribeOptions
, 其signal
为 创建依赖中止信号的结果,信号列表为「activeInnerAbortController 的 signal,subscriber 的 subscription controller 的 signal」,使用AbortSignal
, 以及 当前 realm。 -
订阅 innerObservable, 传入 innerObserver 和 innerOptions。
inspect(inspectorUnion)
方法步骤如下:
-
令 subscribe callback 为
VoidFunction
或 null,初始为 null。 -
令 next callback 为
ObservableSubscriptionCallback
或 null,初始为 null。 -
令 error callback 为
ObservableSubscriptionCallback
或 null,初始为 null。 -
令 complete callback 为
VoidFunction
或 null,初始为 null。 -
令 abort callback 为
ObservableInspectorAbortHandler
或 null,初始为 null。 -
处理 inspectorUnion,如下:
- 如果 inspectorUnion 是
ObservableSubscriptionCallback
-
-
将 next callback 设为 inspectorUnion。
-
- 如果 inspectorUnion 是
ObservableInspector
- 如果 inspectorUnion 是
-
令 sourceObservable 为 this。
-
令 observable 为 新建的
Observable
, 其 subscribe callback 是一个接受Subscriber
subscriber 的算法,具体如下:-
如果 subscribe callback 非 null,则 调用它,参数为 «» 和 "
rethrow
"。如果 抛出异常 E,则以 E 运行 subscriber 的
error()
方法,并终止这些步骤。注意: 这样会导致 sourceObservable 永不被订阅。
-
如果 abort callback 不为 null,则 向下添加的中止算法 到 subscriber 的 subscription controller 的 signal:
-
调用 abort callback,参数为 «subscriber 的 subscription controller 的 signal 的 abort reason» 和 "
report
"。
-
-
令 sourceObserver 为新建的 内部观察者,初始化如下:
- next 步骤
-
-
如果 next callback 非 null,则 调用 next callback,参数为 «传入的 value» 和 "
rethrow
"。如果 抛出异常 E:
-
移除 abort callback,从 subscriber 的 subscription controller 的 signal。
注意: 此步骤很重要,因为 abort callback 仅用于“消费者主动”取消订阅。当生产者终止订阅(通过 subscriber 的
error()
或complete()
)时,需确保不会运行 abort callback。这与 Chromium 实现一致,但建议持有最初传入的
SubscribeOptions
的signal
的引用,仅在其 abort 时调用 abort callback。结果应该一样,但需进一步验证。 -
以 E 运行 subscriber 的
error()
方法,并终止这些步骤。
-
-
以传入的 value 运行 subscriber 的
next()
方法。
-
- error 步骤
- complete 步骤
-
-
移除 abort callback,从 subscriber 的 subscription controller 的 signal。
-
如果 complete callback 非 null,则 调用 complete callback,参数为 «» 和 "
rethrow
"。 -
运行 subscriber 的
complete()
方法。
-
-
令 options 为新建的
SubscribeOptions
, 其signal
为 subscriber 的 subscription controller 的 signal。 -
订阅 sourceObservable,传入 sourceObserver 和 options。
-
-
返回 observable。
catch(callback)
方法步骤如下:
-
令 sourceObservable 为 this。
-
令 observable 为 新建的
Observable
, 其 subscribe callback 是一个接受Subscriber
subscriber 的算法,具体如下:-
令 sourceObserver 为新建的 内部观察者,初始化如下:
- next 步骤
-
以传入的 value 调用 subscriber 的
next()
方法。 - error 步骤
-
-
调用 callback,参数为 «传入的 error» 和 "
rethrow
"。令 result 为返回值。 -
令 innerObservable 为调用
from()
,参数为 result 的结果。如果 抛出异常 E,则以 E 运行 subscriber 的
error()
方法,并终止这些步骤。不应直接调用
from()
,应调用内部算法以便异常能正确传递到 subscriber。 -
令 innerObserver 为新建的 内部观察者,初始化如下:
- next 步骤
-
以传入的 value 调用 subscriber 的
next()
方法。 - error 步骤
-
以传入的 error 调用 subscriber 的
error()
方法。 - complete 步骤
-
运行 subscriber 的
complete()
方法。
-
令 innerOptions 为新建的
SubscribeOptions
, 其signal
为 subscriber 的 subscription controller 的 signal。 -
订阅 innerObservable,传入 innerObserver 和 innerOptions。
注意: 此处可直接订阅 innerObservable,不需先取消订阅 sourceObservable,也不用担心 sourceObservable 继续发出值,因为这些都发生在 error 步骤内,说明 sourceObservable 已结束,不会再产生值,可以安全切换到 innerObservable。
-
- complete 步骤
-
运行 subscriber 的
complete()
方法。
-
令 options 为新建的
SubscribeOptions
, 其signal
为 subscriber 的 subscription controller 的 signal。 -
订阅 sourceObservable,传入 sourceObserver 和 options。
-
-
返回 observable。
finally(callback)
方法步骤如下:
-
令 sourceObservable 为 this。
-
令 observable 为 新建的
Observable
, 其 subscribe callback 是一个接受Subscriber
subscriber 的算法,具体如下:-
以 callback 调用 subscriber 的
addTeardown()
方法。 -
令 sourceObserver 为新建的 内部观察者,初始化如下:
- next 步骤
-
以传入的 value 调用 subscriber 的
next()
方法。 - error 步骤
-
-
以传入的 error 调用 subscriber 的
error()
方法。
-
- complete 步骤
-
-
运行 subscriber 的
complete()
方法。
-
-
令 options 为新建的
SubscribeOptions
, 其signal
为 subscriber 的 subscription controller 的 signal。 -
订阅 sourceObservable,传入 sourceObserver 和 options。
-
-
返回 observable。
2.3.3.
Promise
返回型操作符
toArray(options)
方法步骤如下:
-
令 p 为新建的 promise。
-
如果 options 的
signal
不为 null: -
令 values 为新建的 列表。
-
令 observer 为新建的 内部观察者,初始化如下:
-
返回 p。
forEach(callback, options)
方法步骤如下:
-
令 p 为新建的 promise。
-
令 visitor callback controller 为 新建的
AbortController
。 -
令 internal options 为新建的
SubscribeOptions
, 其signal
是 创建依赖中止信号的结果,信号列表为「visitor callback controller 的 signal,options 的signal
(如不为 null)」, 使用AbortSignal
, 以及 当前 realm。许多琐碎的 内部观察者 起到传递的作用,并不控制它们所代表的
Observable
的订阅;也就是说,当订阅被终止时,会调用它们的 error steps 和 complete steps,而它们的 next steps 只是沿链传递给定值的某种形式。但是,对于这个运算符,下方 observer 的 next steps 在 callback 抛出异常的情况下,实际上负责中止对 this 的底层订阅。在这种情况下,我们传递给“Subscribe to an
Observable
” 的SubscribeOptions
的signal
需要是从 options 的signal
派生的 dependent signal,并且下方 next steps 可访问且在需要时可用来 signal abort 的AbortController
的AbortSignal
。 -
令 idx 为
unsigned long long
,初始为 0。 -
令 observer 为新建的 内部观察者,初始化如下:
-
返回 p。
every(predicate, options)
方法步骤如下:
-
令 p 为新建的 promise。
-
令 controller 为 新建的
AbortController
。 -
令 internal options 为新建的
SubscribeOptions
, 其signal
是 创建依赖中止信号的结果,信号列表为「controller 的 signal,options 的signal
(如不为 null)」, 使用AbortSignal
, 以及 当前 realm。 -
令 idx 为
unsigned long long
,初始为 0。 -
令 observer 为新建的 内部观察者,初始化如下:
-
返回 p。
first(options)
方法步骤如下:
-
令 p 为新建的 promise。
-
令 controller 为新建的
AbortController
。 -
令 internal options 为新建的
SubscribeOptions
, 其signal
是 创建依赖中止信号的结果,信号列表为「controller 的 signal,options 的signal
(如不为 null)」, 使用AbortSignal
, 以及 当前 realm。 -
令 internal observer 为新建的 内部观察者,初始化如下:
- next 步骤
- error 步骤
-
拒绝 p,原因为传入的 error。
- complete 步骤
-
拒绝 p,原因为新建
RangeError
。注意: 只有当源
Observable
在发出第一个值之前就完成时才会到达这里。
-
返回 p。
last(options)
方法步骤如下:
-
令 p 为新建的 promise。
-
如果 options 的
signal
不为 null: -
令 lastValue 为
any
或 null,初始为 null。 -
令 hasLastValue 为 boolean,初始为 false。
-
令 observer 为新建的 内部观察者,初始化如下:
- next 步骤
-
-
将 hasLastValue 设为 true。
-
将 lastValue 设为传入的 value。
-
- error 步骤
-
拒绝 p,原因为传入的 error。
- complete 步骤
-
-
如果 hasLastValue 为 true,解析 p,值为 lastValue。
-
否则,拒绝 p,原因为新建
RangeError
。注意: 参见
first()
的说明。
-
-
-
返回 p。
find(predicate, options)
方法步骤如下:
-
令 p 为新建的 promise。
-
令 controller 为新建的
AbortController
。 -
令 internal options 为新建的
SubscribeOptions
, 其signal
是 创建依赖中止信号的结果,信号列表为「controller 的 signal,options 的signal
(如不为 null)」, 使用AbortSignal
, 以及 当前 realm。 -
令 idx 为
unsigned long long
,初始为 0。 -
令 observer 为新建的 内部观察者,初始化如下:
-
返回 p。
some(predicate, options)
方法步骤如下:
-
令 p 为新建的 promise。
-
令 controller 为新建的
AbortController
。 -
令 internal options 为新建的
SubscribeOptions
, 其signal
是 创建依赖中止信号的结果,信号列表为「controller 的 signal,options 的signal
(如不为 null)」, 使用AbortSignal
, 以及 当前 realm。 -
令 idx 为
unsigned long long
,初始为 0。 -
令 observer 为新建的 内部观察者,初始化如下:
-
返回 p。
reduce(reducer, initialValue, options)
方法步骤如下:
-
令 p 为新建的 promise。
-
令 controller 为新建的
AbortController
。 -
令 internal options 为新建的
SubscribeOptions
, 其signal
是 创建依赖中止信号的结果,信号列表为「controller 的 signal,options 的signal
(如不为 null)」, 使用AbortSignal
, 以及 当前 realm。 -
将以下中止算法添加到 internal options 的
signal
: -
令 idx 为
unsigned long long
,初始为 0。 -
令 accumulator 为 initialValue(如果已给定),否则未初始化。
-
令 observer 为新建的 内部观察者,初始化如下:
- next steps
-
-
如果 accumulator 未初始化(即没有传入 initialValue),则将 accumulator 设为传入的 value,将 idx 设为 idx + 1,并终止这些步骤。
注意: 这意味着 reducer 不会用 this 产生的第一个 value 作为
currentValue
调用。实际在第二个值发出时才会用它作为currentValue
,第一个值(此处保存的)作为accumulator
。 -
调用 reducer,参数为 «accumulator 作为
accumulator
,传入的 value 作为currentValue
,idx 作为index
» 和 "rethrow
"。令 result 为返回值。 -
将 idx 设为 idx + 1。
-
将 accumulator 设为 result。
-
- error steps
-
拒绝 p,原因为传入的 error。
- complete steps
-
返回 p。
3. EventTarget
集成
dictionary {
ObservableEventListenerOptions boolean =
capture false ;boolean ; };
passive partial interface EventTarget {Observable when (DOMString ,
type optional ObservableEventListenerOptions = {}); };
options
when(type, options)
方法步骤如下:
-
如果 this 的 相关全局对象 是
Window
对象,且其 关联的 Document 非 完全激活,则返回。 -
令 event target 为 this。
-
令 observable 为 新建的
Observable
, 初始化如下:- subscribe callback
-
一个算法,接受
Subscriber
subscriber,按以下步骤运行:-
如果 event target 为 null,则终止这些步骤。
注意: 用于捕捉 event target 可能在订阅时已被垃圾回收的情况。
-
如果 subscriber 的 subscription controller 的 signal 已 中止,则终止这些步骤。
-
添加事件监听器,参数为 event target 及如下 事件监听器:
- type
-
type
- callback
-
创建一个新的 Web IDL
EventListener
实例,引用一个参数为Event
event 的函数。该函数会执行 observable 事件监听器调用算法,参数为 subscriber 和 event。 - capture
-
options 的
capture
- passive
- once
-
false
- signal
-
subscriber 的 subscription controller 的 signal
-
-
返回 observable。
Subscriber
subscriber 和一个 Event
event,按以下步骤运行:
-
以 event 调用 subscriber 的
next()
方法。
4. 安全与隐私注意事项
本内容正从我们的 explainer 上游到本规范,期间可以参阅以下资源:
5. 致谢
特别感谢 Ben Lesh 对 Observable
API 的大量设计输入,以及多年维护用户态 Observable 代码的工作,使这一贡献得以进入 Web 平台。