可观察对象(Observable)

社区组草案报告

此版本:
https://wicg.github.io/observable/
编辑:
Dominic Farolino (Google)
参与方式:
GitHub WICG/observable新建议题开放议题
提交记录:
GitHub spec.bs 提交记录
测试套件:
https://wpt.fyi/results/dom/observable/

摘要

Observable API 提供了一种可组合、易用的方式来处理异步事件流。

本文档状态

本规范由 Web Platform Incubator Community Group 发布。 它不是 W3C 标准,也不在 W3C 标准制定流程中。 请注意,根据 W3C 社区贡献者许可协议 (CLA),有有限的退出选项,并适用其他条件。 详细了解 W3C 社区及商业团体

1. 引言

本节为非规范性内容。

2. 核心基础设施

2.1. Subscriber 接口

[Exposed=*]
interface Subscriber {
  undefined next(any value);
  undefined error(any error);
  undefined complete();
  undefined addTeardown(VoidFunction teardown);

  // True after the Subscriber is created, up until either
  // complete()/error() are invoked, or the subscriber unsubscribes. Inside
  // complete()/error(), this attribute is true.
  readonly attribute boolean active;

  readonly attribute AbortSignal signal;
};

每个 Subscriber 拥有一个有序集合内部观察者,初始为空。

每个 Subscriber 有一个清理回调,它是一个列表,包含VoidFunction,初始为空。

每个 Subscriber 有一个订阅控制器,它是一个AbortController

每个 Subscriber 有一个active布尔值,初始为 true。

注意:这是一个用于记录状态的变量,用于确保 Subscriber 在被关闭后不会调用其拥有的任何回调。

active 的 getter 步骤是返回 thisactive 布尔值。

signal 的 getter 步骤是返回 this订阅控制器signal

next(value) 方法的步骤为:
  1. 如果 thisactive 为 false,则返回。

  2. 如果 this相关全局对象Window 对象,且其 关联文档不是 完全激活,则返回。

  3. 遍历 this内部观察者,对每个 observer

    1. value 运行 observernext 步骤

      断言:没有抛出 异常

      注意:这里不会抛出异常,因为当 内部观察者next 步骤 只是脚本提供的回调包装时,处理观察者步骤会用逻辑包裹这些回调,在调用时捕获任何异常并向全局报告。

      next 步骤 是规范算法时,这些步骤会确保不会有异常抛出到外部,以满足上述断言。

error(error) 方法的步骤如下:

  1. 如果thisactive为 false,则使用 errorthis相关全局对象报告一个异常,然后返回。

  2. 如果this相关全局对象是一个Window对象,并且其关联的文档不是完全活跃,则返回。

  3. 关闭this

  4. 对于每个observer,在this内部观察者中:

    1. 运行 observer错误步骤,给定 error

      断言:未抛出异常

      注意:有关为什么这是正确的详细信息,请参阅next()中的文档。

complete() 方法的步骤如下:

  1. 如果thisactive为 false,则返回。

  2. 如果this相关全局对象是一个Window对象,并且其关联的文档不是完全活跃,则返回。

  3. 关闭this

  4. 对于每个observer,在this内部观察者中:

    1. 运行 observer完成步骤

      断言:未抛出异常

      注意:有关为什么这是正确的详细信息,请参阅next()中的文档。

addTeardown(teardown) 方法的步骤如下:

  1. 如果this相关全局对象是一个Window对象,并且其关联的文档不是完全活跃,则返回。

  2. 如果thisactive为 true,则将teardown 追加thisteardown 回调列表中。

  3. 否则,调用teardown,参数为 «» 和 "report"。

关闭订阅,给定一个 Subscriber subscriber,以及一个可选的 any reason,请执行以下步骤:
  1. 如果 subscriberactive 为 false,则返回。

    此步骤用于防止重入调用,这种情况可能发生在“生产者发起”的取消订阅场景。请看如下例子:

    const outerController = new AbortController();
    const observable = new Observable(subscriber => {
      subscriber.addTeardown(() => {
        // 2.) This teardown executes inside the "Close" algorithm, while it’s
        //     running. Aborting the downstream signal run its abort algorithms,
        //     one of which is the currently-running "Close" algorithm.
        outerController.abort();
      });
    
      // 1.) This immediately invokes the "Close" algorithm, which
      //     sets subscriber.active to false.
      subscriber.complete();
    });
    
    observable.subscribe({}, {signal: outerController.signal});
    
  2. subscriberactive 布尔值设为 false。

  3. reason(如果有的话)中止信号 subscriber订阅控制器

  4. 对于每个teardown,在subscriberteardown 回调中按反插入顺序排序:

    1. 如果subscriber相关全局对象是一个Window对象,并且其关联的文档不是完全活跃,则中止这些步骤。

      注意:此步骤会重复运行,因为每个teardown可能导致上述文档变为非活跃状态。

    2. 调用teardown,参数为 «» 和 "report"。

2.2. Observable 接口

// SubscribeCallback is where the Observable "creator's" code lives. It's
// called when subscribe() is called, to set up a new subscription.
callback SubscribeCallback = undefined (Subscriber subscriber);
callback ObservableSubscriptionCallback = undefined (any value);

dictionary SubscriptionObserver {
  ObservableSubscriptionCallback next;
  ObservableSubscriptionCallback error;
  VoidFunction complete;
};

callback ObservableInspectorAbortHandler = undefined (any value);

dictionary ObservableInspector {
  ObservableSubscriptionCallback next;
  ObservableSubscriptionCallback error;
  VoidFunction complete;

  VoidFunction subscribe;
  ObservableInspectorAbortHandler abort;
};

typedef (ObservableSubscriptionCallback or SubscriptionObserver) ObserverUnion;
typedef (ObservableSubscriptionCallback or ObservableInspector) ObservableInspectorUnion;

dictionary SubscribeOptions {
  AbortSignal signal;
};

callback Predicate = boolean (any value, unsigned long long index);
callback Reducer = any (any accumulator, any currentValue, unsigned long long index);
callback Mapper = any (any value, unsigned long long index);
// Differs from Mapper only in return type, since this callback is exclusively
// used to visit each element in a sequence, not transform it.
callback Visitor = undefined (any value, unsigned long long index);

// This callback returns an `any` that must convert into an `Observable`, via
// the `Observable` conversion semantics.
callback CatchCallback = any (any value);

[Exposed=*]
interface Observable {
  constructor(SubscribeCallback callback);
  undefined subscribe(optional ObserverUnion observer = {}, optional SubscribeOptions options = {});

  // Constructs a native Observable from value if it's any of the following:
  //   - Observable
  //   - AsyncIterable
  //   - Iterable
  //   - Promise
  static Observable from(any value);

  // Observable-returning operators. See "Operators" section in the spec.
  //
  // takeUntil() can consume promises, iterables, async iterables, and other
  // observables.
  Observable takeUntil(any value);
  Observable map(Mapper mapper);
  Observable filter(Predicate predicate);
  Observable take(unsigned long long amount);
  Observable drop(unsigned long long amount);
  Observable flatMap(Mapper mapper);
  Observable switchMap(Mapper mapper);
  Observable inspect(optional ObservableInspectorUnion inspectorUnion = {});
  Observable catch(CatchCallback callback);
  Observable finally(VoidFunction callback);

