可观察对象 | Observable

Observable (可观察对象)

Observables 是多个值的惰性推送集合。它填补了下面表格中的空白:

单个值多个值
拉取FunctionIterator
推送PromiseObservable

示例 - 当订阅下面代码中的 Observable 的时候会立即(同步地)推送值123,然后1秒后会推送值4,再然后是完成流:

var observable = Rx.Observable.create(function (observer) { observer.next(1 observer.next(2 observer.next(3 setTimeout(() => { observer.next(4 observer.complete( }, 1000 }

要调用 Observable 并看到这些值,我们需要订阅 Observable:

var observable = Rx.Observable.create(function (observer) { observer.next(1 observer.next(2 observer.next(3 setTimeout(() => { observer.next(4 observer.complete( }, 1000 } console.log('just before subscribe' observable.subscribe{ next: x => console.log('got value ' + x), error: err => console.error('something wrong occurred: ' + err), complete: () => console.log('done'), } console.log('just after subscribe'

控制台执行的结果:

just before subscribe got value 1 got value 2 got value 3 just after subscribe got value 4 done

拉取 (Pull) vs. 推送 (Push)

拉取推送是两种不同的协议,用来描述数据**生产者 (Producer)如何与数据消费者 (Consumer)**进行通信的。

什么是拉取? - 在拉取体系中,由消费者来决定何时从生产者那里接收数据。生产者本身不知道数据是何时交付到消费者手中的。

每个 JavaScript 函数都是拉取体系。函数是数据的生产者,调用该函数的代码通过从函数调用中“取出”一个单个返回值来对该函数进行消费。

ES2015 引入了 generator 函数和 iterators (function*),这是另外一种类型的拉取体系。调用 iterator.next() 的代码是消费者,它会从 iterator(生产者) 那“取出”多个值。

生产者消费者
拉取被动的: 当被请求时产生数据。主动的: 决定何时请求数据。
推送主动的: 按自己的节奏产生数据。被动的: 对收到的数据做出反应。

什么是推送? - 在推送体系中,由生产者来决定何时把数据发送给消费者。消费者本身不知道何时会接收到数据。

在当今的 JavaScript 世界中,Promises 是最常见的推送体系类型。Promise(生产者) 将一个解析过的值传递给已注册的回调函数(消费者),但不同于函数的是,由 Promise 来决定何时把值“推送”给回调函数。

RxJS 引入了 Observables,一个新的 JavaScript 推送体系。Observable 是多个值的生产者,并将值“推送”给观察者(消费者)。

  • Function 是惰性的评估运算,调用时会同步地返回一个单一值。

Observables 作为函数的泛化

与流行的说法正好相反,Observables 既不像 EventEmitters,也不像多个值的 Promises 。在某些情况下,即当使用 RxJS 的 Subjects 进行多播时, Observables 的行为可能会比较像 EventEmitters,但通常情况下 Observables 的行为并不像 EventEmitters 。

Observables 像是没有参数, 但可以泛化为多个值的函数。

考虑如下代码:

function foo() { console.log('Hello' return 42; } var x = foo.call( // 等同于 foo() console.log(x var y = foo.call( // 等同于 foo() console.log(y

我们期待看到的输出:

"Hello" 42 "Hello" 42

你可以使用 Observables 重写上面的代码:

var foo = Rx.Observable.create(function (observer) { console.log('Hello' observer.next(42 } foo.subscribe(function (x) { console.log(x } foo.subscribe(function (y) { console.log(y }

输出是一样的:

"Hello" 42 "Hello" 42

这是因为函数和 Observables 都是惰性运算。如果你不调用函数,console.log('Hello') 就不会执行。Observables 也是如此,如果你不“调用”它(使用 subscribe),console.log('Hello') 也不会执行。此外,“调用”或“订阅”是独立的操作:两个函数调用会触发两个单独的副作用,两个 Observable 订阅同样也是触发两个单独的副作用。EventEmitters 共享副作用并且无论是否存在订阅者都会尽早执行,Observables 与之相反,不会共享副作用并且是延迟执行。

订阅 Observable 类似于调用函数。

一些人声称 Observables 是异步的。那不是真的。如果你用日志包围一个函数调用,像这样:

console.log('before' console.log(foo.call() console.log('after'

你会看到这样的输出:

"before" "Hello" 42 "after"

使用 Observables 来做同样的事:

console.log('before' foo.subscribe(function (x) { console.log(x } console.log('after'

输出是:

"before" "Hello" 42 "after"

这证明了 foo 的订阅完全是同步的,就像函数一样。

Observables 传递值可以是同步的,也可以是异步的。

那么 Observable 和 函数的区别是什么呢?Observable 可以随着时间的推移“返回”多个值,这是函数所做不到的。你无法这样:

function foo() { console.log('Hello' return 42; return 100; // 死代码,永远不会执行 }

函数只能返回一个值。但 Observables 可以这样:

var foo = Rx.Observable.create(function (observer) { console.log('Hello' observer.next(42 observer.next(100 // “返回”另外一个值 observer.next(200 // 还可以再“返回”值 } console.log('before' foo.subscribe(function (x) { console.log(x } console.log('after'

同步输出:

"before" "Hello" 42 100 200 "after"

但你也可以异步地“返回”值:

var foo = Rx.Observable.create(function (observer) { console.log('Hello' observer.next(42 observer.next(100 observer.next(200 setTimeout(() => { observer.next(300 // 异步执行 }, 1000 } console.log('before' foo.subscribe(function (x) { console.log(x } console.log('after'

输出:

"before" "Hello" 42 100 200 "after" 300

结论:

  • func.call() 意思是 "同步地给我一个值"

Observable 剖析

Observables 是使用 Rx.Observable.create 或创建操作符创建的,并使用观察者来订阅它,然后执行它并发送 next / error / complete 通知给观察者,而且执行可能会被清理。这四个方面全部编码在 Observables 实例中,但某些方面是与其他类型相关的,像 Observer (观察者) 和 Subscription (订阅)。

Observable 的核心关注点:

  • 创建 Observables

创建 Observables

Rx.Observable.createObservable 构造函数的别名,它接收一个参数:subscribe 函数。

下面的示例创建了一个 Observable,它每隔一秒会向观察者发送字符串 'hi'

var observable = Rx.Observable.create(function subscribe(observer) { var id = setInterval(() => { observer.next('hi') }, 1000 }

Observables 可以使用 create 来创建, 但通常我们使用所谓的创建操作符, 像 offrominterval、等等。

在上面的示例中,subscribe 函数是用来描述 Observable 最重要的一块。我们来看下订阅是什么意思。

订阅 Observables

示例中的 Observable 对象 observable 可以订阅,像这样:

observable.subscribe(x => console.log(x)

observable.subscribeObservable.create(function subscribe(observer) {...}) 中的 subscribe 有着同样的名字,这并不是一个巧合。在库中,它们是不同的,但从实际出发,你可以认为在概念上它们是等同的。

这表明 subscribe 调用在同一 Observable 的多个观察者之间是不共享的。当使用一个观察者调用 observable.subscribe 时,Observable.create(function subscribe(observer) {...}) 中的 subscribe 函数只服务于给定的观察者。对 observable.subscribe 的每次调用都会触发针对给定观察者的独立设置。

订阅 Observable 像是调用函数, 并提供接收数据的回调函数。

这与像 addEventListener / removeEventListener 这样的事件处理方法 API 是完全不同的。使用 observable.subscribe,在 Observable 中不会将给定的观察者注册为监听器。Observable 甚至不会去维护一个附加的观察者列表。

subscribe 调用是启动 “Observable 执行”的一种简单方式, 并将值或事件传递给本次执行的观察者。

执行 Observables

Observable.create(function subscribe(observer) {...})...的代码表示 “Observable 执行”,它是惰性运算,只有在每个观察者订阅后才会执行。随着时间的推移,执行会以同步或异步的方式产生多个值。

Observable 执行可以传递三种类型的值:

  • "Next" 通知: 发送一个值,比如数字、字符串、对象,等等。

"Next" 通知是最重要,也是最常见的类型:它们表示传递给观察者的实际数据。"Error" 和 "Complete" 通知可能只会在 Observable 执行期间发生一次,并且只会执行其中的一个。

这些约束用所谓的* Observable 语法合约*表达最好,写为正则表达式是这样的:

next*(error|complete)?

在 Observable 执行中, 可能会发送零个到无穷多个 "Next" 通知。如果发送的是 "Error" 或 "Complete" 通知的话,那么之后不会再发送任何通知了。

下面是 Observable 执行的示例,它发送了三个 "Next" 通知,然后是 "Complete" 通知:

var observable = Rx.Observable.create(function subscribe(observer) { observer.next(1 observer.next(2 observer.next(3 observer.complete( }

Observable 严格遵守自身的规约,所以下面的代码不会发送 "Next" 通知 4

var observable = Rx.Observable.create(function subscribe(observer) { observer.next(1 observer.next(2 observer.next(3 observer.complete( observer.next(4 // 因为违反规约,所以不会发送 }

subscribe 中用 try/catch 代码块来包裹任意代码是个不错的主意,如果捕获到异常的话,会发送 "Error" 通知:

var observable = Rx.Observable.create(function subscribe(observer) { try { observer.next(1 observer.next(2 observer.next(3 observer.complete( } catch (err) { observer.error(err // 如果捕获到异常会发送一个错误 } }

清理 Observable 执行

因为 Observable 执行可能会是无限的,并且观察者通常希望能在有限的时间内中止执行,所以我们需要一个 API 来取消执行。因为每个执行都是其对应观察者专属的,一旦观察者完成接收值,它必须要一种方法来停止执行,以避免浪费计算能力或内存资源。

当调用了 observable.subscribe ,观察者会被附加到新创建的 Observable 执行中。这个调用还返回一个对象,即 Subscription (订阅):

var subscription = observable.subscribe(x => console.log(x)

Subscription 表示进行中的执行,它有最小化的 API 以允许你取消执行。想了解更多订阅相关的内容,请参见 Subscription 类型。使用 subscription.unsubscribe() 你可以取消进行中的执行:

var observable = Rx.Observable.from([10, 20, 30] var subscription = observable.subscribe(x => console.log(x) // 稍后: subscription.unsubscribe(

当你订阅了 Observable,你会得到一个 Subscription ,它表示进行中的执行。只要调用 unsubscribe() 方法就可以取消执行。

当我们使用 create() 方法创建 Observable 时,Observable 必须定义如何清理执行的资源。你可以通过在 function subscribe() 中返回一个自定义的 unsubscribe 函数。

举例来说,这是我们如何清理使用了 setInterval 的 interval 执行集合:

var observable = Rx.Observable.create(function subscribe(observer) { // 追踪 interval 资源 var intervalID = setInterval(() => { observer.next('hi' }, 1000 // 提供取消和清理 interval 资源的方法 return function unsubscribe() { clearInterval(intervalID }; }

正如 observable.subscribe 类似于 Observable.create(function subscribe() {...}),从 subscribe 返回的 unsubscribe 在概念上也等同于 subscription.unsubscribe。事实上,如果我们抛开围绕这些概念的 ReactiveX 类型,保留下来的只是相当简单的 JavaScript 。

function subscribe(observer) { var intervalID = setInterval(() => { observer.next('hi' }, 1000 return function unsubscribe() { clearInterval(intervalID }; } var unsubscribe = subscribe{next: (x) => console.log(x)} // 稍后: unsubscribe( // 清理资源

为什么我们要使用像 Observable、Observer 和 Subscription 这样的 Rx 类型?原因是保证代码的安全性(比如 Observable 规约)和操作符的可组合性。