常规管道与 WebSocket 管道之间没有本质区别。唯一的区别在于,你应该使用 WsException
而不是抛出 HttpException
。此外,所有管道将仅应用于 data
参数(因为验证或转换 client
实例没有意义)。
info 提示
WsException
类是从@nestjs/websockets
包中导出的。
以下示例使用了一个手动实例化的方法作用域管道。与基于 HTTP 的应用程序一样,你也可以使用网关作用域管道(即在网关类前添加 @UsePipes()
装饰器)。
@UsePipes(new ValidationPipe({ exceptionFactory: (errors) => new WsException(errors) }))
@SubscribeMessage('events')
handleEvent(client: Client, data: unknown): WsResponse<unknown> {
const event = 'events';
return { event, data };
}
return object;
}
private toValidate(metatype: any): boolean { const types = [String, Boolean, Number, Array, Object]; return !types.find(type => metatype === type); } }
#### 数据传输对象(DTO)
定义 WebSocket 消息的 DTO:
```typescript
import { IsString, IsNotEmpty, IsOptional, IsNumber, IsEnum } from 'class-validator';
export class ChatMessageDto {
@IsString()
@IsNotEmpty()
message: string;
@IsString()
@IsOptional()
roomId?: string;
@IsEnum(['text', 'image', 'file'])
type: string;
}
export class JoinRoomDto {
@IsString()
@IsNotEmpty()
roomId: string;
@IsString()
@IsOptional()
password?: string;
}
export class UserStatusDto {
@IsEnum(['online', 'offline', 'away', 'busy'])
status: string;
@IsString()
@IsOptional()
message?: string;
}
// 使用示例
@WebSocketGateway()
export class ChatGateway {
@UsePipes(new WsValidationPipe())
@SubscribeMessage('send-message')
handleMessage(@MessageBody() messageDto: ChatMessageDto) {
// messageDto 已经被验证和转换
return {
status: 'success',
message: `收到消息: ${messageDto.message}`,
};
}
@UsePipes(new WsValidationPipe())
@SubscribeMessage('join-room')
handleJoinRoom(@MessageBody() joinRoomDto: JoinRoomDto) {
// 验证房间ID和密码
return {
status: 'success',
room: joinRoomDto.roomId,
};
}
}
创建数据转换管道:
import { ArgumentMetadata, Injectable, PipeTransform } from '@nestjs/common';
import { WsException } from '@nestjs/websockets';
@Injectable()
export class WsParseIntPipe implements PipeTransform<string, number> {
transform(value: string, metadata: ArgumentMetadata): number {
const val = parseInt(value, 10);
if (isNaN(val)) {
throw new WsException(`${value} 不是一个有效的数字`);
}
return val;
}
}
@Injectable()
export class WsParseObjectPipe implements PipeTransform {
transform(value: any, metadata: ArgumentMetadata): any {
if (typeof value === 'string') {
try {
return JSON.parse(value);
} catch (error) {
throw new WsException('无效的 JSON 格式');
}
}
return value;
}
}
// 使用示例
@SubscribeMessage('update-score')
handleUpdateScore(
@MessageBody(new WsParseIntPipe()) score: number,
@ConnectedSocket() client: Socket,
) {
return { newScore: score, clientId: client.id };
}
确保消息格式的一致性:
import { ArgumentMetadata, Injectable, PipeTransform } from '@nestjs/common';
import { WsException } from '@nestjs/websockets';
export interface StandardMessage {
action: string;
payload: any;
timestamp?: number;
id?: string;
}
@Injectable()
export class WsMessageStandardizationPipe implements PipeTransform {
transform(value: any, metadata: ArgumentMetadata): StandardMessage {
// 如果已经是标准格式,直接返回
if (this.isStandardMessage(value)) {
return {
...value,
timestamp: value.timestamp || Date.now(),
id: value.id || this.generateId(),
};
}
// 尝试转换为标准格式
if (typeof value === 'string') {
return {
action: 'text_message',
payload: { text: value },
timestamp: Date.now(),
id: this.generateId(),
};
}
if (value && typeof value === 'object') {
return {
action: value.type || 'generic_message',
payload: value,
timestamp: Date.now(),
id: this.generateId(),
};
}
throw new WsException('无法解析消息格式');
}
private isStandardMessage(value: any): boolean {
return value &&
typeof value === 'object' &&
typeof value.action === 'string' &&
value.payload !== undefined;
}
private generateId(): string {
return `msg_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
}
}
清理输入数据,防止恶意内容:
import { ArgumentMetadata, Injectable, PipeTransform } from '@nestjs/common';
import { WsException } from '@nestjs/websockets';
@Injectable()
export class WsSanitizationPipe implements PipeTransform {
private readonly bannedWords = ['spam', 'abuse', 'harmful'];
private readonly maxLength = 1000;
transform(value: any, metadata: ArgumentMetadata): any {
if (typeof value === 'string') {
return this.sanitizeString(value);
}
if (value && typeof value === 'object') {
return this.sanitizeObject(value);
}
return value;
}
private sanitizeString(str: string): string {
// 检查长度
if (str.length > this.maxLength) {
throw new WsException(`消息长度不能超过 ${this.maxLength} 字符`);
}
// 过滤敏感词
let sanitized = str;
this.bannedWords.forEach(word => {
const regex = new RegExp(word, 'gi');
sanitized = sanitized.replace(regex, '*'.repeat(word.length));
});
// 移除潜在的 HTML 标签
sanitized = sanitized.replace(/<[^>]*>/g, '');
// 去除首尾空格
return sanitized.trim();
}
private sanitizeObject(obj: any): any {
const sanitized = {};
for (const [key, value] of Object.entries(obj)) {
if (typeof value === 'string') {
sanitized[key] = this.sanitizeString(value);
} else if (value && typeof value === 'object') {
sanitized[key] = this.sanitizeObject(value);
} else {
sanitized[key] = value;
}
}
return sanitized;
}
}
将多个管道组合使用:
@WebSocketGateway()
export class SecureChatGateway {
@UsePipes(
new WsMessageStandardizationPipe(),
new WsSanitizationPipe(),
new WsValidationPipe(),
)
@SubscribeMessage('secure-message')
handleSecureMessage(@MessageBody() message: StandardMessage) {
// 消息已经过标准化、清理和验证
return {
status: 'processed',
messageId: message.id,
processedAt: new Date(),
};
}
@UsePipes(new WsSanitizationPipe(), new WsValidationPipe())
@SubscribeMessage('user-input')
handleUserInput(@MessageBody() data: ChatMessageDto) {
return { received: data };
}
}
根据条件应用不同的管道:
import { ArgumentMetadata, Injectable, PipeTransform } from '@nestjs/common';
@Injectable()
export class WsConditionalPipe implements PipeTransform {
constructor(
private condition: (value: any) => boolean,
private truePipe: PipeTransform,
private falsePipe?: PipeTransform,
) {}
async transform(value: any, metadata: ArgumentMetadata) {
if (this.condition(value)) {
return this.truePipe.transform(value, metadata);
} else if (this.falsePipe) {
return this.falsePipe.transform(value, metadata);
}
return value;
}
}
// 使用示例
@SubscribeMessage('flexible-message')
@UsePipes(
new WsConditionalPipe(
(value) => typeof value === 'string',
new WsSanitizationPipe(),
new WsValidationPipe(),
)
)
handleFlexibleMessage(@MessageBody() data: any) {
return { processed: data };
}
设置全局 WebSocket 管道:
// main.ts
import { NestFactory } from '@nestjs/core';
import { AppModule } from './app.module';
import { WsValidationPipe } from './pipes/ws-validation.pipe';
async function bootstrap() {
const app = await NestFactory.create(AppModule);
// 设置全局 WebSocket 管道
app.useWebSocketAdapter(new IoAdapter(app));
app.useGlobalPipes(new WsValidationPipe());
await app.listen(3000);
}
bootstrap();
在管道中正确处理错误:
@Injectable()
export class WsRobustValidationPipe implements PipeTransform {
async transform(value: any, metadata: ArgumentMetadata) {
try {
// 执行验证逻辑
return await this.validateAndTransform(value, metadata);
} catch (error) {
// 记录错误
console.error('WebSocket 管道验证失败:', error);
// 返回友好的错误消息
throw new WsException({
status: 'validation_error',
message: '数据格式不正确',
timestamp: new Date().toISOString(),
});
}
}
private async validateAndTransform(value: any, metadata: ArgumentMetadata) {
// 实际的验证和转换逻辑
return value;
}
}
通过这些管道,你可以确保 WebSocket 消息的数据完整性、安全性和格式一致性。
WebSocket 应用中可以使用所有内置管道,包括:
ValidationPipe
- 验证管道ParseIntPipe
- 整数解析管道ParseFloatPipe
- 浮点数解析管道ParseBoolPipe
- 布尔值解析管道ParseArrayPipe
- 数组解析管道ParseUUIDPipe
- UUID 解析管道ParseEnumPipe
- 枚举解析管道DefaultValuePipe
- 默认值管道ParseFilePipe
- 文件解析管道ParseDatePipe
- 日期解析管道使用转换管道来处理传入的数据:
import { PipeTransform, Injectable, ArgumentMetadata } from '@nestjs/common';
import { WsException } from '@nestjs/websockets';
@Injectable()
export class ParseIntPipe implements PipeTransform<string, number> {
transform(value: string, metadata: ArgumentMetadata): number {
const val = parseInt(value, 10);
if (isNaN(val)) {
throw new WsException('Validation failed');
}
return val;
}
}
将此管道绑定到消息处理器:
@SubscribeMessage('getUserById')
async findOne(@MessageBody('id', ParseIntPipe) id: number) {
return this.userService.findOne(id);
}
使用验证管道来确保传入数据的有效性:
import { IsString, IsInt, IsPositive } from 'class-validator';
export class CreateUserDto {
@IsString()
name: string;
@IsInt()
@IsPositive()
age: number;
@IsString()
email: string;
}
在网关中使用验证管道:
@WebSocketGateway()
@UsePipes(new ValidationPipe({ exceptionFactory: (errors) => new WsException(errors) }))
export class ChatGateway {
@SubscribeMessage('createUser')
async createUser(@MessageBody() createUserDto: CreateUserDto) {
return this.userService.create(createUserDto);
}
}
创建专门的 WebSocket 验证管道:
import { ArgumentMetadata, Injectable, PipeTransform } from '@nestjs/common';
import { WsException } from '@nestjs/websockets';
import { plainToClass } from 'class-transformer';
import { validate } from 'class-validator';
@Injectable()
export class WsValidationPipe implements PipeTransform<any> {
async transform(value: any, { metatype }: ArgumentMetadata) {
if (!metatype || !this.toValidate(metatype)) {
return value;
}
const object = plainToClass(metatype, value);
const errors = await validate(object);
if (errors.length > 0) {
const errorMessages = errors.map(error =>
Object.values(error.constraints).join(', ')
);
throw new WsException(`Validation failed: ${errorMessages.join('; ')}`);
}
return value;
}
private toValidate(metatype: Function): boolean {
const types: Function[] = [String, Boolean, Number, Array, Object];
return !types.includes(metatype);
}
}
你可以在特定的消息处理器上使用管道:
@SubscribeMessage('message')
@UsePipes(new WsValidationPipe())
handleMessage(@MessageBody() data: MessageDto): WsResponse<unknown> {
return { event: 'message', data: this.processMessage(data) };
}
在整个网关上应用管道:
@WebSocketGateway()
@UsePipes(new ValidationPipe({
exceptionFactory: (errors) => new WsException(errors),
transform: true
}))
export class ChatGateway {
@SubscribeMessage('joinRoom')
joinRoom(@MessageBody() joinRoomDto: JoinRoomDto) {
// 数据将被自动验证和转换
return this.chatService.joinRoom(joinRoomDto);
}
@SubscribeMessage('sendMessage')
sendMessage(@MessageBody() messageDto: MessageDto) {
// 数据将被自动验证和转换
return this.chatService.sendMessage(messageDto);
}
}
WebSocket 管道中的错误处理:
@Injectable()
export class SafeValidationPipe implements PipeTransform {
async transform(value: any, metadata: ArgumentMetadata) {
try {
// 执行验证逻辑
return await this.validate(value, metadata);
} catch (error) {
// 将 HTTP 异常转换为 WebSocket 异常
if (error instanceof HttpException) {
throw new WsException(error.message);
}
throw new WsException('Validation failed');
}
}
private async validate(value: any, metadata: ArgumentMetadata) {
// 验证逻辑
return value;
}
}
虽然可以设置全局管道,但需要注意它们对 WebSocket 的影响:
// main.ts
async function bootstrap() {
const app = await NestFactory.create(AppModule);
// 注意:全局管道对 WebSocket 的影响
app.useGlobalPipes(new ValidationPipe({
exceptionFactory: (errors) => {
// 检查当前上下文是否为 WebSocket
return new WsException(errors);
}
}));
await app.listen(3000);
}
warning 注意 在混合应用中,
useGlobalPipes()
方法不会为网关和微服务设置管道。对于"标准"(非混合)微服务应用,useGlobalPipes()
会全局挂载管道。
WsException
而不是 HttpException
class-validator
进行 DTO 验证class-transformer
进行数据转换以下是一个完整的聊天应用示例:
// dto/chat.dto.ts
import { IsString, IsNotEmpty, IsOptional, IsEnum } from 'class-validator';
export enum MessageType {
TEXT = 'text',
IMAGE = 'image',
FILE = 'file'
}
export class SendMessageDto {
@IsString()
@IsNotEmpty()
content: string;
@IsString()
@IsNotEmpty()
roomId: string;
@IsEnum(MessageType)
@IsOptional()
type?: MessageType = MessageType.TEXT;
}
export class JoinRoomDto {
@IsString()
@IsNotEmpty()
roomId: string;
@IsString()
@IsNotEmpty()
userId: string;
}
// chat.gateway.ts
@WebSocketGateway({
cors: {
origin: '*',
},
})
@UsePipes(new ValidationPipe({
exceptionFactory: (errors) => new WsException(errors),
transform: true,
whitelist: true
}))
export class ChatGateway {
@SubscribeMessage('joinRoom')
async joinRoom(@MessageBody() joinRoomDto: JoinRoomDto) {
// 数据已经被验证和转换
return this.chatService.joinRoom(joinRoomDto);
}
@SubscribeMessage('sendMessage')
async sendMessage(@MessageBody() messageDto: SendMessageDto) {
// 数据已经被验证和转换
return this.chatService.sendMessage(messageDto);
}
@SubscribeMessage('uploadFile')
@UsePipes(new ParseFilePipe({
validators: [
new MaxFileSizeValidator({ maxSize: 5 * 1024 * 1024 }), // 5MB
new FileTypeValidator({ fileType: /\.(jpg|jpeg|png|gif)$/i })
],
exceptionFactory: (error) => new WsException(error)
}))
async uploadFile(@MessageBody() file: Express.Multer.File) {
return this.chatService.uploadFile(file);
}
}
这个示例展示了如何在 WebSocket 应用中使用各种管道来确保数据的有效性和安全性。