  // Promise-returning operators.
  Promise<sequence<any>> toArray(optional SubscribeOptions options = {});
  Promise<undefined> forEach(Visitor callback, optional SubscribeOptions options = {});
  Promise<boolean> every(Predicate predicate, optional SubscribeOptions options = {});
  Promise<any> first(optional SubscribeOptions options = {});
  Promise<any> last(optional SubscribeOptions options = {});
  Promise<any> find(Predicate predicate, optional SubscribeOptions options = {});
  Promise<boolean> some(Predicate predicate, optional SubscribeOptions options = {});
  Promise<any> reduce(Reducer reducer, optional any initialValue, optional SubscribeOptions options = {});
};

每个 Observable 都有一个 subscribe 回调,它是一个 SubscribeCallback 或者是一组接收 Subscriber 的步骤。

每个 Observable 都有一个 弱订阅者,它是一个指向 Subscriber 的弱引用或者 null,初始为 null。

注意: 这些类型的“联合”是为了同时支持由 JavaScript 创建的 Observable (它们总是用 SubscribeCallback 构造), 以及原生构造的 Observable 对象(它们的 subscribe 回调 可能是任意一组原生步骤,而不是 JavaScript 回调)。 when() 的返回值就是后者的一个例子。

new Observable(callback) 构造函数步骤如下:
  1. thissubscribe 回调 设为 callback

    注意: 这个回调会在后续调用 subscribe() 时被调用。

subscribe(observer, options) 方法步骤如下:
  1. 给定 observeroptions订阅 this

2.2.1. 支持性概念

默认错误算法 是一个接受 any 类型 error 的算法,步骤如下:
  1. 报告一个异常,使用 error 和当前realm全局对象

注意: 我们将这个默认步骤单独列出,是为了让所有在规范中原生订阅 Observable (即规范文本中的订阅,而不是通过 subscribe() 方法)的地方都不用重复定义这些步骤。

一个 内部观察者 是一个包含如下成员结构体

next 步骤

一个接受 any 类型参数的算法。初始时,这些步骤为空操作。

error 步骤

一个接受 any 类型参数的算法。初始为 默认错误算法

complete 步骤

无参数的算法。初始时,这些步骤为空操作。

内部观察者 结构体 用于对应 nexterrorcomplete 回调函数。对于通过 subscribe() 方法由 JavaScript 订阅的任何 Observable, 这些算法“步骤”只是包装对脚本提供的对应 回调函数的调用。

但当规范文本(非用户脚本)订阅一个 Observable 时, 这些“步骤”可以是任意规范算法,而不是通过 Web IDL 回调函数打包的 ObserverUnion。 例如,见 § 2.3.3 返回 Promise 的操作符

转换为 Observable 一个 any 类型的 value,请运行以下步骤:

注意: 我们将此算法与 Web IDL 的 from() 方法分离出来,是为了让规范文本能够直接转换值而无需通过 Web IDL 绑定。

  1. 如果 Type(value) 不是 Object,则 抛出 TypeError

    注意: 这样可以阻止将原始类型强制为可迭代对象(如字符串)。详见 WICG/observable#125 的讨论。

  2. 来自 Observable:如果 value特定类型Observable,则返回 value

  3. 来自异步可迭代对象:令 asyncIteratorMethod? GetMethod(value, %Symbol.asyncIterator%)。

    注意: 这里我们使用 GetMethod 而不是 GetIterator,因为我们只需探查是否支持异步迭代协议,不希望在未实现时抛出异常。GetIterator 在以下两种情况下都会抛错:(a)未实现迭代协议;(b)实现了协议但不可调用或 getter 抛错。GetMethod 只会在后者抛错。

  4. 如果 asyncIteratorMethod 为 undefined 或 null,则跳到标记为 来自可迭代对象 的步骤。

  5. nextAlgorithm 为以下步骤,给定 Subscriber subscriber迭代器记录 iteratorRecord

    1. 如果 subscriber订阅控制器signal中止,则返回。

    2. nextPromisePromise 或 undefined,初始为 undefined。

    3. nextCompletionIteratorNext(iteratorRecord)。

      注意: 这里我们用 IteratorNext 而不是 IteratorStepValue,因为后者要求 next() 立即返回可用的对象,而异步迭代器 next() 返回 Promise/thenable(我们用 Promise 包裹并根据结果再处理)。

    4. 如果 nextCompletionthrow completion,则:

      1. 断言iteratorRecord 的 [[Done]] 为 true。

      2. nextPromise 设为 一个被拒绝的 Promise,错误为 nextRecord 的 [[Value]]。

    5. 否则,如果 nextRecord正常完成,则将 nextPromise 设为 一个被解析的 Promise,值为 nextRecord 的 [[Value]]。

      注意: 这样做是为了保证 nextRecord 的 [[Value]] 本身不是 Promise 时也能正常处理。

    6. 监听 nextPromise 的状态:

      • 如果 nextPromise fulfilled,值为 iteratorResult,则:

        1. 如果 Type(iteratorResult) 不是 Object,则用 error() 方法和 TypeError 运行 subscriber,并终止这些步骤。

        2. done = IteratorComplete(iteratorResult)。

        3. 如果 donethrow completion,则用 done 的 [[Value]] 运行 subscribererror() 方法并终止这些步骤。

        4. 如果 done 的 [[Value]] 为 true,则运行 subscribercomplete() 并终止这些步骤。

        5. value = IteratorValue(iteratorResult)。

        6. 如果 valuethrow completion,则用 value 的 [[Value]] 运行 subscribererror() 方法并终止这些步骤。

        7. value 的 [[Value]] 运行 subscribernext() 方法。

        8. 再次用 subscriberiteratorRecord 运行 nextAlgorithm

      • 如果 nextPromise 被拒绝,原因为 r,则用 r 运行 subscribererror() 方法。

  6. 返回一个 新的 Observable, 其 subscribe 回调 是一个接收 Subscriber subscriber 的算法,具体如下:

    1. 如果 subscribersubscription controllersignal 已经 aborted,则返回。

    2. iteratorRecordCompletionGetIterator(value, async)。

      注:这会重新调用 value 上的任何 %Symbol.asyncIterator% 方法 getter——注意,这么做是否合理属于极端边界情况,但它符合测试预期;相关讨论见 issue#127——并调用协议本身以获取一个 Iterator Record

    3. 如果 iteratorRecordCompletion 是一个 throw completion,则以 iteratorRecordCompletion 的 [[Value]] 作为参数,调用 subscribererror() 方法,并终止本步骤。

      注:这意味着我们会在订阅的同步上下文中调用 error() 方法,这也是异步可迭代对象被转换为 Observable 时,唯一会同步抛出错误的场景。在所有其他情况下,错误会通过被包裹在拒绝 Promise 中,并被 nextAlgorithm reacts 到,从而以微任务时序异步传播给 observer。这种同步错误传播行为与语言结构一致,例如 for-await of 循环在调用 %Symbol.asyncIterator% 并同步将异常重新抛给循环外部的 catch 块(在任何 Awaiting 发生之前)时就是如此。

    4. iteratorRecord! iteratorRecordCompletion

    5. 断言iteratorRecord 是一个 Iterator Record

    6. 如果 subscribersubscription controllersignal 已经 aborted,则返回。

    7. subscribersubscription controllersignal 添加如下中止算法

      1. 执行 AsyncIteratorClose(iteratorRecord, NormalCompletion(subscribersubscription controllerabort reason))。

    8. subscriberiteratorRecord 为参数,执行 nextAlgorithm

  7. 来自可迭代对象:令 iteratorMethod = ? GetMethod(value, %Symbol.iterator%)。

  8. 如果 iteratorMethod 为 undefined,则跳到标记为 来自 Promise 的步骤。

    否则,返回一个 新的 Observable, 其 subscribe 回调 是一个接收 Subscriber subscriber 的算法,具体如下:

    1. 如果 subscribersubscription controllersignal 已经 aborted,则返回。

    2. iteratorRecordCompletionGetIterator(value, sync)。

    3. 如果 iteratorRecordCompletion 是一个 throw completion,则以 iteratorRecordCompletion 的 [[Value]] 作为参数,调用 subscribererror() 方法,并终止本步骤。

    4. iteratorRecord! iteratorRecordCompletion

    5. 如果 subscribersubscription controllersignal 已经 aborted,则返回。

    6. subscribersubscription controllersignal 添加如下中止算法

      1. 执行 IteratorClose(iteratorRecord, NormalCompletion(UNUSED))。

    7. true 时,循环:

      1. nextIteratorStepValue(iteratorRecord)。

      2. 如果 next 是一个 throw completion,则以 next 的 [[Value]] 作为参数,调用 subscribererror() 方法,并 跳出 循环。

      3. next 设为 ! next

      4. 如果 next 为 done,则:

        1. 断言iteratorRecord 的 [[Done]] 为 true。

        2. 调用 subscribercomplete()

        3. 返回。

      5. next 作为参数,调用 subscribernext()

      6. 如果 subscribersubscription controllersignal 已经 aborted,则 跳出 循环。

  9. 来自 Promise:如果 IsPromise(value) 为 true,则:

    1. 返回一个 新的 Observable, 其 subscribe 回调 是一个接收 Subscriber subscriber 的算法,具体如下:

      1. 监听 value 的状态:

        1. 如果 value fulfilled,值为 v,则:

          1. v 运行 subscribernext() 方法。

          2. 运行 subscribercomplete() 方法。

        2. 如果 value 被拒绝,原因为 r,则用 r 运行 subscribererror() 方法。

  10. 抛出一个 TypeError

