《深入浅出RxJS》读书笔记

rxjs的引入 // 如果以这种方式导入rxjs,那么整个库都会导入,我们一般不可能在项目中运用到rxjs的所有功能 const Rx = require('rxjs'); 解决这个问题,可以使用深链deep link的方式,只导入用的上的功能 import {Observable} from 'rxjs/Observable'; 这样可以减少不必要的依赖,不光可以优化打包文件的大小,还有利于代码的稳定性 另外目前最新的一种解决方案就是Tree Shaking, Tree Shaking只对import语句导入产生作用,对require不起作用。因为tree shaking的工作方式是对代码静态分析,import只能出现在代码的第一层,不能出现在if分支中。而require可以出现在if分支中,参数也是可以动态产生的字符串,所以只能动态执行时才知道require函数式如何执行的,这里Tree Shaking就不起作用了。 rxjs中Tree Shaking不起作用 实际项目中,如果不会使用很多RxJS的功能,建议还是避免导入全部RxJS的做法,使用npm导入然后通过打包工具来组合 Observer的完结 为了让Observable有机会告诉Observer已经没有更多数据了,需要有另外一种通信机制。在Rxjs中,实现这种通信机制的就是Observer的complete函数 如果你没法预测你的程序会不会出现异常,那么就需要使用error参数,如果不需要可以直接给个Null作为第二个参数 const theObserver = { next: item => console.log(item), null, complete: () => console.log('No More Data') }; 何时完结这个Observable对象需要Observable主动调用complete() 在Observable发生error之后,不再会调用后面的complete().因为在一个Observable对象中,要么是完结状态,要么是出错状态。一旦进入出错那么就终结了。 Observable 可观察的对象 Observer 观察者 联系两者的桥梁就是subscribe 在Rxjs中,发布者就是Observable,观察者就是subscribe函数,这样就可以吧观察者和发布者联系起来 如何取消订阅 const Observable = Rx.Observable; const onSubscribe = observer => { let number = 1; const handle = setInterval(() => { observer.next(number++); }, 1000); return { unsubscribe: () => { clearInterval(handle); } }; }; const source$ = new Observable(onSubscribe); const subscription = source$.subscribe(item => console.log(item)); setTimeout(() => { subscription.unsubscribe(); }, 3500); Observable产生的事件,只有Observer通过subscribe订阅之后才会收到,在unsubscribe之后就不再收到 Hot Observable和Cold Observable 如果一个Observable对象同时有多个Observer订阅,如果A在B之前订阅,那么B该不该订阅到错过的那些数据流。 如果错过就错过了那么这样的Observable成为Hot,但是如果B仍然从头开始订阅这个Observable那么这样的成为Cold 如果每次订阅的时候, 已经有⼀个热的“⽣产者”准备好了, 那就是Hot Observable, 相反,如果每次订阅都要产⽣⼀个新的⽣产者, 新的⽣产者就像汽车引擎⼀样刚启动时肯定是冷的, 所以叫Cold Observable 复杂的问题可以被分解为三个小问题 如何产生事件 如何响应事件 什么样的发布者关联什么样的观察者,也就是何时调用subscribe Observable产生的事件,只有Observer通过subscribe订阅之后才会收到,在unsubscribe之后就不会收到 Observable.create()用来创建一个Observable对象 在RXJS中,和数组的map一样,作为操作符的map也接受一个函数参数,不同之处是,对于数组的map,是把每个数组元素都映射为一个新的值,组合成一个新的数组 操作符分类 创建类(creation) 转化类(transformation) 过滤类(filtering) 合并类(combination) 多播类(multicasting) 错误处理类(error Handling) 辅助⼯具类(utility) 条件分⽀类(conditional&boolean) 数学和合计类(mathmatical&aggregate) 静态操作符的导入路径rxjs/add/observable/ 实例操作符的导入路径rxjs/add/operator/ 在链式调用中,静态操作符只能出现在首位,实例操作符则可以出现在任何位置。 Tree Shaking帮不上Rxjs什么忙,因为Tree Shaking只能做静态代码检查,并不是程序运行时去检测一个函数是否真的被调用、只有一个函数在任何代码中都没有引用过,才会认为这个函数不会被引用。但是RxJS任何一个操作符都是挂在Observable类或者Observable.prototype上的, 赋值给Observable或者Observable.prototype上某个属性在Tree Shaking看来就是被引⽤, 所以, 所 有的操作符, 不管真实运⾏时是否被调⽤, 都会被Tree Shaking认为是会⽤到的代码, 也就不会当做死代码删除。 退订资源的基本原则:当不再需要某个Observable对象获取数据的时候,就要退订这个Observable对象 在对上游的数据处理中,利用try...catch...的组合捕获project调用的可能的错误,如果真的有错误,那就调用下游的error函数 const sub = this.subscribe({ next: value => { try{ observer.next(project(value)) }catch(err) { observer.error(err); } }, error: err => observer.error(err), complete: () => observer.complete() }); 关联Observable 给Observable打补丁 这种方式比较简单,可以直接绑定在prototype上,如果是静态属性直接绑定在类上面 使用bind绑定特定的Observable对象 // 比如我们自己创建了一个map方法 function map(project) { return new Observable(observer => { const sub = this.subscribe({ next: value => observer.next(project(value)), error: err => observer.next(error), complete: () => observer.complete() }); return { unsubscribe: () => { sub.unsubscribe(); } }; }); } // 这个时候我们就可以主动使用bind改变this的指向 const result$ = map.bind(source$)(x => x * 2); // 或者直接使用call const result$ = map.call(source$, x => x * 2); 使用lift lift是Observable的实例函数,它会返回一个新的Observable对象,通过传递给lift的函数参数可以赋予这个新的Observable对象特殊的功能 function map(project) { return this.lift(function(source$) { return source$.subscribe({ next: value => { try{ this.next(project(value)); }catch(err) { this.error(err); } }, error: err => this.error(error), complete: () => this.complete() }); }); } Observable.prototype.map = map; create Observable.create() 其实就是简单的调用了Observable的构造函数 Observable.create = function(subscribe) { return new Observable(subscribe); } of range range(1, 10) 从1开始吐出10个数据 range(1.5, 3) 从1.5开始吐出3个数据,每次加1 generate generate类似一个for循环,设定一个初始值,每次递增这个值,知道满足某个条件为止 使用generate实现range功能 const range = (start, count) => { const max = start + count; return Observable.generate( start, value => value < max, value => value + 1, value => value ); }; 所有能够使用for循环完成的操作,都可以使用generate来实现 repeat 重复数据的数据流 const source$ = Observable.of(1,2,3); const repeated$ = source$.repeat(10); // 将source$中的数据流重复10遍 empty() 产生一个直接完结的Observable对象 throw() 产生的Observable对象什么都不做,直接抛出错误 never() 产生的Observable对象什么也不做,既不吐出数据,也不产生错误 interval() 接受一个数值类型的参数,代表产生数据的间隔毫秒数 timer() 第一个参数可以是一个数值,表示多少毫秒之后吐出第一个数值0 如果存在第二个参数,那就会产生一个持续吐出数据的Observable对象,第二个参数就是时间间隔 // 2s后。每隔1s产生一个数值,该数值从0开始递增 const source$ = Observable.timer(2000, 1000); from() 可以将一切转化为Observable fromPromise() 可以将Promise对象转化为Observable对象,Promise如果成功则调用正常的成功回调,如果失败则调用失败的回调 fromEvent() 将DOM事件转化为Observable对象中的数据 // 将点击事件转化为Observable const source$ = Observble.fromEvent(document.querySelector('#id'), 'click'); ajax() 用来将ajax的返回转化为Observable对象 repeatWhen() 接受一个函数作为参数,这个函数在上游第一次产生异常是被调用,这个函数应该返回一个Observable对象 const notifier = () => { return Observable.interval(1000); }; const source$ = Observable.of(1,2,3); const repeat$ = source$.repeatWhen(notifier); defer() 当defer产生的Observable对象呗订阅的时候,defer的函数参数就会被调用,逾期这个函数返回另外一个Observable const observableFactory = () => Observable.of(1,2,3); const source$ = Observable.defer(observableFacatory); 合并类操作符 不少合并类操作符都有两种形式,既提供静态操作符,又提供实例操作符。 concat concat可以将多个Observable的数据内容一次合并 const source1$ = Observable.of(1,2,3); const source2$ = Observable.of(4,5,6); const concated$ = source1$.concat(source2$); // 或者静态操作符 const concated$ = Observable.concat(source1$, source2$); concat开始从下一个Observable抽取数据是发生在前一个Observable对象完结之后,所以参与到这个concat之中的Observable对象应该都能完结。如果一个Observable对象不完结,那排在后面的Observable对象永远没有上场的机会 // source1$不完结,永远轮不到source2$上场 const source1$ = Observable.interval(1000); const source2$ = Observable.of(1); const concated$ = source1$.concat(source2$); merge 先到先得快速通过 merge同样支持静态和实例形式的操作符 const Observable = Rx.Observable; const source1$ = Observable.timer(0, 1000).map(x => x + 'A'); const source2$ = Observable.timer(500, 1000).map(x => x + 'B'); const merged$ = Observable.merge(source1$, source2$); merged$.subscribe(console.log, null, () => console.log('complete')); merge第一时间会subscribe上游所有的Observable,然后才去先到先得的策略,任何一个Observable只要有数据下来,就会传给下游的Observable对象 image merge的第一个Observable如果产生的是同步数据流,那会等第一个同步数据流产生完毕之后,再回合并下一个Observable对象,因此merge的主要适用场景仍然是异步数据流。一个比较常用的场景就是用于合并DOM事件 merge还有一个可选的参数concurrent,用于指定同时合并的Observable对象的个数 const source1$ = Observable.timer(0, 1000).map(x => x+'A'); const source2$ = Observable.timer(500, 1000).map(x => x+'B'); const source3$ = Observable.timer(1000, 1000).map(x => x+'C'); const merged$ = source1$.merge(source2$, source3$, 2); merged$.subscribe(console.log, null, () => console.log('complete')); // 0A 0B 1A 1B 2A 2B... 这里就限定了优先合并2个Observable对象。而第一二个又不会完结,所以source3$没有出头之日。 zip zip将上游的两个Obserable合并,并且将他们中的数据一一对应。 // 基本用法 const source1$ = Observable.of(1,2,3); const source2$ = Observable.of(4,5,6); const zipped$ = Observable.zip(source1$, source2$); zipped$.subscribe(console.log, null, () => console.log('completed')); // [1,4] [2,5] [3,6] completed 当使用zip的时候,它会立刻订阅上游Observable,然后开始合并数据。对于zip而言上游任何一个Observable完结,zip只要给这个完结的Observable对象吐出所有的数据找到配对的数据,那么zip就会给下游一个complete信号 const source1$ = Observable.interval(1000); const source2$ = Observable.of('a', 'b', 'c'); // [0, 'a'] [1, 'b'] [2, 'c'] complete 但是这里也会有一个问题,如果某个上游的source1$吐出的数据很快,但是source$2吐出的数据慢,那么zip就不得不先存储source1$的数据 如果使用zip组合超过两个Observable对象,游戏规则依然一样,组合而成的Observable吐出的数据依然是数组 combineLatest 合并最后一个数据,从所有输入Observable对象中那最后一个产生的数据(最新数据),然后把这些数据组合起来传给下游。 const source1$ = Observable.timer(500, 1000); const source2$ = Observable.timer(1000, 1000); const result$ = source1$.combineLatest(source2$); image image 我们也可以自由的定制下游数据 const source1$ = Observable.timer(500, 1000); const source2$ = Observable.timer(1000, 1000); const project = (a, b) => `${a} and ${b}`; const result$ = source1$.combineLatest(source2$, project); 多重依赖的问题: const original$ = Observable.timer(0, 1000); const source1$ = original$.map(x => x + 'a'); const source2$ = original$.map(x => x + 'b'); const result$ = source1$.combineLatest(source2$); withLatestFrom 功能类似于combineLatest,但是给下游推送数据只能由一个 const source1$ = Observable.timer(0, 2000).map(x => 1000 * x); const source2$ = Observable.timer(500, 1000); const result$ = source1$.withLatestFrom(source2$, (a,b) => a + b); // 101 203 305 407... race 第一个吐出数据的Observable对象就是胜者,race产生的Observable就会完全采用Observable对象的数据,其余的输入Observable对象则会被退订而抛弃。 const source1$ = Observable.timer(0, 2000).map(x => x + 'a'); const source2$ = Observable.timer(500, 2000).map(y => y + 'b'); const winner$ = source1$.race(source2$); winner$.subscribe(console.log); // 1a 2a 3a... startWith 让一个Observable对象在被订阅的时候,总是先吐出指定的若干数据 const origin$ = Observable.timer(0, 1000); const result$ = origin$.startWith('start'); // start // 0 // 1 startWith的操作符就是为了满足链式调用的需求 original$.map(x => x * 2).startWith('start').map(x => x + 'ok'); forkJoin 只有当所有的Observable对象都完结,确定不会有新的数据产生的时候,forkJoin就会把所有输入的Observable对象产生的最后一个数据合并成给下游唯一的数据 const source1$ = Observable.interval(1000).map(x => x + 'a').take(1); const source2$ = Observable.interval(1000).map(x => x + 'b').take(3); const concated$ = Observable.forkJoin(source1$, source2$); concated$.subscribe(console.log); // ["0a", "2b"] 高阶Observable 所谓高阶Observable,指的就是产生数据依然是Observable的Observable // 高阶Observable示例 const ho$ = Observable.interval(1000).take(2) .map(x => Observable.interval(1500).map(y => x + ':' + y)); concatAll 会对其内部的Observable对象做concat操作 const ho$ = Observable.interval(1000) .take(2) .map(x => Observable.interval(1500).map(y => x+':'+y).take(2)); const concated$ = ho$.concatAll(); // 0:0 0:1 1:0 1:1 concatAll首先会订阅上游产生的第一个内部的Observable对象,抽取其中的数据,然后只有当第一个Observable完结的时候才回去订阅第二个Observable。这样很容易产生数据积压 mergeAll 和concatAll()功能类似,但是只要上游产生了数据,mergeAll就会立即订阅 switch switch的含义就是切换,总是切换到最新的内部Observable对象获取数据。每当switch的上游高阶Observable产生一个内部Observable对象,witch都会⽴刻订阅最新的内部Observable对象上, 如果已经订阅了之前的内部Observable对象, 就会退订那个过时的内部Observable对象, 这个“⽤上新的, 舍弃旧的”动作, 就是切换。 const ho$ = Observable.interval(1000) .take(2) .map(x => Observable.interval(1500).map(y => x+':'+y).take(2)); const result$ = ho$.switch(); exhaust exhaust在耗尽当前内部Observable数据之前不会切换到下一个内部Observable对象。和switch一样,exhaust产生的Observable对象完结前提是最新的内部Observable对象完结而且上游高阶Observable对象完结 count 统计上游Observable对象吐出所有数据的个数 const source$ = Observable.of(1,2,3).concat(Observable.of(4,5,6)); const count$ = source$.count(); // 6 max, min 取的最小值和最大值 reduce 规约统计 const source$ = Observable.range(1, 100); const reduced$ = source$.reduce((acc, current) => acc + current, 0); // 参数基本和js中的一致 find和findIndex 在某些情况下,我们希望可以将find和findIndex结合在一起,我们可以这样做 const source$ = Observable.of(3,1,4,1,5,9); const isEven = x => x % 2 === 0; const find$ = source$.find(isEven); const findIndex$ = source$.findIndex(isEven); const zipped$ =find$.zip(findIndex$); defaultIfEmpty defaultIfEmpty()除了检测上游Observable对象是否为空之外,还要接受一个默认值作为参数,如果上游Observable对象是空的,那就把默认值吐出来 const new$ = source$.defaultIfEmpty('this is default'); filter 过滤 first 如果first不接受参数,那么就是获取的上游的第一个数据 如果first接受函数作为参数,那么就会获取上游数据中满足函数条件的第一个数据 last 工作方式与first刚好相反,从上游数据的末尾开始寻找符合条件的元素 takeWhile 接受一个判定函数作为参数 const source$ = Observable.range(1, 100); const takeWhile$ = source$.takeWhile( value => value % 2 === 0 ); takeUtil takeUtil是一个里程碑式的过滤类操作符,因为takeUtil让我们可以用Observable对象来控制另一个Observable对象的数据产生 在RxJS中,创建类操作符是数据流的源头,其余所有操作符最重要的三类就是合并类、过滤类和转化类。 map map用来改变数据流中的数据,具有一一对应的映射功能 const source$ = Rx.Observable.of(1,2,3); // 注意这里只能使用普通函数,箭头函数中的this是绑定在执行环境上的,无法获取context中的值 const mapFunc = function(value, index) { return `${value} ${this.separator} ${index}`; } const context = {separator: ':'}; const result$ = source$.map(mapFunc, context); result$.subscribe( console.log, null, () => console.log('complete') ); mapTo 无论上游产生什么数据,传给下游的都是同样的数据 // 将result$中的数据都映射成A const result$ = source$.mapTo('A'); pluck pluck就是把上游数据中特定字段的值拔出来 const source$ = Rx.Observable.of( {name: 'RxJS', version: 'v4'}, {name: 'React', version: 'v15'}, {name: 'React', version: 'v16'}, {name: 'RxJS', version: 'v5'} ); const result$ = source$.pluck('name'); result$.subscribe( console.log, null, () => console.log('complete') ); // RxJS // React // React // RxJS // complete 上面的代码中,pluck方法将对象中的键对应的值获取出来 获取DOM事件中的值 const click$ = Rx.Observable.fromEvent(document, 'click'); const result$ = click$.pluck('target', 'tagName'); // HTML 将上游数据放在数组中传给下游操作符 bufferTime 用一个参数来指定产生缓冲窗口的间隔 const source$ = Rx.Observable.timer(0, 100); // 参数400,就会把时间划分为连续的400毫秒长度区块,上游传来的数据不会直接传给下游,而是在该时间区块的开始就新建一个数组对象推送给下游 const result$ = source$.bufferTime(400); 如果上游在短时间内产生了大量的数据,那bufferTime就会有很大的内存压力,为了防止出现这种情况,bufferTime还支持第三个可选参数,用于指定每个时间区间内缓存的最多数据个数 const result$ = source$.bufferTime(400, 200, 2); bufferCount 根据个数来界定 bufferWhen 接受一个函数作为参数,这个参数名为closingSelector bufferToggle buffer 将上游数据放在Observable中传给下游的操作符
50000+
5万行代码练就真实本领
17年
创办于2008年老牌培训机构
1000+
合作企业
98%
就业率

联系我们

电话咨询

0532-85025005

扫码添加微信