logologo
文档仓库
文档仓库
logologo
开始

概述

第一步
控制器
提供者
模块
中间件
异常过滤器
管道
守卫
拦截器
自定义装饰器

基础

自定义提供程序
异步提供者
动态模块
注入作用域
循环依赖
模块引用
懒加载模块
执行上下文
生命周期事件
发现服务
平台无关
单元测试

技术

配置
SQL
Mongo
验证
缓存
序列化
版本控制
任务调度
队列
日志
Cookies
事件
压缩
文件上传
文件流
HTTP 模块
Session
MVC
性能(Fastify)
SSE

安全

认证
授权
加密与哈希
Helmet
CORS
CSRF
速率限制

GraphQL

快速开始
解析器
变更
订阅
标量
指令
接口
联合与枚举
字段中间件
类型映射
插件
复杂度
扩展
CLI 插件
生成SDL
共享模型
其他功能
联邦

WebSocket

网关
异常过滤器
管道
守卫
拦截器
适配器

微服务

基础
Redis
MQTT
NATS
RabbitMQ
Kafka
gRPC
自定义传输
异常过滤器
管道
守卫
拦截器
部署
独立应用程序

CLI

概述
工作区
库
用法
脚本

OpenAPI

介绍
装饰器
类型映射
操作
其他特性
安全
类型与参数
CLI 插件

实用示例

REPL
CRUD生成器
SWC
Passport(认证)
热重载
MikroORM
TypeORM
Mongoose
Sequelize
路由模块
Swagger
健康检查
CQRS
Compodoc
Prisma
Sentry
静态资源
Commander
异步本地存储
Necord
套件(原Automock)

常见问题

Serverless
HTTP 适配器
长连接
全局前缀
原始请求体
混合应用
HTTPS & 多服务器
请求生命周期
错误

开发工具

概述
CI/CD
迁移指南
API参考(官方)

生态与案例

谁在用
精彩资源

支持

支持

社区

贡献者

最后更新于: 2025/11/18 02:11:37

上一页联邦
下一页异常过滤器

#网关

本文档其他部分讨论的大多数概念,如依赖注入、装饰器、异常过滤器、管道、守卫和拦截器,同样适用于网关。只要可能,Nest 都会抽象实现细节,使得相同的组件可以跨基于 HTTP 的平台、WebSocket 和微服务运行。本节将介绍 Nest 中特定于 WebSocket 的方面。

在 Nest 中,网关只是一个用 @WebSocketGateway() 装饰器注解的类。从技术上讲,网关是与平台无关的,这使得它们在创建适配器后可以与任何 WebSocket 库兼容。目前内置支持两种 WS 平台:socket.io 和 ws。您可以选择最适合您需求的平台。此外,您也可以按照这个指南构建自己的适配器。

提示

网关可以被视为提供者 ;这意味着它们可以通过类构造函数注入依赖项。同时,网关也可以被其他类(提供者和控制器)注入。

#安装

要开始构建基于 WebSocket 的应用程序,首先需要安装所需的包:

$ npm i --save @nestjs/websockets @nestjs/platform-socket.io

#概述

通常情况下,每个网关都会监听与 HTTP 服务器相同的端口,除非您的应用程序不是 Web 应用,或者您已手动更改了端口。可以通过向 @WebSocketGateway(80) 装饰器传递参数来修改此默认行为,其中 80 是选定的端口号。您还可以使用以下结构设置网关使用的命名空间 :

@WebSocketGateway(80, { namespace: 'events' })
注意

网关在被现有模块的 providers 数组引用之前不会被实例化。

您可以通过 @WebSocketGateway() 装饰器的第二个参数,向 socket 构造函数传递任何受支持的选项 ,如下所示:

@WebSocketGateway(81, { transports: ['websocket'] })

网关已开始监听,但尚未订阅任何传入消息。让我们创建一个处理器来订阅 events 消息,并用完全相同的数据向用户响应。

@SubscribeMessage('events')
handleEvent(@MessageBody() data: string): string {
  return data;
}
提示

@SubscribeMessage() 和 @MessageBody() 装饰器是从 @nestjs/websockets 包导入的。

网关创建完成后,我们可以在模块中注册它。

import { Module } from '@nestjs/common';
import { EventsGateway } from './events.gateway';

@Module({
  providers: [EventsGateway]
})
export class EventsModule {}

您还可以在装饰器中传入属性键来从消息正文中提取它:

@SubscribeMessage('events')
handleEvent(@MessageBody('id') id: number): number {
  // id === messageBody.id
  return id;
}