测试
订阅 Observable,给定一个 ObserverUnion内部观察者 observer,以及一个 SubscribeOptions options,请执行以下步骤:

注意: 我们将此算法从 Web IDL 的 subscribe() 方法中单独分离,以便规范文本可以直接 订阅 Observable,而无需通过 Web IDL 绑定。相关背景可见 w3c/IntersectionObserver#464,其中“内部”规范文本必须避免通过 Web IDL 绑定对象(其属性可能会被 JavaScript 修改)。用法见 § 2.3.3 返回 Promise 的操作符

  1. 如果this相关全局对象是一个Window对象,并且其关联的文档不是完全活跃,则返回。

  2. internal observer 为一个新的内部观察者

  3. 按如下方式处理 observer

    1. 如果 observer 是一个 ObservableSubscriptionCallback
      internal observernext 步骤设置为以下接收一个 any 类型 value 的步骤:
      1. 调用 observer,参数为 «value» 和 "report"。

      如果 observer 是一个 SubscriptionObserver
      1. 如果 observernext 存在,则将 internal observernext 步骤设置为以下接收一个 any 类型 value 的步骤:

        1. 调用 observernext,参数为 «value» 和 "report"。

      2. 如果 observererror 存在,则将 internal observererror 步骤设置为以下接收一个 any 类型 error 的步骤:

        1. 调用 observererror,参数为 «error» 和 "report"。

      3. 如果 observercomplete 存在,则将 internal observercomplete 步骤设置为以下步骤:

        1. 调用 observercomplete,参数为 «» 和 "report"。

      如果 observer 是一个内部观察者
      internal observer 设置为 observer
  4. 断言internal observererror 步骤要么是默认错误算法,要么是一个调用所提供的 error 回调函数的算法。

  5. 如果this弱订阅者不为 null 且this弱订阅者active为 true:

    1. subscriberthis弱订阅者

    2. 追加 internal observersubscriber内部观察者集合中。

    3. 如果 optionssignal 存在,则:

      1. 如果 optionssignal中止,则从 subscriber内部观察者移除 internal observer

      2. 否则,添加以下中止算法optionssignal

        1. 如果 subscriberactive为 false,则中止这些步骤。

        2. subscriber内部观察者移除 internal observer

        3. 如果 subscriber内部观察者为空,则使用 optionssignal中止原因关闭 subscriber

    4. 返回。

  6. subscriber 为一个新的 Subscriber

  7. 追加 internal observersubscriber内部观察者集合中。

  8. this弱订阅者设置为 subscriber

  9. 如果 optionssignal 存在,则:

    1. 如果 optionssignal中止,则使用 optionssignal中止原因关闭 subscriber

    2. 否则,添加以下中止算法optionssignal

      1. 如果 subscriberactive为 false,则中止这些步骤。

      2. subscriber内部观察者移除 internal observer

      3. 如果 subscriber内部观察者为空,则使用 optionssignal中止原因关闭 subscriber

  10. 如果thissubscribe 回调是一个SubscribeCallback,则调用它,参数为 «subscriber» 和 "rethrow"。

    如果抛出了异常 E,则使用 E 调用 subscribererror() 方法。

  11. 否则,运行由thissubscribe 回调给定的步骤,并传入 subscriber

测试

2.3. 操作符

目前请参见 https://github.com/wicg/observable#operators

2.3.1. from()

from(value) 方法步骤如下:
  1. 返回 value 转换为 Observable 的结果。若有异常则重新抛出。

2.3.2. Observable 返回型操作符

takeUntil(value) 方法步骤如下:
  1. sourceObservablethis

  2. notifier value 转换为 Observable 的结果。

  3. observable 为一个 新的 Observable,其 subscribe 回调 是一个接收 Subscriber subscriber 的算法,内容如下:

    注意:该方法会订阅两个 Observable: (1) notifier,(2) sourceObservable。我们会在如下情形同时“取消订阅”:
    1. notifier 开始发出值(“next” 或 “error”)。此时我们已获得所需信息,不再需要 notifier,同时也不再需要 sourceObservable,因为已经手动结束了 observable 的订阅。

    2. 如果 sourceObservable 自身 error()complete(),此时我们只需取消对 notifier 的订阅。

    1. notifierObserver 为一个新的 内部观察者,初始如下:

      next 步骤

      运行 subscribercomplete() 方法。

      注意: 这样会“取消订阅” sourceObservable,因为 sourceObservable 是通过“外部”subscriber订阅控制器signal 作为输入信号订阅的,一旦调用 subscribercomplete(),该信号会被中止。

      error 步骤

      运行 subscribercomplete() 方法。

      注意: 未指定 complete 步骤,因为如果 notifier Observable 自身完成,我们无需对 observablesubscriber 调 complete(),observable 会继续镜像 sourceObservable

    2. options 为一个新的 SubscribeOptions,其 signalsubscriber订阅控制器signal

    3. 订阅 notifier,参数为 notifierObserveroptions

    4. 如果 subscriberactive 为 false,则返回。

      注意: 如果 notifier 同步发出值,则 sourceObservablesubscribe 回调 甚至不会被调用;如果 notifier 仅同步 complete,则 subscriber 的 active 仍然为 true,会继续订阅 sourceObservable 并镜像其内容。

    5. sourceObserver 为一个新的 内部观察者,初始如下:

      next 步骤

      用传入的 value 运行 subscribernext() 方法。

      error 步骤

      用传入的 error 运行 subscribererror() 方法。

      complete 步骤

      运行 subscribercomplete() 方法。

      注意: sourceObserver 基本是透传,将 sourceObservable 发出的内容全部镜像,唯一的例外是如果 sourceObservable 完成时 notifier 还未发出任何内容,会主动取消 notifier 的订阅。

    6. 订阅 sourceObservable,参数为 sourceObserveroptions

  4. 返回 observable

