可观察对象 | observable

可观察对象(Observable)

可观察对象支持在应用中的发布者和订阅者之间传递消息。 在需要进行事件处理、异步编程和处理多个值的时候,可观察对象相对其它技术有着显著的优点。

可观察对象是声明式的 —— 也就是说,虽然你定义了一个用于发布值的函数,但是在有消费者订阅它之前,这个函数并不会实际执行。 订阅之后,当这个函数执行完或取消订阅时,订阅者就会收到通知。

可观察对象可以发送多个任意类型的值 —— 字面量、消息、事件。无论这些值是同步发送的还是异步发送的,接收这些值的 API 都是一样的。 由于准备(setup)和清场(teardown)的逻辑都是由可观察对象自己处理的,因此你的应用代码只管订阅并消费这些值就可以了,做完之后,取消订阅。无论这个流是击键流、HTTP 响应流还是定时器,对这些值进行监听和停止监听的接口都是一样的。

由于这些优点,可观察对象在 Angular 中得到广泛使用,也同样建议应用开发者好好使用它。

基本用法和词汇

作为发布者,你创建一个 Observable 的实例,其中定义了一个订阅者(subscriber)函数。 当有消费者调用 subscribe() 方法时,这个函数就会执行。 订阅者函数用于定义“如何获取或生成那些要发布的值或消息”。

要执行所创建的可观察对象,并开始从中接收通知,你就要调用它的 subscribe() 方法,并传入一个观察者(observer)。 这是一个 JavaScript 对象,它定义了你收到的这些消息的处理器(handler)。subscribe() 调用会返回一个 Subscription 对象,该对象具有一个 unsubscribe() 方法。 当调用该方法时,你就会停止接收通知。

下面这个例子中示范了这种基本用法,它展示了如何使用可观察对象来对当前地理位置进行更新。

Observe geolocation updates

content_copy// Create an Observable that will start listening to geolocation updates// when a consumer subscribes.const locations = new Observable((observer) => { // Get the next and error callbacks. These will be passed in when // the consumer subscribes. const {next, error} = observer; let watchId;  // Simple geolocation API check provides values to publish if ('geolocation' in navigator) { watchId = navigator.geolocation.watchPosition(next, error } else { error('Geolocation not available' }  // When the consumer unsubscribes, clean up data ready for next subscription. return {unsubscribe() { navigator.geolocation.clearWatch(watchId }};} // Call subscribe() to start listening for updates.const locationsSubscription = locations.subscribe{ next(position) { console.log('Current Position: ', position }, error(msg) { console.log('Error Getting Location: ', msg }} // Stop listening for location after 10 secondssetTimeout(() => { locationsSubscription.unsubscribe( }, 10000

定义观察者

用于接收可观察对象通知的处理器要实现 Observer 接口。这个对象定义了一些回调函数来处理可观察对象可能会发来的三种通知:

通知类型说明
next必要。用来处理每个送达值。在开始执行后可能执行零次或多次。
error可选。用来处理错误通知。错误会中断这个可观察对象实例的执行过程。
complete可选。用来处理执行完毕(complete)通知。当执行完毕后,这些值就会继续传给下一个处理器。

观察者对象可以定义这三种处理器的任意组合。如果你不为某种通知类型提供处理器,这个观察者就会忽略相应类型的通知。

订阅

只有当有人订阅 Observable 的实例时,它才会开始发布值。 订阅时要先调用该实例的 subscribe() 方法,并把一个观察者对象传给它,用来接收通知。

为了展示订阅的原理,我们需要创建新的可观察对象。它有一个构造函数可以用来创建新实例,但是为了更简明,也可以使用 Observable 上定义的一些静态方法来创建一些常用的简单可观察对象:

  • Observable.of(...items) —— 返回一个 Observable实例,它用同步的方式把参数中提供的这些值发送出来。

下面的例子会创建并订阅一个简单的可观察对象,它的观察者会把接收到的消息记录到控制台中:

Subscribe using observer

content_copy// Create simple observable that emits three valuesconst myObservable = Observable.of(1, 2, 3 // Create observer objectconst myObserver = { next: x => console.log('Observer got a next value: ' + x), error: err => console.error('Observer got an error: ' + err), complete: () => console.log('Observer got a complete notification'),}; // Execute with the observer objectmyObservable.subscribe(myObserver// Logs:// Observer got a next value: 1// Observer got a next value: 2// Observer got a next value: 3// Observer got a complete notification

