2-4 高可用实践:使用RxJS优化gRPC客户端更新机制
为什么需要发布订阅模式
之前的实现中,ConsulService 在发现服务变更后会直接更新内部的 client 属性。但在 NestJS 的依赖注入体系下,其他 Service 或 Controller 引用 ConsulService 时,直接读写 client 属性存在以下问题:
| 问题 | 说明 |
|---|---|
| 状态不一致 | 多个消费者可能在不同时刻读取到不同的 client |
| 缺乏通知机制 | Client 变更时,消费者无法感知 |
| 难以测试 | 直接读写属性不利于单元测试 |
RxJS 的 BehaviorSubject 天然适合解决这类"状态变更通知"问题。
BehaviorSubject 核心概念
BehaviorSubject 是 RxJS 中的一种特殊 Subject:
| 特性 | 说明 |
|---|---|
| 初始值 | 创建时必须提供初始值(如 null) |
| 最新值缓存 | 新订阅者立即收到当前最新值 |
| 多播 | 一个更新,所有订阅者都会收到通知 |
在微服务场景中的应用
ConsulService(生产者)
└── BehaviorSubject(client)
├── AuthService(订阅者1)→ 自动获取最新 Client
├── UserController(订阅者2)→ 自动获取最新 Client
└── HealthCheck(订阅者3)→ 监控 Client 状态
text
实现方案
ConsulService 改造
import { BehaviorSubject } from 'rxjs';
@Injectable()
export class ConsulService implements OnModuleInit {
// 使用 BehaviorSubject 替代普通属性
private clientSubject = new BehaviorSubject<any>(null);
// 对外暴露 Observable(防止外部调用 next)
getClient(): Observable<any> {
return this.clientSubject.asObservable();
}
// 内部更新 Client 时通过 next 发布
private updateClient(newClient: any) {
this.clientSubject.next(newClient);
}
}
typescript
消费者订阅方式
在需要使用 gRPC Client 的 Service 中,订阅 Client 变更:
@Injectable()
export class AuthService {
private client: any;
constructor(private readonly consulService: ConsulService) {
this.consulService.getClient().subscribe((client) => {
if (client) {
this.client = client;
}
});
}
async findOne(id: string) {
return new Promise((resolve, reject) => {
this.client.findOne({ id }, (err, response) => {
if (err) reject(err);
else resolve(response);
});
});
}
}
typescript
健康服务过滤优化
从 Consul 获取的健康服务数据结构需要正确解析:
Consul.health.service() 返回结构:
{
Service: {
Address: '192.168.1.100',
Port: 40001
}
}
text
关键过滤逻辑
async updateService() {
const healthServices = await this.consul.health.service({
service: this.options.serviceName,
passing: true // 只获取健康(passing)状态的实例
});
if (healthServices.length === 0) {
// 无健康实例,触发重试
this.scheduleRetry();
return;
}
// 随机选择一个健康实例
const index = Math.floor(Math.random() * healthServices.length);
const service = healthServices[index].Service;
// 使用 service.Address 和 service.Port 创建 Client
this.initClient(service);
}
typescript
服务地址提取注意点
| 字段 | 说明 | 使用方式 |
|---|---|---|
healthServices[i].Service.Address | 服务实例 IP | 用于 gRPC 连接地址 |
healthServices[i].Service.Port | 服务实例端口 | 用于 gRPC 连接端口 |
healthServices[i].Service.ID | 服务实例 ID | 用于标识具体实例 |
Client 更新完整流程
定时健康检查触发
└── healthCheck()
├── 当前 Client 可用 → 跳过
└── 当前 Client 不可用
└── updateService()
├── Consul.health.service({ passing: true })
├── 过滤出健康实例列表
├── 随机选取一个
└── initClient(service)
├── 创建新的 gRPC Client
└── this.clientSubject.next(newClient)
├── AuthService 收到新 Client
├── 其他 Service 收到新 Client
└── 后续请求使用新 Client
text
BehaviorSubject vs 普通属性
| 维度 | 普通属性 | BehaviorSubject |
|---|---|---|
| 状态同步 | 需要手动轮询 | 自动推送最新值 |
| 多消费者 | 各消费者独立读取 | 所有消费者同步收到通知 |
| 初始值 | 可能读到 undefined | 必须有初始值(null) |
| 可测试性 | 较差 | 容易 mock 和断言 |
| 内存开销 | 极小 | 略大(维护订阅者列表) |
参考资源
- RxJS BehaviorSubject - 官方文档
- NestJS RxJS - NestJS 中的 RxJS 使用
- Consul Health Service API - 健康服务 API
↑