测试
map(mapper) 方法步骤如下:
  1. sourceObservablethis

  2. observable 为一个 新的 Observable,其 subscribe 回调 是一个接收 Subscriber subscriber 的算法,内容如下:

    1. idxunsigned long long,初始值为 0。

    2. sourceObserver 为一个新的 内部观察者,初始如下:

      next 步骤
      1. 调用 mapper,参数为 «传入的 value, idx» 和 "rethrow";令 mappedValue 为返回值。

        如果抛出了异常 E,则 运行 subscribererror() 方法,传入 E,并中止这些步骤。

      2. 递增 idx

      3. 运行 subscribernext() 方法,传入 mappedValue

      error 步骤

      用传入的 error 运行 subscribererror() 方法。

      complete 步骤

      运行 subscribercomplete() 方法。

    3. options 为一个新的 SubscribeOptions,其 signalsubscriber订阅控制器signal

    4. 订阅 sourceObservable,参数为 sourceObserveroptions

  3. 返回 observable

测试
filter(predicate) 方法步骤如下:
  1. sourceObservablethis

  2. observable 为一个 新的 Observable,其 subscribe 回调 是一个接收 Subscriber subscriber 的算法,内容如下:

    1. idxunsigned long long,初始值为 0。

    2. sourceObserver 为一个新的 内部观察者,初始如下:

      next 步骤
      1. 调用 predicate,参数为 «传入的 value, idx» 和 "rethrow";令 matches 为返回值。

        如果抛出了异常 E,则 运行 subscribererror() 方法,传入 E,并中止这些步骤。

      2. idx 设置为 idx + 1。

      3. 如果 matches 为 true,则运行 subscribernext() 方法,传入 value

      error 步骤

      用传入的 error 运行 subscribererror() 方法。

      complete 步骤

      运行 subscribercomplete() 方法。

    3. options 为一个新的 SubscribeOptions,其 signalsubscriber订阅控制器signal

    4. 订阅 sourceObservable,参数为 sourceObserveroptions

  3. 返回 observable

测试
take(amount) 方法步骤如下:
  1. sourceObservablethis

  2. observable 为一个 新的 Observable,其 subscribe 回调 是一个接收 Subscriber subscriber 的算法,内容如下:

    1. remainingamount

    2. 如果 remaining 为 0,则运行 subscribercomplete() 方法,并中止这些步骤。

    3. sourceObserver 为一个新的 内部观察者,初始如下:

      next 步骤
      1. 用传入的 value 运行 subscribernext() 方法。

      2. 递减 remaining

      3. 如果 remaining 为 0,则运行 subscribercomplete() 方法。

      error 步骤

      用传入的 error 运行 subscribererror() 方法。

      complete 步骤

      运行 subscribercomplete() 方法。

    4. options 为一个新的 SubscribeOptions,其 signalsubscriber订阅控制器signal

    5. 订阅 sourceObservable,参数为 sourceObserveroptions

  3. 返回 observable

测试
drop(amount) 方法步骤如下:
  1. sourceObservablethis

  2. observable 为一个 新的 Observable,其 subscribe 回调 是一个接收 Subscriber subscriber 的算法,内容如下:

    1. remainingamount

    2. sourceObserver 为一个新的 内部观察者,初始如下:

      next 步骤
      1. 如果 remaining > 0,则递减 remaining 并中止这些步骤。

      2. 断言remaining 为 0。

      3. 用传入的 value 运行 subscribernext() 方法。

      error 步骤

      用传入的 error 运行 subscribererror() 方法。

      complete 步骤

      运行 subscribercomplete() 方法。

    3. options 为一个新的 SubscribeOptions,其 signalsubscriber订阅控制器signal

    4. 订阅 sourceObservable,参数为 sourceObserveroptions

  3. 返回 observable

测试
flatMap(mapper) 方法步骤如下:
  1. sourceObservablethis

  2. observable 为一个 新的 Observable,其 subscribe 回调 是一个接收 Subscriber subscriber 的算法,内容如下:

    1. idxunsigned long long,初始值为 0。

    2. outerSubscriptionHasCompleted布尔值,初始为 false。

    3. queue 为一个新的 列表,元素为 any,初始为空。

      注意:queue 用于当 observable 正在订阅某个由 sourceObservable 产生但尚未耗尽的 Observable 时,存储由 sourceObservable 新发出的 Observable

    4. activeInnerSubscription布尔值,初始为 false。

    5. sourceObserver 为一个新的 内部观察者,初始如下:

      next 步骤
      1. 如果 activeInnerSubscription 为 true,则:

        1. value 添加到 queue

          注意:value 在当前订阅的 Observable 耗尽后会被处理。

      2. 否则:

        1. activeInnerSubscription 为 true。

        2. valuesubscribermapper 及对 queueactiveInnerSubscriptionouterSubscriptionHasCompletedidx引用,运行 flatmap process next value steps

          注意:该 flatmap process next value steps 会订阅由 value 派生的 Observable 并处理其所有值,直到其订阅失效(错误或完成)。如该“内部” Observable 完成,则会递归处理 queue 中下一个值。

          queue 中无值,则流程终止并取消 activeInnerSubscription,以便后续 sourceObservable 新发出的值能被正确处理。

      error 步骤

      用传入的 error 运行 subscribererror() 方法。

      complete 步骤
      1. outerSubscriptionHasCompleted 为 true。

        注意:activeInnerSubscription 为 true,则不会立即 complete subscriber,后续由 flatmap process next value steps 在“内部”订阅失效且 queue 为空时负责 complete。

      2. activeInnerSubscription 为 false 且 queue 为空,则运行 subscribercomplete() 方法。

    6. options 为一个新的 SubscribeOptions,其 signalsubscriber订阅控制器signal

    7. 订阅 sourceObservable,参数为 sourceObserveroptions

  3. 返回 observable

