gRPC

gRPC 是一种现代开源的高性能 RPC 框架,可在任何环境中运行。它能够高效地连接数据中心内及跨数据中心的服务,并支持可插拔的负载均衡、链路追踪、健康检查和身份验证功能。

与许多 RPC 系统类似,gRPC 的核心思想是通过可远程调用的函数(方法)来定义服务。您需要为每个方法定义参数和返回类型。这些服务、参数和返回类型都通过 Google 开源的与语言无关的 protocol buffers 机制,在 .proto 文件中进行定义。

通过 gRPC 传输器,Nest 使用 .proto 文件动态绑定客户端和服务器,从而轻松实现远程过程调用,并自动对结构化数据进行序列化和反序列化。

安装

要开始构建基于 gRPC 的微服务,首先需要安装所需的软件包:

$ npm i --save @grpc/grpc-js @grpc/proto-loader

概述

与其他 Nest 微服务传输层实现类似,您可以通过传递给 createMicroservice() 方法的选项对象中的 transport 属性来选择 gRPC 传输机制。在以下示例中,我们将设置一个英雄服务。options 属性提供了有关该服务的元数据;其属性描述见下文

@@filename(main)
const app = await NestFactory.createMicroservice<MicroserviceOptions>(AppModule, {
  transport: Transport.GRPC,
  options: {
    package: 'hero',
    protoPath: join(__dirname, 'hero/hero.proto'),
  },
});

info 提示 join() 函数是从 path 包导入的;Transport 枚举是从 @nestjs/microservices 包导入的。

nest-cli.json 文件中,我们添加了 assets 属性以允许分发非 TypeScript 文件,以及 watchAssets 属性用于开启对所有非 TypeScript 资源的监视。在本例中,我们希望 .proto 文件能自动复制到 dist 文件夹。

{
  "compilerOptions": {
    "assets": ["**/*.proto"],
    "watchAssets": true
  }
}

选项

gRPC 传输器选项对象公开了以下描述的属性。

示例 gRPC 服务

让我们定义名为 HeroesService 的示例 gRPC 服务。在上述 options 对象中,protoPath 属性设置了指向协议定义文件 hero.proto 的路径。该 hero.proto 文件采用 protocol buffers 格式编写,其内容如下:

// hero/hero.proto
syntax = "proto3";

package hero;

service HeroesService {
  rpc FindOne (HeroById) returns (Hero) {}
}

message HeroById {
  int32 id = 1;
}

message Hero {
  int32 id = 1;
  string name = 2;
}

我们的 HeroesService 公开了一个 FindOne() 方法。该方法期望接收一个 HeroById 类型的输入参数,并返回一个 Hero 消息(协议缓冲区使用 message 元素来定义参数类型和返回类型)。

接下来我们需要实现该服务。为了定义一个满足此要求的处理程序,我们在控制器中使用 @GrpcMethod() 装饰器,如下所示。该装饰器提供了将方法声明为 gRPC 服务方法所需的元数据。

info 提示 在前面的微服务章节中介绍的 @MessagePattern() 装饰器( 了解更多 )不适用于基于 gRPC 的微服务。对于基于 gRPC 的微服务,@GrpcMethod() 装饰器有效地取代了它的位置。

@@filename(heroes.controller)
@Controller()
export class HeroesController {
  @GrpcMethod('HeroesService', 'FindOne')
  findOne(data: HeroById, metadata: Metadata, call: ServerUnaryCall<any, any>): Hero {
    const items = [
      { id: 1, name: 'John' },
      { id: 2, name: 'Doe' },
    ];
    return items.find(({ id }) => id === data.id);
  }
}

info 提示 @GrpcMethod() 装饰器是从 @nestjs/microservices 包导入的,而 MetadataServerUnaryCall 则来自 grpc 包。

上述装饰器接收两个参数。第一个是服务名称(例如 'HeroesService'),对应 hero.proto 文件中的 HeroesService 服务定义。第二个参数(字符串 'FindOne')对应 hero.proto 文件里 HeroesService 服务中定义的 FindOne() rpc 方法。

findOne() 处理方法接收三个参数:调用者传递的 data 数据、存储 gRPC 请求元数据的 metadata,以及用于获取 GrpcCall 对象属性(如向客户端发送元数据的 sendMetadata)的 call 参数。

@GrpcMethod() 装饰器的两个参数都是可选的。如果调用时不传第二个参数(例如 'FindOne'),Nest 会根据处理方法名自动将其转换为大驼峰命名(例如将 findOne 处理方法关联到 FindOne rpc 调用定义)来关联 .proto 文件中的 rpc 方法。如下所示。