另外,subscribe() 方法还可以接收定义在同一行中的回调函数,无论 nexterror 还是 complete 处理器。比如,下面的 subscribe() 调用和前面指定预定义观察者的例子是等价的。

Subscribe with positional arguments

content_copymyObservable.subscribe( x => console.log('Observer got a next value: ' + x), err => console.error('Observer got an error: ' + err), () => console.log('Observer got a complete notification')

无论哪种情况,next 处理器都是必要的,而 errorcomplete 处理器是可选的。

注意,next() 函数可以接受消息字符串、事件对象、数字值或各种结构,具体类型取决于上下文。 为了更通用一点,我们把由可观察对象发布出来的数据统称为。任何类型的值都可以表示为可观察对象,而这些值会被发布为一个

创建可观察对象

使用 Observable 构造函数可以创建任何类型的可观察流。 当执行可观察对象的 subscribe() 方法时,这个构造函数就会把它接收到的参数作为订阅函数来运行。 订阅函数会接收一个 Observer 对象,并把值发布给观察者的 next() 方法。

比如,要创建一个与前面的 Observable.of(1, 2, 3) 等价的可观察对象,你可以这样做:

Create observable with constructor

content_copy// This function runs when subscribe() is calledfunction sequenceSubscriber(observer) { // synchronously deliver 1, 2, and 3, then complete observer.next(1 observer.next(2 observer.next(3 observer.complete(  // unsubscribe function doesn't need to do anything in this // because values are delivered synchronously return {unsubscribe() {}};} // Create a new Observable that will deliver the above sequenceconst sequence = new Observable(sequenceSubscriber // execute the Observable and print the result of each notificationsequence.subscribe{ next(num) { console.log(num }, complete() { console.log('Finished sequence' }} // Logs:// 1// 2// 3// Finished sequence

如果要略微加强这个例子,我们可以创建一个用来发布事件的可观察对象。在这个例子中,订阅函数是用内联方式定义的。

Create with custom fromEvent function

content_copyfunction fromEvent(target, eventName) { return new Observable((observer) => { const handler = (e) => observer.next(e  // Add the event handler to the target target.addEventListener(eventName, handler  return () => { // Detach the event handler from the target target.removeEventListener(eventName, handler }; }}

现在,你就可以使用这个函数来创建可发布 keydown 事件的可观察对象了:

Use custom fromEvent function

content_copyconst ESC_KEY = 27; const nameInput = document.getElementById('name') as HTMLInputElement; const subscription = fromEvent(nameInput, 'keydown') .subscribe((e: KeyboardEvent) => { if (e.keyCode === ESC_KEY) { nameInput.value = ''; } }

多播

典型的可观察对象会为每一个观察者创建一次新的、独立的执行。 当观察者进行订阅时,该可观察对象会连上一个事件处理器,并且向那个观察者发送一些值。当第二个观察者订阅时,这个可观察对象就会连上一个新的事件处理器,并独立执行一次,把这些值发送给第二个可观察对象。

有时候,不应该对每一个订阅者都独立执行一次,你可能会希望每次订阅都得到同一批值 —— 即使是那些你已经发送过的。这在某些情况下有用,比如用来发送 document 上的点击事件的可观察对象。

多播用来让可观察对象在一次执行中同时广播给多个订阅者。借助支持多播的可观察对象,你不必注册多个监听器,而是复用第一个(next)监听器,并且把值发送给各个订阅者。

当创建可观察对象时,你要决定你希望别人怎么用这个对象以及是否对它的值进行多播。

来看一个从 1 到 3 进行计数的例子,它每发出一个数字就会等待 1 秒。

Create a delayed sequence

content_copyfunction sequenceSubscriber(observer) { const seq = [1, 2, 3]; let timeoutId;  // Will run through an array of numbers, emitting one value // per second until it gets to the end of the array. function doSequence(arr, idx) { timeoutId = setTimeout(() => { observer.next(arr[idx] if (idx === arr.length - 1) { observer.complete( } else { doSequence(arr, ++idx } }, 1000 }  doSequence(seq, 0  // Unsubscribe should clear the timeout to stop execution return {unsubscribe() { clearTimeout(timeoutId }};} // Create a new Observable that will deliver the above sequenceconst sequence = new Observable(sequenceSubscriber sequence.subscribe{ next(num) { console.log(num }, complete() { console.log('Finished sequence' }} // Logs:// (at 1 second): 1// (at 2 seconds): 2// (at 3 seconds): 3// (at 3 seconds): Finished sequence

注意,如果你订阅了两次,就会有两个独立的流,每个流都会每秒发出一个数字。代码如下:

Two subscriptions

content_copy// Subscribe starts the clock, and will emit after 1 secondsequence.subscribe{ next(num) { console.log('1st subscribe: ' + num }, complete() { console.log('1st sequence finished.' }} // After 1/2 second, subscribe again.setTimeout(() => { sequence.subscribe{ next(num) { console.log('2nd subscribe: ' + num }, complete() { console.log('2nd sequence finished.' } }}, 500 // Logs:// (at 1 second): 1st subscribe: 1// (at 1.5 seconds): 2nd subscribe: 1// (at 2 seconds): 1st subscribe: 2// (at 2.5 seconds): 2nd subscribe: 2// (at 3 seconds): 1st subscribe: 3// (at 3 seconds): 1st sequence finished// (at 3.5 seconds): 2nd subscribe: 3// (at 3.5 seconds): 2nd sequence finished

修改这个可观察对象以支持多播,代码如下:

Create a multicast subscriber

content_copyfunction multicastSequenceSubscriber() { const seq = [1, 2, 3]; // Keep track of each observer (one for every active subscription) const observers = []; // Still a single timeoutId because there will only ever be one // set of values being generated, multicasted to each subscriber let timeoutId;  // Return the subscriber function (runs when subscribe() // function is invoked) return (observer) => { observers.push(observer // When this is the first subscription, start the sequence if (observers.length === 1) { timeoutId = doSequence{ next(val) { // Iterate through observers and notify all subscriptions observers.forEach(obs => obs.next(val) }, complete() { // Notify all complete callbacks observers.slice(0).forEach(obs => obs.complete() } }, seq, 0 }  return { unsubscribe() { // Remove from the observers array so it's no longer notified observers.splice(observers.indexOf(observer), 1 // If there's no more listeners, do cleanup if (observers.length === 0) { clearTimeout(timeoutId } } }; };} // Run through an array of numbers, emitting one value// per second until it gets to the end of the array.function doSequence(observer, arr, idx) { return setTimeout(() => { observer.next(arr[idx] if (idx === arr.length - 1) { observer.complete( } else { doSequence(observer, arr, ++idx } }, 1000} // Create a new Observable that will deliver the above sequenceconst multicastSequence = new Observable(multicastSequenceSubscriber() // Subscribe starts the clock, and begins to emit after 1 secondmulticastSequence.subscribe{ next(num) { console.log('1st subscribe: ' + num }, complete() { console.log('1st sequence finished.' }} // After 1 1/2 seconds, subscribe again (should "miss" the first value).setTimeout(() => { multicastSequence.subscribe{ next(num) { console.log('2nd subscribe: ' + num }, complete() { console.log('2nd sequence finished.' } }}, 1500 // Logs:// (at 1 second): 1st subscribe: 1// (at 2 seconds): 1st subscribe: 2// (at 2 seconds): 2nd subscribe: 2// (at 3 seconds): 1st subscribe: 3// (at 3 seconds): 1st sequence finished// (at 3 seconds): 2nd subscribe: 3// (at 3 seconds): 2nd sequence finished

Multicasting observables take a bit more setup, but they can be useful for certain applications. Later we will look at tools that simplify the process of multicasting, allowing you to take any observable and make it multicasting.

虽然支持多播的可观察对象需要做更多的准备工作,但对某些应用来说,这非常有用。稍后我们会介绍一些简化多播的工具,它们让你能接收任何可观察对象,并把它变成支持多播的。

错误处理

由于可观察对象会异步生成值,所以用 try/catch 是无法捕获错误的。你应该在观察者中指定一个 error 回调来处理错误。发生错误时还会导致可观察对象清理现有的订阅,并且停止生成值。可观察对象可以生成值(调用 next 回调),也可以调用 completeerror 回调来主动结束。

content_copymyObservable.subscribe{ next(num) { console.log('Next num: ' + num)}, error(err) { console.log('Received an errror: ' + err)} }

在稍后的小节中会对错误处理(特别是从错误中的恢复)做更详细的讲解。