flatmap process next value steps,给定 any valueSubscriber subscriberMapper mapper,以及对 queueactiveInnerSubscriptionouterSubscriptionHasCompletedidx引用,步骤如下:
  1. mappedResult调用 mapper 的结果,参数为 «value, idx» 和 "rethrow"。

    如果抛出了异常 E,则运行 subscribererror() 方法,传入 E,并中止这些步骤。

  2. idx 设置为 idx + 1。

  3. innerObservable 为使用 mappedResult 调用 from() 的结果。

    如果抛出了异常 E,则运行 subscribererror() 方法,传入 E,并中止这些步骤。

    我们不应该直接调用 from()。 相反,我们应该调用某个内部算法,该算法将异常传回给我们在此处妥善处理,因为我们希望将它们通过管道传递给 subscriber

  4. innerObserver 为一个新的内部观察者,初始化如下:

    next 步骤

    运行 subscribernext() 方法,传入传递的 value

    error 步骤

    运行 subscribererror() 方法,传入传递的 error

    complete 步骤
    1. 如果 queue 不为空,则:

      1. nextValuequeue 中的第一项;从 queue移除此项。

      2. 运行flatmap 处理下一个值的步骤,传入 nextValuesubscribermapper 以及对 queueactiveInnerSubscription引用

    2. 否则:

      1. activeInnerSubscription 设置为 false。

        注意:因为 activeInnerSubscription 是一个引用,这实际上确保了所有后续从“外部”Observable (称为 sourceObservable)发出的值。

      2. 如果 outerSubscriptionHasCompleted 为 true,则运行 subscribercomplete() 方法。

        注意:这意味着“外部”Observable 已经完成,但尚未继续完成 subscriber,因为至少还有一个待处理的“内部”Observable (即 innerObservable)已经被排队但尚未完成。直到现在!

  5. innerOptions 为一个新的 SubscribeOptions, 其 signalsubscriber订阅控制器signal

  6. 订阅 innerObservable,传入 innerObserverinnerOptions

switchMap(mapper) 方法步骤如下:
  1. sourceObservablethis

  2. observable 为一个 新的 Observable,其 subscribe 回调 是一个接收 Subscriber subscriber 的算法,内容如下:

    1. idxunsigned long long,初始值为 0。

    2. outerSubscriptionHasCompleted布尔值,初始为 false。

    3. activeInnerAbortControllerAbortController 或 null,初始为 null。

      注意:AbortController 仅由本算法的 next steps 分配新值,仅由 switchmap process next value steps 置为 null(当“内部”Observable完成或出错时)。此变量用作当前是否有活动“内部”订阅的标记。下方的 complete steps 会用到它——如果 sourceObservable 完成时有活动“内部”订阅,则不会立即 complete subscriber,而是等“内部”订阅完成后再 complete。

    4. sourceObserver 为一个新的 内部观察者,初始如下:

      next 步骤
      1. 如果 activeInnerAbortController 非 null,则 signal abort activeInnerAbortController

        注意: 这样会“取消订阅”前一个由 sourceObservable 最近 push 的值派生的“内部”Observable,然后立即订阅由本次 push 的新 value 派生的 Observable

      2. activeInnerAbortController 为一个 新的 AbortController

      3. valuesubscribermapperactiveInnerAbortControllerouterSubscriptionHasCompletedidx引用,运行 switchmap process next value steps

        注意: switchmap process next value steps 会订阅由 value 派生的 Observable,并持续处理其值,直到 (1) 其订阅失效,或 (2) activeInnerAbortController 被中止(即 sourceObservable 推送了更新的值)。

      error 步骤

      用传入的 error 运行 subscribererror() 方法。

      complete 步骤
      1. outerSubscriptionHasCompleted 为 true。

        注意: 如果 activeInnerAbortController 非 null,则不会立刻 complete subscriber,而是由 switchmap process next value steps 在内部订阅完成时负责 complete。

      2. 如果 activeInnerAbortController 为 null,则运行 subscribercomplete() 方法。

    5. options 为一个新的 SubscribeOptions,其 signalsubscriber订阅控制器signal

    6. 订阅 sourceObservable,参数为 sourceObserveroptions

  3. 返回 observable

switchmap process next value steps,给定 any valueSubscriber subscriberMapper mapper,以及 activeInnerAbortControllerouterSubscriptionHasCompletedidx引用,执行以下步骤:
  1. mappedResult调用 mapper 的结果,参数为 «value, idx» 和 "rethrow"。

    如果抛出了异常 E,则运行 subscribererror() 方法,传入 E,并中止这些步骤。

  2. idx 设置为 idx + 1。

  3. innerObservable 为使用 mappedResult 调用 from() 的结果。

    如果抛出了异常 E,则运行 subscribererror() 方法,传入 E,并中止这些步骤。

  4. innerObserver 为一个新的内部观察者,初始化如下:

    next 步骤

    运行 subscribernext() 方法,传入传递的 value

    error 步骤

    运行 subscribererror() 方法,传入传递的 error

    注意:我们不必在此处将 activeInnerAbortController 设置为 null,以向上面的 switchMap() 方法步骤发出内部“订阅”已取消的信号。这是因为调用 subscribererror() 方法已经从“外部”源 Observable 取消订阅,因此它将无法再向 switchMap() 内部观察者推送任何值。

    complete 步骤
    1. 如果 outerSubscriptionHasCompleted 为 true,则运行 subscribercomplete() 方法。

    2. 否则,将 activeInnerAbortController 设置为 null。

      注意:因为此变量是一个引用,它向 switchMap complete 步骤发出信号,表明没有活动的内部订阅。

  5. innerOptions 为一个新的 SubscribeOptions, 其 signal 是使用 AbortSignal当前领域,从列表 «activeInnerAbortControllersignal, subscriber订阅控制器signal» 创建依赖中止信号的结果。

  6. 订阅 innerObservable,传入 innerObserverinnerOptions

inspect(inspectorUnion) 方法步骤如下:
  1. subscribe callbackVoidFunction 或 null,初始为 null。

  2. next callbackObservableSubscriptionCallback 或 null,初始为 null。

  3. error callbackObservableSubscriptionCallback 或 null,初始为 null。

  4. complete callbackVoidFunction 或 null,初始为 null。

  5. abort callbackObservableInspectorAbortHandler 或 null,初始为 null。

  6. 按如下方式处理 inspectorUnion

    如果 inspectorUnionObservableSubscriptionCallback
    1. next callback 设为 inspectorUnion

    如果 inspectorUnionObservableInspector
    1. subscribe 存在,则将 subscribe callback 设为其值。

    2. next 存在,则将 next callback 设为其值。

    3. error 存在,则将 error callback 设为其值。

    4. complete 存在,则将 complete callback 设为其值。

    5. abort 存在,则将 abort callback 设为其值。

  7. sourceObservablethis

  8. observable 为一个 新的 Observable,其 subscribe 回调 是一个接收 Subscriber subscriber 的算法,内容如下:

    1. 如果 subscribe callback 不为 null,则调用它,参数为 «» 和 "rethrow"。

      如果抛出了异常 E, 则运行 subscribererror() 方法,传入 E,并中止这些步骤。

      注意:这样做的结果是 sourceObservable 永远不会被订阅。

    2. 如果 abort callback 不为 null,则将以下中止算法添加subscriber订阅控制器signal 中:

      1. 调用 abort callback,参数为 «subscriber订阅控制器signal中止原因» 和 "report"。

    3. sourceObserver 为一个新的内部观察者,初始化如下:

      next 步骤
      1. 如果 next callback 不为 null,则调用 next callback,参数为 «传入的 value» 和 "rethrow"。

        如果抛出了异常 E, 则:

        1. subscriber订阅控制器signal移除 abort callback

          注意:此步骤很重要,因为 abort callback 仅用于消费者发起的退订。当生产者通过 subscribererror()complete() 方法终止订阅时(如下所示),我们必须确保不运行 abort callback

          这与 Chromium 的实现相匹配,但可以考虑保留对最初传入的 SubscribeOptionssignal 的引用,并在中止时才调用 abort callback。结果可能相同,但需要调查。

        2. 运行 subscribererror() 方法,传入 E,并中止这些步骤。

      2. 使用传入的 value 运行 subscribernext() 方法。

      error 步骤
      1. subscriber订阅控制器signal移除 abort callback

      2. 如果 error callback 不为 null,则调用 error callback,参数为 «传入的 error» 和 "rethrow"。

        如果抛出了异常 E,则 运行 subscribererror() 方法,传入 E,并中止这些步骤。

      3. 运行 subscribererror() 方法,传入传递的 error

      complete 步骤
      1. subscriber订阅控制器signal移除 abort callback

      2. 如果 complete callback 不为 null,则调用 complete callback,参数为 «» 和 "rethrow"。

        如果抛出了异常 E,则 运行 subscribererror() 方法,传入 E,并中止这些步骤。

      3. 运行 subscribercomplete() 方法。

    4. options 为一个新的 SubscribeOptions, 其 signalsubscriber订阅控制器signal

    5. 订阅 sourceObservable,传入 sourceObserveroptions

  9. 返回 observable

