WebSocket插件 Downloads

基于Netty的高性能WebSocket/MQTT通信框架

来源:超级管理员 ¥ ? 登录

📋 项目概述

jeelowcode-plugin-socket 是 JeeLowCode 低代码平台的 Socket 通信插件,提供了基于 Netty 的高性能 WebSocket 服务和 MQTT 消息路由功能。该插件采用责任链模式处理消息流程,支持多种消息类型(P2P、租户广播、自定义),并提供了灵活的扩展机制。

✨ 核心特性

  • 高性能通信:基于 Netty NIO 框架,支持高并发连接
  • 责任链处理:认证 → 解密 → 参数增强 → 路由 → 订阅,流程清晰可扩展
  • 双路由模式:支持本地路由(单机)和 MQTT 路由(分布式)
  • 多消息类型:点对点消息、租户广播、自定义主题
  • 安全机制:Token 认证、消息加密(AES/Base64)、限流保护
  • 注解驱动:通过 @MqttSubscribeTopic 注解声明主题监听

🏗️ 技术架构

🔧 核心技术

  • ☑️ Java + Spring Framework
  • ☑️ Netty(高性能网络框架)
  • ☑️ Spring Integration MQTT
  • ☑️ MyBatis(数据持久化)
  • ☑️ Maven(依赖管理)

🎨 设计模式

  • ☑️ 责任链模式(消息处理)
  • ☑️ 策略模式(加密/参数增强)
  • ☑️ 工厂模式(策略创建)
  • ☑️ 单例模式(管理器)
  • ☑️ 观察者模式(事件发布)

⚙️ 核心功能模块

1️⃣ WebSocket 服务器(MqttNettyServer)

功能:基于 Netty 的 WebSocket 服务器,支持高并发连接

配置参数:

  • jeelowcode.socket.ip - 服务IP地址
  • jeelowcode.socket.port - 服务端口
  • jeelowcode.socket.bossGroupNum - 主线程数
  • jeelowcode.socket.workGroupNum - 工作线程数
  • jeelowcode.socket.wsPath - WebSocket路径

特性:

  • 自动启动与优雅停机
  • TCP 长连接保持(SO_KEEPALIVE)
  • 请求队列管理(SO_BACKLOG=1024)

2️⃣ 责任链消息处理流程

消息按顺序经过以下处理链:

1
DecryptMessageChain - 解密链

支持多种解密策略:AES、Base64、自定义加密算法

2
ParamMessageChain - 参数增强链

对消息进行参数增强处理,支持自定义参数策略扩展

3
AuthMessageChain - 认证链

验证 Token,建立会话(MqttSession),支持用户身份和租户信息绑定

4
SubscribeMessageChain - 订阅链

处理主题订阅/取消订阅,管理客户端订阅关系

5
RouterMessageChain - 路由链

消息路由分发,支持本地路由(LocalRouterStrategy)和 MQTT 路由(MqttRouterStrategy)

3️⃣ 消息类型(MessageTypeEnum)

消息类型 类型值 说明
MESSAGE message 普通消息
AUTH auth 认证消息(建立会话)
SUBSCRIBE sucribe 订阅主题
UNSUBSCRIBE unSucribe 取消订阅主题

4️⃣ 主题消息策略

📨 P2P 点对点消息(P2pMessage)

主题格式: p2p@@{tenantId}

功能:实现用户之间的 1v1 私聊

消息体结构:

{
  "toUserId": 123,  // 接收者用户ID
  "msg": "Hello!"   // 消息内容
}

特性:

  • 需要登录认证(requireLogin=true)
  • 支持消息持久化
  • 精准点对点推送

🏢 租户广播消息(MessageTenant)

主题格式: tenant@@{tenantId}

功能:向同一租户下的所有在线用户广播消息

特性:

  • 需要登录认证(requireLogin=true)
  • 租户隔离(通过 TenantParamStrategy 校验)
  • 群发消息能力

🎯 自定义主题消息(SelfMessage)

主题格式: 自定义

功能:支持业务自定义主题和消息处理逻辑

特性:

  • 灵活的主题定义
  • 可扩展的消息处理策略
  • 支持自定义参数增强

🚀 高级特性

🛡️ 限流保护(RateLimitHandler)

基于令牌桶算法的限流机制:

  • 默认速率:100 条/秒
  • 桶容量:300 条(容忍 3 秒突发流量)
  • 特性:每个 Channel 独立限流,连接关闭自动清理

💓 心跳检测(HeartbeatHandler)

基于 Netty IdleStateHandler 的心跳机制:

  • 读空闲:2 秒无数据读取触发
  • 写空闲:4 秒无数据写入触发
  • 全空闲:10 秒无任何活动触发
  • Pong 检测:配置最大丢失次数自动断开连接

