可观察对象

社区组报告草案,

此版本:
https://wicg.github.io/observable/
编辑:
Dominic Farolino (Google)
参与:
GitHub WICG/observable (新建议题, 开放议题)
提交记录:
GitHub spec.bs 提交
测试套件:
https://wpt.fyi/results/dom/observable/

摘要

Observable API 提供了一种可组合、易用的方式来处理异步事件流。

本文档状态

本规范由 Web 平台孵化社区组 发布。 它不是 W3C 标准,也不在 W3C 标准进程中。 请注意,根据 W3C 社区贡献者许可协议 (CLA) 有有限的选择退出权,且适用其他条件。 了解更多关于 W3C 社区和业务组 的信息。

1. 介绍

本节为非规范性内容。

2. 核心架构

2.1. Subscriber 接口

[Exposed=*]
interface Subscriber {
  undefined next(any value);
  undefined error(any error);
  undefined complete();
  undefined addTeardown(VoidFunction teardown);

  // 创建后为 true,直到 complete()/error() 被调用,或订阅者取消订阅。在 complete()/error() 内该属性为 true。
  readonly attribute boolean active;

  readonly attribute AbortSignal signal;
};

每个 Subscriber 都有一个 有序集合内部观察者,初始为空。

每个 Subscriber 都有一个 清理回调,它是一个 列表,列表元素为 VoidFunction, 初始为空。

每个 Subscriber 都有一个 订阅控制器,它是一个 AbortController

每个 Subscriber 都有一个 active 布尔值,初始为 true。

注意:这是一个用于记录的变量,用于确保 Subscriber 在关闭后不再调用其拥有的任意回调。

active 的 getter 步骤为返回 thisactive 布尔值。

signal 的 getter 步骤为返回 this订阅控制器signal

next(value) 方法的步骤如下:
  1. 如果 thisactive 为 false,则返回。

  2. 如果 this相关全局对象Window 对象,且其 关联的 Document完全激活,则返回。

  3. internal observers copythis内部观察者 的副本。

    注意:我们对 内部观察者 列表进行复制并遍历, 这样如果其中某个 内部观察者next 步骤导致新的订阅, 则观察者列表在遍历过程中不会被改变。

  4. 对每个 observer 属于 internal observers copy

    1. value 运行 observernext 步骤

      断言:没有 异常抛出

      注意:这里不会抛出异常,因为当 内部观察者next 步骤是包装脚本提供的回调时, 处理观察者 步骤会在调用这些回调时使用逻辑包装,捕获所有异常并报告给全局。

      next 步骤是规范算法时, 这些步骤会确保不会向外抛出异常,以满足此断言。

error(error) 方法的步骤如下:
  1. 如果 thisactive 为 false,报告异常,携带 errorthis相关全局对象,然后返回。

  2. 如果 this相关全局对象Window 对象,且其 关联的 Document完全激活,则返回。

  3. 关闭 this

  4. internal observers copythis内部观察者 的副本。

  5. 对每个 observer 属于 internal observers copy

    1. error 运行 observererror 步骤

      断言:没有 异常抛出

      注意:详细原因可见 next() 文档。

complete() 方法的步骤如下:
  1. 如果 thisactive 为 false,则返回。

  2. 如果 this相关全局对象Window 对象,且其 关联的 Document完全激活,则返回。

  3. 关闭 this

  4. internal observers copythis内部观察者 的副本。

  5. 对每个 observer 属于 internal observers copy

    1. 运行 observercomplete 步骤

      断言:没有 异常抛出

      注意:详细原因可见 next() 文档。

addTeardown(teardown) 方法的步骤如下:
  1. 如果 this相关全局对象Window 对象,且其 关联的 Document完全激活,则返回。

  2. 如果 thisactive 为 true,则 追加 teardownthis清理回调 列表。

  3. 否则,调用 teardown,参数为 «» 和 "report"。

关闭订阅,给定 Subscriber subscriber,以及可选的 any reason,执行以下步骤:
  1. 如果 subscriberactive 为 false,则返回。

    此操作用于防止可重入调用,可能出现在“生产者主动取消订阅”场景。如下例:

    const outerController = new AbortController();
    const observable = new Observable(subscriber => {
      subscriber.addTeardown(() => {
        // 2.) 此清理操作在“关闭”算法执行期间运行。中止下游 signal 会运行其 abort 算法。其中之一就是当前正在运行的“关闭”算法。
        outerController.abort();
      });
    
      // 1.) 这会立即调用“关闭”算法,将 subscriber.active 设为 false。
      subscriber.complete();
    });
    
    observable.subscribe({}, {signal: outerController.signal});
    
  2. subscriberactive 设为 false。

  3. 中止信号 subscriber订阅控制器,如有 reason 则携带。

  4. 对每个 teardown 属于 subscriber清理回调,按逆序插入遍历:

    1. 如果 subscriber相关全局对象Window 对象,且其 关联的 Document完全激活,则中止这些步骤。

      注意:此步骤会反复运行,因为每个 teardown 都可能导致上述 Document 变为非激活状态。

    2. 调用 teardown,参数为 «» 和 "report"。

2.2. Observable 接口

// SubscribeCallback 是 Observable "创建者" 的代码所在。它在调用 subscribe() 时执行,用于建立新订阅。
callback SubscribeCallback = undefined (Subscriber subscriber);
callback ObservableSubscriptionCallback = undefined (any value);

dictionary SubscriptionObserver {
  ObservableSubscriptionCallback next;
  ObservableSubscriptionCallback error;
  VoidFunction complete;
};

callback ObservableInspectorAbortHandler = undefined (any value);

dictionary ObservableInspector {
  ObservableSubscriptionCallback next;
  ObservableSubscriptionCallback error;
  VoidFunction complete;

  VoidFunction subscribe;
  ObservableInspectorAbortHandler abort;
};

typedef (ObservableSubscriptionCallback or SubscriptionObserver) ObserverUnion;
typedef (ObservableSubscriptionCallback or ObservableInspector) ObservableInspectorUnion;

dictionary SubscribeOptions {
  AbortSignal signal;
};

callback Predicate = boolean (any value, unsigned long long index);
callback Reducer = any (any accumulator, any currentValue, unsigned long long index);
callback Mapper = any (any value, unsigned long long index);
// 与 Mapper 的区别仅在于返回类型,此回调仅用于访问序列中的每个元素,不进行变换。
callback Visitor = undefined (any value, unsigned long long index);

// 此回调返回一个 `any`,必须通过 `Observable` 的转换语义转为 `Observable`。
callback CatchCallback = any (any value);

[Exposed=*]
interface Observable {
  constructor(SubscribeCallback callback);
  undefined subscribe(optional ObserverUnion observer = {}, optional SubscribeOptions options = {});

  // 如果 value 是以下任意类型,则构造一个原生 Observable:
  //   - Observable
  //   - AsyncIterable
  //   - Iterable
  //   - Promise
  static Observable from(any value);

