重构目标
将分散在各处的 gRPC 客户端初始化逻辑统一收敛到 ConsulService 中,通过静态变量管理所有客户端实例。每个 gRPC 客户端需要存储丰富的元信息,为后续的故障转移和实例切换提供数据支撑。
GrpcClientInterface 设计
// src/consul/interfaces/consul.interface.ts
import { BehaviorSubject } from 'rxjs';
import { ClientGrpc } from '@nestjs/microservices';
import Consul from 'consul';
export interface GrpcClientInterface {
// gRPC 客户端(BehaviorSubject 实现响应式更新)
client: BehaviorSubject<ClientGrpc | null>;
// 所属 Consul 实例名称(用于区分数据中心)
consulName: string;
// Consul 客户端实例(用于健康检查和服务发现)
consulClient: Consul.Consul;
// 服务名称(在 Consul 中注册的 service name)
serviceName: string;
// ConsulService 实例(用于调用更新方法)
consulService: ConsulService;
}
typescript
这个接口的设计考虑了后续扩展需求:不仅存储 gRPC 客户端本身,还保留了 Consul 相关的上下文信息,使得在发生故障时能够快速定位和切换实例。
ConsulService 中的 gRPC 客户端管理
初始化多个客户端
// src/consul/consul.service.ts
export class ConsulService {
// 静态变量:存储所有 gRPC 客户端
private static grpcClients: GrpcClientInterface[] = [];
// 对外暴露的访问器
static get GrpcClients(): GrpcClientInterface[] {
return ConsulService.grpcClients;
}
private options: Consul.ConsulOptions;
private services: ConsulServiceOptions | ConsulServiceOptions[];
private name: string;
private consulClient: Consul.Consul;
constructor(private readonly consulModuleOptions: ConsulModuleOptions) {
this.options = consulModuleOptions.options;
this.services = consulModuleOptions.services;
this.name = consulModuleOptions.name;
this.initConsul();
this.initClients(); // 初始化 Consul 后立即初始化 gRPC 客户端
}
/**
* 初始化所有 gRPC 客户端
* 支持单个配置或数组配置
*/
private initClients(): void {
if (Array.isArray(this.services)) {
this.services.forEach((service) => this.initGrpcClient(service));
} else {
this.initGrpcClient(this.services);
}
}
}
typescript
单个 gRPC 客户端初始化
/**
* 初始化单个 gRPC 客户端
*/
private async initGrpcClient(serviceOptions: ConsulServiceOptions): Promise<void> {
const { serviceName, packageName, protoPath } = serviceOptions;
// 创建 GrpcClientInterface 对象
const item: GrpcClientInterface = {
client: new BehaviorSubject<ClientGrpc | null>(null),
consulName: this.name,
consulClient: this.consulClient,
serviceName,
consulService: this,
};
// 通过 Consul 获取健康的服务实例
const serviceInfo = await this.getServiceInfo(serviceName);
if (serviceInfo) {
// 初始化 gRPC 客户端
const client = this.createGrpcClient(serviceInfo, packageName, protoPath);
item.client.next(client);
} else {
// TODO: 当前 Consul 中没有该服务 → 需要切换到其他数据中心查找
console.warn(`No healthy instance found for ${serviceName}`);
}
// 推入静态数组
ConsulService.grpcClients.push(item);
}
typescript
使用 RxJS 重写服务发现逻辑
原先使用 try/catch 的同步方式改为 RxJS 流式操作,更加优雅地处理异步和重试:
/**
* 获取服务的健康实例信息
* 使用 RxJS 实现带重试的服务发现
*/
async getServiceInfo(serviceName: string): Promise<ServiceInfo | null> {
return firstValueFrom(
from(this.consulClient.health.service(serviceName)).pipe(
map((healthServices: any[]) => {
if (!healthServices || healthServices.length === 0) {
throw new Error(`No service found: ${serviceName}`);
}
// 随机选择一个健康实例(简单负载均衡)
const service = healthServices[
Math.floor(Math.random() * healthServices.length)
];
return {
address: service.Service.Address,
port: service.Service.Port,
};
}),
// 重试机制:指数退避,最多 3 次
retry({
count: 3,
delay: (error, retryCount) => {
const delayDuration = Math.pow(2, retryCount) * 1000;
return timer(delayDuration);
},
}),
catchError((error) => {
console.error('Error fetching service info:', error.message);
return of(null); // 重试耗尽后返回 null
}),
),
);
}
typescript
RxJS 操作符对比
| 操作符 | 作用 | 替代旧写法 |
|---|---|---|
from() | 将 Promise 转为 Observable | await + try/catch |
map() | 转换数据结构 | 手动赋值 |
retry() | 自动重试 | 手动 for 循环 |
catchError() | 兜底错误处理 | catch 块 |
firstValueFrom() | 取第一个值转为 Promise | 直接 await |
在 ConsulCoreModule 中暴露 gRPC 客户端
// src/consul/consul-core.module.ts
import { GrpcClientInterface } from './interfaces/consul.interface';
export const GRPC_CLIENTS = 'GRPC_CLIENTS';
@Global()
@Module({})
export class ConsulCoreModule {
static forRoot(options: ConsulModuleOptions): DynamicModule {
const consulService = new ConsulService(options);
// ... 其他 provider
// 暴露 gRPC 客户端集合
const grpcClientsProvider: Provider = {
provide: GRPC_CLIENTS,
useValue: ConsulService.GrpcClients,
};
return {
module: ConsulCoreModule,
providers: [consulServicesProvider, consulClientsProvider, grpcClientsProvider],
exports: [consulServicesProvider, consulClientsProvider, grpcClientsProvider],
};
}
}
typescript
服务发现的随机负载均衡
getServiceInfo 方法中使用了随机选择策略来分配请求到不同的健康实例:
const service = healthServices[
Math.floor(Math.random() * healthServices.length)
];
typescript
这是最简单的负载均衡策略,在实例数量较少时效果不错。生产环境中可以考虑更精细的策略(如加权随机、最少连接数等)。
调试要点
- options 传递问题:确保
ConsulModuleOptions中的options属性正确传递到 ConsulService 构造函数 - services 为空:检查
forRoot()调用时是否正确传递了services配置 - gRPC 客户端初始化失败:确认 Consul 中有对应名称的健康服务实例
- BehaviorSubject 初始值为 null:首次订阅时 client 可能为 null,需要判断后再使用
↑