🌐 MQTT 分布式路由(MqttAutoConfiguration)

支持分布式场景下的消息路由:

  • 配置启用: jeelowcode.mqtt.senderType=mqtt
  • 发布/订阅:通过 MqttGateway 发送消息到 MQTT Broker
  • QoS 支持:0=最多一次、1=至少一次、2=仅一次
  • 事件驱动:通过 ApplicationEventPublisher 发布 MqttMessageEvent
  • 跨节点通信:多实例之间通过 MQTT Broker 路由消息

配置参数:

  • jeelowcode.mqtt.host - MQTT Broker 地址
  • jeelowcode.mqtt.username - MQTT 用户名
  • jeelowcode.mqtt.password - MQTT 密码

📌 注解驱动订阅(@MqttSubscribeTopic)

通过注解声明式配置主题监听:

@Component
@MqttSubscribeTopic(
    topics = "myTopic@@1",              // 订阅的主题列表
    requireLogin = true,                // 是否需要登录
    kickoutAfter = false,               // 是否踢出之前的订阅者
    enableSysEncrypt = false,           // 是否启用系统加密
    selfParamStrategies = {...},       // 自定义参数策略
    selfCryptoStrategies = {...}       // 自定义加密策略
)
public class MyMessage implements IMessageStrategy {
    @Override
    public void handleMessage(String topic, MqttDataVo vo) {
        // 处理消息逻辑
    }
}

🔐 会话与连接管理

📡 MqttChannelManager

管理 Channel 与 SessionId 的映射关系

  • ✓ 添加/移除 Channel
  • ✓ 通过 SessionId 查找 Channel
  • ✓ 线程安全的 ConcurrentHashMap

👤 MqttSessionManager

管理用户会话信息(MqttSession)

  • ✓ 存储用户ID、租户ID
  • ✓ Token 认证信息
  • ✓ 会话生命周期管理

📢 MqttSubscribeManager

管理主题订阅关系

  • ✓ 主题 → SessionId 映射
  • ✓ 订阅/取消订阅管理
  • ✓ 支持一对多订阅

📊 MqttConnectCouManager

管理连接计数

  • ✓ 实时统计在线连接数
  • ✓ 连接数监控
  • ✓ 原子性计数操作

📦 消息数据结构

MqttDataVo 消息对象

{
  "header": {
    "sessionId": "channel-session-id",   // 会话ID
    "token": "user-auth-token",          // 认证Token
    "topic": "p2p@@1",                    // 主题
    "messageType": "message"              // 消息类型
  },
  "body": "{...}",                        // 消息体(JSON字符串)
  "enhanceParam": {                       // 增强参数(由参数链添加)
    "key": "value"
  }
}

📝 典型使用流程

1

建立 WebSocket 连接

客户端连接到 ws://host:port/ws

2

发送认证消息

{"header": {"messageType": "auth", "token": "xxx"}, "body": "{}"}
3

订阅主题

{"header": {"messageType": "sucribe", "topic": "p2p@@1"}, "body": "{}"}
4

发送/接收消息

{"header": {"messageType": "message", "topic": "p2p@@1"}, "body": "{\"toUserId\": 123, \"msg\": \"Hello\"}"}
5

取消订阅/断开连接

发送取消订阅消息或直接关闭 WebSocket 连接

🔧 扩展机制

🔐 自定义加密策略

实现 ICryptoStrategy 接口:

  • AesStrategy:AES 对称加密
  • Base64Strategy:Base64 编解码
  • SystemStrategy:系统默认加密
  • 自定义策略:通过 @MqttSubscribeTopic 配置

📊 自定义参数增强策略

实现 IParamStrategy 接口:

  • 对消息进行参数验证
  • 添加额外的业务参数到 enhanceParam
  • 支持多个策略链式执行

🎯 自定义消息处理

实现 IMessageStrategy 接口并使用 @MqttSubscribeTopic 注解:

  • 定义自己的主题名称
  • 实现自定义的消息处理逻辑
  • 配置认证、加密、参数增强策略

💡 应用场景

💬 即时通讯

用户之间的实时聊天、群组消息、客服系统

📢 消息推送

系统通知、业务提醒、实时告警

📊 数据同步

实时数据更新、状态同步、协同编辑

🎮 在线游戏

游戏状态同步、玩家交互、实时对战

📈 实时监控

设备状态监控、性能指标推送、日志流

🏢 企业协同

多租户隔离、企业内部通信、工作流通知

JeeLowCode Socket Plugin - 构建高性能实时通信应用

基于 Netty + MQTT 的企业级 WebSocket 解决方案

添加客服
图片未加载,请刷新后再试
联系我们