  // 返回 Observable 的操作符。详见规范的 Operators 部分。
  //
  // takeUntil() 可消费 promise、iterable、async iterable 以及其他 observable。
  Observable takeUntil(any value);
  Observable map(Mapper mapper);
  Observable filter(Predicate predicate);
  Observable take(unsigned long long amount);
  Observable drop(unsigned long long amount);
  Observable flatMap(Mapper mapper);
  Observable switchMap(Mapper mapper);
  Observable inspect(optional ObservableInspectorUnion inspectorUnion = {});
  Observable catch(CatchCallback callback);
  Observable finally(VoidFunction callback);

  // 返回 Promise 的操作符。
  Promise<sequence<any>> toArray(optional SubscribeOptions options = {});
  Promise<undefined> forEach(Visitor callback, optional SubscribeOptions options = {});
  Promise<boolean> every(Predicate predicate, optional SubscribeOptions options = {});
  Promise<any> first(optional SubscribeOptions options = {});
  Promise<any> last(optional SubscribeOptions options = {});
  Promise<any> find(Predicate predicate, optional SubscribeOptions options = {});
  Promise<boolean> some(Predicate predicate, optional SubscribeOptions options = {});
  Promise<any> reduce(Reducer reducer, optional any initialValue, optional SubscribeOptions options = {});
};

每个 Observable 都有一个 subscribe callback,它是一个 SubscribeCallback 或一组接收 Subscriber 的步骤。

每个 Observable 都有一个 weak subscriber,它是指向 Subscriber 的弱引用或 null,初始为 null。

注意:这些类型的“联合”用于支持两种 Observable 的构造方式:由 JavaScript 创建的 (总是用 SubscribeCallback 构造), 以及原生构造的 Observable 对象(其 subscribe callback 可以是任意原生步骤而不是 JS 回调)。 when() 的返回值就是后者的一个例子。

new Observable(callback) 构造函数步骤如下:
  1. thissubscribe callback 设为 callback

    注意:此回调将在后续调用 subscribe() 时被调用。

subscribe(observer, options) 方法步骤如下:
  1. 订阅 this,给定 observeroptions

2.2.1. 支持性概念

默认错误算法是一个算法,接收一个 any error,执行以下步骤:
  1. 报告异常,携带 error当前 realm全局对象

注意:单独抽出此默认算法,便于规范中所有原生订阅 Observable(即规范语句订阅,不经过 subscribe() 方法)时不必重复定义这些步骤。

内部观察者 是具有如下 结构的对象:

next 步骤

接收一个 any 参数的算法。初始时,这些步骤不执行任何操作。

error 步骤

接收一个 any 参数的算法。初始为 默认错误算法

complete 步骤

无参数的算法。初始时,这些步骤不执行任何操作。

内部观察者结构用于映射 nexterrorcomplete 回调函数。对于通过 subscribe() 方法 由 JavaScript 订阅的 Observable, 这些算法步骤其实就是对脚本提供的 回调函数做包装, 分别对应 nexterrorcomplete 回调函数

但当规范内部(非脚本)订阅 Observable 时, 这些“步骤”是任意规范算法,不是包含 Web IDL 回调函数ObserverUnion。如 § 2.3.3 返回 Promise 的操作符 就用到了这种方式。

转换为 Observable,给定 any value,执行以下步骤:

注意:将此算法从 Web IDL 的 from() 方法中拆分出来, 以便规范正文可以直接 转换值,不经过 Web IDL 绑定。

  1. 如果 Type(value) 不是 Object抛出 TypeError

    注意:此步骤防止原始类型被强制转换为 iterable(如 String)。讨论见 WICG/observable#125

  2. Observable 转换:如果 value具体类型Observable,则直接返回 value

  3. 异步 iterable 转换:令 asyncIteratorMethod? GetMethod(value, %Symbol.asyncIterator%)。

    注意:我们使用 GetMethod 而不是 GetIterator,因为我们仅检测异步迭代协议支持,不希望因未实现而抛错。 GetIterator 在以下两种情况都会抛错:(a) 没有实现迭代协议,(b) 实现了但不可调用或 getter 抛错。 GetMethod 只在后者情况下抛错。

  4. 如果 asyncIteratorMethod 是 undefined 或 null,则跳到 Iterable 转换 步骤。

  5. nextAlgorithm 为如下步骤,给定 Subscriber subscriberIterator Record iteratorRecord

    1. 如果 subscriber订阅控制器signal中止,则返回。

    2. nextPromisePromise 或 undefined,初始为 undefined。

    3. nextCompletionIteratorNext(iteratorRecord)。

      注意:此处用 IteratorNext,而不是 IteratorStepValue, 因为 IteratorStepValue 要求迭代器的 next() 方法返回可立即检查的对象, 而异步迭代时 next() 返回 Promise/thenable(需包装为 Promise 再获取值)。

    4. 如果 nextCompletionthrow completion,则:

      1. 断言iteratorRecord 的 [[Done]] 为 true。

      2. nextPromise被拒绝的 promise, 值为 nextRecord 的 [[Value]]。

    5. 否则,如果 nextRecordnormal completion, 令 nextPromise已解决的 promise, 值为 nextRecord 的 [[Value]]。

      注意:这样做是确保 nextRecord 的 [[Value]] 即使不是 Promise,也会被包装为 Promise。

    6. 响应 nextPromise

      • 如果 nextPromise 被 fulfilled,值为 iteratorResult,则:

        1. 如果 Type(iteratorResult) 不是 Object,则以 error()TypeError 调用 subscriber,并终止步骤。

        2. doneIteratorComplete(iteratorResult)。

        3. 如果 donethrow completion, 则以 done 的 [[Value]] 调用 subscribererror(),并终止步骤。

        4. 如果 done 的 [[Value]] 为 true,则以 subscribercomplete() 调用,并终止步骤。

        5. valueIteratorValue(iteratorResult)。

        6. 如果 valuethrow completion, 则以 value 的 [[Value]] 调用 subscribererror(),并终止步骤。

        7. value 的 [[Value]] 调用 subscribernext()

        8. 再次以 subscriberiteratorRecord 运行 nextAlgorithm

      • 如果 nextPromise 被 rejected,原因为 r,则以 r 调用 subscribererror()

  6. 返回一个 新的 Observable, 其 subscribe callback 是一个接收 Subscriber subscriber 并执行如下操作的算法:

    1. 如果 subscriber订阅控制器signal中止,则返回。

    2. iteratorRecordCompletionGetIterator(value, async)。

      注意:此步骤会重新调用 %Symbol.asyncIterator% 的 getter, 并触发协议本身以获取 Iterator Record。极端情况见 issue#127

    3. 如果 iteratorRecordCompletionthrow completion, 则以 iteratorRecordCompletion 的 [[Value]] 同步调用 subscribererror(),并终止步骤。

      注意:此处同步传播错误,仅在异步 iterable 转 Observable 时可能发生; 其它情况错误会异步经 microtask 传播,因为被包装为 rejected Promise。 此同步传播行为与语言结构一致,如 for-await of 循环会同步抛出异常到外部 catch 块,且未发生 Await 前即抛出。

    4. iteratorRecord! iteratorRecordCompletion

    5. 断言iteratorRecordIterator Record

    6. 如果 subscriber订阅控制器signal中止,则返回。

    7. 添加如下中止算法subscriber订阅控制器signal

      1. 运行 AsyncIteratorClose(iteratorRecord, NormalCompletion(subscriber订阅控制器abort reason))。

    8. subscriberiteratorRecord 运行 nextAlgorithm

  7. Iterable 转换:令 iteratorMethod? GetMethod(value, %Symbol.iterator%)。

  8. 如果 iteratorMethod 为 undefined,则跳到 Promise 转换 步骤。

    否则,返回一个 新的 Observable, 其 subscribe callback 是一个接收 Subscriber subscriber 并执行如下操作的算法:

    1. 如果 subscriber订阅控制器signal中止,则返回。

    2. iteratorRecordCompletionGetIterator(value, sync)。

    3. 如果 iteratorRecordCompletionthrow completion, 则以 iteratorRecordCompletion 的 [[Value]] 调用 subscribererror(),并终止步骤。

    4. iteratorRecord! iteratorRecordCompletion

    5. 如果 subscriber订阅控制器signal中止,则返回。

    6. 添加如下中止算法subscriber订阅控制器signal

      1. 运行 IteratorClose(iteratorRecord, NormalCompletion(UNUSED))。

    7. true 时:

      1. nextIteratorStepValue(iteratorRecord)。

      2. 如果 nextthrow completion, 则以 next 的 [[Value]] 调用 subscribererror(),并 跳出

      3. next! next

      4. 如果 next 已结束,则:

        1. 断言iteratorRecord 的 [[Done]] 为 true。

        2. 调用 subscribercomplete()

        3. 返回。

      5. next 调用 subscribernext()

      6. 如果 subscriber订阅控制器signal中止,则 跳出

  9. Promise 转换:如果 IsPromise(value) 为 true, 则:

    1. 返回一个 新的 Observable, 其 subscribe callback 是一个接收 Subscriber subscriber 并执行如下操作的算法:

      1. value 做出响应:

        1. 如果 value 被 fulfilled,值为 v,则:

          1. v 运行 subscribernext() 方法。

          2. 运行 subscribercomplete() 方法。

        2. 如果 value 被 rejected,原因为 r,则以 r 运行 subscribererror() 方法。

  10. 抛出一个 TypeError