测试
catch(callback) 方法步骤如下:
  1. sourceObservablethis

  2. observable 为一个 新的 Observable,其 subscribe 回调 是一个接收 Subscriber subscriber 的算法,内容如下:

    1. sourceObserver 为一个新的 内部观察者,初始如下:

      next 步骤

      用传入的 value 运行 subscribernext() 方法。

      error 步骤
      1. 调用 callback,参数为 «传入的 error» 和 "rethrow"。 令 result 为返回值。

        如果抛出了异常 E,则 使用 E 运行 subscribererror(),并中止这些步骤。

      2. innerObservable 为使用 result 调用 from() 的结果。

        如果抛出了异常 E,则 运行 subscribererror() 方法,传入 E,并中止这些步骤。

        我们不应该直接调用 from()。 相反,我们应该调用某个内部算法,该算法将异常传回给我们在此处妥善处理,因为我们希望将它们通过管道传递给 subscriber

      3. innerObserver 为一个新的内部观察者,初始化如下:

        next 步骤

        运行 subscribernext() 方法,传入传递的 value

        error 步骤

        运行 subscribererror() 方法,传入传递的 error

        complete 步骤

        运行 subscribercomplete() 方法。

      4. innerOptions 为一个新的 SubscribeOptions, 其 signalsubscriber订阅控制器signal

      5. 订阅 innerObservable,传入 innerObserverinnerOptions

        注意:我们可以在此处自由地订阅 innerObservable,而无需先从 sourceObservable“退订”,也不用担心 sourceObservable 会继续发出值,因为所有这些都发生在与 sourceObservable 关联的错误步骤内部。这意味着 sourceObservable 已经完成了它的订阅,将不再产生任何值,我们可以安全地将我们的值源切换到 innerObservable

      complete 步骤

      运行 subscribercomplete() 方法。

    2. options 为一个新的 SubscribeOptions,其 signalsubscriber订阅控制器signal

    3. 订阅 sourceObservable,参数为 sourceObserveroptions

  3. 返回 observable

finally(callback) 方法步骤如下:
  1. sourceObservablethis

  2. observable 为一个 新的 Observable,其 subscribe 回调 是一个接收 Subscriber subscriber 的算法,内容如下:

    1. callback 运行 subscriberaddTeardown() 方法。

    2. sourceObserver 为一个新的 内部观察者,初始如下:

      next 步骤

      用传入的 value 运行 subscribernext() 方法。

      error 步骤
      1. 用传入的 error 运行 subscribererror() 方法。

      complete 步骤
      1. 运行 subscribercomplete() 方法。

    3. options 为一个新的 SubscribeOptions,其 signalsubscriber订阅控制器signal

    4. 订阅 sourceObservable,参数为 sourceObserveroptions

  3. 返回 observable

2.3.3. Promise 返回型操作符

toArray(options) 方法步骤如下:
  1. p新 promise

  2. 如果 optionssignal 不为 null:

    1. 如果 optionssignal中止

      1. 拒绝 p,原因为 optionssignal中止原因

      2. 返回 p

    2. optionssignal 添加如下中止算法:

      1. 拒绝 p,原因为 optionssignal中止原因

      注意: 这里只需要拒绝 p。对 this Observable 的订阅会自动关闭,因为 “内部” Subscriber 会因 optionssignal 中止而 关闭

  3. values 为新列表

  4. observer 为一个新的 内部观察者,初始如下:

    next 步骤

    传入的 value 添加到 values

    error 步骤

    拒绝 p,原因为传入的 error

    complete 步骤

    解决 p,值为 values

  5. 订阅 this,参数为 observeroptions

  6. 返回 p

测试
forEach(callback, options) 方法步骤如下:
  1. p新 promise

  2. visitor callback controller新的 AbortController

  3. internal options 为一个新的 SubscribeOptions,其 signal 是以下结果:从列表 «visitor callback controllersignaloptionssignal(如非 null 则包含)» 创建一个依赖中止信号(create a dependent abort signal),使用 AbortSignal当前 realm

    许多简单的 internal observers 仅作为传递节点,不控制它们所代表的 Observable 的订阅;也就是说,它们的 error stepscomplete steps 会在订阅被终止时被调用,而它们的 next steps 仅仅是将某种形式的值沿链条向后传递。

    然而对于本操作符,下面的 observernext steps 实际上负责在 callback 抛出异常时中止对 this 的底层订阅。此时,传递给“Subscribe to an Observable”的 SubscribeOptionssignal 需要是一个依赖于 optionssignal 和一个 AbortSignal (属于下面 next steps 可访问、可在必要时 signal abortAbortController)的 dependent signal

  4. 如果 internal optionssignal中止

    1. 拒绝 p,原因为 internal optionssignal中止原因

    2. 返回 p

  5. internal optionssignal 添加如下中止算法:

    1. 拒绝 p,原因为 internal optionssignal中止原因

      注意: p 的拒绝是和 internal optionssignal 绑定的,而不是 optionssignal,这样 optionsabort 事件触发时排队的 微任务会先跑完,再运行 p 的拒绝处理。

  6. idxunsigned long long,初始为 0。

  7. observer 为一个新的 内部观察者,初始如下:

    next 步骤
    1. 调用 callback,参数为 «传入的 value, idx» 和 "rethrow"。

      如果抛出了异常 E,则使用 E 拒绝 p,并使用 E 发出中止信号visitor callback controller

    2. 递增 idx

    error 步骤

    拒绝 p,原因为传入的 error

    complete 步骤

    解决 p,值为 undefined

  8. 订阅 this,参数为 observerinternal options

  9. 返回 p