如果您不想使用装饰器,以下代码在功能上是等效的:

@SubscribeMessage('events')
handleEvent(client: Socket, data: string): string {
  return data;
}

在上面的示例中,handleEvent() 函数有两个参数。第一个是平台特定的套接字实例 ,第二个是从客户端接收的数据。不过,不建议使用这种方法,因为它需要在每个单元测试中模拟 socket 实例。

一旦收到 events 消息,处理器就会发送一个确认,其中包含通过网络发送的相同数据。此外,还可以使用特定于库的方法发出消息,例如,使用 client.emit() 方法。为了访问连接的套接字实例,请使用 @ConnectedSocket() 装饰器。

@SubscribeMessage('events')
handleEvent(
  @MessageBody() data: string,
  @ConnectedSocket() client: Socket,
): string {
  return data;
}
提示

@ConnectedSocket() 装饰器是从 @nestjs/websockets 包导入的。

但是,在这种情况下,您将无法利用拦截器。如果您不想向用户回应,可以简单地跳过 return 语句(或明确返回"虚假"值,例如 undefined)。

现在当客户端发出如下消息时:

socket.emit('events', { name: 'Nest' });

handleEvent() 方法将被执行。为了监听上述处理程序内部发出的消息,客户端必须附加相应的确认监听器:

socket.emit('events', { name: 'Nest' }, (data) => console.log(data));

#多重响应

确认通知仅会发送一次。此外,原生 WebSockets 实现并不支持此功能。为解决这一限制,您可以返回一个包含两个属性的对象:event 表示触发事件的名称,data 则是需要转发给客户端的数据。

@SubscribeMessage('events')
handleEvent(@MessageBody() data: unknown): WsResponse<unknown> {
  const event = 'events';
  return { event, data };
}
提示

WsResponse 接口是从 @nestjs/websockets 包中导入的。

注意

如果您的 data 字段依赖于 ClassSerializerInterceptor,则应返回实现 WsResponse 的类实例,因为该拦截器会忽略普通的 JavaScript 对象响应。

客户端需要添加另一个事件监听器才能接收传入的响应。

socket.on('events', (data) => console.log(data));

#异步响应

消息处理器能够以同步或异步方式响应。因此,支持 async 方法。消息处理器还能返回一个 Observable,在这种情况下,结果值将持续发射直到流完成。

@SubscribeMessage('events')
onEvent(@MessageBody() data: unknown): Observable<WsResponse<number>> {
  const event = 'events';
  const response = [1, 2, 3];

  return from(response).pipe(
    map(data => ({ event, data })),
  );
}

在上例中,消息处理器将响应 3 次 (对应数组中的每个元素)。

#生命周期钩子

提供了 3 个实用的生命周期钩子。它们都有对应的接口,如下表所述:

OnGatewayInit 强制实现 afterInit() 方法。接收特定库的服务器实例作为参数(如果需要会展开其余部分)
OnGatewayConnection 强制实现 handleConnection() 方法。接收特定库的客户端套接字实例作为参数
OnGatewayDisconnect 强制实现 handleDisconnect() 方法。接收特定库的客户端套接字实例作为参数
提示

每个生命周期接口都从 @nestjs/websockets 包中导出。

#服务器与命名空间

有时您可能需要直接访问原生的平台特定服务器实例。该对象的引用会作为参数传递给 afterInit() 方法(OnGatewayInit 接口)。另一种方式是使用 @WebSocketServer() 装饰器。

@WebSocketServer()
server: Server;

您还可以通过 namespace 属性获取对应的命名空间,如下所示:

@WebSocketServer({ namespace: 'my-namespace' })
namespace: Namespace;
注意

@WebSocketServer() 装饰器需要从 @nestjs/websockets 包中导入。

一旦服务器实例准备就绪,Nest 会自动将其分配给该属性。

#示例

一个可用的示例在此处查看。 import { Module } from '@nestjs/common'; import { EventsGateway } from './events.gateway';

events.module.ts
@Module({
 providers: [EventsGateway]
})
export class EventsModule {}

你也可以向装饰器传入属性键,以便从传入消息体中提取特定属性:

events.gateway.ts
@SubscribeMessage('events')
handleEvent(@MessageBody('id') id: number): number {
 // id === messageBody.id
 return id;
}

如果您不想使用装饰器,以下代码在功能上是等效的:

events.gateway.ts
@SubscribeMessage('events')
handleEvent(client: Socket, data: string): string {
 return data;
}