测试
订阅一个 Observable ,给定一个 ObserverUnion内部观察者 observer,以及 SubscribeOptions options,执行以下步骤:

注意:我们将此算法与 Web IDL 的 subscribe() 方法分离开,使规范正文可以 订阅一个 Observable 而无需经过 Web IDL 绑定。参见 w3c/IntersectionObserver#464 相关背景,其中“内部”正文 必须 不经过 Web IDL 绑定处理属性可能被 JavaScript 修改的对象。用法参见 § 2.3.3 返回 Promise 的操作符

  1. 如果 this相关全局对象Window 对象,且其 关联的 Document完全激活,则返回。

  2. internal observer 为新的 内部观察者

  3. 按如下处理 observer

    1. 如果 observerObservableSubscriptionCallback
      设置 internal observernext 步骤如下,接收 any value
      1. 调用 observer,参数为 «value» 和 "report"。

      如果 observerSubscriptionObserver
      1. 如果 observernext 存在,则设置 internal observernext 步骤如下,接收 any value

        1. 调用 observernext ,参数为 «value» 和 "report"。

      2. 如果 observererror 存在,则设置 internal observererror 步骤如下,接收 any error

        1. 调用 observererror ,参数为 «error» 和 "report"。

      3. 如果 observercomplete 存在,则设置 internal observercomplete 步骤如下:

        1. 调用 observercomplete ,参数为 «» 和 "report"。

      如果 observer内部观察者
      internal observer 设为 observer
  4. 断言internal observererror 步骤要么是 默认错误算法,要么是 调用提供的 error 回调函数的算法。

  5. 如果 thisweak subscriber 非 null 且 thisweak subscriberactive 为 true:

    1. subscriberthisweak subscriber

    2. 追加 internal observersubscriber内部观察者

    3. 如果 optionssignal 存在,则:

      1. 如果 optionssignal中止,则 移除 internal observersubscriber内部观察者

      2. 否则,添加以下中止算法optionssignal

        1. 如果 subscriberactive 为 false,则中止这些步骤。

        2. 移除 internal observersubscriber内部观察者

        3. 如果 subscriber内部观察者 为空,则以 optionssignal中止原因 关闭 subscriber

    4. 返回。

  6. subscriber新建Subscriber

  7. 追加 internal observersubscriber内部观察者

  8. 设置 thisweak subscribersubscriber

  9. 如果 optionssignal 存在,则:

    1. 如果 optionssignal中止,则以 optionssignal中止原因 关闭 subscriber

    2. 否则,添加以下中止算法optionssignal

      1. 如果 subscriberactive 为 false,则中止这些步骤。

      2. 移除 internal observersubscriber内部观察者

      3. 如果 subscriber内部观察者 为空,则以 optionssignal中止原因 关闭 subscriber

  10. 如果 thissubscribe callbackSubscribeCallback, 则以「subscriber」和 "rethrow" 调用它。

    如果 抛出了异常 E,则以 E 调用 subscribererror() 方法。

  11. 否则,以 subscriber 运行 thissubscribe callback 给出的步骤。

测试

2.3. 操作符

目前请参见 https://github.com/wicg/observable#operators

2.3.1. from()

from(value) 方法步骤如下:
  1. 返回 转换 valueObservable 的结果。 如有异常则抛出。

2.3.2. Observable 返回型操作符

takeUntil(value) 方法步骤如下:
  1. sourceObservablethis

  2. notifier 转换 valueObservable 的结果。

  3. observable新建Observable, 其 subscribe callback 是一个接受 Subscriber subscriber 的算法,具体如下:

    注意:此方法涉及对两个 Observable 进行 订阅: (1) notifier,(2) sourceObservable。在以下情况我们会从两者取消订阅:
    1. notifier 开始发出值(无论是 "next" 还是 "error")。此时, 我们会从 notifier 取消订阅,因为我们已获得所需,不再需要其继续产生值。同时也会从 sourceObservable 取消订阅, 因为它不再需要继续产生用于本方法返回的 observable 的值,因为我们已经手动结束了 observable 的订阅, 因为 notifier 最终产生了值。

    2. sourceObservable 调用 error()complete()。此时,从 notifier 取消订阅, 因为我们不再需要监听其产生的值以决定 observable 何时停止镜像 sourceObservable 的值 (因为 sourceObservable 已自行完成)。无需从 sourceObservable 取消订阅, 因为其订阅已自动结束。

    1. notifierObserver 为新的 内部观察者,初始化如下:

      next 步骤

      运行 subscribercomplete() 方法。

      注意:这会从 sourceObservable 取消订阅, 如果此时已订阅。原因是 sourceObservable 使用 "外部" subscribersubscription controllersignal 作为输入信号, 当上述(或下述)调用 complete() 时, 该信号会被 中止

      error 步骤

      运行 subscribercomplete() 方法。

      注意:没有指定 complete 步骤, 因为如果 notifier Observable 自行完成, 我们无需对与该方法返回的 observable 相关的 subscriber 调用 complete。 此时 observable 会继续无中断地镜像 sourceObservable

    2. options 为新建的 SubscribeOptions, 其 signalsubscribersubscription controllersignal

    3. 订阅 notifier, 传入 notifierObserveroptions

    4. 如果 subscriberactive 为 false,则返回。

      注意:这意味着如果 notifier 同步发出值, sourceObservablesubscribe callback 不会被调用。 但如果 notifier 仅同步 "complete"(没有发出 "next" 或 "error"),则 subscriberactive 仍为 true,我们会继续订阅 sourceObservableobservable 会无中断地镜像它。

    5. sourceObserver 为新的 内部观察者,初始化如下:

      next 步骤

      以传入的 value 调用 subscribernext() 方法。

      error 步骤

      以传入的 error 调用 subscribererror() 方法。

      complete 步骤

      运行 subscribercomplete() 方法。

      注意: sourceObserver 主要是透传, 镜像 sourceObservable 发出的所有内容,唯一例外是当 sourceObservable 被耗尽, 在 notifier 尚未产生任何内容时,可以取消订阅 notifier Observable

    6. 订阅 sourceObservable, 传入 sourceObserveroptions

  4. 返回 observable

