为什么需要超时控制
在微服务架构中,所有的服务调用都是远程调用。即使服务部署在同一台机器上,也可能因为以下原因导致请求无法正常响应:
- 服务卡死 -- 目标服务因内存溢出或死循环而无响应
- 资源受限 -- 数据库连接池耗尽、文件描述符用完
- 外部依赖故障 -- 第三方接口超时、网络不通
- 请求排队 -- 目标服务正在处理大量请求,新请求排队等待
如果没有超时控制,客户端会无限等待,导致线程/连接资源被耗尽,最终引发级联故障。
设置超时时间
NestJS 微服务使用 RxJS 的 timeout 操作符来实现超时控制:
import { Injectable, Inject } from '@nestjs/common';
import { ClientProxy } from '@nestjs/microservices';
import { timeout, catchError, of } from 'rxjs';
@Injectable()
export class AppService {
constructor(
@Inject('MATH_SERVICE') private client: ClientProxy,
) {}
getSum(numbers: number[]) {
return this.client
.send<number>({ cmd: 'sum' }, numbers)
.pipe(
timeout(5000), // 5 秒超时
);
}
}
typescript
当请求在 5 秒内没有收到响应时,RxJS 会抛出 TimeoutError。
捕获和处理异常
基本的错误捕获
import { Injectable } from '@nestjs/common';
import { timeout, catchError, throwError } from 'rxjs';
@Injectable()
export class AppService {
getSum(numbers: number[]) {
return this.client
.send<number>({ cmd: 'sum' }, numbers)
.pipe(
timeout(5000),
catchError(err => {
if (err.name === 'TimeoutError') {
console.error('微服务调用超时');
return of(-1); // 返回降级数据
}
return throwError(() => err);
})
);
}
}
typescript
使用 NestJS 异常过滤器
NestJS 提供了异常过滤器来统一处理微服务的异常:
import { Catch, RpcExceptionFilter, ArgumentsHost } from '@nestjs/common';
import { Observable, throwError } from 'rxjs';
import { RpcException } from '@nestjs/microservices';
@Catch()
export class MicroserviceExceptionFilter implements RpcExceptionFilter {
catch(exception: any, host: ArgumentsHost): Observable<any> {
console.error('微服务异常:', exception.message);
// 根据异常类型返回不同的错误响应
if (exception.name === 'TimeoutError') {
return throwError(() => new RpcException('服务调用超时'));
}
return throwError(() => new RpcException(exception.message));
}
}
typescript
在 Controller 上使用:
@Controller()
@UseFilters(new MicroserviceExceptionFilter())
export class AppController {
// ...
}
typescript
完整的超时和异常处理示例
import { Injectable, Inject, Logger } from '@nestjs/common';
import { ClientProxy } from '@nestjs/microservices';
import { timeout, catchError, retry, of } from 'rxjs';
@Injectable()
export class OrderService {
private readonly logger = new Logger(OrderService.name);
constructor(
@Inject('PAYMENT_SERVICE') private paymentClient: ClientProxy,
) {}
// 带超时、重试和降级的完整示例
processPayment(orderId: string, amount: number) {
return this.paymentClient
.send('process_payment', { orderId, amount })
.pipe(
timeout(5000), // 5 秒超时
retry(2), // 失败后最多重试 2 次
catchError(err => {
this.logger.error(`支付服务调用失败: ${err.message}`);
if (err.name === 'TimeoutError') {
this.logger.warn('支付服务超时,返回降级结果');
return of({ success: false, reason: 'timeout' });
}
return of({ success: false, reason: 'service_error' });
})
);
}
}
typescript
常见的 RxJS 错误处理操作符
| 操作符 | 作用 | 使用场景 |
|---|---|---|
timeout(ms) | 设置超时时间 | 防止无限等待 |
catchError() | 捕获错误并返回替代数据 | 降级处理 |
retry(n) | 失败后自动重试 n 次 | 临时性故障恢复 |
retryWhen() | 自定义重试策略 | 指数退避重试 |
throwError() | 抛出新错误 | 错误转换 |
of(value) | 返回降级数据 | 服务降级 |
优雅的错误处理策略
请求 → 超时控制(5s)→ 重试(2次)→ 捕获异常
↓
┌── 成功:返回数据
└── 失败:降级响应
text
降级响应不是简单的报错,而是返回一个可以接受的替代结果,保证系统的可用性。例如支付服务超时时,可以返回"处理中"状态,而不是直接报错给用户。
↑