在上面的示例中,handleEvent() 函数接受两个参数。第一个是平台特定的 socket 实例 ,第二个是从客户端接收的数据。不过不建议采用这种方法,因为它需要在每个单元测试中模拟 socket 实例。

当接收到 events 消息时,处理程序会发送一个包含网络传输数据的确认响应。此外,还可以使用库特定的方式发送消息,例如利用 client.emit() 方法。要访问已连接的 socket 实例,请使用 @ConnectedSocket() 装饰器。

events.gateway.ts
@SubscribeMessage('events')
handleEvent(
 @MessageBody() data: string,
 @ConnectedSocket() client: Socket,
): string {
 return data;
}
提示

@ConnectedSocket() 装饰器是从 @nestjs/websockets 包中导入的。

然而,在这种情况下,您将无法利用拦截器。若不想对用户作出响应,可直接跳过 return 语句(或显式返回"falsy"值,例如 undefined)。

现在当客户端发出如下消息时:

socket.emit('events', { name: 'Nest' });

handleEvent() 方法将被执行。为了监听上述处理程序内部发出的消息,客户端必须附加相应的确认监听器:

socket.emit('events', { name: 'Nest' }, (data) => console.log(data));

#多重响应

确认通知仅会发送一次。此外,原生 WebSockets 实现并不支持此功能。为解决这一限制,您可以返回一个包含两个属性的对象:event 表示触发事件的名称,data 则是需要转发给客户端的数据。

events.gateway.ts
@SubscribeMessage('events')
handleEvent(@MessageBody() data: unknown): WsResponse<unknown> {
 const event = 'events';
 return { event, data };
}
提示

WsResponse 接口是从 @nestjs/websockets 包中导入的。

注意

如果您的 data 字段依赖于 ClassSerializerInterceptor,则应返回实现 WsResponse 的类实例,因为该拦截器会忽略普通的 JavaScript 对象响应。

客户端需要添加另一个事件监听器才能接收传入的响应。

socket.on('events', (data) => console.log(data));

#异步响应

消息处理器能够以同步或异步方式响应。因此,支持 async 方法。消息处理器还能返回一个 Observable,在这种情况下,结果值将持续发射直到流完成。

events.gateway.ts
@SubscribeMessage('events')
onEvent(@MessageBody() data: unknown): Observable<WsResponse<number>> {
 const event = 'events';
 const response = [1, 2, 3];

 return from(response).pipe(
   map(data => ({ event, data })),
 );
}

在上例中,消息处理器将响应 3 次 (对应数组中的每个元素)。

#生命周期钩子

提供了 3 个实用的生命周期钩子。它们都有对应的接口,如下表所述:

钩子描述
OnGatewayInit强制实现 afterInit() 方法。接收特定库的服务器实例作为参数(如果需要会展开其余部分)
OnGatewayConnection强制实现 handleConnection() 方法。接收特定库的客户端套接字实例作为参数
OnGatewayDisconnect强制实现 handleDisconnect() 方法。接收特定库的客户端套接字实例作为参数
提示

每个生命周期接口都从 @nestjs/websockets 包中导出。

以下是实现这些生命周期钩子的完整示例:

import {
  OnGatewayInit,
  OnGatewayConnection,
  OnGatewayDisconnect,
  WebSocketGateway,
  WebSocketServer,
} from '@nestjs/websockets';
import { Server, Socket } from 'socket.io';
import { Logger } from '@nestjs/common';

@WebSocketGateway()
export class EventsGateway implements OnGatewayInit, OnGatewayConnection, OnGatewayDisconnect {
  private logger: Logger = new Logger('EventsGateway');

  @WebSocketServer()
  server: Server;

  afterInit(server: Server) {
    this.logger.log('WebSocket 服务器初始化完成');
  }

  handleConnection(client: Socket, ...args: any[]) {
    this.logger.log(`客户端连接: ${client.id}`);
    // 可以在这里执行连接时的逻辑,如用户认证
  }

  handleDisconnect(client: Socket) {
    this.logger.log(`客户端断开连接: ${client.id}`);
    // 可以在这里执行断开连接时的清理逻辑
  }
}

#服务器与命名空间

有时您可能需要直接访问原生的平台特定服务器实例。该对象的引用会作为参数传递给 afterInit() 方法(OnGatewayInit 接口)。另一种方式是使用 @WebSocketServer() 装饰器。

@WebSocketServer()
server: Server;

您还可以通过 namespace 属性获取对应的命名空间,如下所示:

@WebSocketServer({ namespace: 'my-namespace' })
namespace: Namespace;
注意

@WebSocketServer() 装饰器需要从 @nestjs/websockets 包中导入。