测试
map(mapper) 方法步骤如下:
  1. sourceObservablethis

  2. observable新建Observable, 其 subscribe callback 是一个接受 Subscriber subscriber 的算法,具体如下:

    1. idxunsigned long long,初始为 0。

    2. sourceObserver 为新的 内部观察者,初始化如下:

      next 步骤
      1. 调用 mapper,参数为「传入的 valueidx」和 "rethrow";令 mappedValue 为返回值。

        如果 抛出异常 E, 则以 E 调用 subscribererror() 方法,并终止这些步骤。

      2. 递增 idx

      3. mappedValue 调用 subscribernext() 方法。

      error 步骤

      以传入的 error 调用 subscribererror() 方法。

      complete 步骤

      调用 subscribercomplete() 方法。

    3. options 为新建的 SubscribeOptions, 其 signalsubscribersubscription controllersignal

    4. 订阅 sourceObservable, 传入 sourceObserveroptions

  3. 返回 observable

测试
filter(predicate) 方法步骤如下:
  1. sourceObservablethis

  2. observable新建Observable, 其 subscribe callback 是一个接受 Subscriber subscriber 的算法,具体如下:

    1. idxunsigned long long,初始为 0。

    2. sourceObserver 为新的 内部观察者,初始化如下:

      next 步骤
      1. 调用 predicate,参数为「传入的 valueidx」和 "rethrow";令 matches 为返回值。

        如果 抛出异常 E, 则以 E 调用 subscribererror() 方法,并终止这些步骤。

      2. idx 设为 idx + 1。

      3. 如果 matches 为 true,则以 value 调用 subscribernext() 方法。

      error 步骤

      以传入的 error 调用 subscribererror() 方法。

      complete 步骤

      调用 subscribercomplete() 方法。

    3. options 为新建的 SubscribeOptions, 其 signalsubscribersubscription controllersignal

    4. 订阅 sourceObservable, 传入 sourceObserveroptions

  3. 返回 observable

测试
take(amount) 方法步骤如下:
  1. sourceObservablethis

  2. observable新建Observable, 其 subscribe callback 是一个接受 Subscriber subscriber 的算法,具体如下:

    1. remainingamount

    2. 如果 remaining 为 0,则运行 subscribercomplete() 方法并终止这些步骤。

    3. sourceObserver 为新的 内部观察者,初始化如下:

      next 步骤
      1. 以传入的 value 调用 subscribernext() 方法。

      2. 递减 remaining

      3. 如果 remaining 为 0,则运行 subscribercomplete() 方法。

      error 步骤

      以传入的 error 调用 subscribererror() 方法。

      complete 步骤

      调用 subscribercomplete() 方法。

    4. options 为新建的 SubscribeOptions, 其 signalsubscribersubscription controllersignal

    5. 订阅 sourceObservable, 传入 sourceObserveroptions

  3. 返回 observable

测试
drop(amount) 方法步骤如下:
  1. sourceObservablethis

  2. observable新建Observable, 其 subscribe callback 是一个接受 Subscriber subscriber 的算法,具体如下:

    1. remainingamount

    2. sourceObserver 为新的 内部观察者,初始化如下:

      next 步骤
      1. 如果 remaining > 0,则递减 remaining 并终止这些步骤。

      2. 断言remaining 为 0。

      3. 以传入的 value 调用 subscribernext() 方法。

      error 步骤

      以传入的 error 调用 subscribererror() 方法。

      complete 步骤

      调用 subscribercomplete() 方法。

    3. options 为新建的 SubscribeOptions, 其 signalsubscribersubscription controllersignal

    4. 订阅 sourceObservable, 传入 sourceObserveroptions

  3. 返回 observable

测试
flatMap(mapper) 方法步骤如下:
  1. sourceObservablethis

  2. observable新建Observable, 其 subscribe callback 是一个接受 Subscriber subscriber 的算法,具体如下:

    1. idxunsigned long long,初始为 0。

    2. outerSubscriptionHasCompletedboolean,初始为 false。

    3. queue 为新的 列表,元素类型为 any,初始为空。

      注意:queue 用于存储 sourceObservable 发出的所有 Observable, 当 observable 当前正在订阅某个较早由 sourceObservable 发出的且尚未耗尽的 Observable 时。

    4. activeInnerSubscriptionboolean,初始为 false。

    5. sourceObserver 为新的 内部观察者,初始化如下:

      next 步骤
      1. 如果 activeInnerSubscription 为 true:

        1. 追加 valuequeue

          注意:value 会在当前 Observable 订阅结束后处理。

      2. 否则:

        1. activeInnerSubscription 设为 true。

        2. valuesubscribermapper, 以及对以下变量的引用:queueactiveInnerSubscriptionouterSubscriptionHasCompletedidx 运行 flatmap 处理下一个值步骤

          注意:该 flatmap 处理下一个值步骤 会订阅由 value 派生的 Observable(如果能派生)并持续处理其值,直到订阅失效(错误或完成)。如果该“内部” Observable 完成,则处理步骤会递归调用自身,处理 queue 下一个值。

          如果不存在下一个值,则处理步骤终止,取消设置 activeInnerSubscription,以便后续 sourceObservable 产生的值能正确处理。

      error 步骤

      以传入的 error 调用 subscribererror() 方法。

      complete 步骤
      1. outerSubscriptionHasCompleted 设为 true。

        注意:如果 activeInnerSubscription 为 true,下面的步骤不会完成 subscriber。此时,flatmap 处理下一个值步骤 会在 queue 为空且“内部”订阅失效后负责完成 subscriber

      2. 如果 activeInnerSubscription 为 false 且 queue 为空,则运行 subscribercomplete() 方法。

    6. options 为新建的 SubscribeOptions, 其 signalsubscribersubscription controllersignal

    7. 订阅 sourceObservable, 传入 sourceObserveroptions

  3. 返回 observable