测试
every(predicate, options) 方法步骤如下:
  1. p新 promise

  2. controller新的 AbortController

  3. internal options 为一个新的 SubscribeOptions, 其 signal 是以下结果:从列表 «controllersignaloptionssignal(如非 null 则包含)» 创建一个依赖中止信号(creating a dependent abort signal), 使用 AbortSignal, 以及 当前 realm

  4. 如果 internal optionssignal中止

    1. 拒绝 p,原因为 internal optionssignal中止原因

    2. 返回 p

  5. internal optionssignal 添加如下中止算法:

    1. 拒绝 p,原因为 internal optionssignal中止原因

  6. idxunsigned long long,初始为 0。

  7. observer 为一个新的 内部观察者,初始如下:

    next 步骤
    1. 调用 predicate,参数为 «传入的 value, idx» 和 "rethrow";令 passed 为返回值。

      如果抛出了异常 E,则使用 E 拒绝 p,并使用 E 发出中止信号controller

    2. idx 设置为 idx + 1。

    3. 如果 passed 为 false,则使用 false 兑现 p,并发出中止信号controller

    error 步骤

    拒绝 p,原因为传入的 error

    complete 步骤

    解决 p,值为 true。

  8. 订阅 this,参数为 observerinternal options

  9. 返回 p

first(options) 方法步骤如下:
  1. p新 promise

  2. controller新的 AbortController

  3. internal options 为一个新的 SubscribeOptions,其 signal 是以下结果:从列表 «controllersignaloptionssignal(如非 null 则包含)» 创建一个依赖中止信号(create a dependent abort signal),使用 AbortSignal当前 realm

  4. 如果 internal optionssignal 已经 aborted,则:

    1. 拒绝 p,原因为 internal optionssignal中止原因

    2. 返回 p

  5. internal optionssignal 添加如下中止算法:

    1. 拒绝 p,原因为 internal optionssignal中止原因

  6. internal observer 为一个新的 内部观察者,初始如下:

    next 步骤
    1. 解决 p,值为传入的 value

    2. signal abort controller

    error 步骤

    拒绝 p,原因为传入的 error

    complete 步骤

    拒绝 p,原因为新的 RangeError

    注意: 仅当源 Observable 在未发出任何值时即完成才会执行到此步骤。

  7. 订阅 this,参数为 internal observerinternal options

  8. 返回 p

last(options) 方法步骤如下:
  1. p新 promise

  2. 如果 optionssignal 不为 null:

    1. 如果 optionssignal 已经 aborted,则:

      1. 拒绝 p,理由为 optionssignalabort reason

      2. 返回 p

    2. optionssignal 添加如下中止算法:

      1. 拒绝 p,理由为 optionssignalabort reason

  3. lastValueany 或 null,初始为 null。

  4. hasLastValue布尔值,初始为 false。

  5. observer 为一个新的 内部观察者,初始如下:

    next 步骤
    1. hasLastValue 为 true。

    2. lastValue 为传入的 value

    error 步骤

    拒绝 p,原因为传入的 error

    complete 步骤
    1. 如果 hasLastValue 为 true,解决 p,值为 lastValue

      1. 否则,拒绝 p,原因为新的 RangeError

        注意: 参见 first() 的说明。

  6. 订阅 this,参数为 observeroptions

  7. 返回 p

find(predicate, options) 方法步骤如下:
  1. p新 promise

  2. controller新的 AbortController

  3. internal options 为一个新的 SubscribeOptions,其 signal 是以下结果:从列表 «controllersignaloptionssignal(如非 null 则包含)» 创建一个依赖中止信号(create a dependent abort signal),使用 AbortSignal当前 realm

  4. 如果 internal optionssignal中止

    1. 拒绝 p,原因为 internal optionssignal中止原因

    2. 返回 p

  5. internal optionssignal 添加如下中止算法:

    1. 拒绝 p,原因为 internal optionssignal中止原因

  6. idxunsigned long long,初始为 0。

  7. observer 为一个新的 内部观察者,初始如下:

    next 步骤
    1. 调用 predicate,参数为 «传入的 value, idx» 和 "rethrow";令 passed 为返回值。

      如果抛出了异常 E,则使用 E 拒绝 p,并使用 E 发出中止信号controller

    2. idx 设置为 idx + 1。

    3. 如果 passed 为 true,则使用 value 兑现 p,并发出中止信号controller

    error 步骤

    拒绝 p,原因为传入的 error

    complete 步骤

    解决 p,值为 undefined

  8. 订阅 this,参数为 observerinternal options

  9. 返回 p

some(predicate, options) 方法步骤如下:
  1. p新 promise

  2. controller新的 AbortController

  3. internal options 为一个新的 SubscribeOptions,其 signal 是以下结果:从列表 «controllersignaloptionssignal(如非 null 则包含)» 创建一个依赖中止信号(create a dependent abort signal),使用 AbortSignal当前 realm

  4. 如果 internal optionssignal 已经 aborted,则:

    1. 拒绝 p,理由为 internal optionssignalabort reason

    2. 返回 p

  5. internal optionssignal 添加如下中止算法:

    1. 拒绝 p,原因为 internal optionssignal中止原因

  6. idxunsigned long long,初始为 0。

  7. observer 为一个新的 内部观察者,初始如下:

    next 步骤
    1. 调用 predicate,参数为 «传入的 value, idx» 和 "rethrow";令 passed 为返回值。

      如果抛出了异常 E,则使用 E 拒绝 p,并使用 E 发出中止信号controller

    2. idx 设置为 idx + 1。

    3. 如果 passed 为 true,则使用 true 兑现 p,并发出中止信号controller

    error 步骤

    拒绝 p,原因为传入的 error

    complete 步骤

    解决 p,值为 false。

  8. 订阅 this,参数为 observerinternal options

  9. 返回 p

reduce(reducer, initialValue, options) 方法步骤如下:
  1. p新 promise

  2. controller新的 AbortController

  3. internal options 为一个新的 SubscribeOptions,其 signal 是以下结果:从列表 «controllersignaloptionssignal(如非 null 则包含)» 创建一个依赖中止信号(create a dependent abort signal),使用 AbortSignal当前 realm

  4. 如果 internal optionssignal中止

    1. 拒绝 p,原因为 internal optionssignal中止原因

    2. 返回 p

  5. internal optionssignal 添加如下中止算法:

    1. 拒绝 p,原因为 internal optionssignal中止原因

  6. idxunsigned long long,初始为 0。

  7. accumulatorinitialValue(如果有的话),否则未初始化。

  8. observer 为一个新的 内部观察者,初始如下:

    next 步骤
    1. 如果 accumulator 未初始化(意味着没有传入 initialValue),则将 accumulator 设置为传入的 value,将 idx 设置为 idx + 1,并中止这些步骤。

      注意:这意味着 reducer 不会以对象产生的第一个 value 作为 currentValue 来调用。 相反,当最终发出第二个值时,我们将以作为 currentValue,并以第一个值(我们在此处保存的)作为 accumulator 来调用 reducer

    2. 调用 reducer,参数为 «accumulator 作为 accumulator, 传入的 value 作为 currentValueidx 作为 index» 和 "rethrow"。令 result 为返回值。

      如果抛出了异常 E,则使用 E 拒绝 p,并使用 E 发出中止信号controller

    3. idx 设置为 idx + 1。

    4. accumulator 设置为 result

    error 步骤

    拒绝 p,原因为传入的 error

    complete 步骤
    1. 如果 accumulator 不为 "unset",则 解决 p,值为 accumulator

      否则,拒绝 p,原因为 TypeError

  9. 订阅 this,参数为 observerinternal options

  10. 返回 p