@@filename(heroes.controller)
@Controller()
export class HeroesController {
  @GrpcMethod('HeroesService')
  findOne(data: HeroById, metadata: Metadata, call: ServerUnaryCall<any, any>): Hero {
    const items = [
      { id: 1, name: 'John' },
      { id: 2, name: 'Doe' },
    ];
    return items.find(({ id }) => id === data.id);
  }
}

你也可以省略第一个 @GrpcMethod() 参数。在这种情况下,Nest 会根据定义处理程序的 class 名称,自动将该处理程序与 proto 定义文件中的服务定义关联起来。例如,在以下代码中,类 HeroesService 会基于名称 'HeroesService' 的匹配,将其处理程序方法与 hero.proto 文件中的 HeroesService 服务定义相关联。

@@filename(heroes.controller)
@Controller()
export class HeroesService {
  @GrpcMethod()
  findOne(data: HeroById, metadata: Metadata, call: ServerUnaryCall<any, any>): Hero {
    const items = [
      { id: 1, name: 'John' },
      { id: 2, name: 'Doe' },
    ];
    return items.find(({ id }) => id === data.id);
  }
}

客户端

Nest 应用程序可以作为 gRPC 客户端,使用定义在 .proto 文件中的服务。你可以通过 ClientGrpc 对象访问远程服务。获取 ClientGrpc 对象有多种方式。

首选技术是导入 ClientsModule 模块。使用 register() 方法将.proto 文件中定义的服务包绑定到注入令牌,并进行服务配置。name 属性即为注入令牌。对于 gRPC 服务,需使用 transport: Transport.GRPC 配置。options 属性是一个对象,其包含的属性与前文所述相同。

imports: [
  ClientsModule.register([
    {
      name: 'HERO_PACKAGE',
      transport: Transport.GRPC,
      options: {
        package: 'hero',
        protoPath: join(__dirname, 'hero/hero.proto'),
      },
    },
  ]),
];

提示 register() 方法接收对象数组作为参数。通过提供以逗号分隔的注册对象列表,可同时注册多个服务包。

注册完成后,我们可以通过 @Inject() 注入配置好的 ClientGrpc 对象。然后使用该对象的 getService() 方法获取服务实例,如下所示。

@Injectable()
export class AppService implements OnModuleInit {
  private heroesService: HeroesService;

  constructor(@Inject('HERO_PACKAGE') private client: ClientGrpc) {}

  onModuleInit() {
    this.heroesService = this.client.getService<HeroesService>('HeroesService');
  }

  getHero(): Observable<string> {
    return this.heroesService.findOne({ id: 1 });
  }
}

error 警告 :除非在 proto 加载器配置中将 keepCase 选项设置为 true(即微服务传输器配置中的 options.loader.keepcase),否则 gRPC 客户端不会发送名称中包含下划线 _ 的字段。

请注意,与其他微服务传输方法相比存在细微差异。我们不再使用 ClientProxy 类,而是改用提供 getService() 方法的 ClientGrpc 类。这个 getService() 泛型方法接收服务名称作为参数,并返回其实例(如果可用)。

或者,您也可以使用 @Client() 装饰器来实例化 ClientGrpc 对象,如下所示:

@Injectable()
export class AppService implements OnModuleInit {
  @Client({
    transport: Transport.GRPC,
    options: {
      package: 'hero',
      protoPath: join(__dirname, 'hero/hero.proto'),
    },
  })
  client: ClientGrpc;

  private heroesService: HeroesService;

  onModuleInit() {
    this.heroesService = this.client.getService<HeroesService>('HeroesService');
  }

  getHero(): Observable<string> {
    return this.heroesService.findOne({ id: 1 });
  }
}

最后,对于更复杂的场景,我们可以使用 ClientProxyFactory 类注入动态配置的客户端,具体方法如此处所述。

无论是哪种情况,我们最终都会获得一个指向 HeroesService 代理对象的引用,该对象暴露了与 .proto 文件内定义的相同方法集。当我们访问这个代理对象(即 heroesService)时,gRPC 系统会自动序列化请求、将其转发至远程系统、返回响应并反序列化响应结果。由于 gRPC 为我们屏蔽了这些网络通信细节,heroesService 的表现就如同本地服务提供者一般。

请注意,所有服务方法都采用小驼峰命名法 (遵循语言的自然约定)。例如,虽然我们的 .proto 文件中 HeroesService 定义包含 FindOne() 函数,但 heroesService 实例将提供 findOne() 方法。

interface HeroesService {
  findOne(data: { id: number }): Observable<any>;
}

消息处理器也可以返回一个 Observable,在这种情况下,结果值将持续发射直到流完成。