flatmap 处理下一个值步骤,给定 any valueSubscriber subscriberMapper mapper,以及对以下所有变量的引用:列表类型的 anyqueueboolean activeInnerSubscriptionboolean outerSubscriptionHasCompleted,以及 unsigned long long idx
  1. mappedResult调用 mapper,参数为 «value, idx» 和 "rethrow" 的结果。

    如果 抛出异常 E,则以 E 运行 subscribererror() 方法,并终止这些步骤。

  2. idx 设为 idx + 1。

  3. innerObservable 为调用 from(),参数为 mappedResult 的结果。

    如果 抛出异常 E,则以 E 运行 subscribererror() 方法,并终止这些步骤。

    不应该直接调用 from(),应调用某个内部算法以便异常能正确传递到 subscriber

  4. innerObserver 为新建的 内部观察者,初始化如下:

    next 步骤

    以传入的 value 调用 subscribernext() 方法。

    error 步骤

    以传入的 error 调用 subscribererror() 方法。

    complete 步骤
    1. 如果 queue 非空:

      1. nextValuequeue 的第一个元素;移除该元素。

      2. nextValuesubscribermapper、以及对 queueactiveInnerSubscription 的引用, 运行 flatmap 处理下一个值步骤

    2. 否则:

      1. activeInnerSubscription 设为 false。

        注意: 由于 activeInnerSubscription 是引用类型,这能确保后续“外部” Observable (即 sourceObservable)产生的所有值都能被正确处理。

      2. 如果 outerSubscriptionHasCompleted 为 true,则运行 subscribercomplete() 方法。

        注意: 这表示“外部” Observable 已经完成,但因为还有至少一个待处理的“内部” Observable(即 innerObservable)尚未完成,所以直到现在才真正完成 subscriber

  5. innerOptions 为新建的 SubscribeOptions, 其 signalsubscribersubscription controllersignal

  6. 订阅 innerObservable, 传入 innerObserverinnerOptions

switchMap(mapper) 方法步骤如下:
  1. sourceObservablethis

  2. observable新建Observable, 其 subscribe callback 是一个接受 Subscriber subscriber 的算法,具体如下:

    1. idxunsigned long long,初始为 0。

    2. outerSubscriptionHasCompletedboolean,初始为 false。

    3. activeInnerAbortControllerAbortController 或 null,初始为 null。

      注意: AbortController 只会在本算法的 next 步骤(见下方)被赋值为新实例,并只会在 switchmap 处理下一个值步骤 被赋值为 null(当“内部” Observable 完成或错误时)。 此变量用作当前是否有活跃“内部”订阅的标志。下方 complete 步骤 判断此变量, 如果 sourceObservable 完成但有活跃“内部”订阅,则不会立即完成 subscriber,而是等待“内部”订阅完成后再完成。

    4. sourceObserver 为新建的 内部观察者,初始化如下:

      next 步骤
      1. 如果 activeInnerAbortController 不为 null,则 中止信号 activeInnerAbortController

        注意: 这会“取消订阅”上一次由 sourceObservable 推送的值派生出的“内部” Observable, 然后我们会立即订阅即将由当前 value 推送产生的“新的” Observable

      2. activeInnerAbortController 设为新建 AbortController

      3. valuesubscribermapper, 以及对 activeInnerAbortControllerouterSubscriptionHasCompletedidx 的引用, 运行 switchmap 处理下一个值步骤

        注意: switchmap 处理下一个值步骤 会订阅由 value 派生的 Observable, 持续处理其值,直到 (1) 订阅失效(错误或完成);或 (2) activeInnerAbortController中止(因为 sourceObservable 推送了更新的值,替换了当前“内部”订阅)。

      error 步骤

      以传入的 error 调用 subscribererror() 方法。

      complete 步骤
      1. outerSubscriptionHasCompleted 设为 true。

        注意: 如果 activeInnerAbortController 不为 null,则不会立即完成 subscriber。 此时 switchmap 处理下一个值步骤 会在“内部”订阅完成时完成 subscriber

      2. 如果 activeInnerAbortController 为 null,则运行 subscribercomplete() 方法。

    5. options 为新建的 SubscribeOptions, 其 signalsubscribersubscription controllersignal

    6. 订阅 sourceObservable, 传入 sourceObserveroptions

  3. 返回 observable

switchmap 处理下一个值步骤,给定 any valueSubscriber subscriberMapper mapper,以及对以下所有变量的引用:AbortController activeInnerAbortControllerboolean outerSubscriptionHasCompleted, 以及 unsigned long long idx
  1. mappedResult调用 mapper,参数为 «value, idx» 和 "rethrow" 的结果。

    如果 抛出异常 E,则以 E 运行 subscribererror() 方法,并终止这些步骤。

  2. idx 设为 idx + 1。

  3. innerObservable 为调用 from(),参数为 mappedResult 的结果。

    如果 抛出异常 E,则以 E 运行 subscribererror() 方法,并终止这些步骤。

  4. innerObserver 为新建的 内部观察者,初始化如下:

    next 步骤

    以传入的 value 调用 subscribernext() 方法。

    error 步骤

    以传入的 error 调用 subscribererror() 方法。

    注意: 此处无需将 activeInnerAbortController 设为 null, 因为调用 subscribererror() 方法会自动从“外部”源 Observable 取消订阅, 不会再收到更多值。

    complete 步骤
    1. 如果 outerSubscriptionHasCompleted 为 true,则运行 subscribercomplete() 方法。

    2. 否则,将 activeInnerAbortController 设为 null。

      注意: 由于该变量是引用类型,能向 switchMap 完成步骤传递没有活跃内部订阅的信号。

  5. innerOptions 为新建的 SubscribeOptions, 其 signal创建依赖中止信号的结果,信号列表为「activeInnerAbortControllersignalsubscribersubscription controllersignal」,使用 AbortSignal, 以及 当前 realm

  6. 订阅 innerObservable, 传入 innerObserverinnerOptions

