logologo
文档仓库
文档仓库
logologo
开始

概述

第一步
控制器
提供者
模块
中间件
异常过滤器
管道
守卫
拦截器
自定义装饰器

基础

自定义提供程序
异步提供者
动态模块
注入作用域
循环依赖
模块引用
懒加载模块
执行上下文
生命周期事件
发现服务
平台无关
单元测试

技术

配置
SQL
Mongo
验证
缓存
序列化
版本控制
任务调度
队列
日志
Cookies
事件
压缩
文件上传
文件流
HTTP 模块
Session
MVC
性能(Fastify)
SSE

安全

认证
授权
加密与哈希
Helmet
CORS
CSRF
速率限制

GraphQL

快速开始
解析器
变更
订阅
标量
指令
接口
联合与枚举
字段中间件
类型映射
插件
复杂度
扩展
CLI 插件
生成SDL
共享模型
其他功能
联邦

WebSocket

网关
异常过滤器
管道
守卫
拦截器
适配器

微服务

基础
Redis
MQTT
NATS
RabbitMQ
Kafka
gRPC
自定义传输
异常过滤器
管道
守卫
拦截器
部署
独立应用程序

CLI

概述
工作区
库
用法
脚本

OpenAPI

介绍
装饰器
类型映射
操作
其他特性
安全
类型与参数
CLI 插件

实用示例

REPL
CRUD生成器
SWC
Passport(认证)
热重载
MikroORM
TypeORM
Mongoose
Sequelize
路由模块
Swagger
健康检查
CQRS
Compodoc
Prisma
Sentry
静态资源
Commander
异步本地存储
Necord
套件(原Automock)

常见问题

Serverless
HTTP 适配器
长连接
全局前缀
原始请求体
混合应用
HTTPS & 多服务器
请求生命周期
错误

开发工具

概述
CI/CD
迁移指南
API参考(官方)

生态与案例

谁在用
精彩资源

支持

支持

社区

贡献者

最后更新于: 2025/11/18 02:11:37

上一页Redis
下一页NATS

#MQTT

MQTT(消息队列遥测传输)是一种开源轻量级消息协议,专为低延迟场景优化。该协议通过发布/订阅模式,为设备互联提供了可扩展且经济高效的解决方案。基于 MQTT 的通信系统由发布服务器、代理服务器及一个或多个客户端组成,特别适用于资源受限设备及低带宽、高延迟或不稳定网络环境。

#安装

要开始构建基于 MQTT 的微服务,首先需安装所需软件包:

$ npm i --save mqtt

#概述

使用 MQTT 传输器时,请将以下配置对象传入 createMicroservice() 方法:

main.ts
const app = await NestFactory.createMicroservice<MicroserviceOptions>(AppModule, {
 transport: Transport.MQTT,
 options: {
   url: 'mqtt://localhost:1883',
 },
});
注意

Transport 枚举是从 @nestjs/microservices 包中导入的。

#选项

options 对象特定于所选的传输器。MQTT 传输器公开的属性描述见此处 。

#客户端

与其他微服务传输器类似,创建 MQTT ClientProxy 实例有多种方式 。

一种创建实例的方法是使用 ClientsModule。要通过 ClientsModule 创建客户端实例,需导入该模块并使用 register() 方法传递选项对象,该对象包含与上述 createMicroservice() 方法相同的属性,以及一个用作注入令牌的 name 属性。更多关于 ClientsModule 的信息请参阅此处 。

@Module({
  imports: [
    ClientsModule.register([
      {
        name: 'MATH_SERVICE',
        transport: Transport.MQTT,
        options: {
          url: 'mqtt://localhost:1883',
        }
      },
    ]),
  ]
  ...
})

其他创建客户端的方式(使用 ClientProxyFactory 或 @Client())也同样适用。您可以在此了解更多相关信息。

#上下文

在更复杂的场景中,您可能需要访问有关传入请求的额外信息。当使用 MQTT 传输器时,您可以访问 MqttContext 对象。

@MessagePattern('notifications')
getNotifications(@Payload() data: number[], @Ctx() context: MqttContext) {
  console.log(`Topic: ${context.getTopic()}`);
}
提示

@Payload()、@Ctx() 和 MqttContext 均从 @nestjs/microservices 包导入。

要访问原始的 MQTT 数据包 ,请使用 MqttContext 对象的 getPacket() 方法,如下所示:

@MessagePattern('notifications')
getNotifications(@Payload() data: number[], @Ctx() context: MqttContext) {
  console.log(context.getPacket());
}

#通配符

订阅可以是针对明确主题的,也可以包含通配符。有两种通配符可用:+ 和 #。+ 是单级通配符,而 # 是多级通配符,可覆盖多个主题层级。

@MessagePattern('sensors/+/temperature/+')
getTemperature(@Ctx() context: MqttContext) {
  console.log(`Topic: ${context.getTopic()}`);
}

#服务质量(QoS)

使用 @MessagePattern 或 @EventPattern 装饰器创建的任何订阅都将以 QoS 0 级别进行订阅。如需更高 QoS 级别,可在建立连接时通过 subscribeOptions 块全局设置,如下所示:

main.ts
const app = await NestFactory.createMicroservice<MicroserviceOptions>(AppModule, {
 transport: Transport.MQTT,
 options: {
   url: 'mqtt://localhost:1883',
   subscribeOptions: {
     qos: 2
   },
 },
});

如需针对特定主题设置 QoS,可考虑创建自定义传输器 。

#记录构建器

要配置消息选项(调整 QoS 级别、设置 Retain 或 DUP 标志,或向有效负载添加额外属性),可使用 MqttRecordBuilder 类。例如,要将 QoS 设置为 2,可使用 setQoS 方法,如下所示:

const userProperties = { 'x-version': '1.0.0' };
const record = new MqttRecordBuilder(':cat:')
  .setProperties({ userProperties })
  .setQoS(1)
  .build();
client.send('replace-emoji', record).subscribe(...);
提示

MqttRecordBuilder 类是从 @nestjs/microservices 包导出的。

您也可以通过访问 MqttContext 在服务端读取这些选项。

@MessagePattern('replace-emoji')
replaceEmoji(@Payload() data: string, @Ctx() context: MqttContext): string {
  const { properties: { userProperties } } = context.getPacket();
  return userProperties['x-version'] === '1.0.0' ? '🐱' : '🐈';
}

在某些情况下,您可能想为多个请求配置用户属性,可以将这些选项传递给 ClientProxyFactory。

import { Module } from '@nestjs/common';
import { ClientProxyFactory, Transport } from '@nestjs/microservices';

@Module({
  providers: [
    {
      provide: 'API_v1',
      useFactory: () =>
        ClientProxyFactory.create({
          transport: Transport.MQTT,
          options: {
            url: 'mqtt://localhost:1833',
            userProperties: { 'x-version': '1.0.0' },
          },
        }),
    },
  ],
})
export class ApiModule {}

#实例状态更新

要获取连接状态及底层驱动实例的实时更新,您可以订阅 status 数据流。该数据流会提供所选驱动的特定状态更新。对于 MQTT 驱动,status 数据流会发出 connected(已连接)、disconnected(已断开)、reconnecting(重连中)和 closed(已关闭)事件。

this.client.status.subscribe((status: MqttStatus) => {
  console.log(status);
});
提示

MqttStatus 类型是从 @nestjs/microservices 包导入的。

同样地,您可以订阅服务器的 status 流来接收有关服务器状态的通知。

const server = app.connectMicroservice<MicroserviceOptions>(...);
server.status.subscribe((status: MqttStatus) => {
  console.log(status);
});

#监听 MQTT 事件

在某些情况下,您可能需要监听微服务发出的内部事件。例如,您可以监听 error 事件,以便在发生错误时触发其他操作。为此,请使用如下所示的 on() 方法:

this.client.on('error', (err) => {
  console.error(err);
});

同样地,您可以监听服务器的内部事件:

server.on<MqttEvents>('error', (err) => {
  console.error(err);
});
提示

MqttEvents 类型是从 @nestjs/microservices 包导入的。

#底层驱动访问

对于更高级的用例,您可能需要访问底层驱动实例。这在手动关闭连接或使用驱动特定方法等场景中非常有用。但请注意,在大多数情况下,您不需要直接访问驱动。

为此,您可以使用 unwrap() 方法,该方法会返回底层驱动实例。泛型类型参数应指定您预期的驱动实例类型。

const mqttClient = this.client.unwrap<import('mqtt').MqttClient>();

同样地,您可以访问服务器的底层驱动实例:

const mqttClient = server.unwrap<import('mqtt').MqttClient>();