1. 引言
本节为非规范性内容。
2. 核心基础设施
2.1. Subscriber
接口
[Exposed=*]interface {
Subscriber undefined next (any );
value undefined error (any );
error undefined complete ();undefined addTeardown (VoidFunction ); // True after the Subscriber is created, up until either // complete()/error() are invoked, or the subscriber unsubscribes. Inside // complete()/error(), this attribute is true.
teardown readonly attribute boolean active ;readonly attribute AbortSignal signal ; };
每个 Subscriber
拥有一个有序集合的内部观察者,初始为空。
每个 Subscriber
有一个清理回调,它是一个列表,包含VoidFunction
,初始为空。
每个 Subscriber
有一个订阅控制器,它是一个AbortController
。
每个 Subscriber
有一个active布尔值,初始为 true。
注意:这是一个用于记录状态的变量,用于确保
Subscriber
在被关闭后不会调用其拥有的任何回调。
active
的 getter 步骤是返回 this 的 active 布尔值。
signal
的 getter 步骤是返回 this 的 订阅控制器的 signal。
next(value)
方法的步骤为:
error(error)
方法的步骤如下:
complete()
方法的步骤如下:
addTeardown(teardown)
方法的步骤如下:
Subscriber
subscriber,以及一个可选的 any
reason,请执行以下步骤:
-
如果 subscriber 的 active 为 false,则返回。
此步骤用于防止重入调用,这种情况可能发生在“生产者发起”的取消订阅场景。请看如下例子:
const outerController= new AbortController(); const observable= new Observable( subscriber=> { subscriber. addTeardown(() => { // 2.) This teardown executes inside the "Close" algorithm, while it’s // running. Aborting the downstream signal run its abort algorithms, // one of which is the currently-running "Close" algorithm. outerController. abort(); }); // 1.) This immediately invokes the "Close" algorithm, which // sets subscriber.active to false. subscriber. complete(); }); observable. subscribe({}, { signal: outerController. signal}); -
将 subscriber 的 active 布尔值设为 false。
-
对于每个teardown,在subscriber的teardown 回调中按反插入顺序排序:
2.2. Observable
接口
// SubscribeCallback is where the Observable "creator's" code lives. It's // called when subscribe() is called, to set up a new subscription.callback =
SubscribeCallback undefined (Subscriber );
subscriber callback =
ObservableSubscriptionCallback undefined (any );
value dictionary {
SubscriptionObserver ObservableSubscriptionCallback ;
next ObservableSubscriptionCallback ;
error VoidFunction ; };
complete callback =
ObservableInspectorAbortHandler undefined (any );
value dictionary {
ObservableInspector ObservableSubscriptionCallback ;
next ObservableSubscriptionCallback ;
error VoidFunction ;
complete VoidFunction ;
subscribe ObservableInspectorAbortHandler ; };
abort typedef (ObservableSubscriptionCallback or SubscriptionObserver );
ObserverUnion typedef (ObservableSubscriptionCallback or ObservableInspector );
ObservableInspectorUnion dictionary {
SubscribeOptions AbortSignal ; };
signal callback =
Predicate boolean (any ,
value unsigned long long );
index callback =
Reducer any (any ,
accumulator any ,
currentValue unsigned long long );
index callback =
Mapper any (any ,
value unsigned long long ); // Differs from Mapper only in return type, since this callback is exclusively // used to visit each element in a sequence, not transform it.
index callback =
Visitor undefined (any ,
value unsigned long long ); // This callback returns an `any` that must convert into an `Observable`, via // the `Observable` conversion semantics.
index callback =
CatchCallback any (any ); [Exposed=*]
value interface {
Observable constructor (SubscribeCallback );
callback undefined subscribe (optional ObserverUnion = {},
observer optional SubscribeOptions = {}); // Constructs a native Observable from value if it's any of the following: // - Observable // - AsyncIterable // - Iterable // - Promise
options static Observable from (any ); // Observable-returning operators. See "Operators" section in the spec. // // takeUntil() can consume promises, iterables, async iterables, and other // observables.
value Observable takeUntil (any );
value Observable map (Mapper );
mapper Observable filter (Predicate );
predicate Observable take (unsigned long long );
amount Observable drop (unsigned long long );
amount Observable flatMap (Mapper );
mapper Observable switchMap (Mapper );
mapper Observable inspect (optional ObservableInspectorUnion = {});
inspectorUnion Observable catch (CatchCallback );
callback Observable finally (VoidFunction ); // Promise-returning operators.
callback Promise <sequence <any >>toArray (optional SubscribeOptions = {});
options Promise <undefined >forEach (Visitor ,
callback optional SubscribeOptions = {});
options Promise <boolean >every (Predicate ,
predicate optional SubscribeOptions = {});
options Promise <any >first (optional SubscribeOptions = {});
options Promise <any >last (optional SubscribeOptions = {});
options Promise <any >find (Predicate ,
predicate optional SubscribeOptions = {});
options Promise <boolean >some (Predicate ,
predicate optional SubscribeOptions = {});
options Promise <any >reduce (Reducer ,
reducer optional any ,
initialValue optional SubscribeOptions = {}); };
options
每个 Observable
都有一个 subscribe 回调,它是一个 SubscribeCallback
或者是一组接收 Subscriber
的步骤。
每个 Observable
都有一个 弱订阅者,它是一个指向 Subscriber
的弱引用或者 null,初始为 null。
注意: 这些类型的“联合”是为了同时支持由 JavaScript 创建的 Observable
(它们总是用 SubscribeCallback
构造),
以及原生构造的 Observable
对象(它们的 subscribe 回调 可能是任意一组原生步骤,而不是 JavaScript 回调)。
when()
的返回值就是后者的一个例子。
new
Observable(callback)
构造函数步骤如下:
-
将 this 的 subscribe 回调 设为 callback。
注意: 这个回调会在后续调用
subscribe()
时被调用。
2.2.1. 支持性概念
any
类型 error 的算法,步骤如下:
注意: 我们将这个默认步骤单独列出,是为了让所有在规范中原生订阅 Observable
(即规范文本中的订阅,而不是通过 subscribe()
方法)的地方都不用重复定义这些步骤。
- next 步骤
-
一个接受
any
类型参数的算法。初始时,这些步骤为空操作。 - error 步骤
- complete 步骤
-
无参数的算法。初始时,这些步骤为空操作。
内部观察者 结构体 用于对应
next
、
error
、
complete
回调函数。对于通过 subscribe()
方法由 JavaScript 订阅的任何 Observable
,
这些算法“步骤”只是包装对脚本提供的对应 回调函数的调用。
但当规范文本(非用户脚本)订阅一个 Observable
时,
这些“步骤”可以是任意规范算法,而不是通过 Web IDL 回调函数打包的 ObserverUnion
。
例如,见 § 2.3.3 返回 Promise 的操作符。
any
类型的 value,请运行以下步骤:
注意: 我们将此算法与 Web IDL 的 from()
方法分离出来,是为了让规范文本能够直接转换值而无需通过 Web IDL 绑定。
-
如果 Type(value) 不是 Object,则 抛出
TypeError
。注意: 这样可以阻止将原始类型强制为可迭代对象(如字符串)。详见 WICG/observable#125 的讨论。
-
来自 Observable:如果 value 的 特定类型是
Observable
,则返回 value。 -
来自异步可迭代对象:令 asyncIteratorMethod 为 ? GetMethod(value,
%Symbol.asyncIterator%
)。注意: 这里我们使用 GetMethod 而不是 GetIterator,因为我们只需探查是否支持异步迭代协议,不希望在未实现时抛出异常。GetIterator 在以下两种情况下都会抛错:(a)未实现迭代协议;(b)实现了协议但不可调用或 getter 抛错。GetMethod 只会在后者抛错。
-
如果 asyncIteratorMethod 为 undefined 或 null,则跳到标记为 来自可迭代对象 的步骤。
-
令 nextAlgorithm 为以下步骤,给定
Subscriber
subscriber 和 迭代器记录 iteratorRecord:-
令 nextPromise 为
Promise
或 undefined,初始为 undefined。 -
令 nextCompletion 为 IteratorNext(iteratorRecord)。
注意: 这里我们用 IteratorNext 而不是 IteratorStepValue,因为后者要求
next()
立即返回可用的对象,而异步迭代器next()
返回 Promise/thenable(我们用 Promise 包裹并根据结果再处理)。 -
如果 nextCompletion 是 throw completion,则:
-
断言:iteratorRecord 的 [[Done]] 为 true。
-
将 nextPromise 设为 一个被拒绝的 Promise,错误为 nextRecord 的 [[Value]]。
-
-
否则,如果 nextRecord 是 正常完成,则将 nextPromise 设为 一个被解析的 Promise,值为 nextRecord 的 [[Value]]。
注意: 这样做是为了保证 nextRecord 的 [[Value]] 本身不是
Promise
时也能正常处理。 -
监听 nextPromise 的状态:
-
如果 nextPromise fulfilled,值为 iteratorResult,则:
-
如果 Type(iteratorResult) 不是 Object,则用
error()
方法和TypeError
运行 subscriber,并终止这些步骤。 -
令 done = IteratorComplete(iteratorResult)。
-
如果 done 是 throw completion,则用 done 的 [[Value]] 运行 subscriber 的
error()
方法并终止这些步骤。 -
如果 done 的 [[Value]] 为 true,则运行 subscriber 的
complete()
并终止这些步骤。 -
令 value = IteratorValue(iteratorResult)。
-
如果 value 是 throw completion,则用 value 的 [[Value]] 运行 subscriber 的
error()
方法并终止这些步骤。 -
用 value 的 [[Value]] 运行 subscriber 的
next()
方法。 -
再次用 subscriber 和 iteratorRecord 运行 nextAlgorithm。
-
-
如果 nextPromise 被拒绝,原因为 r,则用 r 运行 subscriber 的
error()
方法。
-
-
返回一个 新的
Observable
, 其 subscribe 回调 是一个接收Subscriber
subscriber 的算法,具体如下:-
如果 subscriber 的 subscription controller 的 signal 已经 aborted,则返回。
-
令 iteratorRecordCompletion 为 GetIterator(value, async)。
注:这会重新调用 value 上的任何
%Symbol.asyncIterator%
方法 getter——注意,这么做是否合理属于极端边界情况,但它符合测试预期;相关讨论见 issue#127——并调用协议本身以获取一个 Iterator Record。 -
如果 iteratorRecordCompletion 是一个 throw completion,则以 iteratorRecordCompletion 的 [[Value]] 作为参数,调用 subscriber 的
error()
方法,并终止本步骤。注:这意味着我们会在订阅的同步上下文中调用
error()
方法,这也是异步可迭代对象被转换为Observable
时,唯一会同步抛出错误的场景。在所有其他情况下,错误会通过被包裹在拒绝Promise
中,并被 nextAlgorithm reacts 到,从而以微任务时序异步传播给 observer。这种同步错误传播行为与语言结构一致,例如 for-await of 循环在调用%Symbol.asyncIterator%
并同步将异常重新抛给循环外部的 catch 块(在任何 Awaiting 发生之前)时就是如此。 -
令 iteratorRecord 为 ! iteratorRecordCompletion。
-
断言:iteratorRecord 是一个 Iterator Record。
-
如果 subscriber 的 subscription controller 的 signal 已经 aborted,则返回。
-
向 subscriber 的 subscription controller 的 signal 添加如下中止算法:
-
执行 AsyncIteratorClose(iteratorRecord, NormalCompletion(subscriber 的 subscription controller 的 abort reason))。
-
-
以 subscriber 和 iteratorRecord 为参数,执行 nextAlgorithm。
-
-
来自可迭代对象:令 iteratorMethod = ? GetMethod(value,
%Symbol.iterator%
)。 -
如果 iteratorMethod 为 undefined,则跳到标记为 来自 Promise 的步骤。
否则,返回一个 新的
Observable
, 其 subscribe 回调 是一个接收Subscriber
subscriber 的算法,具体如下:-
如果 subscriber 的 subscription controller 的 signal 已经 aborted,则返回。
-
令 iteratorRecordCompletion 为 GetIterator(value, sync)。
-
如果 iteratorRecordCompletion 是一个 throw completion,则以 iteratorRecordCompletion 的 [[Value]] 作为参数,调用 subscriber 的
error()
方法,并终止本步骤。 -
令 iteratorRecord 为 ! iteratorRecordCompletion。
-
如果 subscriber 的 subscription controller 的 signal 已经 aborted,则返回。
-
向 subscriber 的 subscription controller 的 signal 添加如下中止算法:
-
执行 IteratorClose(iteratorRecord, NormalCompletion(UNUSED))。
-
-
当 true 时,循环:
-
令 next 为 IteratorStepValue(iteratorRecord)。
-
如果 next 是一个 throw completion,则以 next 的 [[Value]] 作为参数,调用 subscriber 的
error()
方法,并 跳出 循环。 -
将 next 设为 ! next。
-
如果 next 为 done,则:
-
断言:iteratorRecord 的 [[Done]] 为 true。
-
调用 subscriber 的
complete()
。 -
返回。
-
-
以 next 作为参数,调用 subscriber 的
next()
。 -
如果 subscriber 的 subscription controller 的 signal 已经 aborted,则 跳出 循环。
-
-
-
来自 Promise:如果 IsPromise(value) 为 true,则:
-
返回一个 新的
Observable
, 其 subscribe 回调 是一个接收Subscriber
subscriber 的算法,具体如下:-
监听 value 的状态:
-
如果 value fulfilled,值为 v,则:
-
用 v 运行 subscriber 的
next()
方法。 -
运行 subscriber 的
complete()
方法。
-
-
如果 value 被拒绝,原因为 r,则用 r 运行 subscriber 的
error()
方法。
-
-
-
Observable
,给定一个
ObserverUnion
或 内部观察者
observer,以及一个 SubscribeOptions
options,请执行以下步骤:
注意: 我们将此算法从 Web IDL 的 subscribe()
方法中单独分离,以便规范文本可以直接 订阅 Observable
,而无需通过
Web IDL 绑定。相关背景可见 w3c/IntersectionObserver#464,其中“内部”规范文本必须避免通过 Web IDL 绑定对象(其属性可能会被 JavaScript 修改)。用法见 § 2.3.3 返回 Promise 的操作符。
-
令 internal observer 为一个新的内部观察者。
-
按如下方式处理 observer:
-
- 如果 observer 是一个
ObservableSubscriptionCallback
-
将 internal observer 的next 步骤设置为以下接收一个
any
类型 value 的步骤:-
调用 observer,参数为 «value» 和 "
report
"。
-
- 如果 observer 是一个
SubscriptionObserver
- 如果 observer 是一个内部观察者
- 将 internal observer 设置为 observer。
- 如果 observer 是一个
-
-
断言:internal observer 的error 步骤要么是默认错误算法,要么是一个调用所提供的
error
回调函数的算法。 -
如果this的弱订阅者不为 null 且this的弱订阅者的active为 true:
-
返回。
-
令 subscriber 为一个新的
Subscriber
。 -
如果this的subscribe 回调是一个
SubscribeCallback
,则调用它,参数为 «subscriber» 和 "rethrow
"。 -
否则,运行由this的subscribe 回调给定的步骤,并传入 subscriber。
2.3. 操作符
目前请参见 https://github.com/wicg/observable#operators。
2.3.1. from()
from(value)
方法步骤如下:
-
返回将 value 转换为
Observable
的结果。若有异常则重新抛出。
2.3.2.
Observable
返回型操作符
takeUntil(value)
方法步骤如下:
-
令 sourceObservable 为 this。
-
令 notifier 为 将 value 转换为
Observable
的结果。 -
令 observable 为一个 新的
Observable
,其 subscribe 回调 是一个接收Subscriber
subscriber 的算法,内容如下:注意:该方法会订阅两个Observable
: (1) notifier,(2) sourceObservable。我们会在如下情形同时“取消订阅”:-
notifier 开始发出值(“next” 或 “error”)。此时我们已获得所需信息,不再需要 notifier,同时也不再需要 sourceObservable,因为已经手动结束了 observable 的订阅。
-
如果 sourceObservable 自身
error()
或complete()
,此时我们只需取消对 notifier 的订阅。
-
令 notifierObserver 为一个新的 内部观察者,初始如下:
- next 步骤
-
运行 subscriber 的
complete()
方法。注意: 这样会“取消订阅” sourceObservable,因为 sourceObservable 是通过“外部”subscriber 的 订阅控制器的 signal 作为输入信号订阅的,一旦调用 subscriber 的
complete()
,该信号会被中止。 - error 步骤
-
运行 subscriber 的
complete()
方法。
注意: 未指定 complete 步骤,因为如果 notifier
Observable
自身完成,我们无需对 observable 的 subscriber 调 complete(),observable 会继续镜像 sourceObservable。 -
令 options 为一个新的
SubscribeOptions
,其signal
为 subscriber 的 订阅控制器 的 signal。 -
订阅 notifier,参数为 notifierObserver 和 options。
-
如果 subscriber 的 active 为 false,则返回。
注意: 如果 notifier 同步发出值,则 sourceObservable 的 subscribe 回调 甚至不会被调用;如果 notifier 仅同步 complete,则 subscriber 的 active 仍然为 true,会继续订阅 sourceObservable 并镜像其内容。
-
令 sourceObserver 为一个新的 内部观察者,初始如下:
- next 步骤
-
用传入的 value 运行 subscriber 的
next()
方法。 - error 步骤
-
用传入的 error 运行 subscriber 的
error()
方法。 - complete 步骤
-
运行 subscriber 的
complete()
方法。
注意: sourceObserver 基本是透传,将 sourceObservable 发出的内容全部镜像,唯一的例外是如果 sourceObservable 完成时 notifier 还未发出任何内容,会主动取消 notifier 的订阅。
-
订阅 sourceObservable,参数为 sourceObserver 和 options。
-
-
返回 observable。
map(mapper)
方法步骤如下:
-
令 sourceObservable 为 this。
-
令 observable 为一个 新的
Observable
,其 subscribe 回调 是一个接收Subscriber
subscriber 的算法,内容如下:-
令 idx 为
unsigned long long
,初始值为 0。 -
令 sourceObserver 为一个新的 内部观察者,初始如下:
- next 步骤
- error 步骤
-
用传入的 error 运行 subscriber 的
error()
方法。 - complete 步骤
-
运行 subscriber 的
complete()
方法。
-
令 options 为一个新的
SubscribeOptions
,其signal
为 subscriber 的 订阅控制器 的 signal。 -
订阅 sourceObservable,参数为 sourceObserver 和 options。
-
-
返回 observable。
filter(predicate)
方法步骤如下:
-
令 sourceObservable 为 this。
-
令 observable 为一个 新的
Observable
,其 subscribe 回调 是一个接收Subscriber
subscriber 的算法,内容如下:-
令 idx 为
unsigned long long
,初始值为 0。 -
令 sourceObserver 为一个新的 内部观察者,初始如下:
- next 步骤
- error 步骤
-
用传入的 error 运行 subscriber 的
error()
方法。 - complete 步骤
-
运行 subscriber 的
complete()
方法。
-
令 options 为一个新的
SubscribeOptions
,其signal
为 subscriber 的 订阅控制器 的 signal。 -
订阅 sourceObservable,参数为 sourceObserver 和 options。
-
-
返回 observable。
take(amount)
方法步骤如下:
-
令 sourceObservable 为 this。
-
令 observable 为一个 新的
Observable
,其 subscribe 回调 是一个接收Subscriber
subscriber 的算法,内容如下:-
令 remaining 为 amount。
-
如果 remaining 为 0,则运行 subscriber 的
complete()
方法,并中止这些步骤。 -
令 sourceObserver 为一个新的 内部观察者,初始如下:
- next 步骤
-
-
用传入的 value 运行 subscriber 的
next()
方法。 -
递减 remaining。
-
如果 remaining 为 0,则运行 subscriber 的
complete()
方法。
-
- error 步骤
-
用传入的 error 运行 subscriber 的
error()
方法。 - complete 步骤
-
运行 subscriber 的
complete()
方法。
-
令 options 为一个新的
SubscribeOptions
,其signal
为 subscriber 的 订阅控制器 的 signal。 -
订阅 sourceObservable,参数为 sourceObserver 和 options。
-
-
返回 observable。
drop(amount)
方法步骤如下:
-
令 sourceObservable 为 this。
-
令 observable 为一个 新的
Observable
,其 subscribe 回调 是一个接收Subscriber
subscriber 的算法,内容如下:-
令 remaining 为 amount。
-
令 sourceObserver 为一个新的 内部观察者,初始如下:
- next 步骤
- error 步骤
-
用传入的 error 运行 subscriber 的
error()
方法。 - complete 步骤
-
运行 subscriber 的
complete()
方法。
-
令 options 为一个新的
SubscribeOptions
,其signal
为 subscriber 的 订阅控制器 的 signal。 -
订阅 sourceObservable,参数为 sourceObserver 和 options。
-
-
返回 observable。
flatMap(mapper)
方法步骤如下:
-
令 sourceObservable 为 this。
-
令 observable 为一个 新的
Observable
,其 subscribe 回调 是一个接收Subscriber
subscriber 的算法,内容如下:-
令 idx 为
unsigned long long
,初始值为 0。 -
令 outerSubscriptionHasCompleted 为 布尔值,初始为 false。
-
令 queue 为一个新的 列表,元素为
any
,初始为空。注意: 该 queue 用于当 observable 正在订阅某个由 sourceObservable 产生但尚未耗尽的
Observable
时,存储由 sourceObservable 新发出的Observable
。 -
令 activeInnerSubscription 为 布尔值,初始为 false。
-
令 sourceObserver 为一个新的 内部观察者,初始如下:
- next 步骤
-
-
如果 activeInnerSubscription 为 true,则:
-
将 value 添加到 queue。
注意: 该 value 在当前订阅的
Observable
耗尽后会被处理。
-
-
否则:
-
设 activeInnerSubscription 为 true。
-
以 value、subscriber、mapper 及对 queue、activeInnerSubscription、outerSubscriptionHasCompleted 和 idx 的引用,运行 flatmap process next value steps。
注意:该 flatmap process next value steps 会订阅由 value 派生的
Observable
并处理其所有值,直到其订阅失效(错误或完成)。如该“内部”Observable
完成,则会递归处理 queue 中下一个值。如 queue 中无值,则流程终止并取消 activeInnerSubscription,以便后续 sourceObservable 新发出的值能被正确处理。
-
-
- error 步骤
-
用传入的 error 运行 subscriber 的
error()
方法。 - complete 步骤
-
-
设 outerSubscriptionHasCompleted 为 true。
注意: 若 activeInnerSubscription 为 true,则不会立即 complete subscriber,后续由 flatmap process next value steps 在“内部”订阅失效且 queue 为空时负责 complete。
-
若 activeInnerSubscription 为 false 且 queue 为空,则运行 subscriber 的
complete()
方法。
-
-
令 options 为一个新的
SubscribeOptions
,其signal
为 subscriber 的 订阅控制器 的 signal。 -
订阅 sourceObservable,参数为 sourceObserver 和 options。
-
-
返回 observable。
any
value,Subscriber
subscriber,Mapper
mapper,以及对
queue、activeInnerSubscription、outerSubscriptionHasCompleted 和
idx 的引用,步骤如下:
-
令 mappedResult 为调用 mapper 的结果,参数为 «value, idx» 和 "
rethrow
"。 -
将 idx 设置为 idx + 1。
-
令 innerObservable 为使用 mappedResult 调用
from()
的结果。如果抛出了异常 E,则运行 subscriber 的
error()
方法,传入 E,并中止这些步骤。我们不应该直接调用
from()
。 相反,我们应该调用某个内部算法,该算法将异常传回给我们在此处妥善处理,因为我们希望将它们通过管道传递给 subscriber。 -
令 innerObserver 为一个新的内部观察者,初始化如下:
- next 步骤
-
运行 subscriber 的
next()
方法,传入传递的 value。 - error 步骤
-
运行 subscriber 的
error()
方法,传入传递的 error。 - complete 步骤
-
-
如果 queue 不为空,则:
-
令 nextValue 为 queue 中的第一项;从 queue 中移除此项。
-
运行flatmap 处理下一个值的步骤,传入 nextValue、subscriber、 mapper 以及对 queue 和 activeInnerSubscription 的引用。
-
-
否则:
-
将 activeInnerSubscription 设置为 false。
注意:因为 activeInnerSubscription 是一个引用,这实际上确保了所有后续从“外部”
Observable
(称为 sourceObservable)发出的值。 -
如果 outerSubscriptionHasCompleted 为 true,则运行 subscriber 的
complete()
方法。注意:这意味着“外部”
Observable
已经完成,但尚未继续完成 subscriber,因为至少还有一个待处理的“内部”Observable
(即 innerObservable)已经被排队但尚未完成。直到现在!
-
-
-
令 innerOptions 为一个新的
SubscribeOptions
, 其signal
是 subscriber 的订阅控制器的signal。 -
订阅 innerObservable,传入 innerObserver 和 innerOptions。
switchMap(mapper)
方法步骤如下:
-
令 sourceObservable 为 this。
-
令 observable 为一个 新的
Observable
,其 subscribe 回调 是一个接收Subscriber
subscriber 的算法,内容如下:-
令 idx 为
unsigned long long
,初始值为 0。 -
令 outerSubscriptionHasCompleted 为 布尔值,初始为 false。
-
令 activeInnerAbortController 为
AbortController
或 null,初始为 null。注意: 该
AbortController
仅由本算法的 next steps 分配新值,仅由 switchmap process next value steps 置为 null(当“内部”Observable
完成或出错时)。此变量用作当前是否有活动“内部”订阅的标记。下方的 complete steps 会用到它——如果 sourceObservable 完成时有活动“内部”订阅,则不会立即 complete subscriber,而是等“内部”订阅完成后再 complete。 -
令 sourceObserver 为一个新的 内部观察者,初始如下:
-
- next 步骤
-
-
如果 activeInnerAbortController 非 null,则 signal abort activeInnerAbortController。
注意: 这样会“取消订阅”前一个由 sourceObservable 最近 push 的值派生的“内部”
Observable
,然后立即订阅由本次 push 的新 value 派生的Observable
。 -
令 activeInnerAbortController 为一个 新的
AbortController
。 -
用 value、subscriber、mapper 及 activeInnerAbortController、outerSubscriptionHasCompleted、idx 的引用,运行 switchmap process next value steps。
注意: switchmap process next value steps 会订阅由 value 派生的
Observable
,并持续处理其值,直到 (1) 其订阅失效,或 (2) activeInnerAbortController 被中止(即 sourceObservable 推送了更新的值)。
-
- error 步骤
-
用传入的 error 运行 subscriber 的
error()
方法。 -
- complete 步骤
-
-
设 outerSubscriptionHasCompleted 为 true。
注意: 如果 activeInnerAbortController 非 null,则不会立刻 complete subscriber,而是由 switchmap process next value steps 在内部订阅完成时负责 complete。
-
如果 activeInnerAbortController 为 null,则运行 subscriber 的
complete()
方法。
-
-
-
令 options 为一个新的
SubscribeOptions
,其signal
为 subscriber 的 订阅控制器 的 signal。 -
订阅 sourceObservable,参数为 sourceObserver 和 options。
-
-
返回 observable。
any
value,Subscriber
subscriber,Mapper
mapper,以及 activeInnerAbortController、outerSubscriptionHasCompleted 和
idx 的引用,执行以下步骤:
-
令 mappedResult 为调用 mapper 的结果,参数为 «value, idx» 和 "
rethrow
"。 -
将 idx 设置为 idx + 1。
-
令 innerObservable 为使用 mappedResult 调用
from()
的结果。 -
令 innerObserver 为一个新的内部观察者,初始化如下:
- next 步骤
-
运行 subscriber 的
next()
方法,传入传递的 value。 - error 步骤
-
运行 subscriber 的
error()
方法,传入传递的 error。注意:我们不必在此处将 activeInnerAbortController 设置为 null,以向上面的
switchMap()
方法步骤发出内部“订阅”已取消的信号。这是因为调用 subscriber 的error()
方法已经从“外部”源 Observable 取消订阅,因此它将无法再向switchMap()
内部观察者推送任何值。 - complete 步骤
-
-
如果 outerSubscriptionHasCompleted 为 true,则运行 subscriber 的
complete()
方法。 -
否则,将 activeInnerAbortController 设置为 null。
注意:因为此变量是一个引用,它向 switchMap complete 步骤发出信号,表明没有活动的内部订阅。
-
-
令 innerOptions 为一个新的
SubscribeOptions
, 其signal
是使用AbortSignal
和当前领域,从列表 «activeInnerAbortController 的signal, subscriber 的订阅控制器的signal» 创建依赖中止信号的结果。 -
订阅 innerObservable,传入 innerObserver 和 innerOptions。
inspect(inspectorUnion)
方法步骤如下:
-
令 subscribe callback 为
VoidFunction
或 null,初始为 null。 -
令 next callback 为
ObservableSubscriptionCallback
或 null,初始为 null。 -
令 error callback 为
ObservableSubscriptionCallback
或 null,初始为 null。 -
令 complete callback 为
VoidFunction
或 null,初始为 null。 -
令 abort callback 为
ObservableInspectorAbortHandler
或 null,初始为 null。 -
按如下方式处理 inspectorUnion:
- 如果 inspectorUnion 是
ObservableSubscriptionCallback
-
-
将 next callback 设为 inspectorUnion。
-
- 如果 inspectorUnion 是
ObservableInspector
- 如果 inspectorUnion 是
-
令 sourceObservable 为 this。
-
令 observable 为一个 新的
Observable
,其 subscribe 回调 是一个接收Subscriber
subscriber 的算法,内容如下:-
如果 subscribe callback 不为 null,则调用它,参数为 «» 和 "
rethrow
"。如果抛出了异常 E, 则运行 subscriber 的
error()
方法,传入 E,并中止这些步骤。注意:这样做的结果是 sourceObservable 永远不会被订阅。
-
如果 abort callback 不为 null,则将以下中止算法添加到 subscriber 的订阅控制器的signal 中:
-
令 sourceObserver 为一个新的内部观察者,初始化如下:
- next 步骤
-
-
如果 next callback 不为 null,则调用 next callback,参数为 «传入的 value» 和 "
rethrow
"。如果抛出了异常 E, 则:
-
从 subscriber 的订阅控制器的signal 中移除 abort callback。
注意:此步骤很重要,因为 abort callback 仅用于消费者发起的退订。当生产者通过 subscriber 的
error()
或complete()
方法终止订阅时(如下所示),我们必须确保不运行 abort callback。这与 Chromium 的实现相匹配,但可以考虑保留对最初传入的
SubscribeOptions
的signal
的引用,并在它中止时才调用 abort callback。结果可能相同,但需要调查。 -
运行 subscriber 的
error()
方法,传入 E,并中止这些步骤。
-
-
使用传入的 value 运行 subscriber 的
next()
方法。
-
- error 步骤
- complete 步骤
-
令 options 为一个新的
SubscribeOptions
, 其signal
是 subscriber 的订阅控制器的signal。 -
订阅 sourceObservable,传入 sourceObserver 和 options。
-
-
返回 observable。
catch(callback)
方法步骤如下:
-
令 sourceObservable 为 this。
-
令 observable 为一个 新的
Observable
,其 subscribe 回调 是一个接收Subscriber
subscriber 的算法,内容如下:-
令 sourceObserver 为一个新的 内部观察者,初始如下:
- next 步骤
-
用传入的 value 运行 subscriber 的
next()
方法。 - error 步骤
-
-
调用 callback,参数为 «传入的 error» 和 "
rethrow
"。 令 result 为返回值。 -
令 innerObservable 为使用 result 调用
from()
的结果。如果抛出了异常 E,则 运行 subscriber 的
error()
方法,传入 E,并中止这些步骤。我们不应该直接调用
from()
。 相反,我们应该调用某个内部算法,该算法将异常传回给我们在此处妥善处理,因为我们希望将它们通过管道传递给 subscriber。 -
令 innerObserver 为一个新的内部观察者,初始化如下:
- next 步骤
-
运行 subscriber 的
next()
方法,传入传递的 value。 - error 步骤
-
运行 subscriber 的
error()
方法,传入传递的 error。 - complete 步骤
-
运行 subscriber 的
complete()
方法。
-
令 innerOptions 为一个新的
SubscribeOptions
, 其signal
是 subscriber 的订阅控制器的signal。 -
订阅 innerObservable,传入 innerObserver 和 innerOptions。
注意:我们可以在此处自由地订阅 innerObservable,而无需先从 sourceObservable“退订”,也不用担心 sourceObservable 会继续发出值,因为所有这些都发生在与 sourceObservable 关联的错误步骤内部。这意味着 sourceObservable 已经完成了它的订阅,将不再产生任何值,我们可以安全地将我们的值源切换到 innerObservable。
-
- complete 步骤
-
运行 subscriber 的
complete()
方法。
-
令 options 为一个新的
SubscribeOptions
,其signal
为 subscriber 的 订阅控制器 的 signal。 -
订阅 sourceObservable,参数为 sourceObserver 和 options。
-
-
返回 observable。
finally(callback)
方法步骤如下:
-
令 sourceObservable 为 this。
-
令 observable 为一个 新的
Observable
,其 subscribe 回调 是一个接收Subscriber
subscriber 的算法,内容如下:-
用 callback 运行 subscriber 的
addTeardown()
方法。 -
令 sourceObserver 为一个新的 内部观察者,初始如下:
- next 步骤
-
用传入的 value 运行 subscriber 的
next()
方法。 - error 步骤
-
-
用传入的 error 运行 subscriber 的
error()
方法。
-
- complete 步骤
-
-
运行 subscriber 的
complete()
方法。
-
-
令 options 为一个新的
SubscribeOptions
,其signal
为 subscriber 的 订阅控制器 的 signal。 -
订阅 sourceObservable,参数为 sourceObserver 和 options。
-
-
返回 observable。
2.3.3.
Promise
返回型操作符
toArray(options)
方法步骤如下:
forEach(callback, options)
方法步骤如下:
-
令 p 为新 promise。
-
令 visitor callback controller 为新的
AbortController
。 -
令 internal options 为一个新的
SubscribeOptions
,其signal
是以下结果:从列表 «visitor callback controller 的 signal,options 的signal
(如非 null 则包含)» 创建一个依赖中止信号(create a dependent abort signal),使用AbortSignal
和 当前 realm。许多简单的 internal observers 仅作为传递节点,不控制它们所代表的
Observable
的订阅;也就是说,它们的 error steps 和 complete steps 会在订阅被终止时被调用,而它们的 next steps 仅仅是将某种形式的值沿链条向后传递。然而对于本操作符,下面的 observer 的 next steps 实际上负责在 callback 抛出异常时中止对 this 的底层订阅。此时,传递给“Subscribe to an
Observable
”的SubscribeOptions
的signal
需要是一个依赖于 options 的signal
和一个AbortSignal
(属于下面 next steps 可访问、可在必要时 signal abort 的AbortController
)的 dependent signal。 -
令 idx 为
unsigned long long
,初始为 0。 -
令 observer 为一个新的 内部观察者,初始如下:
-
返回 p。
every(predicate, options)
方法步骤如下:
-
令 p 为新 promise。
-
令 controller 为新的
AbortController
。 -
令 internal options 为一个新的
SubscribeOptions
, 其signal
是以下结果:从列表 «controller 的 signal,options 的signal
(如非 null 则包含)» 创建一个依赖中止信号(creating a dependent abort signal), 使用AbortSignal
, 以及 当前 realm。 -
令 idx 为
unsigned long long
,初始为 0。 -
令 observer 为一个新的 内部观察者,初始如下:
-
返回 p。
first(options)
方法步骤如下:
-
令 p 为新 promise。
-
令 controller 为新的
AbortController
。 -
令 internal options 为一个新的
SubscribeOptions
,其signal
是以下结果:从列表 «controller 的 signal,options 的signal
(如非 null 则包含)» 创建一个依赖中止信号(create a dependent abort signal),使用AbortSignal
和 当前 realm。 -
令 internal observer 为一个新的 内部观察者,初始如下:
- next 步骤
-
-
解决 p,值为传入的 value。
-
signal abort controller。
-
- error 步骤
-
拒绝 p,原因为传入的 error。
- complete 步骤
-
拒绝 p,原因为新的
RangeError
。注意: 仅当源
Observable
在未发出任何值时即完成才会执行到此步骤。
-
返回 p。
last(options)
方法步骤如下:
-
令 p 为新 promise。
-
如果 options 的
signal
不为 null:-
如果 options 的
signal
已经 aborted,则:-
拒绝 p,理由为 options 的
signal
的 abort reason。 -
返回 p。
-
-
-
拒绝 p,理由为 options 的
signal
的 abort reason。
-
-
-
令 lastValue 为
any
或 null,初始为 null。 -
令 hasLastValue 为 布尔值,初始为 false。
-
令 observer 为一个新的 内部观察者,初始如下:
- next 步骤
-
-
设 hasLastValue 为 true。
-
设 lastValue 为传入的 value。
-
- error 步骤
-
拒绝 p,原因为传入的 error。
- complete 步骤
-
-
如果 hasLastValue 为 true,解决 p,值为 lastValue。
-
否则,拒绝 p,原因为新的
RangeError
。注意: 参见
first()
的说明。
-
-
-
返回 p。
find(predicate, options)
方法步骤如下:
-
令 p 为新 promise。
-
令 controller 为新的
AbortController
。 -
令 internal options 为一个新的
SubscribeOptions
,其signal
是以下结果:从列表 «controller 的 signal,options 的signal
(如非 null 则包含)» 创建一个依赖中止信号(create a dependent abort signal),使用AbortSignal
和 当前 realm。 -
令 idx 为
unsigned long long
,初始为 0。 -
令 observer 为一个新的 内部观察者,初始如下:
-
返回 p。
some(predicate, options)
方法步骤如下:
-
令 p 为新 promise。
-
令 controller 为新的
AbortController
。 -
令 internal options 为一个新的
SubscribeOptions
,其signal
是以下结果:从列表 «controller 的 signal,options 的signal
(如非 null 则包含)» 创建一个依赖中止信号(create a dependent abort signal),使用AbortSignal
和 当前 realm。 -
如果 internal options 的
signal
已经 aborted,则:-
拒绝 p,理由为 internal options 的
signal
的 abort reason。 -
返回 p。
-
-
令 idx 为
unsigned long long
,初始为 0。 -
令 observer 为一个新的 内部观察者,初始如下:
-
返回 p。
reduce(reducer, initialValue, options)
方法步骤如下:
-
令 p 为新 promise。
-
令 controller 为新的
AbortController
。 -
令 internal options 为一个新的
SubscribeOptions
,其signal
是以下结果:从列表 «controller 的 signal,options 的signal
(如非 null 则包含)» 创建一个依赖中止信号(create a dependent abort signal),使用AbortSignal
和 当前 realm。 -
令 idx 为
unsigned long long
,初始为 0。 -
令 accumulator 为 initialValue(如果有的话),否则未初始化。
-
令 observer 为一个新的 内部观察者,初始如下:
- next 步骤
-
-
如果 accumulator 未初始化(意味着没有传入 initialValue),则将 accumulator 设置为传入的 value,将 idx 设置为 idx + 1,并中止这些步骤。
注意:这意味着 reducer 不会以此对象产生的第一个 value 作为
currentValue
来调用。 相反,当最终发出第二个值时,我们将以它作为currentValue
,并以第一个值(我们在此处保存的)作为accumulator
来调用 reducer。 -
调用 reducer,参数为 «accumulator 作为
accumulator
, 传入的 value 作为currentValue
, idx 作为index
» 和 "rethrow
"。令 result 为返回值。 -
将 idx 设置为 idx + 1。
-
将 accumulator 设置为 result。
-
- error 步骤
-
拒绝 p,原因为传入的 error。
- complete 步骤
-
返回 p。
3. EventTarget
集成
dictionary {
ObservableEventListenerOptions boolean =
capture false ;boolean ; };
passive partial interface EventTarget {Observable when (DOMString ,
type optional ObservableEventListenerOptions = {}); };
options
when(type, options)
方法步骤如下:
-
令 event target 为this。
-
令 observable 为一个 新的
Observable
,初始化如下:- subscribe 回调
-
一个接收
Subscriber
subscriber 的算法,步骤如下:-
如果 event target 为 null,则中止这些步骤。
注意: 这是为了捕捉 event target 在订阅时可能已被垃圾回收的情况。
-
添加事件监听器,参数为 event target 及如下定义的 事件监听器:
- type
-
type
- callback
-
新建一个 Web IDL
EventListener
实例,其函数类型为 (event:Event
),内容为调用 observable event listener invoke algorithm,参数为 subscriber 和 event。 - capture
-
options 的
capture
- passive
- once
-
false
- signal
-
-
返回 observable。
Subscriber
subscriber 和一个 Event
event,执行以下步骤:
-
用 event 运行 subscriber 的
next()
方法。
4. 安全与隐私注意事项
本内容正在从我们的说明文档上游合并至本规范,在此期间,你可以参考以下资源:
5. 致谢
特别感谢 Ben Lesh 对
Observable
API 的诸多设计建议,以及他多年维护用户空间 Observable 代码,为该功能贡献到 Web 平台做出的重要贡献。