inspect(inspectorUnion) 方法步骤如下:
  1. subscribe callbackVoidFunction 或 null,初始为 null。

  2. next callbackObservableSubscriptionCallback 或 null,初始为 null。

  3. error callbackObservableSubscriptionCallback 或 null,初始为 null。

  4. complete callbackVoidFunction 或 null,初始为 null。

  5. abort callbackObservableInspectorAbortHandler 或 null,初始为 null。

  6. 处理 inspectorUnion,如下:

    如果 inspectorUnionObservableSubscriptionCallback
    1. next callback 设为 inspectorUnion

    如果 inspectorUnionObservableInspector
    1. 如果 subscribe 存在inspectorUnion,则将 subscribe callback 设为它。

    2. 如果 next 存在inspectorUnion,则将 next callback 设为它。

    3. 如果 error 存在inspectorUnion,则将 error callback 设为它。

    4. 如果 complete 存在inspectorUnion,则将 complete callback 设为它。

    5. 如果 abort 存在inspectorUnion,则将 abort callback 设为它。

  7. sourceObservablethis

  8. observable新建Observable, 其 subscribe callback 是一个接受 Subscriber subscriber 的算法,具体如下:

    1. 如果 subscribe callback 非 null,则 调用它,参数为 «» 和 "rethrow"。

      如果 抛出异常 E,则以 E 运行 subscribererror() 方法,并终止这些步骤。

      注意: 这样会导致 sourceObservable 永不被订阅。

    2. 如果 abort callback 不为 null,则 向下添加的中止算法subscribersubscription controllersignal

      1. 调用 abort callback,参数为 «subscribersubscription controllersignalabort reason» 和 "report"。

    3. sourceObserver 为新建的 内部观察者,初始化如下:

      next 步骤
      1. 如果 next callback 非 null,则 调用 next callback,参数为 «传入的 value» 和 "rethrow"。

        如果 抛出异常 E

        1. 移除 abort callback,从 subscribersubscription controllersignal

          注意: 此步骤很重要,因为 abort callback 仅用于“消费者主动”取消订阅。当生产者终止订阅(通过 subscribererror()complete())时,需确保不会运行 abort callback

          这与 Chromium 实现一致,但建议持有最初传入的 SubscribeOptionssignal 的引用,仅在其 abort 时调用 abort callback。结果应该一样,但需进一步验证。

        2. E 运行 subscribererror() 方法,并终止这些步骤。

      2. 以传入的 value 运行 subscribernext() 方法。

      error 步骤
      1. 移除 abort callback,从 subscribersubscription controllersignal

      2. 如果 error callback 非 null,则 调用 error callback,参数为 «传入的 error» 和 "rethrow"。

        如果 抛出异常 E,则以 E 运行 subscribererror() 方法,并终止这些步骤。

      3. 以传入的 error 运行 subscribererror() 方法。

      complete 步骤
      1. 移除 abort callback,从 subscribersubscription controllersignal

      2. 如果 complete callback 非 null,则 调用 complete callback,参数为 «» 和 "rethrow"。

        如果 抛出异常 E,则以 E 运行 subscribererror() 方法,并终止这些步骤。

      3. 运行 subscribercomplete() 方法。

    4. options 为新建的 SubscribeOptions, 其 signalsubscribersubscription controllersignal

    5. 订阅 sourceObservable,传入 sourceObserveroptions

  9. 返回 observable

测试
catch(callback) 方法步骤如下:
  1. sourceObservablethis

  2. observable新建Observable, 其 subscribe callback 是一个接受 Subscriber subscriber 的算法,具体如下:

    1. sourceObserver 为新建的 内部观察者,初始化如下:

      next 步骤

      以传入的 value 调用 subscribernext() 方法。

      error 步骤
      1. 调用 callback,参数为 «传入的 error» 和 "rethrow"。令 result 为返回值。

        如果 抛出异常 E,则以 E 运行 subscribererror() 方法,并终止这些步骤。

      2. innerObservable 为调用 from(),参数为 result 的结果。

        如果 抛出异常 E,则以 E 运行 subscribererror() 方法,并终止这些步骤。

        不应直接调用 from(),应调用内部算法以便异常能正确传递到 subscriber

      3. innerObserver 为新建的 内部观察者,初始化如下:

        next 步骤

        以传入的 value 调用 subscribernext() 方法。

        error 步骤

        以传入的 error 调用 subscribererror() 方法。

        complete 步骤

        运行 subscribercomplete() 方法。

      4. innerOptions 为新建的 SubscribeOptions, 其 signalsubscribersubscription controllersignal

      5. 订阅 innerObservable,传入 innerObserverinnerOptions

        注意: 此处可直接订阅 innerObservable,不需先取消订阅 sourceObservable,也不用担心 sourceObservable 继续发出值,因为这些都发生在 error 步骤内,说明 sourceObservable 已结束,不会再产生值,可以安全切换到 innerObservable

      complete 步骤

      运行 subscribercomplete() 方法。

    2. options 为新建的 SubscribeOptions, 其 signalsubscribersubscription controllersignal

    3. 订阅 sourceObservable,传入 sourceObserveroptions

  3. 返回 observable

finally(callback) 方法步骤如下:
  1. sourceObservablethis

  2. observable新建Observable, 其 subscribe callback 是一个接受 Subscriber subscriber 的算法,具体如下:

    1. callback 调用 subscriberaddTeardown() 方法。

    2. sourceObserver 为新建的 内部观察者,初始化如下:

      next 步骤

      以传入的 value 调用 subscribernext() 方法。

      error 步骤
      1. 以传入的 error 调用 subscribererror() 方法。

      complete 步骤
      1. 运行 subscribercomplete() 方法。

    3. options 为新建的 SubscribeOptions, 其 signalsubscribersubscription controllersignal

    4. 订阅 sourceObservable,传入 sourceObserveroptions

  3. 返回 observable

2.3.3. Promise 返回型操作符

toArray(options) 方法步骤如下:
  1. p新建的 promise

  2. 如果 optionssignal 不为 null:

    1. 如果 optionssignal中止,则:

      1. 拒绝 p,原因为 optionssignal中止原因

      2. 返回 p

    2. 将以下中止算法添加optionssignal:

      1. 拒绝 p,原因为 optionssignal中止原因

      注意: 这里只需要拒绝 p。订阅 this Observable 也会自动关闭,因为“内部” Subscriber 会在 optionssignal中止时被 关闭

  3. values 为新建的 列表

  4. observer 为新建的 内部观察者,初始化如下:

    next 步骤

    追加传入的 valuevalues

    error 步骤

    拒绝 p,原因为传入的 error

    complete 步骤

    解析 p,值为 values

  5. 订阅 this,传入 observeroptions

  6. 返回 p

测试
forEach(callback, options) 方法步骤如下:
  1. p新建的 promise

  2. visitor callback controller新建AbortController

  3. internal options 为新建的 SubscribeOptions, 其 signal创建依赖中止信号的结果,信号列表为「visitor callback controllersignaloptionssignal(如不为 null)」, 使用 AbortSignal, 以及 当前 realm

    许多琐碎的 内部观察者 起到传递的作用,并不控制它们所代表的 Observable 的订阅;也就是说,当订阅被终止时,会调用它们的 error stepscomplete steps,而它们的 next steps 只是沿链传递给定值的某种形式。

    但是,对于这个运算符,下方 observernext stepscallback 抛出异常的情况下,实际上负责中止对 this 的底层订阅。在这种情况下,我们传递给“Subscribe to an Observable” 的 SubscribeOptionssignal 需要是从 optionssignal 派生的 dependent signal,并且下方 next steps 可访问且在需要时可用来 signal abortAbortControllerAbortSignal

  4. 如果 internal optionssignal中止,则:

    1. 拒绝 p,原因为 internal optionssignal中止原因

    2. 返回 p

  5. 将以下中止算法添加internal optionssignal:

    1. 拒绝 p,原因为 internal optionssignal中止原因

      注意: p 的拒绝与 internal optionssignal 相关,而不是与 optionssignal 相关。 所以在 optionssignalabort 事件触发时, 期间排队的 微任务会在 p 的拒绝处理前运行。

  6. idxunsigned long long,初始为 0。

  7. observer 为新建的 内部观察者,初始化如下:

    next 步骤
    1. 调用 callback ,参数为「传入的 valueidx」和 "rethrow"。

      如果 抛出异常 E,则 拒绝 p,原因为 E,并 中止信号 visitor callback controller,原因为 E

    2. 递增 idx

    error 步骤

    拒绝 p,原因为传入的 error

    complete 步骤

    解析 p,值为 undefined

  8. 订阅 this,传入 observerinternal options

  9. 返回 p

