重试逻辑的设计思考
在微服务架构中,服务间调用失败是常态。原先的拦截器存在几个设计缺陷:
- 重试过于即时:请求失败后立刻重试,没有给 Consul 健康检查留出状态更新时间
- 重试无上限:当微服务不可用时,如果大量用户同时请求,拦截器会疯狂重试,导致流量雪崩
- 耦合过紧:拦截器需要知道具体更新哪个微服务实例,引入了复杂的元数据传递机制
设计原则:大道至简
核心思路是砍掉不需要的复杂传递机制——拦截器不需要知道具体是哪个微服务出了问题,它的职责只有一个:发现错误 → 延迟重试 → 达到上限后返回错误。
改造后的拦截器实现
完整代码
// src/common/interceptors/grpc-retry.interceptor.ts
import {
Injectable,
NestInterceptor,
ExecutionContext,
CallHandler,
} from '@nestjs/common';
import { Observable, timer, of, throwError } from 'rxjs';
import { switchMap, retry, catchError } from 'rxjs/operators';
@Injectable()
export class GrpcRetryInterceptor implements NestInterceptor {
intercept(context: ExecutionContext, next: CallHandler): Observable<any> {
return next.handle().pipe(
catchError((error) => {
const isConnectionIssue = this.checkIfConnectionIssue(error);
if (isConnectionIssue) {
// 微服务连接异常 → 进入重试流程
return this.handleConnectionIssue(context, next);
}
// 非微服务异常 → 直接抛出
console.error('Non-gRPC error:', of(error));
return throwError(() => error);
}),
);
}
/**
* 判断错误是否来自 gRPC 连接问题
*/
private checkIfConnectionIssue(error: any): boolean {
const connectionErrorCodes = [
'UNAVAILABLE',
'DEADLINE_EXCEEDED',
'INTERNAL',
'UNKNOWN',
];
return (
error?.code &&
connectionErrorCodes.some((code) => error.code.toString().includes(code))
);
}
/**
* 处理连接异常的重试逻辑
* - 延迟 1 秒后重试
* - 最多重试 3 次
* - 每次重试的延迟时间指数增长(2^n 秒)
*/
private handleConnectionIssue(
_context: ExecutionContext,
next: CallHandler,
): Observable<any> {
return timer(1000).pipe(
// 延迟 1 秒,给 Consul 健康检查留出时间
switchMap(() => next.handle()),
// 重试机制
retry({
count: 3, // 最多重试 3 次
delay: (error, retryCount) => {
// 指数退避:2^retryCount 秒
const delayDuration = Math.pow(2, retryCount) * 1000;
console.log(
`Retry attempt ${retryCount}, delay ${delayDuration}ms`,
);
return timer(delayDuration);
},
}),
// 重试耗尽后返回最终错误
catchError(() =>
throwError(() => new Error('Service update timeout')),
),
);
}
}
typescript
指数退避策略详解
重试延迟采用指数退避(Exponential Backoff)算法,每次重试的等待时间是前一次的 2 倍:
| 重试次数 | 延迟时间 | 累计等待 |
|---|---|---|
| 第 1 次 | 2 秒 | 3 秒(含初始 1 秒) |
| 第 2 次 | 4 秒 | 7 秒 |
| 第 3 次 | 8 秒 | 15 秒 |
为什么用指数退避而不是固定间隔?
- 给 Consul Server 留出足够的健康检查和状态同步时间
- 避免在服务不可用时产生大量并发重试请求
- 第 1 次重试间隔较短(服务可能只是临时抖动),后续间隔逐渐加长
RxJS 操作符说明
整个重试流程使用了 RxJS 的流式操作,比传统的 try/catch + setTimeout 更加简洁:
timer(1000) // 等待 1 秒
→ switchMap() // 切换到实际请求
→ retry({count, delay}) // 带延迟的重试
→ catchError() // 兜底错误处理
text
| 操作符 | 作用 |
|---|---|
timer(1000) | 延迟 1 秒执行,给系统缓冲时间 |
switchMap() | 切换到新的 Observable(实际的服务调用) |
retry({count, delay}) | 重试控制,支持自定义延迟函数 |
catchError() | 捕获最终错误,返回错误响应 |
of() | 将普通值包装为 Observable |
执行流程
延迟时间参数调优
初始延迟时间可以根据实际网络环境调整:
| 场景 | 初始延迟 | 建议值 |
|---|---|---|
| 局域网内微服务 | 短延迟 | 200-500ms |
| 跨机房微服务 | 中等延迟 | 500-1000ms |
| 跨区域公网 | 长延迟 | 1000-2000ms |
全局注册拦截器
// app.module.ts
import { Module } from '@nestjs/common';
import { APP_INTERCEPTOR } from '@nestjs/core';
import { GrpcRetryInterceptor } from './common/interceptors/grpc-retry.interceptor';
@Module({
providers: [
{
provide: APP_INTERCEPTOR,
useClass: GrpcRetryInterceptor,
},
],
})
export class AppModule {}
typescript
与旧版拦截器的对比
| 对比项 | 旧版实现 | 新版实现 |
|---|---|---|
| 重试机制 | try/catch + setTimeout | RxJS 流式操作 |
| 重试延迟 | 无延迟,立即重试 | 指数退避,递增延迟 |
| 重试上限 | 无限制 | 3 次 |
| 需要知道具体服务 | 是(通过元数据传递) | 否(通用拦截器) |
| 流量保护 | 无 | 有(指数退避防止雪崩) |
↑