一旦服务器实例准备就绪,Nest 会自动将其分配给该属性。

#认证和授权

在 WebSocket 应用中实现认证通常需要在连接建立时验证用户身份:

import { UseGuards } from '@nestjs/common';
import { AuthGuard } from '@nestjs/passport';

@UseGuards(AuthGuard('jwt'))
@WebSocketGateway()
export class AuthenticatedGateway implements OnGatewayConnection {
  handleConnection(client: Socket) {
    // 从 JWT 令牌中获取用户信息
    const user = client.request.user;
    console.log('用户已连接:', user.username);
  }

  @SubscribeMessage('private-message')
  handlePrivateMessage(
    @MessageBody() data: string,
    @ConnectedSocket() client: Socket,
  ) {
    const user = client.request.user;
    return `私人消息来自 ${user.username}: ${data}`;
  }
}

#自定义中间件

可以在网关中使用自定义中间件来处理连接:

import { IoAdapter } from '@nestjs/platform-socket.io';
import { Socket } from 'socket.io';

export class AuthenticatedSocketIoAdapter extends IoAdapter {
  createIOServer(port: number, options?: any): any {
    const server = super.createIOServer(port, options);
    
    server.use((socket: Socket, next) => {
      // 验证令牌
      const token = socket.handshake.auth.token;
      if (this.validateToken(token)) {
        next();
      } else {
        next(new Error('Authentication error'));
      }
    });

    return server;
  }

  private validateToken(token: string): boolean {
    // 实现令牌验证逻辑
    return token === 'valid-token';
  }
}

#错误处理

正确处理 WebSocket 错误:

@WebSocketGateway()
export class ErrorHandlingGateway {
  @SubscribeMessage('risky-operation')
  async handleRiskyOperation(@MessageBody() data: any) {
    try {
      // 可能抛出异常的操作
      const result = await this.performRiskyOperation(data);
      return result;
    } catch (error) {
      throw new WsException({
        status: 'error',
        message: error.message,
        code: 'RISKY_OPERATION_FAILED'
      });
    }
  }

  private async performRiskyOperation(data: any) {
    // 模拟可能失败的操作
    if (Math.random() > 0.5) {
      throw new Error('操作失败');
    }
    return { success: true, data };
  }
}

#房间和命名空间管理

高级房间管理功能:

@WebSocketGateway({ namespace: 'chat' })
export class ChatGateway implements OnGatewayConnection, OnGatewayDisconnect {
  @WebSocketServer()
  server: Server;

  private userRooms = new Map<string, Set<string>>();

  handleConnection(client: Socket) {
    console.log(`客户端 ${client.id} 已连接`);
  }

  handleDisconnect(client: Socket) {
    // 清理用户的房间信息
    this.leaveAllRooms(client.id);
    console.log(`客户端 ${client.id} 已断开连接`);
  }

  @SubscribeMessage('join-room')
  handleJoinRoom(
    @MessageBody() room: string,
    @ConnectedSocket() client: Socket,
  ) {
    client.join(room);
    
    // 记录用户的房间
    if (!this.userRooms.has(client.id)) {
      this.userRooms.set(client.id, new Set());
    }
    this.userRooms.get(client.id).add(room);

    client.to(room).emit('user-joined', {
      userId: client.id,
      message: `用户 ${client.id} 加入了房间 ${room}`
    });

    return { status: 'success', room };
  }

  @SubscribeMessage('leave-room')
  handleLeaveRoom(
    @MessageBody() room: string,
    @ConnectedSocket() client: Socket,
  ) {
    client.leave(room);
    
    // 更新用户房间记录
    if (this.userRooms.has(client.id)) {
      this.userRooms.get(client.id).delete(room);
    }

    client.to(room).emit('user-left', {
      userId: client.id,
      message: `用户 ${client.id} 离开了房间 ${room}`
    });

    return { status: 'success', room };
  }

  @SubscribeMessage('broadcast-to-room')
  handleBroadcastToRoom(
    @MessageBody() data: { room: string; message: string },
    @ConnectedSocket() client: Socket,
  ) {
    this.server.to(data.room).emit('room-message', {
      from: client.id,
      message: data.message,
      timestamp: new Date(),
    });
  }

  private leaveAllRooms(clientId: string) {
    if (this.userRooms.has(clientId)) {
      const rooms = this.userRooms.get(clientId);
      rooms.forEach(room => {
        this.server.to(room).emit('user-left', {
          userId: clientId,
          message: `用户 ${clientId} 离开了房间 ${room}`
        });
      });
      this.userRooms.delete(clientId);
    }
  }

