概述
RxJS(Reactive Extensions for JavaScript)是一个使用可观察序列编写异步和基于事件程序的库。它被 NestJS 核心模块广泛使用,也是后续实现 Prisma 断线重连等功能的基础。本节为扩展内容,系统介绍 RxJS 的核心概念。
如果已经熟悉 RxJS,可以跳过本节。
学习资源
| 资源 | 地址 | 说明 |
|---|---|---|
| 官方文档 | rxjs.dev | 英文原版,最权威 |
| 中文站 1 | rxjs.org | 官方文档的中文翻译 |
| 中文站 2 | rxjs.tech | 社区翻译,质量较好 |
| 中文站 3 | rxnodejs.cn | 侧重 Node.js 场景 |
在官方文档中,通过 References 入口可查找具体 API。部分 API 上有删除线标记,表示已废弃或即将移除,使用时需注意。
核心定义
官方描述:RxJS 是一个使用可观察序列编写异步和基于事件程序的库。
这句话的关键词拆解:
- 可观察序列 -- Observable 对象,可以被订阅和消费
- 异步 -- 类似 Promise 的异步处理能力
- 基于事件 -- 处理点击、定时器、WebSocket 等事件流
最通俗的理解:RxJS 是处理事件的 Lodash。
Lodash 封装了常用的数组/对象操作工具,RxJS 封装了常用的事件/异步操作工具。
核心概念
Observable(可观察对象)
Observable 是 RxJS 的核心类型,类似 Promise 但更强大:
| 特性 | Promise | Observable |
|---|---|---|
| 数据数量 | 单个值 | 多个值(流) |
| 取消 | 不可取消 | 可取消(unsubscribe) |
| 惰性 | 立即执行 | 订阅时才执行 |
| 操作符 | 无 | map、filter、retry 等 |
Observer(观察者)
Observer 是订阅 Observable 时提供的回调对象,包含三个方法:
const observer = {
next: (value) => console.log('收到数据:', value), // 类似 try
error: (err) => console.error('出错:', err), // 类似 catch
complete: () => console.log('完成'), // 类似 finally
};
typescript
Subscription(订阅)
调用 observable.subscribe() 返回的订阅对象,可用于取消订阅:
const subscription = observable.subscribe(observer);
subscription.unsubscribe(); // 取消订阅
typescript
Subject(主体)
Subject 既是 Observable 又是 Observer,相当于事件总线(EventEmitter),支持多播:
const subject = new Subject();
subject.subscribe(v => console.log('观察者 A:', v));
subject.subscribe(v => console.log('观察者 B:', v));
subject.next('hello'); // 两个观察者都会收到
typescript
Operator(操作符)
操作符是 RxJS 最强大的特性,分为以下类别:
- 创建操作符 --
fromEvent、timer、defer、throwError - 转换操作符 --
map、scan、mergeMap - 过滤操作符 --
filter、take、first、last、skip - 错误处理 --
catchError、retry、retryWhen - 工具操作符 --
tap、delay、timeout
Scheduler(调度器)
进阶概念,控制并发和执行时机:
- 同步执行 vs 异步执行
- 通过
setTimeout、requestAnimationFrame等控制调度策略
Pipe 管道机制
pipe 是 RxJS 中串联操作符的核心方法,将嵌套调用变为链式调用:
无 pipe(嵌套写法,不可读):
op4(op3(op2(op1(source)))
typescript
有 pipe(链式写法,清晰可读):
source.pipe(
op1(),
op2(),
op3(),
op4(),
)
typescript
管道中上一个操作符的返回值会自动传递给下一个操作符。
基础示例
事件监听对比
原生 JavaScript:
document.addEventListener('click', (event) => {
console.log(event);
});
javascript
RxJS:
import { fromEvent } from 'rxjs';
const clicks$ = fromEvent(document, 'click');
clicks$.subscribe((event) => {
console.log(event);
});
javascript
计数器(scan 操作符)
import { fromEvent } from 'rxjs';
import { scan } from 'rxjs/operators';
fromEvent(document, 'click').pipe(
scan((count) => count + 1, 0) // 类似 Array.reduce
).subscribe((count) => {
console.log(`点击次数: ${count}`);
});
javascript
scan 类似 Array.prototype.reduce,但会发射每次累加的中间值。
节流(throttleTime 操作符)
import { fromEvent } from 'rxjs';
import { throttleTime, scan } from 'rxjs/operators';
fromEvent(document, 'click').pipe(
throttleTime(1000), // 每秒最多触发一次
scan((count) => count + 1, 0)
).subscribe((count) => {
console.log(`节流后的点击计数: ${count}`);
});
javascript
值转换(map 操作符)
import { fromEvent } from 'rxjs';
import { throttleTime, map } from 'rxjs/operators';
fromEvent(document, 'click').pipe(
throttleTime(1000),
map((event) => event.clientX) // 从事件对象中提取 x 坐标
).subscribe((x) => {
console.log(`点击位置 x: ${x}`);
});
javascript
常用操作符速查
| 操作符 | 类别 | 作用 |
|---|---|---|
fromEvent | 创建 | 将 DOM 事件转为 Observable |
timer | 创建 | 定时发射值 |
defer | 创建 | 延迟创建 Observable(订阅时才创建) |
map | 转换 | 转换每个发射的值 |
scan | 转换 | 累加器(类似 reduce) |
filter | 过滤 | 按条件过滤值 |
take | 过滤 | 只取前 N 个值 |
first / last | 过滤 | 取第一个/最后一个值 |
skip | 过滤 | 跳过前 N 个值 |
throttleTime | 过滤 | 节流 |
debounceTime | 过滤 | 防抖 |
distinct | 过滤 | 去重 |
retry | 错误处理 | 出错时重试 |
catchError | 错误处理 | 捕获错误 |
与 NestJS 的关系
NestJS 内部大量使用 RxJS,可以在 node_modules/rxjs 中找到自动安装的依赖。后续在实现 PrismaModule 的断线重连时,将使用以下 RxJS API:
lastValueFrom-- 将 Observable 转为 Promise,获取最后一个值defer-- 延迟创建 Observableretry-- 出错时自动重试catchError-- 捕获并处理错误
这些 API 在 @nestjs/typeorm 和 @nestjs/mongoose 的源码中都有使用。
↑