@@filename(heroes.controller)
@Get()
call(): Observable<any> {
  return this.heroesService.findOne({ id: 1 });
}

要发送 gRPC 元数据(随请求一起),您可以传入第二个参数,如下所示:

call(): Observable<any> {
  const metadata = new Metadata();
  metadata.add('Set-Cookie', 'yummy_cookie=choco');

  return this.heroesService.findOne({ id: 1 }, metadata);
}

info 提示 Metadata 类是从 grpc 包中导入的。

请注意,这将需要我们更新之前几步中定义的 HeroesService 接口。

示例

一个可用的示例在此处查看。

gRPC 反射

gRPC 服务器反射规范是一项标准,允许 gRPC 客户端获取服务器暴露的 API 详细信息,类似于为 REST API 提供 OpenAPI 文档。这可以显著简化开发者使用调试工具(如 grpc-ui 或 postman)的工作流程。

要为您的服务器添加 gRPC 反射支持,首先需要安装所需的实现包:

$ npm i --save @grpc/reflection

然后可以通过 gRPC 服务器选项中的 onLoadPackageDefinition 钩子将其集成到 gRPC 服务器,如下所示:

@@filename(main)
import { ReflectionService } from '@grpc/reflection';

const app = await NestFactory.createMicroservice<MicroserviceOptions>(AppModule, {
  options: {
    onLoadPackageDefinition: (pkg, server) => {
      new ReflectionService(pkg).addToServer(server);
    },
  },
});

现在您的服务器将能够根据反射规范响应请求 API 详情的消息。

gRPC 流式传输

gRPC 本身支持长期实时连接,通常称为流(streams)。流式传输适用于聊天、观察或分块数据传输等场景。更多细节请参阅官方文档此处

Nest 支持两种 GRPC 流处理方式:

  • RxJS Subject + Observable 处理程序:可直接在控制器方法内编写响应,或传递给 Subject/Observable 消费者
  • 纯 GRPC 调用流处理器:可将其传递给某些执行器,该执行器将处理 Node 标准 Duplex 流处理器的其余分发工作。

流式传输示例

让我们定义一个名为 HelloService 的新示例 gRPC 服务。hello.proto 文件使用 protocol buffers 构建结构。其内容如下:

// hello/hello.proto
syntax = "proto3";

package hello;

service HelloService {
  rpc BidiHello(stream HelloRequest) returns (stream HelloResponse);
  rpc LotsOfGreetings(stream HelloRequest) returns (HelloResponse);
}

message HelloRequest {
  string greeting = 1;
}

message HelloResponse {
  string reply = 1;
}

info 提示由于返回的流可以发出多个值,LotsOfGreetings 方法可以简单地用 @GrpcMethod 装饰器实现(如上文示例所示)。

基于这个 .proto 文件,我们来定义 HelloService 接口:

interface HelloService {
  bidiHello(upstream: Observable<HelloRequest>): Observable<HelloResponse>;
  lotsOfGreetings(
    upstream: Observable<HelloRequest>
  ): Observable<HelloResponse>;
}

interface HelloRequest {
  greeting: string;
}

interface HelloResponse {
  reply: string;
}

info 提示 proto 接口可以通过 ts-proto 包自动生成,了解更多 here

主题策略

@GrpcStreamMethod() 装饰器将函数参数作为 RxJS 的 Observable 提供。因此,我们可以接收并处理多条消息。

@GrpcStreamMethod()
bidiHello(messages: Observable<any>, metadata: Metadata, call: ServerDuplexStream<any, any>): Observable<any> {
  const subject = new Subject();

  const onNext = message => {
    console.log(message);
    subject.next({
      reply: 'Hello, world!'
    });
  };
  const onComplete = () => subject.complete();
  messages.subscribe({
    next: onNext,
    complete: onComplete,
  });


  return subject.asObservable();
}

warning 警告 为了支持与 @GrpcStreamMethod() 装饰器的全双工交互,控制器方法必须返回一个 RxJS 的 Observable 对象。

info 提示 MetadataServerUnaryCall 类/接口是从 grpc 包中导入的。

根据服务定义(在 .proto 文件中),BidiHello 方法应该向服务端流式传输请求。为了从客户端向流发送多个异步消息,我们利用了 RxJS 的 ReplaySubject 类。

const helloService = this.client.getService<HelloService>('HelloService');
const helloRequest$ = new ReplaySubject<HelloRequest>();

helloRequest$.next({ greeting: 'Hello (1)!' });
helloRequest$.next({ greeting: 'Hello (2)!' });
helloRequest$.complete();

return helloService.bidiHello(helloRequest$);

