0%

RxSwift(1)

基础概念

The equivalence of observer pattern (Observable<Element> sequence) and normal sequences (Sequence) is the most important thing to understand about Rx.

Every Observable sequence is just a sequence. The key advantage for an Observable vs Swift’s Sequence is that it can also receive elements asynchronously. This is the kernel of the RxSwift, documentation from here is about ways that we expand on that idea.

  • Observable(ObservableType) is equivalent to Sequence
  • ObservableType.subscribe method is equivalent to Sequence.makeIterator method.
  • Observer (callback) needs to be passed to ObservableType.subscribe method to receive sequence elements instead of calling next() on the returned iterator.

这是 RxSwift 文档最开始的部分,我们可以看到,ObservableSequence 等同。ObservableType.subscribe 类似于 Sequence.makeIterator 方法。向 ObservableType.subscribe 传递的参数是 Observer,用来接收 Observable 产生的 event。

从字面意思来说,就是 Observer 订阅了 Observale,而事实也确实如此。Observable 产生序列 event,通过 ObservableType.subscribe 将序列event产生通知 Observer 。 调用 ObservableType.subscribe 会返回一个 Disposable 对象,用来随时取消订阅(subscription)。

在 Rx 中,一个 sequences 可以由 0 个或多个 event,一旦 error 或者 completed 时间发生,这个 sequences 就不能再生成新的 event。

这是 RxSwift 中定义的一些基本接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
enum Event<Element>  {
case next(Element) // next element of a sequence
case error(Swift.Error) // sequence failed with error
case completed // sequence terminated successfully
}

public protocol ObservableType : ObservableConvertibleType {
func subscribe<O: ObserverType>(_ observer: O) -> Disposable where O.E == E
}

protocol ObserverType {
func on(_ event: Event<Element>)
}

源码分析

在 RxSwift 中,Observable 是遵守 ObservableType 协议的一个具体 Class。

1
2
3
4
5
extension ObservableType {
public static func create(_ subscribe: @escaping (AnyObserver<E>) -> Disposable) -> Observable<E> {
return AnonymousObservable(subscribe)
}
}

常用的 create 方法是定义在 ObservableType 协议的 extension 中,并有默认的实现,返回一个 AnonymousObservable 对象。

AnonymousObservable 是一个 继承 Producer,而 Producer 继承自 Observable

看如下代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
let sequence = Observable<String>.create({ (observer) -> Disposable in
observer.on(.next("hello world!"))
observer.on(.completed)

return Disposables.create()
})

// 方案1
sequence.subscribe({ (event) in
print(event)
})

// 方案2
let observer = AnyObserver<String>.init(eventHandler: { (event) in
print(event)
})
sequence.subscribe(observer)

上面两个方案是等价的,因为方案1传入的闭包最终会封装成为一个 Observer。并且只要调用 Observable 的 subscribe 方法,最终都会回到方案2的方式,这就对应了 ObservableType protocol。

当然这里 Observable 的 create 方法传入的闭包中的 observer 并不是我们调用 subscribe 方法传入的 observer。RxSwift 具体实现中,会对我们传入的 observer 再进行一些封装,一般是由 Sink 的子类作为容器。这样就能方便对传入的 event 进行一些处理及操作。

数据(Event)流动

我们调用 Observable 的 subscribe 方法,就打开了数据的闸门,数据就自顶向下开始流动,中间我们可以调用例如 map、flatMap、concat 等操作符来处理数据。

这个有点像军训的时候我们报数,我们从一端开始报数,直到末端才算完成。而在 RxSwift 我们调用 subscribe 方法报数,直到最顶上。当到达了最顶上,数据开始生成(仅针对于 Cold Observabels,Hot Observabels 并不是如此,具体请参考 Hot and Cold Observables),又从最顶上通过各种操作符,依次向下传递,直到末端。

所以我们看到,数据流动是需要依赖中间每个“人”的配合才行的,如果中间某个“人”使坏,把数据给吞掉,不再向下传递,那么最终 observer 就不会收到数据。因此在 RxSwift 中,每个操作符都是被精心设计的。

整个数据流动的流程:

  1. 底部发出启动
  2. 向顶部传递启动命令
  3. 顶部收到启动命令
  4. 生成 event
  5. 自顶向下传递(包括可能的 event 处理)event
  6. 底部接收 event

需要特别说明的是第5步,一个 event 并不总是从最顶部的 Observabel 生成。例如使用了 flatMap 操作符,那么 event 可能是直接从 flatMap 方法返回的 Observabel 生成的,但是第一个 event 必定是从最顶部生成。下面是一个例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
let disposeBag = DisposeBag()

struct Player {
var score: Variable<Int>
}

let 👦🏻 = Player(score: Variable(80))
let 👧🏼 = Player(score: Variable(90))

let player = Variable(👦🏻)

// 注意 flatMap 方法
player.asObservable()
.flatMap { $0.score.asObservable() } // Change flatMap to flatMapLatest and observe change in printed output
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)

👦🏻.score.value = 85

player.value = 👧🏼

👦🏻.score.value = 95 // Will be printed when using flatMap, but will not be printed when using flatMapLatest

👧🏼.score.value = 100

输出:

1
2
3
4
5
80
85
90
95
100