测试
every(predicate, options) 方法步骤如下:
  1. p新建的 promise

  2. controller新建AbortController

  3. internal options 为新建的 SubscribeOptions, 其 signal创建依赖中止信号的结果,信号列表为「controllersignaloptionssignal(如不为 null)」, 使用 AbortSignal, 以及 当前 realm

  4. 如果 internal optionssignal中止,则:

    1. 拒绝 p,原因为 internal optionssignal中止原因

    2. 返回 p

  5. 将以下中止算法添加internal optionssignal

    1. 拒绝 p,原因为 internal optionssignal中止原因

  6. idxunsigned long long,初始为 0。

  7. observer 为新建的 内部观察者,初始化如下:

    next 步骤
    1. 调用 predicate, 参数为「传入的 valueidx」和 "rethrow";令 passed 为返回值。

      如果 抛出异常 E,则 拒绝 p,原因为 E,并 中止信号 controller,原因为 E

    2. idx 设为 idx + 1。

    3. 如果 passed 为 false,则解析 p,值为 false,并 中止信号 controller

    error 步骤

    拒绝 p,原因为传入的 error

    complete 步骤

    解析 p,值为 true。

  8. 订阅 this,传入 observerinternal options

  9. 返回 p

first(options) 方法步骤如下:
  1. p新建的 promise

  2. controller新建AbortController

  3. internal options 为新建的 SubscribeOptions, 其 signal创建依赖中止信号的结果,信号列表为「controllersignaloptionssignal(如不为 null)」, 使用 AbortSignal, 以及 当前 realm

  4. 如果 internal optionssignal中止,则:

    1. 拒绝 p,原因为 internal optionssignal中止原因

    2. 返回 p

  5. 将以下中止算法添加internal optionssignal

    1. 拒绝 p,原因为 internal optionssignal中止原因

  6. internal observer 为新建的 内部观察者,初始化如下:

    next 步骤
    1. 解析 p,值为传入的 value

    2. 中止信号 controller

    error 步骤

    拒绝 p,原因为传入的 error

    complete 步骤

    拒绝 p,原因为新建 RangeError

    注意: 只有当源 Observable 在发出第一个值之前就完成时才会到达这里。

  7. 订阅 this,传入 internal observerinternal options

  8. 返回 p

last(options) 方法步骤如下:
  1. p新建的 promise

  2. 如果 optionssignal 不为 null:

    1. 如果 optionssignal中止,则:

      1. 拒绝 p,原因为 optionssignal中止原因

      2. 返回 p

    2. 将以下中止算法添加optionssignal

      1. 拒绝 p,原因为 optionssignal中止原因

  3. lastValueany 或 null,初始为 null。

  4. hasLastValueboolean,初始为 false。

  5. observer 为新建的 内部观察者,初始化如下:

    next 步骤
    1. hasLastValue 设为 true。

    2. lastValue 设为传入的 value

    error 步骤

    拒绝 p,原因为传入的 error

    complete 步骤
    1. 如果 hasLastValue 为 true,解析 p,值为 lastValue

      1. 否则,拒绝 p,原因为新建 RangeError

        注意: 参见 first() 的说明。

  6. 订阅 this,传入 observeroptions

  7. 返回 p

find(predicate, options) 方法步骤如下:
  1. p新建的 promise

  2. controller新建AbortController

  3. internal options 为新建的 SubscribeOptions, 其 signal创建依赖中止信号的结果,信号列表为「controllersignaloptionssignal(如不为 null)」, 使用 AbortSignal, 以及 当前 realm

  4. 如果 internal optionssignal中止,则:

    1. 拒绝 p,原因为 internal optionssignal中止原因

    2. 返回 p

  5. 将以下中止算法添加internal optionssignal

    1. 拒绝 p,原因为 internal optionssignal中止原因

  6. idxunsigned long long,初始为 0。

  7. observer 为新建的 内部观察者,初始化如下:

    next 步骤
    1. 调用 predicate,参数为「传入的 valueidx」和 "rethrow";令 passed 为返回值。

      如果 抛出异常 E,则 拒绝 p,原因为 E,并 中止信号 controller,原因为 E

    2. idx 设为 idx + 1。

    3. 如果 passed 为 true,则解析 p,值为 value,并 中止信号 controller

    error 步骤

    拒绝 p,原因为传入的 error

    complete 步骤

    解析 p,值为 undefined

  8. 订阅 this,传入 observerinternal options

  9. 返回 p

some(predicate, options) 方法步骤如下:
  1. p新建的 promise

  2. controller新建AbortController

  3. internal options 为新建的 SubscribeOptions, 其 signal创建依赖中止信号的结果,信号列表为「controllersignaloptionssignal(如不为 null)」, 使用 AbortSignal, 以及 当前 realm

  4. 如果 internal optionssignal中止,则:

    1. 拒绝 p,原因为 internal optionssignal中止原因

    2. 返回 p

  5. 将以下中止算法添加internal optionssignal:

    1. 拒绝 p,原因为 internal optionssignal中止原因

  6. idxunsigned long long,初始为 0。

  7. observer 为新建的 内部观察者,初始化如下:

    next 步骤
    1. 调用 predicate,参数为「传入的 valueidx」和 "rethrow";令 passed 为返回值。

      如果 抛出异常 E,则 拒绝 p,原因为 E,并 中止信号 controller,原因为 E

    2. idx 设为 idx + 1。

    3. 如果 passed 为 true,则解析 p,值为 true,并 中止信号 controller

    error 步骤

    拒绝 p,原因为传入的 error

    complete 步骤

    解析 p,值为 false。

  8. 订阅 this,传入 observerinternal options

  9. 返回 p

reduce(reducer, initialValue, options) 方法步骤如下:
  1. p新建的 promise

  2. controller新建AbortController

  3. internal options 为新建的 SubscribeOptions, 其 signal创建依赖中止信号的结果,信号列表为「controllersignaloptionssignal(如不为 null)」, 使用 AbortSignal, 以及 当前 realm

  4. 如果 internal optionssignal中止,则:

    1. 拒绝 p,原因为 internal optionssignal中止原因

    2. 返回 p

  5. 将以下中止算法添加internal optionssignal:

    1. 拒绝 p,原因为 internal optionssignal中止原因

  6. idxunsigned long long,初始为 0。

  7. accumulatorinitialValue(如果已给定),否则未初始化。

  8. observer 为新建的 内部观察者,初始化如下:

    next steps
    1. 如果 accumulator 未初始化(即没有传入 initialValue),则将 accumulator 设为传入的 value,将 idx 设为 idx + 1,并终止这些步骤。

      注意: 这意味着 reducer 不会用 this 产生的第一个 value 作为 currentValue 调用。实际在第二个值发出时才会用它作为 currentValue,第一个值(此处保存的)作为 accumulator

    2. 调用 reducer,参数为 «accumulator 作为 accumulator,传入的 value 作为 currentValueidx 作为 index» 和 "rethrow"。令 result 为返回值。

      如果 抛出异常 E,则 拒绝 p,原因为 E,并 中止信号 controller,原因为 E

    3. idx 设为 idx + 1。

    4. accumulator 设为 result

    error steps

    拒绝 p,原因为传入的 error

    complete steps
    1. 如果 accumulator 不为 "unset",则 解析 p,值为 accumulator

      否则,拒绝 p,原因为 TypeError

  9. 订阅 this,传入 observerinternal options

  10. 返回 p

