13-4 作业Rxjs示例:计时器、常见操作符等
计时器操作详解
timer基础用法
核心代码解析
import { timer } from 'rxjs';
// 创建2秒后触发的单次计时器
const exampleTimer = timer(2000);
exampleTimer.subscribe({
next: value => console.log(`timer emitted: ${value}`),
complete: () => console.log('timer completed')
});
typescript
执行过程分析
- 初始化阶段:
- 调用
timer(2000)
创建Observable - 开始2000毫秒倒计时
- 调用
- 触发阶段(2000ms后):
- 发出值
0
(所有timer的初始值) - 立即触发complete通知
- 发出值
- 输出结果:
timer emitted: 0 timer completed
text
扩展应用
// 周期性计时器(延迟1秒后,每秒触发)
const intervalTimer = timer(1000, 1000).subscribe(
val => console.log(`Interval #${val}`)
);
// 带初始值的计时器
timer(0, 1000).pipe(
map(v => v + 1) // 从1开始计数
).subscribe(console.log);
typescript
💡 关键知识点:
- 第一个参数决定首次触发延迟
- 第二个参数决定后续触发间隔
- 不提供间隔参数时变为单次触发
- 发射的值是从0开始的递增整数
生命周期回调
完整示例代码
import { timer } from 'rxjs';
const periodicTimer = timer(0, 1000).subscribe({
next: val => console.log(`Tick #${val+1}`), // 从1开始计数
error: err => console.error('Error occurred:', err),
complete: () => console.log('Stream ended')
});
// 5秒后取消订阅
setTimeout(() => {
periodicTimer.unsubscribe();
console.log('Subscription closed');
}, 5000);
typescript
执行流程分析
内存管理实践
- 自动清理方案:
import { takeUntil, Subject } from 'rxjs';
const destroy$ = new Subject();
timer(0, 1000).pipe(
takeUntil(destroy$)
).subscribe(console.log);
// 组件销毁时
destroy$.next();
destroy$.complete();
typescript
- 框架集成示例(Angular):
@Component({...})
export class TimerComponent implements OnDestroy {
private destroy$ = new Subject();
constructor() {
timer(0, 1000).pipe(
takeUntil(this.destroy$)
).subscribe(console.log);
}
ngOnDestroy() {
this.destroy$.next();
this.destroy$.complete();
}
}
typescript
💡 最佳实践:
- 始终处理complete回调以释放资源
- 在框架中使用
takeUntil
模式管理订阅 - 避免在回调中执行耗时操作(改用
subscribeOn
) - 浏览器环境优先使用
animationFrameScheduler
实现UI动画计时器
常见问题解答
Q:为什么timer从0开始计数?
A:这是RxJS的设计约定,所有递增类操作符(如interval)都从0开始,保持一致性
Q:如何实现倒计时功能?
const countdown = timer(0, 1000).pipe(
take(5),
map(v => 5 - v - 1)
);
// 输出:4,3,2,1,0
typescript
Q:timer与interval有何区别?
特性 | timer | interval |
---|---|---|
首次触发 | 可配置延迟 | 立即触发 |
参数 | (delay, period?) | (period) |
适用场景 | 延迟执行/周期执行 | 严格周期执行 |
Q:如何测试timer相关代码?
import { TestScheduler } from 'rxjs/testing';
// 使用虚拟时间测试
new TestScheduler((actual, expected) => {
expect(actual).toEqual(expected);
}).run(({ expectObservable }) => {
const source = timer(1000, 1000);
expectObservable(source.pipe(take(3))).toBe(
'1s a 999ms b 999ms (c|)',
{ a: 0, b: 1, c: 2 }
);
});
typescript
延伸学习资源
通过掌握这些计时器操作,您已经能够处理以下实际场景:
- 页面倒计时功能
- 轮询请求接口数据
- 用户操作防抖/节流
- 动画帧率控制
错误处理机制深度解析
重试与捕获模式详解
核心代码增强版
import { throwError, retry, catchError, timer } from 'rxjs';
const errorObservable = new Observable(subscriber => {
console.log('Executing at:', new Date().toISOString());
subscriber.error('Network timeout');
});
errorObservable.pipe(
retry({
count: 3,
delay: (error, retryCount) => {
console.log(`Retry #${retryCount} after 1s`);
return timer(1000); // 指数退避可改为:timer(1000 * Math.pow(2, retryCount))
}
}),
catchError(err => {
console.error(`Critical error after retries: ${err}`);
return throwError(() => ({
status: 'FAILED',
originalError: err,
timestamp: new Date()
}));
})
).subscribe({
error: err => console.log('Final error:', err),
complete: () => console.log('This will never execute')
});
typescript
执行流程分析
关键改进点
- 增强的重试配置:
- 支持延迟重试(网络请求场景必备)
- 可自定义重试条件(如仅重试5xx错误)
- 错误信息增强:
- 保留原始错误信息
- 添加时间戳等元数据
- 实际应用场景:
// HTTP请求重试逻辑 this.http.get('/api/data').pipe( retryWhen(errors => errors.pipe( mergeMap((error, i) => i > 2 ? throwError(error) : timer(1000 * i) ) )) )
typescript
重试次数跟踪高级用法
带状态管理的重试
class RetryTracker {
private attempts = 0;
track(observable$: Observable<any>) {
return new Observable(subscriber => {
this.attempts++;
console.log(`Attempt ${this.attempts} started`);
const subscription = observable$.subscribe({
next: val => subscriber.next(val),
error: err => {
console.log(`Attempt ${this.attempts} failed`);
subscriber.error(err);
},
complete: () => subscriber.complete()
});
return () => subscription.unsubscribe();
});
}
}
// 使用示例
const tracker = new RetryTracker();
const apiCall$ = /* 模拟API请求 */;
tracker.track(apiCall$).pipe(
retry(2)
).subscribe();
typescript
重试策略对比表
策略 | 优点 | 缺点 | 适用场景 |
---|---|---|---|
固定次数重试 | 实现简单 | 可能加重服务器负担 | 临时性网络抖动 |
指数退避重试 | 减轻服务器压力 | 实现复杂 | 高并发系统 |
基于错误类型重试 | 精准控制 | 需要错误分类 | 混合错误类型系统 |
无限重试 | 保证最终成功 | 可能无限循环 | 关键任务处理 |
错误处理最佳实践
1. 复合错误处理管道
import { of } from 'rxjs';
source$.pipe(
catchError(err => {
if (err.status === 404) {
return of(defaultData); // 降级处理
} else if (err.status === 401) {
return authService.refreshToken(); // 令牌刷新
} else {
return throwError(() => err); // 继续抛出
}
}),
retry(2),
finalize(() => console.log('Cleanup resources'))
);
typescript
2. 错误边界处理(React集成示例)
// ErrorBoundary组件中
componentDidCatch(error, info) {
this.error$ = throwError(() => ({
error,
componentStack: info.componentStack
}));
}
// 子组件中
this.props.data$.pipe(
catchError(err => {
this.context.reportError(err);
return EMPTY;
})
).subscribe();
typescript
常见问题解决方案
Q:如何避免重复登录弹窗?
let isShowingModal = false;
catchError(err => {
if (err.status === 401 && !isShowingModal) {
isShowingModal = true;
return showLoginModal().pipe(
finalize(() => isShowingModal = false)
);
}
return throwError(() => err);
})
typescript
Q:如何实现请求超时控制?
import { timeout, catchError } from 'rxjs';
httpRequest$.pipe(
timeout(5000),
catchError(err =>
err.name === 'TimeoutError'
? fallbackAction()
: throwError(() => err)
)
)
typescript
Q:如何测试错误处理逻辑?
// 使用jasmine-marbles
it('should retry 3 times', () => {
const source = cold('--#--#--#--#', null, new Error());
const expected = '------------(c|)';
expect(source.pipe(
retry(3),
catchError(() => of('complete'))
)).toBeObservable(expected);
});
typescript
延伸学习资源
通过掌握这些错误处理技术,您可以构建健壮的应用程序,有效处理以下场景:
- 网络请求失败自动恢复
- 第三方服务不可用降级处理
- 用户令牌过期自动刷新
- 关键业务操作最终一致性保证
延迟创建模式深度解析
defer基础应用增强版
核心机制图解
高级应用场景
import { defer, of, throwError } from 'rxjs';
// 动态选择数据源
const config = { useMock: true };
const dynamicSource$ = defer(() =>
config.useMock
? of('mock data')
: throwError(() => new Error('No real data available'))
);
// 条件性AJAX请求
const conditionalRequest$ = defer(() =>
user.isLoggedIn
? ajax('/api/protected')
: of({ status: 'unauthorized' })
);
typescript
性能优化技巧
// 惰性加载heavy模块
const lazyHeavy$ = defer(() =>
import('./heavy-calculation').then(
({ calculate }) => calculate(input)
)
);
// 配合Promise使用
const promiseBased$ = defer(() =>
fetch('/api/data').then(res => res.json())
);
typescript
异步值更新高级模式
带取消能力的实现
const cancellableDefer$ = defer(() => {
console.log('Factory executed');
return new Observable(subscriber => {
const timerId = setTimeout(() => {
subscriber.next('Delayed value');
subscriber.complete();
}, 2000);
return () => {
clearTimeout(timerId);
console.log('Cleanup timer');
};
});
});
const subscription = cancellableDefer$.subscribe();
setTimeout(() => subscription.unsubscribe(), 1000); // 输出"Cleanup timer"
typescript
多阶段异步操作
const multiStage$ = defer(() => {
let stage = 0;
return new Observable(subscriber => {
const intervalId = setInterval(() => {
subscriber.next(`Stage ${++stage}`);
if (stage === 3) {
subscriber.complete();
clearInterval(intervalId);
}
}, 1000);
});
});
lastValueFrom(multiStage$).then(console.log); // 最终输出"Stage 3"
typescript
实际应用案例
1. 用户权限检查
const secureResource$ = defer(() => {
return authService.checkPermission()
? fetchResource()
: of({ error: 'Forbidden' });
});
typescript
2. 动态主题加载
const themeLoader$ = defer(() => {
const themeName = localStorage.getItem('theme') || 'light';
return import(`./themes/${themeName}.css`);
});
typescript
3. 竞态条件处理
const safeRequest$ = defer(() => {
let active = true;
const controller = new AbortController();
return new Observable(subscriber => {
fetch('/api', { signal: controller.signal })
.then(res => active && subscriber.next(res))
.catch(err => active && subscriber.error(err));
return () => {
active = false;
controller.abort();
};
});
});
typescript
对比其他创建方式
特性 | defer | from | new Observable |
---|---|---|---|
执行时机 | 订阅时 | 创建时 | 创建时 |
工厂函数 | 必须 | 无 | 可选 |
Promise转换 | 支持 | 直接支持 | 需手动实现 |
内存效率 | 高 | 中等 | 取决于实现 |
典型应用场景 | 条件创建 | 已知数据转换 | 完全自定义流 |
常见问题解答
Q:defer和from有什么本质区别?
A:from
在创建时就确定数据源,而defer
推迟到订阅时才确定,这对动态数据源至关重要
Q:如何避免defer重复执行?
// 使用shareReplay缓存
const cached$ = defer(() => expensiveCall()).pipe(
shareReplay(1)
);
typescript
Q:defer适合处理同步操作吗?
A:虽然可以,但同步操作更推荐of
或from
,defer的真正价值在于处理异步或有条件的操作
Q:lastValueFrom会丢失中间值吗?
// 收集所有值的方案
defer(() => of(1, 2, 3)).pipe(
toArray()
).subscribe(console.log); // 输出[1, 2, 3]
typescript
性能优化指南
- 避免嵌套defer:
// 反模式 defer(() => defer(() => of(1))); // 正确做法 defer(() => of(1));
typescript - 配合调度器使用:
defer(() => of('data').pipe( observeOn(animationFrameScheduler) ));
typescript - 内存泄漏检查:
const leakCheck$ = defer(() => { console.log('Factory called'); return new Observable(); }); // 应该看到unsubscribe日志 const sub = leakCheck$.subscribe(); setTimeout(() => sub.unsubscribe(), 1000);
typescript
延伸学习资源
通过掌握这些延迟创建技术,您可以实现:
- 按需加载减少初始包体积
- 动态配置系统参数
- 安全地处理用户权限相关操作
- 优化高频操作的内存使用
综合应用场景深度解析
1. 带重试的API请求(增强版)
完整实现方案
import { fromFetch } from 'rxjs/fetch';
import { retry, catchError, switchMap, timer } from 'rxjs/operators';
const smartApi$ = (url: string) => fromFetch(url).pipe(
switchMap(response =>
response.ok
? response.json()
: throwError(() => new Error(`HTTP ${response.status}`))
),
retry({
count: 3,
delay: (error, retryCount) => {
console.warn(`Retry ${retryCount} for ${url}`);
return timer(1000 * retryCount); // 指数退避
}
}),
catchError(err => {
console.error('Final failure:', err);
return of({
status: 'fallback',
data: null,
timestamp: Date.now()
});
})
);
// 使用示例
smartApi$('https://api.example.com/data').subscribe({
next: data => updateUI(data),
error: err => showErrorAlert(err)
});
typescript
关键改进点
- 智能重试:仅重试服务器错误(5xx)
- 响应验证:检查HTTP状态码
- 错误分类:区分网络错误和业务错误
- 回退数据:提供优雅降级方案
实际应用场景
2. 用户操作防抖(专业版)
增强实现
import { fromEvent, Subject } from 'rxjs';
import { debounceTime, switchMap, takeUntil, distinctUntilChanged } from 'rxjs/operators';
class SearchService {
private destroy$ = new Subject();
initSearch(inputEl: HTMLInputElement) {
fromEvent(inputEl, 'input').pipe(
map(e => (e.target as HTMLInputElement).value.trim()),
distinctUntilChanged(),
debounceTime(300),
switchMap(query =>
query ? this.fetchResults(query) : of([])
),
takeUntil(this.destroy$)
).subscribe(results => {
this.renderResults(results);
});
}
destroy() {
this.destroy$.next();
this.destroy$.complete();
}
private fetchResults(query: string) {
return fromFetch(`/api/search?q=${encodeURIComponent(query)}`).pipe(
switchMap(res => res.json())
);
}
}
typescript
优化特性
- 输入过滤:忽略空白和重复查询
- 内存管理:组件销毁时自动取消订阅
- 空查询处理:立即返回空结果
- TypeScript强类型:完整类型推断
性能对比
方案 | 内存占用 | 请求次数 | 响应速度 |
---|---|---|---|
原始实现 | 高 | 多 | 快 |
增强版 | 低 | 少 | 适中 |
无防抖方案 | 极高 | 极多 | 最快 |
3. 组合计时器(工业级实现)
完整计时器系统
import { timer, Subject } from 'rxjs';
import { takeUntil, map, tap, finalize, startWith } from 'rxjs/operators';
class CountdownSystem {
private remaining = 5;
private stop$ = new Subject();
start() {
return timer(0, 1000).pipe(
map(() => --this.remaining),
startWith(this.remaining),
tap(count => {
if (count <= 0) this.stop$.next();
}),
takeUntil(this.stop$),
finalize(() => this.cleanup())
);
}
private cleanup() {
console.log('释放计时器资源');
this.stop$.complete();
}
}
// 使用示例
const countdown = new CountdownSystem();
countdown.start().subscribe({
next: val => console.log(`剩余: ${val}秒`),
complete: () => console.log('倒计时结束')
});
// 可随时停止
setTimeout(() => countdown.stop$.next(), 3000);
typescript
高级功能
- 双向控制:支持外部停止
- 状态保持:显示剩余时间
- 资源安全:确保清理回调执行
- 可复用设计:支持多个独立计时器
计时器类型对比
类型 | 精度 | 内存 | 适用场景 |
---|---|---|---|
基本timer | 毫秒级 | 低 | 简单倒计时 |
动画计时器 | 帧级 | 中 | UI动画 |
Web Worker计时器 | 纳秒级 | 高 | 高精度科学计算 |
系统时钟 | 秒级 | 最低 | 长时间后台任务 |
前沿技术整合
1. 使用WebSocket实时更新
const liveData$ = defer(() => {
const ws = new WebSocket('wss://stream.example.com');
return new Observable(subscriber => {
ws.onmessage = e => subscriber.next(e.data);
ws.onerror = e => subscriber.error(e);
return () => ws.close();
});
}).pipe(
retryWhen(errors => errors.pipe(delay(1000)))
);
typescript
2. 配合GraphQL使用
import { request } from 'graphql-request';
const gqlQuery$ = defer(() =>
request('/graphql', `{ posts { title content } }`)
).pipe(
catchError(err => of({ posts: [] }))
);
typescript
3. Web Worker集成
const heavyCompute$ = defer(() => {
const worker = new Worker('./compute.worker');
return new Observable(subscriber => {
worker.onmessage = e => {
subscriber.next(e.data);
subscriber.complete();
};
worker.postMessage(inputData);
});
});
typescript
调试与优化技巧
1. 可视化调试工具
import { tap } from 'rxjs/operators';
source$.pipe(
tap({
next: val => console.log('值流:', val),
error: err => console.error('错误:', err),
complete: () => console.log('完成'),
subscribe: () => console.log('订阅开始'),
unsubscribe: () => console.log('订阅结束')
})
);
typescript
2. 性能检测方案
const timed$ = source$.pipe(
finalize(() =>
console.timeEnd('observable-lifecycle')
)
);
console.time('observable-lifecycle');
typescript
3. 内存泄漏检测
const leakCheck$ = source$.pipe(
finalize(() =>
console.log('Observable被清理')
)
);
// 应该看到清理日志
const sub = leakCheck$.subscribe();
setTimeout(() => sub.unsubscribe(), 1000);
typescript
延伸学习路径
通过这些综合应用,您可以构建:
- 弹性分布式系统
- 实时数据仪表盘
- 复杂用户交互界面
- 高性能计算任务
↑