  // 获取房间信息
  @SubscribeMessage('get-room-info')
  async handleGetRoomInfo(@MessageBody() room: string) {
    const sockets = await this.server.in(room).fetchSockets();
    return {
      room,
      userCount: sockets.length,
      users: sockets.map(socket => socket.id),
    };
  }
}

#性能优化

WebSocket 性能优化建议:

@WebSocketGateway({
  cors: {
    origin: process.env.ALLOWED_ORIGINS?.split(',') || '*',
  },
  transports: ['websocket'], // 仅使用 WebSocket,避免轮询
  maxHttpBufferSize: 1e6, // 限制消息大小
  pingTimeout: 60000,
  pingInterval: 25000,
})
export class OptimizedGateway {
  @WebSocketServer()
  server: Server;

  // 使用节流来限制消息频率
  @SubscribeMessage('high-frequency-event')
  @Throttle(10, 60) // 每 60 秒最多 10 次
  handleHighFrequencyEvent(@MessageBody() data: any) {
    return { received: data, timestamp: Date.now() };
  }

  // 批量处理消息
  private messageQueue: any[] = [];
  private processingTimer: NodeJS.Timeout;

  @SubscribeMessage('bulk-message')
  handleBulkMessage(@MessageBody() message: any) {
    this.messageQueue.push(message);
    
    if (!this.processingTimer) {
      this.processingTimer = setTimeout(() => {
        this.processBulkMessages();
        this.processingTimer = null;
      }, 100); // 100ms 后批量处理
    }
  }

  private processBulkMessages() {
    if (this.messageQueue.length > 0) {
      const messages = [...this.messageQueue];
      this.messageQueue = [];
      
      // 批量处理并广播
      this.server.emit('bulk-processed', {
        count: messages.length,
        processedAt: new Date(),
      });
    }
  }
}

#测试 WebSocket 网关

测试 WebSocket 网关的示例:

import { Test } from '@nestjs/testing';
import { INestApplication } from '@nestjs/common';
import { Socket, io } from 'socket.io-client';
import { EventsGateway } from './events.gateway';

describe('EventsGateway', () => {
  let app: INestApplication;
  let gateway: EventsGateway;
  let clientSocket: Socket;

  beforeAll(async () => {
    const moduleRef = await Test.createTestingModule({
      providers: [EventsGateway],
    }).compile();

    app = moduleRef.createNestApplication();
    gateway = moduleRef.get<EventsGateway>(EventsGateway);
    
    await app.listen(3001);
    
    clientSocket = io('http://localhost:3001');
  });

  afterAll(async () => {
    clientSocket.close();
    await app.close();
  });

  it('应该处理消息事件', (done) => {
    clientSocket.emit('events', { test: 'data' });
    
    clientSocket.on('events', (data) => {
      expect(data).toEqual({ test: 'data' });
      done();
    });
  });

  it('应该处理连接', () => {
    expect(gateway.server).toBeDefined();
  });
});

#监控和调试

添加监控和调试功能:

@WebSocketGateway()
export class MonitoredGateway implements OnGatewayInit, OnGatewayConnection, OnGatewayDisconnect {
  @WebSocketServer()
  server: Server;

  private connectionCount = 0;
  private messageCount = 0;

  afterInit(server: Server) {
    console.log('WebSocket 服务器已初始化');
    
    // 定期报告统计信息
    setInterval(() => {
      console.log(`连接数: ${this.connectionCount}, 消息数: ${this.messageCount}`);
    }, 30000);
  }

  handleConnection(client: Socket) {
    this.connectionCount++;
    console.log(`新连接: ${client.id}, 总连接数: ${this.connectionCount}`);
    
    // 发送连接统计
    this.server.emit('stats', {
      connections: this.connectionCount,
      messages: this.messageCount,
    });
  }

  handleDisconnect(client: Socket) {
    this.connectionCount--;
    console.log(`连接断开: ${client.id}, 剩余连接数: ${this.connectionCount}`);
    
    this.server.emit('stats', {
      connections: this.connectionCount,
      messages: this.messageCount,
    });
  }

  @SubscribeMessage('any-message')
  handleAnyMessage(@MessageBody() data: any) {
    this.messageCount++;
    return data;
  }

  // 健康检查端点
  @SubscribeMessage('health-check')
  handleHealthCheck() {
    return {
      status: 'healthy',
      uptime: process.uptime(),
      connections: this.connectionCount,
      messages: this.messageCount,
      memory: process.memoryUsage(),
    };
  }
}

#示例

一个可用的示例在此处查看。