在上面的示例中,我们向流写入了两条消息(next() 调用)并通知服务端已完成数据发送(complete() 调用)。

调用流处理器

当方法返回值定义为 stream 时,@GrpcStreamCall() 装饰器会将函数参数作为 grpc.ServerDuplexStream 提供,它支持标准方法如 .on('data', callback).write(message).cancel()。完整的方法文档可查阅此处

或者,当方法返回值不是 stream 时,@GrpcStreamCall() 装饰器会提供两个函数参数,分别是 grpc.ServerReadableStream(详见此处 )和 callback

让我们从实现支持全双工交互的 BidiHello 开始。

@GrpcStreamCall()
bidiHello(requestStream: any) {
  requestStream.on('data', message => {
    console.log(message);
    requestStream.write({
      reply: 'Hello, world!'
    });
  });
}

info 注意 该装饰器不需要提供任何特定的返回参数。预期该流将像处理其他标准流类型一样被处理。

在上面的示例中,我们使用了 write() 方法将对象写入响应流。作为第二个参数传入 .on() 方法的回调函数会在服务每次接收到新数据块时被调用。

让我们实现 LotsOfGreetings 方法。

@GrpcStreamCall()
lotsOfGreetings(requestStream: any, callback: (err: unknown, value: HelloResponse) => void) {
  requestStream.on('data', message => {
    console.log(message);
  });
  requestStream.on('end', () => callback(null, { reply: 'Hello, world!' }));
}

这里我们使用 callback 回调函数在 requestStream 处理完成后发送响应。

健康检查

在 Kubernetes 等编排器中运行 gRPC 应用时,可能需要确认其是否正常运行且状态健康。gRPC 健康检查规范作为标准协议,允许 gRPC 客户端暴露健康状态,使编排器能够据此采取相应措施。

要添加 gRPC 健康检查支持,首先安装 grpc-node 包:

$ npm i --save grpc-health-check

随后可通过 gRPC 服务器选项中的 onLoadPackageDefinition 钩子将其集成到 gRPC 服务中,如下所示。注意 protoPath 需同时包含健康检查与 hero 包的定义。

@@filename(main)
import { HealthImplementation, protoPath as healthCheckProtoPath } from 'grpc-health-check';

const app = await NestFactory.createMicroservice<MicroserviceOptions>(AppModule, {
  options: {
    protoPath: [
      healthCheckProtoPath,
      protoPath: join(__dirname, 'hero/hero.proto'),
    ],
    onLoadPackageDefinition: (pkg, server) => {
      const healthImpl = new HealthImplementation({
        '': 'UNKNOWN',
      });

      healthImpl.addToServer(server);
      healthImpl.setStatus('', 'SERVING');
    },
  },
});

提示 gRPC 健康探针是一个实用的 CLI 工具,可用于容器化环境中测试 gRPC 健康检查。

gRPC 元数据

元数据是以键值对列表形式存在的特定 RPC 调用相关信息,其中键为字符串,值通常是字符串但也可以是二进制数据。元数据对 gRPC 本身是不透明的——它允许客户端向服务器提供与调用相关的信息,反之亦然。元数据可能包含认证令牌、用于监控的请求标识符和标签,以及数据集记录数等数据信息。

要在 @GrpcMethod() 处理程序中读取元数据,请使用第二个参数(metadata),其类型为从 grpc 包导入的 Metadata

要从处理程序返回元数据,请使用 ServerUnaryCall#sendMetadata() 方法(处理程序的第三个参数)。

@@filename(heroes.controller)
@Controller()
export class HeroesService {
  @GrpcMethod()
  findOne(data: HeroById, metadata: Metadata, call: ServerUnaryCall<any, any>): Hero {
    const serverMetadata = new Metadata();
    const items = [
      { id: 1, name: 'John' },
      { id: 2, name: 'Doe' },
    ];

    serverMetadata.add('Set-Cookie', 'yummy_cookie=choco');
    call.sendMetadata(serverMetadata);

    return items.find(({ id }) => id === data.id);
  }
}

同样地,要读取带有 @GrpcStreamMethod() 注解的处理程序( 主题策略 )中的元数据,需使用第二个参数(metadata),其类型为 Metadata(从 grpc 包导入)。

要从处理程序返回元数据,请使用 ServerDuplexStream#sendMetadata() 方法(第三个处理程序参数)。

要从调用流处理程序 (带有 @GrpcStreamCall() 装饰器注解的处理程序)内部读取元数据,需监听 requestStream 引用上的 metadata 事件,如下所示:

requestStream.on('metadata', (metadata: Metadata) => {
  const meta = metadata.get('X-Meta');
});