3. EventTarget 集成

dictionary ObservableEventListenerOptions {
  boolean capture = false;
  boolean passive;
};

partial interface EventTarget {
  Observable when(DOMString type, optional ObservableEventListenerOptions options = {});
};
when(type, options) 方法步骤如下:
  1. 如果this相关全局对象是一个Window对象,并且其关联的文档不是完全活跃,则返回。

  2. event targetthis

  3. observable 为一个 新的 Observable,初始化如下:

    subscribe 回调

    一个接收 Subscriber subscriber 的算法,步骤如下:

    1. 如果 event target 为 null,则中止这些步骤。

      注意: 这是为了捕捉 event target 在订阅时可能已被垃圾回收的情况。

    2. 如果 subscriber订阅控制器signal中止,则中止这些步骤。

    3. 添加事件监听器,参数为 event target 及如下定义的 事件监听器

      type

      type

      callback

      新建一个 Web IDL EventListener 实例,其函数类型为 (event: Event),内容为调用 observable event listener invoke algorithm,参数为 subscriberevent

      capture

      optionscapture

      passive

      optionspassive 存在,则为该值,否则为 null。

      once

      false

      signal

      subscriber订阅控制器signal

      注意: 这确保 事件监听器 会在 订阅控制器signal 中止时被清理,与引擎的所有权模型无关。

  4. 返回 observable

observable event listener invoke algorithm 接收一个 Subscriber subscriber 和一个 Event event,执行以下步骤:
  1. event 运行 subscribernext() 方法。

测试

4. 安全与隐私注意事项

本内容正在从我们的说明文档上游合并至本规范,在此期间,你可以参考以下资源:

5. 致谢

特别感谢 Ben LeshObservable API 的诸多设计建议,以及他多年维护用户空间 Observable 代码,为该功能贡献到 Web 平台做出的重要贡献。

索引

本规范定义的术语

引用文献定义的术语

参考文献

规范性引用

[DOM]
Anne van Kesteren. DOM 标准. Living Standard. URL: https://dom.spec.whatwg.org/
[ECMASCRIPT]
ECMAScript 语言规范. URL: https://tc39.es/ecma262/multipage/
[HTML]
Anne van Kesteren 等. HTML 标准. Living Standard. URL: https://html.spec.whatwg.org/multipage/
[INFRA]
Anne van Kesteren; Domenic Denicola. Infra 标准. Living Standard. URL: https://infra.spec.whatwg.org/
[WEBIDL]
Edgar Chen; Timothy Gu. Web IDL 标准. Living Standard. URL: https://webidl.spec.whatwg.org/

IDL 索引

[Exposed=*]
interface Subscriber {
  undefined next(any value);
  undefined error(any error);
  undefined complete();
  undefined addTeardown(VoidFunction teardown);

  // Subscriber 创建后为 true,直到 complete()/error() 被调用,或 subscriber 退订为止。
  // 在 complete()/error() 内部,该属性为 true。
  readonly attribute boolean active;

  readonly attribute AbortSignal signal;
};

// SubscribeCallback 是 Observable "创建者" 代码的入口。
// 在 subscribe() 被调用时触发,用于设置新订阅。
callback SubscribeCallback = undefined (Subscriber subscriber);
callback ObservableSubscriptionCallback = undefined (any value);

dictionary SubscriptionObserver {
  ObservableSubscriptionCallback next;
  ObservableSubscriptionCallback error;
  VoidFunction complete;
};

callback ObservableInspectorAbortHandler = undefined (any value);

dictionary ObservableInspector {
  ObservableSubscriptionCallback next;
  ObservableSubscriptionCallback error;
  VoidFunction complete;

  VoidFunction subscribe;
  ObservableInspectorAbortHandler abort;
};

typedef (ObservableSubscriptionCallback or SubscriptionObserver) ObserverUnion;
typedef (ObservableSubscriptionCallback or ObservableInspector) ObservableInspectorUnion;

dictionary SubscribeOptions {
  AbortSignal signal;
};

callback Predicate = boolean (any value, unsigned long long index);
callback Reducer = any (any accumulator, any currentValue, unsigned long long index);
callback Mapper = any (any value, unsigned long long index);
// 与 Mapper 只有返回值类型不同,此回调仅用于遍历序列的每个元素,不进行转换。
callback Visitor = undefined (any value, unsigned long long index);

// 此回调返回的 `any` 必须通过 Observable 的转换语义转为 Observable。
callback CatchCallback = any (any value);

[Exposed=*]
interface Observable {
  constructor(SubscribeCallback callback);
  undefined subscribe(optional ObserverUnion observer = {}, optional SubscribeOptions options = {});

  // 若 value 为以下任意类型,则构造一个原生 Observable:
  //   - Observable
  //   - AsyncIterable
  //   - Iterable
  //   - Promise
  static Observable from(any value);

  // 返回 Observable 的操作符。更多见规范中的 "Operators" 章节。
  //
  // takeUntil() 可消费 promise、iterable、async iterable 及其它 observable。
  Observable takeUntil(any value);
  Observable map(Mapper mapper);
  Observable filter(Predicate predicate);
  Observable take(unsigned long long amount);
  Observable drop(unsigned long long amount);
  Observable flatMap(Mapper mapper);
  Observable switchMap(Mapper mapper);
  Observable inspect(optional ObservableInspectorUnion inspectorUnion = {});
  Observable catch(CatchCallback callback);
  Observable finally(VoidFunction callback);

  // 返回 Promise 的操作符。
  Promise<sequence<any>> toArray(optional SubscribeOptions options = {});
  Promise<undefined> forEach(Visitor callback, optional SubscribeOptions options = {});
  Promise<boolean> every(Predicate predicate, optional SubscribeOptions options = {});
  Promise<any> first(optional SubscribeOptions options = {});
  Promise<any> last(optional SubscribeOptions options = {});
  Promise<any> find(Predicate predicate, optional SubscribeOptions options = {});
  Promise<boolean> some(Predicate predicate, optional SubscribeOptions options = {});
  Promise<any> reduce(Reducer reducer, optional any initialValue, optional SubscribeOptions options = {});
};

dictionary ObservableEventListenerOptions {
  boolean capture = false;
  boolean passive;
};

partial interface EventTarget {
  Observable when(DOMString type, optional ObservableEventListenerOptions options = {});
};

问题索引

不应直接调用 from()。 我们应该调用某个内部算法,让其返回异常以便我们在此正确处理,因为我们希望将异常传递给 subscriber
这与 Chromium 的实现一致,但请考虑持有最初传入的 SubscribeOptionssignal 的引用,并在其 abort 时只调用 abort callback。结果可能相同,但需要进一步调查。
不应直接调用 from()。 我们应该调用某个内部算法,让其返回异常以便我们在此正确处理,因为我们希望将异常传递给 subscriber