概述
本节通过三个实操示例深入 RxJS 的核心 API,这些 API 将在后续 PrismaModule 的断线重连逻辑中直接使用。示例均在 NestJS 项目中运行,无需额外安装 RxJS(已作为 NestJS 的依赖自动安装)。
示例 1:timer 延迟执行
timer 是一个定时器 API,在指定毫秒数后发射一个值(从 0 开始):
// src/index.ts
import { timer } from 'rxjs';
const exampleTimer = timer(2000); // 2 秒后发射
exampleTimer.subscribe({
next: (value) => console.log(`timer emitted: ${value}`),
complete: () => console.log('timer complete'),
});
typescript
执行结果:
(等待 2 秒)
timer emitted: 0
timer complete
text
timer 是一个冷的 Observable,订阅时开始计时,只发射一个值后自动完成。
示例 2:错误处理 -- retry 与 catchError
模拟数据库连接场景中的错误重试机制:
import { Observable, retry, catchError, throwError } from 'rxjs';
let count = 0;
const errorObservable = new Observable((subscriber) => {
console.log(`retry ${count++}`);
subscriber.error(new Error('This is an error'));
});
const handled$ = errorObservable.pipe(
retry(3), // 出错后重试 3 次
catchError((err) => {
console.error('catchError:', err.message);
return throwError(() => new Error('Error after retried'));
}),
);
handled$.subscribe({
error: (err) =>
console.log(
`subscribe error: ${err.message || err}`,
),
});
typescript
执行结果:
retry 0 (第 1 次执行,失败)
retry 1 (retry 第 1 次)
retry 2 (retry 第 2 次)
retry 3 (retry 第 3 次)
catchError: This is an error
subscribe error: Error after retried
text
执行流程分析
初始执行 -> 失败
-> retry 第 1 次 -> 失败
-> retry 第 2 次 -> 失败
-> retry 第 3 次 -> 失败
-> 共 4 次执行(1 次初始 + 3 次重试)
-> 传递给 catchError 处理
-> 最终由 subscribe 的 error 回调接收
text
关键点: retry(3) 表示重试 3 次,加上初始执行总共会执行 4 次。catchError 位于 retry 之后,只有重试次数用尽后才会触发。
实际应用场景
- 数据库连接失败时自动重试
- HTTP 请求失败时自动重试
- 任何可能因临时故障而失败的操作
示例 3:defer 与 lastValueFrom
defer -- 延迟创建 Observable
defer 在被订阅时才创建 Observable,类似懒加载:
import { defer, Observable, timer } from 'rxjs';
import { lastValueFrom } from 'rxjs';
const deferObservable = defer(() => {
console.log('observable created');
return new Observable((subscriber) => {
subscriber.next('hello 1');
subscriber.next('hello 2');
subscriber.next('hello 3');
subscriber.complete();
});
});
typescript
lastValueFrom -- 获取最后一个值
lastValueFrom 将 Observable 转为 Promise,自动订阅并返回最后一个发射的值:
async function getDeferValue() {
const result = await lastValueFrom(deferObservable);
console.log(`defer value: ${result}`);
}
getDeferValue();
typescript
执行结果:
observable created
defer value: hello 3
text
lastValueFrom 的行为特点:
- 自动订阅 Observable
- 等待 Observable 完成(
complete) - 返回最后一个
next发射的值
complete 的作用
subscriber.complete() 的位置决定了 Observable 何时结束:
// complete 在 next 之前 -> lastValueFrom 只能拿到 complete 之前的值
new Observable((subscriber) => {
subscriber.complete(); // 先完成
subscriber.next('hello'); // 后续的 next 不会生效
});
// complete 在所有 next 之后 -> 正常拿到最后一个值
new Observable((subscriber) => {
subscriber.next('hello 1');
subscriber.next('hello 2');
subscriber.next('hello 3');
subscriber.complete(); // 最后完成
});
// lastValueFrom -> 'hello 3'
typescript
综合练习:timer + defer + lastValueFrom
目标: 让 hello 3 在 2 秒后延迟输出:
import { defer, Observable, timer, lastValueFrom } from 'rxjs';
const deferObservable = defer(() => {
console.log('observable created');
return new Observable((subscriber) => {
// 不立即 complete,等待 2 秒后执行
timer(2000).subscribe(() => {
subscriber.next('hello 3');
subscriber.complete();
});
});
});
async function getDeferValue() {
const result = await lastValueFrom(deferObservable);
console.log(`defer value: ${result}`);
}
getDeferValue();
typescript
执行结果:
observable created (立即输出)
(等待 2 秒)
defer value: hello 3 (延迟输出)
text
API 与 PrismaModule 的对应关系
| RxJS API | 在 PrismaModule 中的作用 |
|---|---|
defer | 延迟创建数据库连接,订阅时才执行 client.connect() |
lastValueFrom | 将连接 Observable 转为 Promise,获取连接成功的 Client |
retry | 连接失败时自动重试 |
catchError | 重试耗尽后处理错误 |
timer | 控制重试间隔(如每 3 秒重试一次) |
这些 API 的组合使用方式可以在 @nestjs/mongoose 的 mongoose-core.module.ts 源码中找到完整的参考实现。
↑