3. EventTarget 集成

dictionary ObservableEventListenerOptions {
  boolean capture = false;
  boolean passive;
};

partial interface EventTarget {
  Observable when(DOMString type, optional ObservableEventListenerOptions options = {});
};
when(type, options) 方法步骤如下:
  1. 如果 this相关全局对象Window 对象,且其 关联的 Document完全激活,则返回。

  2. event targetthis

  3. observable新建Observable, 初始化如下:

    subscribe callback

    一个算法,接受 Subscriber subscriber,按以下步骤运行:

    1. 如果 event target 为 null,则终止这些步骤。

      注意: 用于捕捉 event target 可能在订阅时已被垃圾回收的情况。

    2. 如果 subscribersubscription controllersignal中止,则终止这些步骤。

    3. 添加事件监听器,参数为 event target 及如下 事件监听器

      type

      type

      callback

      创建一个新的 Web IDL EventListener 实例,引用一个参数为 Event event 的函数。该函数会执行 observable 事件监听器调用算法,参数为 subscriberevent

      capture

      optionscapture

      passive

      如果 optionspassive 属性存在,则使用其值;否则为 null。

      once

      false

      signal

      subscribersubscription controllersignal

      注意: 这样可以保证当 signal 被中止时,无论引擎的对象所有权模型如何,都能清理 事件监听器

  4. 返回 observable

observable 事件监听器调用算法 接收一个 Subscriber subscriber 和一个 Event event,按以下步骤运行:
  1. event 调用 subscribernext() 方法。

测试

4. 安全与隐私注意事项

本内容正从我们的 explainer 上游到本规范,期间可以参阅以下资源:

5. 致谢

特别感谢 Ben LeshObservable API 的大量设计输入,以及多年维护用户态 Observable 代码的工作,使这一贡献得以进入 Web 平台。

索引

本规范定义的术语

引用规范中定义的术语

参考文献

规范性引用

[DOM]
Anne van Kesteren。DOM 标准。Living Standard。 URL: https://dom.spec.whatwg.org/
[ECMASCRIPT]
ECMAScript 语言规范。URL: https://tc39.es/ecma262/multipage/
[HTML]
Anne van Kesteren 等。HTML 标准。 Living Standard。URL: https://html.spec.whatwg.org/multipage/
[INFRA]
Anne van Kesteren;Domenic Denicola。Infra 标准。Living Standard。URL: https://infra.spec.whatwg.org/
[WEBIDL]
Edgar Chen;Timothy Gu。Web IDL 标准。Living Standard。URL: https://webidl.spec.whatwg.org/

IDL 索引

[Exposed=*]
interface Subscriber {
  undefined next(any value);
  undefined error(any error);
  undefined complete();
  undefined addTeardown(VoidFunction teardown);

  // True after the Subscriber is created, up until either
  // complete()/error() are invoked, or the subscriber unsubscribes. Inside
  // complete()/error(), this attribute is true.
  readonly attribute boolean active;

  readonly attribute AbortSignal signal;
};

// SubscribeCallback is where the Observable "creator's" code lives. It's
// called when subscribe() is called, to set up a new subscription.
callback SubscribeCallback = undefined (Subscriber subscriber);
callback ObservableSubscriptionCallback = undefined (any value);

dictionary SubscriptionObserver {
  ObservableSubscriptionCallback next;
  ObservableSubscriptionCallback error;
  VoidFunction complete;
};

callback ObservableInspectorAbortHandler = undefined (any value);

dictionary ObservableInspector {
  ObservableSubscriptionCallback next;
  ObservableSubscriptionCallback error;
  VoidFunction complete;

  VoidFunction subscribe;
  ObservableInspectorAbortHandler abort;
};

typedef (ObservableSubscriptionCallback or SubscriptionObserver) ObserverUnion;
typedef (ObservableSubscriptionCallback or ObservableInspector) ObservableInspectorUnion;

dictionary SubscribeOptions {
  AbortSignal signal;
};

callback Predicate = boolean (any value, unsigned long long index);
callback Reducer = any (any accumulator, any currentValue, unsigned long long index);
callback Mapper = any (any value, unsigned long long index);
// Differs from Mapper only in return type, since this callback is exclusively
// used to visit each element in a sequence, not transform it.
callback Visitor = undefined (any value, unsigned long long index);

// This callback returns an `any` that must convert into an `Observable`, via
// the `Observable` conversion semantics.
callback CatchCallback = any (any value);

[Exposed=*]
interface Observable {
  constructor(SubscribeCallback callback);
  undefined subscribe(optional ObserverUnion observer = {}, optional SubscribeOptions options = {});

  // Constructs a native Observable from value if it's any of the following:
  //   - Observable
  //   - AsyncIterable
  //   - Iterable
  //   - Promise
  static Observable from(any value);

  // Observable-returning operators. See "Operators" section in the spec.
  //
  // takeUntil() can consume promises, iterables, async iterables, and other
  // observables.
  Observable takeUntil(any value);
  Observable map(Mapper mapper);
  Observable filter(Predicate predicate);
  Observable take(unsigned long long amount);
  Observable drop(unsigned long long amount);
  Observable flatMap(Mapper mapper);
  Observable switchMap(Mapper mapper);
  Observable inspect(optional ObservableInspectorUnion inspectorUnion = {});
  Observable catch(CatchCallback callback);
  Observable finally(VoidFunction callback);

  // Promise-returning operators.
  Promise<sequence<any>> toArray(optional SubscribeOptions options = {});
  Promise<undefined> forEach(Visitor callback, optional SubscribeOptions options = {});
  Promise<boolean> every(Predicate predicate, optional SubscribeOptions options = {});
  Promise<any> first(optional SubscribeOptions options = {});
  Promise<any> last(optional SubscribeOptions options = {});
  Promise<any> find(Predicate predicate, optional SubscribeOptions options = {});
  Promise<boolean> some(Predicate predicate, optional SubscribeOptions options = {});
  Promise<any> reduce(Reducer reducer, optional any initialValue, optional SubscribeOptions options = {});
};

dictionary ObservableEventListenerOptions {
  boolean capture = false;
  boolean passive;
};

partial interface EventTarget {
  Observable when(DOMString type, optional ObservableEventListenerOptions options = {});
};

问题索引

不应该直接调用 from(),而是应调用某个内部算法,使异常能被正确地传递出来,以便我们能传递给 subscriber
这与 Chromium 的实现一致,但建议持有最初传入的 SubscribeOptionssignal 的引用,只在它 abort 时调用 abort callback。结果可能相同,但需要进一步验证。
不应该直接调用 from(),而是应调用某个内部算法,使异常能被正确地传递出来,以便我们能传递给 subscriber