第一次提交

This commit is contained in:
wyw
2024-08-08 00:31:26 +08:00
commit c202e2b63d
1819 changed files with 221890 additions and 0 deletions

View File

@ -0,0 +1,35 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<artifactId>fastbee-server</artifactId>
<groupId>com.fastbee</groupId>
<version>3.8.5</version>
</parent>
<artifactId>mqtt-broker</artifactId>
<description>基于netty搭建的mqttBroker</description>
<dependencies>
<dependency>
<groupId>com.fastbee</groupId>
<artifactId>iot-server-core</artifactId>
</dependency>
<dependency>
<groupId>com.fastbee</groupId>
<artifactId>fastbee-mq</artifactId>
</dependency>
<dependency>
<groupId>com.fastbee</groupId>
<artifactId>sip-server</artifactId>
</dependency>
<dependency>
<groupId>com.fastbee</groupId>
<artifactId>fastbee-mqtt-client</artifactId>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,24 @@
package com.fastbee.mqtt.annotation;
import io.netty.handler.codec.mqtt.MqttMessageType;
import org.springframework.stereotype.Component;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* 标注mqtt消息处理
* @author bill
*/
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
/*自动注入bean*/
@Component
public @interface Process {
/*消息类型*/
MqttMessageType type() default MqttMessageType.PUBLISH;
}

View File

@ -0,0 +1,112 @@
package com.fastbee.mqtt.auth;
import com.fastbee.common.constant.Constants;
import com.fastbee.common.constant.FastBeeConstant;
import com.fastbee.common.core.redis.RedisCache;
import com.fastbee.common.exception.ServiceException;
import com.fastbee.common.utils.StringUtils;
import com.fastbee.iot.domain.Device;
import com.fastbee.iot.model.MqttAuthenticationModel;
import com.fastbee.iot.service.IDeviceService;
import com.fastbee.iot.service.IToolService;
import com.fastbee.mqttclient.MqttClientConfig;
import io.jsonwebtoken.Claims;
import io.jsonwebtoken.Jwts;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.Objects;
/**
* 客户端认证
*
* @author gsb
* @date 2022/9/15 13:40
*/
@Slf4j
@Component
public class AuthService {
@Resource
private IToolService toolService;
@Resource
private RedisCache redisCache;
@Resource
private MqttClientConfig mqttConfig;
@Resource
private IDeviceService deviceService;
// 令牌秘钥
@Value("${token.secret}")
private String secret;
/**
* MQTT客户端认证
*
* @param clientId 客户端id
* @param username 用户名
* @param password 密码
* @return 结果
*/
public boolean auth(String clientId, String username, String password,String serialNumber) {
/*认证次数*/
redisCache.incr2(FastBeeConstant.REDIS.MESSAGE_AUTH_TOTAL, -1L);
if (StringUtils.isEmpty(clientId) || StringUtils.isEmpty(username) || StringUtils.isEmpty(password)) {
log.error("=>客户端参数缺少,clientId:{},username:{},password:{}", clientId, username, password);
return false;
}
try {
if (clientId.startsWith("server")) {
// 服务端认证:配置的账号密码认证
if (mqttConfig.getUsername().equals(username) && mqttConfig.getPassword().equals(password)) {
log.info("-----------服务端mqtt认证成功,clientId:" + clientId + "---------------");
return true;
} else {
ResponseEntity response = toolService.returnUnauthorized(new MqttAuthenticationModel(clientId, username, password), "mqtt账号和密码与认证服务器配置不匹配");
log.warn("=>服务端认证失败[{}]", response.getBody());
throw new ServiceException("服务端认证失败:"+response.getBody());
}
} else if (clientId.startsWith("web") || clientId.startsWith("phone")) {
// web端和移动端认证token认证
String token = password;
if (StringUtils.isNotEmpty(token) && token.startsWith(Constants.TOKEN_PREFIX)) {
token = token.replace(Constants.TOKEN_PREFIX, "");
}
try {
Claims claims = Jwts.parser().setSigningKey(secret).parseClaimsJws(token).getBody();
log.info("-----------移动端/Web端mqtt认证成功,clientId:" + clientId + "---------------");
return true;
} catch (Exception ex) {
ResponseEntity response = toolService.returnUnauthorized(new MqttAuthenticationModel(clientId, username, password), ex.getMessage());
log.warn("=>移动端/Web端mqtt认证失败[{}]",response.getBody());
throw new ServiceException("移动端/Web端mqtt认证失败:"+response.getBody());
}
} else {
//获取设备是否是禁用状态
Device device = deviceService.selectDeviceBySerialNumber(serialNumber);
if (!Objects.isNull(device)) {
Integer status = device.getStatus();
if (!Objects.isNull(status) && status ==2) {
return false;
}
}
// 设备端认证
ResponseEntity response = toolService.clientAuth(clientId, username, password);
if (response.getStatusCodeValue() == HttpStatus.OK.value()) {
return true;
} else {
log.warn("=>设备端认证失败");
throw new ServiceException("设备端认证失败:"+response.getBody());
}
}
} catch (Exception e) {
log.error("=>客户端认证失败", e);
return false;
}
}
}

View File

@ -0,0 +1,30 @@
package com.fastbee.mqtt.codec;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageCodec;
import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;
import org.springframework.stereotype.Component;
import java.util.List;
/**
* ws协议与MQTT协议编码转换
* @author gsb
* @date 2022/9/15 14:35
*/
@ChannelHandler.Sharable
@Component
public class WebSocketMqttCodec extends MessageToMessageCodec<BinaryWebSocketFrame, ByteBuf> {
@Override
protected void encode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
list.add(new BinaryWebSocketFrame(byteBuf.retain()));
}
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, BinaryWebSocketFrame binaryWebSocketFrame, List<Object> list) throws Exception {
list.add(binaryWebSocketFrame.retain().content());
}
}

View File

@ -0,0 +1,159 @@
package com.fastbee.mqtt.handler;
import com.fastbee.common.constant.FastBeeConstant;
import com.fastbee.common.core.redis.RedisCache;
import com.fastbee.common.enums.ServerType;
import com.fastbee.common.utils.DateUtils;
import com.fastbee.mqtt.annotation.Process;
import com.fastbee.mqtt.auth.AuthService;
import com.fastbee.mqtt.handler.adapter.MqttHandler;
import com.fastbee.mqtt.manager.ResponseManager;
import com.fastbee.mqtt.manager.SessionManger;
import com.fastbee.mqtt.manager.WillMessageManager;
import com.fastbee.mqtt.model.WillMessage;
import com.fastbee.base.session.Session;
import com.fastbee.base.util.AttributeUtils;
import com.fastbee.mqtt.utils.MqttMessageUtils;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.mqtt.*;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.CharsetUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import javax.annotation.Resource;
import java.net.InetSocketAddress;
/**
* 客户端连接处理类
*
* @author gsb
* @date 2022/9/15 12:00
*/
@Slf4j
@Process(type = MqttMessageType.CONNECT)
public class MqttConnect implements MqttHandler {
@Autowired
private AuthService authService;
@Resource
private RedisCache redisCache;
@Override
public void handler(ChannelHandlerContext ctx, MqttMessage message) {
MqttConnectMessage connectMessage = (MqttConnectMessage) message;
/*获取客户端Id*/
String clientId = connectMessage.payload().clientIdentifier();
Channel channel = ctx.channel();
if (clientId.contains("&")){
clientId = clientId.split("&")[1];
}
log.debug("=>客户端:{} 连接:{}", clientId, message);
/*获取session*/
Session session = new Session();
/*mqtt版本*/
MqttVersion version = MqttVersion.fromProtocolNameAndLevel(connectMessage.variableHeader().name(), (byte) connectMessage.variableHeader().version());
/*是否清除客户端*/
boolean cleanSession = connectMessage.variableHeader().isCleanSession();
session.setHandlerContext(ctx);
session.setVersion(version);
session.setClientId(clientId);
session.setCleanSession(cleanSession);
session.setUsername(connectMessage.payload().userName());
session.setConnected_at(DateUtils.getNowDate());
InetSocketAddress socketAddress = (InetSocketAddress) channel.remoteAddress();
session.setIp(socketAddress.getAddress().getHostAddress());
session.setServerType(ServerType.MQTT);
/*设置客户端ping时间*/
MqttConnectVariableHeader header = connectMessage.variableHeader();
/*broker keepAlive时间和 客户端keepAlive时间对比设置*/
int keepAliveTimeSeconds = header.keepAliveTimeSeconds();
int expire = Math.round(keepAliveTimeSeconds * 1.5f);
if (keepAliveTimeSeconds > 0) {
if (channel.pipeline().names().contains("idle")) {
channel.pipeline().remove("idle");
}
channel.pipeline().addFirst("idle", new IdleStateHandler(0, 0, expire));
session.setKeepAlive(expire);
}
/*mqtt客户端登录校验*/
if (!check(session, connectMessage)) {
log.error("=>客户端:{},连接异常", clientId);
session.getHandlerContext().close();
return;
}
/*保存ClientId 和 session 到Attribute*/
AttributeUtils.setClientId(channel, clientId);
AttributeUtils.setSession(channel, session);
SessionManger.removeClient(clientId);
session.setConnected(true);
/*新建连接,推送上线消息*/
SessionManger.buildSession(clientId, session);
/*处理遗嘱消息*/
handleWill(connectMessage);
/*应答接受连接*/
//MqttConnAckMessage connAckMessage = MqttMessageUtils.buildConntAckMessage(MqttConnectReturnCode.CONNECTION_ACCEPTED, false);
ResponseManager.responseMessage(session,
MqttMessageFactory.newMessage(
new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0x02),
new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_ACCEPTED, false),
null),
true
);
/*累计连接次数*/
redisCache.incr2(FastBeeConstant.REDIS.MESSAGE_CONNECT_TOTAL,-1L);
}
/**
* 遗嘱消息处理
*
* @param message 连接消息
*/
private void handleWill(MqttConnectMessage message) {
/*如果没有设置处理遗嘱消息,返回*/
if (!message.variableHeader().isWillFlag()) {
return;
}
/*生成客户端model*/
MqttPublishMessage publishMessage = (MqttPublishMessage) MqttMessageFactory.newMessage(
new MqttFixedHeader(MqttMessageType.PUBLISH, false,
MqttQoS.valueOf(message.variableHeader().willQos()),
message.variableHeader().isWillRetain(), 0),
new MqttPublishVariableHeader(message.payload().willTopic(), 0),
Unpooled.buffer().writeBytes(message.payload().willMessageInBytes()));
WillMessage msg = new WillMessage(message.payload().clientIdentifier(),
message.variableHeader().isCleanSession(), message.payload().willTopic(), publishMessage);
WillMessageManager.push(msg);
}
/**
* 客户端连接校验
*
* @param session 客户端
* @param message 连接消息
* @return 结果
*/
private boolean check(Session session, MqttConnectMessage message) {
/*获取客户端连接地址*/
InetSocketAddress address = (InetSocketAddress) session.getHandlerContext().channel().remoteAddress();
String host = address.getAddress().getHostAddress();
/*webSocket客户端 系统内部客户端不校验*/
String clientId = message.payload().clientIdentifier();
/*根据用户名,密码校验*/
String username = message.payload().userName();
String password = message.payload().passwordInBytes() == null ? null : new String(message.payload().passwordInBytes(), CharsetUtil.UTF_8);
/*验证失败,应答客户端*/
if (!authService.auth(clientId, username, password,session.getClientId())) {
MqttConnAckMessage connAckMessage = MqttMessageUtils.buildConntAckMessage(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD, false);
ResponseManager.responseMessage(session, connAckMessage, true);
return false;
}
return true;
}
}

View File

@ -0,0 +1,45 @@
package com.fastbee.mqtt.handler;
import com.fastbee.mqtt.annotation.Process;
import com.fastbee.mqtt.handler.adapter.MqttHandler;
import com.fastbee.mqtt.manager.ClientManager;
import com.fastbee.mqtt.manager.SessionManger;
import com.fastbee.base.session.Session;
import com.fastbee.base.util.AttributeUtils;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttMessageType;
import lombok.extern.slf4j.Slf4j;
/**
* 客户端主动断开连接(正常断开)
*
* @author bill
*/
@Process(type = MqttMessageType.DISCONNECT)
@Slf4j
public class MqttDisConnect implements MqttHandler {
@Override
public void handler(ChannelHandlerContext ctx, MqttMessage message) {
/*获取clientId*/
String clientId = AttributeUtils.getClientId(ctx.channel());
/*获取session*/
Session session = AttributeUtils.getSession(ctx.channel());
log.debug("=>客户端正常断开,clientId:[{}]", clientId);
try {
if (!session.getConnected()) {
session.getHandlerContext().close();
return;
}
/*处理断开客户端连接*/
SessionManger.pingTimeout(session.getClientId());
/*移除相关topic*/
ClientManager.remove(session.getClientId());
} catch (Exception e) {
log.error("=>客户端断开连接异常:{}", session);
}
}
}

View File

@ -0,0 +1,51 @@
package com.fastbee.mqtt.handler;
import com.fastbee.base.service.ISessionStore;
import com.fastbee.base.session.Session;
import com.fastbee.mqtt.annotation.Process;
import com.fastbee.mqtt.handler.adapter.MqttHandler;
import com.fastbee.mqtt.manager.ClientManager;
import com.fastbee.mqtt.manager.ResponseManager;
import com.fastbee.base.util.AttributeUtils;
import com.fastbee.mqtt.manager.SessionManger;
import com.fastbee.mqtt.utils.MqttMessageUtils;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.mqtt.*;
import lombok.extern.slf4j.Slf4j;
import javax.annotation.Resource;
/**
* 客户端Ping消息应答
*
* @author bill
*/
@Slf4j
@Process(type = MqttMessageType.PINGREQ)
public class MqttPingreq implements MqttHandler {
@Resource
private ISessionStore sessionStore;
@Override
public void handler(ChannelHandlerContext ctx, MqttMessage message) {
/*获取客户端id*/
String clientId = AttributeUtils.getClientId(ctx.channel());
//平台检测session是否同步
Session session = AttributeUtils.getSession(ctx.channel());
boolean containsKey = sessionStore.containsKey(clientId);
if (!containsKey){
SessionManger.buildSession(clientId,session);
}
try {
// log.debug("=>客户端:{},心跳信息", clientId);
/*更新客户端ping时间*/
ClientManager.updatePing(clientId);
/*响应设备的ping消息*/
MqttMessage pingResp = MqttMessageUtils.buildPingResp();
ResponseManager.sendMessage(pingResp, clientId, true);
} catch (Exception e) {
log.error("=>客户端:{},ping异常:{}", clientId, e);
}
}
}

View File

@ -0,0 +1,39 @@
package com.fastbee.mqtt.handler;
import com.fastbee.mqtt.annotation.Process;
import com.fastbee.mqtt.handler.adapter.MqttHandler;
import com.fastbee.mqtt.manager.ClientManager;
import com.fastbee.mqtt.service.IMessageStore;
import com.fastbee.base.session.Session;
import com.fastbee.base.util.AttributeUtils;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttPubAckMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
/**
* 消息等级=Qos1,收到发布消息确认
*
* @author bill
*/
@Process(type = MqttMessageType.PUBACK)
@Slf4j
public class MqttPubAck implements MqttHandler {
@Autowired
private IMessageStore messageStore;
@Override
public void handler(ChannelHandlerContext ctx, MqttMessage message) {
MqttPubAckMessage ackMessage = (MqttPubAckMessage) message;
// PacketId
int packetId = ackMessage.variableHeader().messageId();
Session session = AttributeUtils.getSession(ctx.channel());
// Qos1 的存储消息释放
messageStore.removePubMsg(packetId);
/*更新平台ping*/
ClientManager.updatePing(session.getClientId());
}
}

View File

@ -0,0 +1,47 @@
package com.fastbee.mqtt.handler;
import com.fastbee.mqtt.annotation.Process;
import com.fastbee.mqtt.handler.adapter.MqttHandler;
import com.fastbee.mqtt.manager.ClientManager;
import com.fastbee.mqtt.manager.ResponseManager;
import com.fastbee.mqtt.service.IMessageStore;
import com.fastbee.base.session.Session;
import com.fastbee.base.util.AttributeUtils;
import com.fastbee.mqtt.utils.MqttMessageUtils;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
import io.netty.handler.codec.mqtt.MqttMessageType;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
/**
* 消息等级=Qos2 发布消息收到 交付第一步
*
* @author bill
*/
@Process(type = MqttMessageType.PUBREC)
@Slf4j
public class MqttPubRec implements MqttHandler {
@Autowired
private IMessageStore messageStore;
@Override
public void handler(ChannelHandlerContext ctx, MqttMessage message) {
MqttMessageIdVariableHeader variableHeader = MqttMessageUtils.getIdVariableHeader(message);
//clientId
String clientId = AttributeUtils.getClientId(ctx.channel());
Session session = AttributeUtils.getSession(ctx.channel());
//获取packetId
int messageId = variableHeader.messageId();
/*释放消息*/
messageStore.removePubMsg(messageId);
messageStore.saveRelOutMsg(messageId);
// 回复REL 进入第二阶段
MqttMessage mqttMessage = MqttMessageUtils.buildPubRelMessage(message);
ResponseManager.responseMessage(session, mqttMessage, true);
/*更新平台ping*/
ClientManager.updatePing(session.getClientId());
}
}

View File

@ -0,0 +1,41 @@
package com.fastbee.mqtt.handler;
import com.fastbee.mqtt.annotation.Process;
import com.fastbee.mqtt.handler.adapter.MqttHandler;
import com.fastbee.mqtt.manager.ClientManager;
import com.fastbee.mqtt.manager.ResponseManager;
import com.fastbee.mqtt.service.IMessageStore;
import com.fastbee.base.session.Session;
import com.fastbee.base.util.AttributeUtils;
import com.fastbee.mqtt.utils.MqttMessageUtils;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
import io.netty.handler.codec.mqtt.MqttMessageType;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
/**
* 消息等级=Qos2 发布消息释放 PUBREL
* @author bill
*/
@Slf4j
@Process(type = MqttMessageType.PUBREL)
public class MqttPubRel implements MqttHandler {
@Autowired
private IMessageStore messageStore;
@Override
public void handler(ChannelHandlerContext ctx, MqttMessage message){
MqttMessageIdVariableHeader variableHeader = MqttMessageUtils.getIdVariableHeader(message);
Session session = AttributeUtils.getSession(ctx.channel());
//获取packetId
int messageId = variableHeader.messageId();
messageStore.removeRelInMsg(messageId);
MqttMessage mqttMessage = MqttMessageUtils.buildPubCompMessage(message);
ResponseManager.responseMessage(session,mqttMessage,true);
/*更新本地ping时间*/
ClientManager.updatePing(session.getClientId());
}
}

View File

@ -0,0 +1,38 @@
package com.fastbee.mqtt.handler;
import com.fastbee.mqtt.annotation.Process;
import com.fastbee.mqtt.handler.adapter.MqttHandler;
import com.fastbee.mqtt.manager.ClientManager;
import com.fastbee.mqtt.service.IMessageStore;
import com.fastbee.base.session.Session;
import com.fastbee.base.util.AttributeUtils;
import com.fastbee.mqtt.utils.MqttMessageUtils;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
import io.netty.handler.codec.mqtt.MqttMessageType;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
/**
* 消息等级=Qos2 发布消息完成
* @author bill
*/
@Process(type = MqttMessageType.PUBCOMP)
@Slf4j
public class MqttPubcomp implements MqttHandler {
@Autowired
private IMessageStore messageStore;
@Override
public void handler(ChannelHandlerContext ctx, MqttMessage message){
MqttMessageIdVariableHeader variableHeader = MqttMessageUtils.getIdVariableHeader(message);
Session session = AttributeUtils.getSession(ctx.channel());
//获取packetId
int messageId = variableHeader.messageId();
messageStore.removeRelOutMsg(messageId);
/*更新平台ping*/
ClientManager.updatePing(session.getClientId());
}
}

View File

@ -0,0 +1,231 @@
package com.fastbee.mqtt.handler;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.fastbee.base.service.ISessionStore;
import com.fastbee.common.constant.FastBeeConstant;
import com.fastbee.common.core.mq.DeviceReportBo;
import com.fastbee.common.core.mq.DeviceStatusBo;
import com.fastbee.common.core.redis.RedisCache;
import com.fastbee.common.enums.DeviceStatus;
import com.fastbee.common.enums.ServerType;
import com.fastbee.common.utils.DateUtils;
import com.fastbee.common.utils.StringUtils;
import com.fastbee.common.utils.gateway.mq.TopicsUtils;
import com.fastbee.iot.ruleEngine.MsgContext;
import com.fastbee.iot.ruleEngine.RuleProcess;
import com.fastbee.mq.redischannel.producer.MessageProducer;
import com.fastbee.mqtt.annotation.Process;
import com.fastbee.mqtt.handler.adapter.MqttHandler;
import com.fastbee.mqtt.manager.ClientManager;
import com.fastbee.mqtt.manager.ResponseManager;
import com.fastbee.mqtt.manager.RetainMsgManager;
import com.fastbee.mqtt.manager.SessionManger;
import com.fastbee.mqtt.model.ClientMessage;
import com.fastbee.mqtt.service.IMessageStore;
import com.fastbee.base.session.Session;
import com.fastbee.base.util.AttributeUtils;
import io.netty.buffer.ByteBufUtil;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.mqtt.*;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;
import java.util.Objects;
/**
* 客户端消息推送处理类
*
* @author bill
*/
@Slf4j
@Process(type = MqttMessageType.PUBLISH)
public class MqttPublish implements MqttHandler {
@Autowired
private IMessageStore messageStore;
@Resource
private TopicsUtils topicsUtils;
@Resource
private RedisCache redisCache;
@Resource
private RuleProcess ruleProcess;
@Resource
private ISessionStore sessionStore;
@Override
public void handler(ChannelHandlerContext ctx, MqttMessage message) {
MqttPublishMessage publishMessage = (MqttPublishMessage) message;
/*获取客户端id*/
String clientId = AttributeUtils.getClientId(ctx.channel());
String topicName = publishMessage.variableHeader().topicName();
log.debug("=>***客户端[{}],主题[{}],推送消息[{}]", clientId, topicName,
ByteBufUtil.hexDump(publishMessage.content()));
// 以get结尾是模拟客户端数据,只转发消息
if (topicName.endsWith(FastBeeConstant.MQTT.PROPERTY_GET_SIMULATE)) {
sendTestToMQ(publishMessage);
} else {
/*获取客户端session*/
Session session = AttributeUtils.getSession(ctx.channel());
//平台检测session是否同步
boolean containsKey = sessionStore.containsKey(clientId);
if (!containsKey) {
SessionManger.buildSession(clientId, session);
}
/*推送保留信息*/
pubRetain(publishMessage);
/*响应客户端消息到达Broker*/
callBack(session, publishMessage, clientId);
/*推送到订阅的客户端*/
sendMessageToClients(publishMessage);
/*推送到MQ处理*/
sendToMQ(publishMessage, clientId);
/*累计接收消息数*/
redisCache.incr2(FastBeeConstant.REDIS.MESSAGE_RECEIVE_TOTAL, -1L);
redisCache.incr2(FastBeeConstant.REDIS.MESSAGE_RECEIVE_TODAY, 60 * 60 * 24);
}
}
/**
* 消息推送
*
* @param message 推送消息
*/
@SneakyThrows
public void sendToMQ(MqttPublishMessage message, String clientId) {
/*获取topic*/
String topicName = message.variableHeader().topicName();
byte[] source = ByteBufUtil.getBytes(message.content());
DeviceReportBo reportBo = DeviceReportBo.builder()
.serialNumber(topicsUtils.parseSerialNumber(topicName)).topicName(topicName).packetId((long) message.variableHeader().packetId())
.platformDate(DateUtils.getNowDate()).data(source).serverType(ServerType.MQTT).build();
if (topicName.endsWith(FastBeeConstant.TOPIC.MSG_REPLY) ||
topicName.endsWith(FastBeeConstant.TOPIC.UPGRADE_REPLY)) {
/*设备应答服务器回调数据*/
reportBo.setReportType(2);
} else {
/*设备上报数据*/
reportBo.setReportType(1);
}
// 规则引擎脚本处理,完成后返回结果
MsgContext context = ruleProcess.processRuleScript(topicsUtils.parseSerialNumber(topicName), 1, topicName, new String(source));
if (!Objects.isNull(context) && StringUtils.isNotEmpty(context.getPayload())
&& StringUtils.isNotEmpty(context.getTopic())) {
reportBo.setTopicName(context.getTopic());
reportBo.setData(context.getPayload().getBytes(StandardCharsets.UTF_8));
}
if (reportBo.getTopicName().contains("property")) {
MessageProducer.sendPublishMsg(reportBo);
} else if (reportBo.getTopicName().contains("status")) {
String jsonString = new String(reportBo.getData(), StandardCharsets.UTF_8);
JSONObject jsonObject = JSON.parseObject(jsonString);
int status = jsonObject.getInteger("status");
DeviceStatusBo bo = DeviceStatusBo.builder()
.serialNumber(topicsUtils.parseSerialNumber(reportBo.getTopicName()))
.status(DeviceStatus.convert(status))
.build();
MessageProducer.sendStatusMsg(bo);
} else {
MessageProducer.sendOtherMsg(reportBo);
}
}
/**
* 发送模拟数据进行处理
*
* @param message
*/
public void sendTestToMQ(MqttPublishMessage message) {
/*获取topic*/
String topicName = message.variableHeader().topicName();
DeviceReportBo reportBo = DeviceReportBo.builder()
.serialNumber(topicsUtils.parseSerialNumber(topicName))
.topicName(topicName)
.packetId((long) message.variableHeader().packetId())
.platformDate(DateUtils.getNowDate())
.data(ByteBufUtil.getBytes(message.content()))
.build();
MessageProducer.sendOtherMsg(reportBo);
}
/**
* 推送消息到订阅客户端
*
* @param message 消息
*/
public void sendMessageToClients(MqttPublishMessage message) {
ClientManager.pubTopic(message);
}
/**
* 应答客户端消息到达Broker
*
* @param session 客户端
* @param message 消息
*/
private void callBack(Session session, MqttPublishMessage message, String clientId) {
/*获取消息等级*/
MqttQoS mqttQoS = message.fixedHeader().qosLevel();
int packetId = message.variableHeader().packetId();
MqttFixedHeader header;
switch (mqttQoS.value()) {
/*0,1消息等级直接回复*/
case 0:
case 1:
header = new MqttFixedHeader(MqttMessageType.PUBACK, false, mqttQoS, false, 0);
break;
case 2:
// 处理Qos2的消息确认
if (!messageStore.outRelContains(packetId)) {
messageStore.saveRelInMsg(packetId);
}
header = new MqttFixedHeader(MqttMessageType.PUBREC, false, MqttQoS.AT_MOST_ONCE, false, 0);
break;
default:
header = null;
}
/*处理消息等级*/
handleMqttQos(packetId, mqttQoS, true, clientId);
/*响应客户端*/
MqttMessageIdVariableHeader variableHeader = null;
if (packetId > 0) {
variableHeader = MqttMessageIdVariableHeader.from(packetId);
}
MqttPubAckMessage ackMessage = new MqttPubAckMessage(header, variableHeader);
if (mqttQoS.value() >= 1) {
ResponseManager.responseMessage(session, ackMessage, true);
}
/*更新客户端ping时间*/
ClientManager.updatePing(session.getClientId());
}
/**
* Qos不同消息处理
*/
private void handleMqttQos(int packetId, MqttQoS qoS, boolean clearSession, String clientId) {
if (qoS == MqttQoS.AT_LEAST_ONCE || qoS == MqttQoS.EXACTLY_ONCE) {
ClientMessage clientMessage = ClientMessage.of(clientId, qoS, null, false);
messageStore.savePubMsg(packetId, clientMessage);
}
}
/**
* 推送保留信息
*/
@SneakyThrows
private void pubRetain(MqttPublishMessage message) {
redisCache.incr2(FastBeeConstant.REDIS.MESSAGE_RETAIN_TOTAL, -1L);
/*根据message.fixedHeader().isRetain() 判断是否有保留信息*/
RetainMsgManager.pushMessage(message);
}
}

View File

@ -0,0 +1,112 @@
package com.fastbee.mqtt.handler;
import com.alibaba.fastjson2.JSON;
import com.fastbee.common.constant.FastBeeConstant;
import com.fastbee.common.core.redis.RedisCache;
import com.fastbee.mqtt.annotation.Process;
import com.fastbee.mqtt.model.ClientMessage;
import com.fastbee.base.session.Session;
import com.fastbee.base.util.AttributeUtils;
import com.fastbee.mqtt.utils.MqttMessageUtils;
import com.fastbee.mqtt.model.RetainMessage;
import com.fastbee.mqtt.handler.adapter.MqttHandler;
import com.fastbee.mqtt.manager.ClientManager;
import com.fastbee.mqtt.manager.ResponseManager;
import com.fastbee.mqtt.manager.RetainMsgManager;
import com.fastbee.common.utils.gateway.mq.TopicsUtils;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.mqtt.*;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import javax.annotation.Resource;
import java.util.List;
import java.util.stream.Collectors;
/**
* 客户端订阅处理
*
* @author gsb
* @date 2022/9/16 11:11
*/
@Slf4j
@Process(type = MqttMessageType.SUBSCRIBE)
public class MqttSubscribe implements MqttHandler {
@Resource
private RedisCache redisCache;
@Override
public void handler(ChannelHandlerContext ctx, MqttMessage message) {
subscribe(ctx, (MqttSubscribeMessage) message);
}
public void subscribe(ChannelHandlerContext ctx, MqttSubscribeMessage message) {
/*获取session*/
Session session = AttributeUtils.getSession(ctx.channel());
/*获取客户端订阅的topic列表*/
List<MqttTopicSubscription> topList = message.payload().topicSubscriptions();
/*获取topicName列表*/
List<String> topicNameList = topList.stream().map(MqttTopicSubscription::topicName).collect(Collectors.toList());
log.debug("=>客户端:{},订阅主题:{}", session.getClientId(), JSON.toJSONString(topicNameList));
if (!TopicsUtils.validTopicFilter(topicNameList)) {
log.error("=>订阅主题不合法:{}", JSON.toJSONString(topicNameList));
return;
}
/*存储到本地topic缓存*/
topicNameList.forEach(topicName -> {
ClientManager.push(topicName, session);
/*累计订阅数*/
redisCache.incr2(FastBeeConstant.REDIS.MESSAGE_SUBSCRIBE_TOTAL,-1L);
});
/*更新客户端ping*/
ClientManager.updatePing(session.getClientId());
/*应答客户端订阅成功*/
MqttSubAckMessage subAckMessage = MqttMessageUtils.buildSubAckMessage(message);
ResponseManager.responseMessage(session, subAckMessage, true);
/*客户端订阅了遗留消息主题后,推送遗留消息给客户端*/
topList.forEach(topic -> {
retain(topic.topicName(), session, topic.qualityOfService());
});
}
/**
* 推送遗留消息
*
* @param topicName 主题
* @param session 客户端
* @param mqttQoS 消息质量
*/
@SneakyThrows
private void retain(String topicName, Session session, MqttQoS mqttQoS) {
RetainMessage message = RetainMsgManager.getRetain(topicName);
if (null == message) {
return;
}
MqttQoS qos = message.getQos() > mqttQoS.value() ? mqttQoS : MqttQoS.valueOf(message.getQos());
switch (qos.value()) {
case 0:
buildMessage(qos, topicName, 0, message.getMessage(), session);
break;
case 1:
case 2:
/*使用实时时间戳充当 packId*/
buildMessage(qos, topicName, (int) System.currentTimeMillis(), message.getMessage(), session);
break;
}
}
/*组装推送数据*/
private void buildMessage(MqttQoS qos, String topicName, int packetId, byte[] message, Session session) {
/*生成客户端model*/
ClientMessage clientMessage = ClientMessage.of(qos, topicName, false, message);
/*组建推送消息*/
MqttPublishMessage publishMessage = MqttMessageUtils.buildPublishMessage(clientMessage, packetId);
/*推送消息*/
ResponseManager.publishClients(publishMessage, session);
}
}

View File

@ -0,0 +1,42 @@
package com.fastbee.mqtt.handler;
import com.fastbee.mqtt.annotation.Process;
import com.fastbee.mqtt.handler.adapter.MqttHandler;
import com.fastbee.mqtt.manager.ClientManager;
import com.fastbee.mqtt.manager.ResponseManager;
import com.fastbee.base.session.Session;
import com.fastbee.base.util.AttributeUtils;
import com.fastbee.mqtt.utils.MqttMessageUtils;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttUnsubAckMessage;
import io.netty.handler.codec.mqtt.MqttUnsubscribeMessage;
import lombok.extern.slf4j.Slf4j;
import java.util.List;
/**
* topic取消订阅处理
*
* @author bill
*/
@Slf4j
@Process(type = MqttMessageType.UNSUBSCRIBE)
public class MqttUnsubscribe implements MqttHandler {
@Override
public void handler(ChannelHandlerContext ctx, MqttMessage message) {
MqttUnsubscribeMessage unsubscribeMessage = (MqttUnsubscribeMessage) message;
List<String> topics = unsubscribeMessage.payload().topics();
log.debug("=>收到取消订阅请求,topics[{}]", topics);
Session session = AttributeUtils.getSession(ctx.channel());
topics.forEach(topic -> {
ClientManager.unsubscribe(topic, session);
});
MqttUnsubAckMessage unsubAckMessage = MqttMessageUtils.buildUnsubAckMessage(unsubscribeMessage);
ResponseManager.responseMessage(session, unsubAckMessage, true);
/*更新客户端平台时间*/
ClientManager.updatePing(session.getClientId());
}
}

View File

@ -0,0 +1,12 @@
package com.fastbee.mqtt.handler.adapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.mqtt.MqttMessage;
/**
* @author bill
*/
public interface MqttHandler {
public void handler(ChannelHandlerContext ctx, MqttMessage message);
}

View File

@ -0,0 +1,89 @@
package com.fastbee.mqtt.handler.adapter;
import com.fastbee.mqtt.manager.SessionManger;
import com.fastbee.base.util.AttributeUtils;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.mqtt.*;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.net.InetSocketAddress;
/**
* @author gsb
* @date 2022/9/15 10:34
*/
@Slf4j
@Component
@ChannelHandler.Sharable
public class MqttMessageAdapter extends SimpleChannelInboundHandler<MqttMessage> {
// @Autowired
private MqttMessageDelegate messageDelegate;
public MqttMessageAdapter(MqttMessageDelegate delegate) {
this.messageDelegate = delegate;
}
/**
* 客户端上报消息处理
*
* @param context 上下文
* @param message 消息
*/
@Override
@SuppressWarnings("ThrowableResultOfMethodCallIgnored")
protected void channelRead0(ChannelHandlerContext context, MqttMessage message) {
try {
/*校验消息*/
if (message.decoderResult().isFailure()) {
exceptionCaught(context, message.decoderResult().cause());
return;
}
/*处理客户端报文*/
messageDelegate.process(context, message);
}catch (Exception e){
log.error("=>数据进栈异常",e);
}
}
/**
* 客户端心跳处理
*/
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
InetSocketAddress socketAddress = (InetSocketAddress) ctx.channel().remoteAddress();
String host = socketAddress.getAddress().getHostAddress();
int port = socketAddress.getPort();
String clientId = AttributeUtils.getClientId(ctx.channel());
if (evt instanceof IdleStateEvent) {
IdleStateEvent idleStateEvent = (IdleStateEvent) evt;
IdleState state = idleStateEvent.state();
if (state == IdleState.ALL_IDLE || state == IdleState.READER_IDLE || state == IdleState.WRITER_IDLE) {
log.error("客户端id[{}] 客户端[{}]port:[{}]心跳超时!",clientId, host, port);
/*关闭通道*/
ctx.close();
/*移除客户端所有信息*/
SessionManger.removeContextByContext(ctx);
}
} else {
super.userEventTriggered(ctx, evt);
}
}
/**
* 处理消息异常情况
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
log.error("=>mqtt连接异常",cause);
/*移除客户端所有信息*/
SessionManger.removeContextByContext(ctx);
}
}

View File

@ -0,0 +1,58 @@
package com.fastbee.mqtt.handler.adapter;
import com.fastbee.common.exception.ServiceException;
import com.fastbee.mqtt.annotation.Process;
import com.fastbee.base.util.AttributeUtils;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttMessageType;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
/**
* 消息代理类,根据注解{@link com.fastbee.mqtt.annotation.Process} 分发处理类
* @author gsb
* @date 2022/10/8 11:50
*/
@Component
@Slf4j
public class MqttMessageDelegate {
/**mqtt报文类型为key报文处理类为值*/
private final Map<MqttMessageType, MqttHandler> processor = new HashMap<>();
public MqttMessageDelegate(List<MqttHandler> handlers){
if (CollectionUtils.isEmpty(handlers)){
throw new ServiceException("报文处理类为空");
}
/*将处理类缓存到map*/
handlers.forEach(handler ->{
Process annotation = handler.getClass().getAnnotation(Process.class);
Optional.ofNullable(annotation)
.map(Process::type)
.ifPresent(messageType ->processor.put(messageType,handler));
});
}
/**
* 匹配报文处理类
*/
public void process(ChannelHandlerContext ctx, MqttMessage message){
/*获取固定头的报文类型*/
MqttMessageType messageType = message.fixedHeader().messageType();
/*处理客户端连接时先判断Attribute是否存储Session*/
if (MqttMessageType.CONNECT != messageType &&
AttributeUtils.getSession(ctx.channel()) == null){
log.error("=>客户端未连接");
throw new ServiceException("客户端未连接");
}
Optional.of(processor.get(messageType))
.ifPresent(mqttHandler -> mqttHandler.handler(ctx,message));
}
}

View File

@ -0,0 +1,174 @@
package com.fastbee.mqtt.manager;
import com.fastbee.common.constant.FastBeeConstant;
import com.fastbee.common.utils.gateway.mq.TopicsUtils;
import com.fastbee.common.utils.DateUtils;
import com.fastbee.common.utils.StringUtils;
import com.fastbee.base.session.Session;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.CollectionUtils;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* 客户端管理
*
* @author gsb
* @date 2022/9/15 16:02
*/
@Slf4j
public class ClientManager {
/*topic本地缓存*/
public static Map<String, Map<String, Session>> topicMap = new ConcurrentHashMap<>();
/*客户端最后一次ping时间设备不正常断开判断*/
private static Map<String, Long> pingMap = new ConcurrentHashMap<>();
/*客户端与topic关联本地缓存*/
public static Map<String, Map<String, Boolean>> clientTopicMap = new ConcurrentHashMap<>();
/**
* 将client的上下文相关信息添加到映射关系表中
* @param topic 主题
* @param session
*/
public static void push(String topic, Session session) {
try {
/*处理topic对应的topic*/
Map<String, Session> clientMap = topicMap.get(topic);
if (StringUtils.isEmpty(clientMap)) {
clientMap = new ConcurrentHashMap<>();
}
clientMap.put(session.getClientId(), session);
topicMap.put(topic, clientMap);
/*处理client对应的所有topic*/
Map<String, Boolean> topicsMap = null;
if (clientTopicMap.containsKey(session.getClientId())) {
topicsMap = clientTopicMap.get(session.getClientId());
if (!topicsMap.containsKey(topic)) {
topicsMap.put(topic, true);
}
} else {
topicsMap = new HashMap<>();
topicsMap.put(topic, true);
clientTopicMap.put(session.getClientId(), topicsMap);
}
} catch (Exception e) {
log.error("=>clientId映射topic出现异常:{},e=", e.getMessage(), e);
}
}
/**
* 清理对应client下的所有数据
*
* @param clientId 客户端id
*/
public static void remove(String clientId) {
try {
/*移除client对应的topic*/
Map<String, Boolean> topics = clientTopicMap.get(clientId);
if (null != topics) {
/*从topic中移除client*/
for (String key : topics.keySet()) {
Map<String, Session> clientMap = topicMap.get(key);
if (CollectionUtils.isEmpty(clientMap)) {
continue;
}
clientMap.remove(clientId);
}
clientTopicMap.remove(clientId);
}
pingMap.remove(clientId);
} catch (Exception e) {
log.warn("=>移除client[{}]异常", e.getMessage());
}
}
/**
* 客户端取消订阅
* 删除指定topic下的指定client
*
* @param topic 主题
* @param session 客户端
*/
public static void unsubscribe(String topic, Session session) {
try {
Map<String, Session> clientMap = topicMap.get(topic);
if (StringUtils.isEmpty(clientMap)) {
return;
}
Session s = clientMap.get(session.getClientId());
if (null == s) {
return;
}
clientMap.remove(session.getClientId());
} catch (Exception e) {
log.error("=>客户端取消订阅异常:{}", e.getMessage());
}
}
/**
* 将消息发送到指定topic下的所有client上去
*
* @param msg 推送消息
*/
public static void pubTopic(MqttPublishMessage msg) {
String topic = msg.variableHeader().topicName();
List<String> topicList = TopicsUtils.searchTopic(topic);
for (String itemTopic : topicList) {
Map<String, Session> clientMap = topicMap.get(itemTopic);
if (StringUtils.isEmpty(clientMap)) {
continue;
}
for (Session session : clientMap.values()) {
String clientId = session.getClientId();
if (!validClient(clientId)) {
///*ws的客户端不正常断开连接后直接移除所有信息*/
//if (session.getClientId().startsWith(FastBeeConstant.SERVER.WS_PREFIX)) {
// log.debug("=>移除ws客户端,clientId={}", session);
// remove(clientId);
//}
log.warn("=>{}不在线", clientId);
continue;
}
ResponseManager.publishClients(msg, session);
}
}
}
/**
* 更新客户端在线时间,给客户端发送消息时用这个看客户端最近是否在线
* 用来判断设备不正常掉线没有应答服务器的情况
*
* @param clientId 客户端id
*/
public static void updatePing(String clientId) {
pingMap.put(clientId, DateUtils.getTimestamp());
}
/**
* 平台判定设备状态 Ping客户端是否在线
*
* @param clientId 客户端id
* @return 结果
*/
public static Boolean validClient(String clientId) {
long currTime = DateUtils.getTimestamp();
/*获取客户端连接时,时间*/
Long timestamp = pingMap.get(clientId);
if (null == timestamp) {
return false;
}
//当设备缓存的心跳时间大于 平台判断时间 1.5f 表示设备不正常断开了服务器
if (currTime - timestamp > FastBeeConstant.SERVER.DEVICE_PING_EXPIRED) {
//pingMap.remove(clientId);
//SessionManger.removeClient(clientId);
return false;
}
return true;
}
}

View File

@ -0,0 +1,90 @@
package com.fastbee.mqtt.manager;
import com.fastbee.common.enums.DeviceStatus;
import com.fastbee.common.enums.TopicType;
import com.fastbee.common.utils.gateway.mq.TopicsUtils;
import com.fastbee.iot.domain.Device;
import com.fastbee.iot.service.IDeviceService;
import com.fastbee.mqtt.model.PushMessageBo;
import com.fastbee.mqttclient.PubMqttClient;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.mqtt.*;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;
/**
* mqttBroker获取设备在线情况
* @author gsb
* @date 2022/10/26 10:25
*/
@Component
public class MqttRemoteManager {
@Resource
private TopicsUtils topicsUtils;
@Resource
private IDeviceService deviceService;
/**
* true: 使用netty搭建的mqttBroker false: 使用emq
*/
@Value("${server.broker.enabled}")
private Boolean enabled;
@Resource
private PubMqttClient pubMqttClient;
/**
* 检查设备是否在该集群节点上(集群)
* @param clientId
* @return
*/
public static boolean checkDeviceStatus(String clientId){
return ClientManager.validClient(clientId);
}
/**
* 推送设备状态
* @param serialNumber 设备
* @param status 状态
*/
public void pushDeviceStatus(Long productId, String serialNumber, DeviceStatus status){
//兼容emqx推送TCP客户端上线
Device device = deviceService.selectDeviceNoModel(serialNumber);
String message = "{\"status\":" + status.getType() + ",\"isShadow\":" + device.getIsShadow() + ",\"rssi\":" + device.getRssi() + "}";
String topic = topicsUtils.buildTopic(device.getProductId(), serialNumber, TopicType.STATUS_POST);
if (enabled){
MqttPublishMessage publishMessage = (MqttPublishMessage) MqttMessageFactory.newMessage(
new MqttFixedHeader(MqttMessageType.PUBLISH, false, MqttQoS.AT_MOST_ONCE, false, 0),
new MqttPublishVariableHeader(topic, 0),
Unpooled.buffer().writeBytes(message.getBytes(StandardCharsets.UTF_8))
);
ClientManager.pubTopic(publishMessage);
}else {
//emqx直接用客户端推送
pubMqttClient.publish(1,false,topic,message);
}
}
/**
* 公共推送消息方法
* @param bo 消息体
*/
public void pushCommon(PushMessageBo bo){
//netty版本发送
if (enabled){
MqttPublishMessage publishMessage = (MqttPublishMessage) MqttMessageFactory.newMessage(
new MqttFixedHeader(MqttMessageType.PUBLISH, false, MqttQoS.AT_MOST_ONCE, false, 0),
new MqttPublishVariableHeader(bo.getTopic(), 0),
Unpooled.buffer().writeBytes(bo.getMessage().getBytes())
);
ClientManager.pubTopic(publishMessage);
}else {
pubMqttClient.publish(0,false,bo.getTopic(), bo.getMessage());
}
}
}

View File

@ -0,0 +1,81 @@
package com.fastbee.mqtt.manager;
import com.fastbee.base.session.Session;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.handler.codec.mqtt.*;
import lombok.extern.slf4j.Slf4j;
/**
* 应答客户端管理类
*
* @author gsb
* @date 2022/9/15 11:07
*/
@Slf4j
public class ResponseManager {
/**
* 发送信息:用于服务端收到消息客户端数据后,向客户端发送响应信息
*
* @param session 上下文
* @param msg mqtt消息
* @param flush 是否刷新
*/
public static void responseMessage(Session session, MqttMessage msg, boolean flush) {
ChannelFuture future = flush ? session.getHandlerContext().writeAndFlush(msg) : session.getHandlerContext().write(msg);
future.addListener(f -> {
if (!f.isSuccess()) {
log.error("=>响应设备[{}],发送消息:{},失败原因:{}", session.getClientId(), msg, f.cause());
}else {
//log.debug("=>相应设备:[{}],发送消息:[{}]",session.getClientId(),msg);
}
});
}
/**
* 发送信息:用于服务端向客户端通过clientID下发消息(单客户端)
*
* @param msg mqtt消息
* @param clientId 客户端id
* @param flush 是否刷新
*/
public static void sendMessage(MqttMessage msg, String clientId, boolean flush) {
Session session = SessionManger.getSession(clientId);
if (session == null || null == session.getHandlerContext()) {
return;
}
responseMessage(session, msg, flush);
}
/**
* 推送消息给订阅客户端(所有订阅客户端)
*
* @param msg 推送消息
* @param session 客户端
*/
public static void publishClients(MqttPublishMessage msg, Session session) {
try {
final Channel channel = session.getHandlerContext().channel();
MqttQoS qos = msg.fixedHeader().qosLevel();
ByteBuf sendBuf = msg.content().retainedDuplicate();
sendBuf.resetReaderIndex();
/*配置推送消息类型*/
MqttFixedHeader Header = new MqttFixedHeader(MqttMessageType.PUBLISH,
false, qos, msg.fixedHeader().isRetain(), 0);
/*设置topic packetId*/
MqttPublishVariableHeader publishVariableHeader = new MqttPublishVariableHeader(
msg.variableHeader().topicName(), msg.variableHeader().packetId());
/*推送消息*/
MqttPublishMessage publishMessage = new MqttPublishMessage(Header,
publishVariableHeader, sendBuf);
channel.writeAndFlush(publishMessage);
} catch (Exception e) {
log.error("=>发送消息异常 {}", msg, e);
}
}
}

View File

@ -0,0 +1,58 @@
package com.fastbee.mqtt.manager;
import com.fastbee.mqtt.model.RetainMessage;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* 处理topic保留消息客户端订阅后
* 立即处理消息,用于消息传递
* @author gsb
* @date 2022/9/16 14:01
*/
@Slf4j
@Component
public class RetainMsgManager {
/*保存topic的retain消息*/
private static Map<String, RetainMessage> retainMap = new ConcurrentHashMap<>();
/**
* 推送保留信息到订阅客户端
*
* @param message 推送消息
*/
public static void pushMessage(MqttPublishMessage message) {
if (null == message || !message.fixedHeader().isRetain()) {
return;
}
byte[] bytes = new byte[message.payload().readableBytes()];
if (bytes.length > 0) {
RetainMessage retainMsg = RetainMessage.builder()
.topic(message.variableHeader().topicName())
.qos(message.fixedHeader().qosLevel().value()).message(bytes).build();
retainMap.put(message.variableHeader().topicName(), retainMsg);
} else {
retainMap.remove(message.variableHeader().topicName());
}
}
public static Integer getSize() {
return retainMap.size();
}
/**
* 获取消息
*
* @param topic 主题
* @return 消息
*/
public static RetainMessage getRetain(String topic) {
return retainMap.get(topic);
}
}

View File

@ -0,0 +1,193 @@
package com.fastbee.mqtt.manager;
import com.fastbee.common.constant.FastBeeConstant;
import com.fastbee.common.core.mq.DeviceStatusBo;
import com.fastbee.common.enums.DeviceStatus;
import com.fastbee.common.exception.ServiceException;
import com.fastbee.common.utils.StringUtils;
import com.fastbee.common.utils.spring.SpringUtils;
import com.fastbee.mq.redischannel.consumer.DeviceStatusConsumer;
import com.fastbee.mq.service.IMessagePublishService;
import com.fastbee.base.service.ISessionStore;
import com.fastbee.base.session.Session;
import com.fastbee.base.util.AttributeUtils;
import com.fastbee.mqtt.utils.MqttMessageUtils;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.mqtt.MqttMessageType;
import lombok.extern.slf4j.Slf4j;
import java.util.Map;
import java.util.Set;
/**
* 会话管理类
*
* @Author guanshubiao
* @Date 2022/9/12 20:22
*/
@Slf4j
public class SessionManger {
/*MQ推送接口*/
private static IMessagePublishService messagePublishService = SpringUtils.getBean(IMessagePublishService.class);
/*Session会话存储*/
private static ISessionStore sessionStore = SpringUtils.getBean(ISessionStore.class);
private static MqttRemoteManager remoteManager = SpringUtils.getBean(MqttRemoteManager.class);
private static DeviceStatusConsumer statusConsumer = SpringUtils.getBean(DeviceStatusConsumer.class);
/**
* 获取当前的在线客户Map
*
* @return 客户端本地缓存
*/
public Map<String, Session> getSessionMap() {
return sessionStore.getSessionMap();
}
/**
* 平台校验客户端是否在线
*
* @param clientId 客户id
* @return 结果
*/
public static boolean containsClientId(String clientId) {
if (sessionStore.containsKey(clientId)) {
return true;
}
return false;
}
/**
* 获取所有在线的客户ClientId
*
* @return 客户端所有id
*/
public Set<String> getClientIds() {
return sessionStore.getSessionMap().keySet();
}
/**
* mqtt新客户连接
*
* @param clientId 客户端id
* @param session 客户端
*/
public static void buildSession(String clientId, Session session) {
log.debug("=>新客户端连接clientId={}", clientId);
if (StringUtils.isEmpty(clientId) || handleContext(session)) {
log.error("=>客户端id为空或者session未注册!");
return;
}
sessionStore.storeSession(clientId, session);
/*更新客户端在平台的最新响应时间*/
ClientManager.updatePing(clientId);
/*发送MQ设备上线*/
DeviceStatusBo statusBo = MqttMessageUtils.buildStatusMsg(session.getHandlerContext(), session.getClientId(), DeviceStatus.ONLINE, session.getIp());
if (!statusBo.getSerialNumber().startsWith(FastBeeConstant.SERVER.WM_PREFIX) &&
!statusBo.getSerialNumber().startsWith(FastBeeConstant.SERVER.WS_PREFIX) &&
!statusBo.getSerialNumber().startsWith(FastBeeConstant.SERVER.FAST_PHONE)) {
statusConsumer.consume(statusBo);
remoteManager.pushDeviceStatus(-1L,statusBo.getSerialNumber(), statusBo.getStatus());
}
}
/**
* 根据客户端id移除客户端
*
* @param clientId 客户端id
*/
public static void removeClient(String clientId) {
log.debug("=>移除客户端,clientId={}", clientId);
try {
if (StringUtils.isEmpty(clientId) || !sessionStore.containsKey(clientId) || clientId.endsWith(FastBeeConstant.SERVER.WS_PREFIX) ||
clientId.endsWith(FastBeeConstant.SERVER.FAST_PHONE)) {
return;
}
Session session = sessionStore.getSession(clientId);
if (handleContext(session)) {
log.error("移除客户端失败,客户端未注册!");
return;
}
//关闭通道
session.getHandlerContext().close();
//移除client
sessionStore.cleanSession(clientId);
session.setMqttMessageType(MqttMessageType.DISCONNECT);
//发送至MQ,设备下线
DeviceStatusBo statusBo = MqttMessageUtils.buildStatusMsg(session.getHandlerContext(), session.getClientId(), DeviceStatus.OFFLINE, session.getIp());
if (!statusBo.getSerialNumber().startsWith(FastBeeConstant.SERVER.WM_PREFIX) &&
!statusBo.getSerialNumber().startsWith(FastBeeConstant.SERVER.WS_PREFIX)) {
statusConsumer.consume(statusBo);
remoteManager.pushDeviceStatus(-1L,statusBo.getSerialNumber(), statusBo.getStatus());
}
} catch (Exception e) {
throw new ServiceException("移除客户端失败,message=" + e.getMessage());
}
}
/**
* 根据客户通道移除客户端
*
* @param ctx 上下文通道
*/
public static void removeContextByContext(ChannelHandlerContext ctx) {
try {
/*获取*/
Session session = AttributeUtils.getSession(ctx.channel());
if (handleContext(session)) {
log.error("=>客户端通道不存在!移除失败");
return;
}
sessionStore.cleanSession(session.getClientId());
session.setMqttMessageType(MqttMessageType.DISCONNECT);
//发送至MQ,设备下线
DeviceStatusBo statusBo = MqttMessageUtils.buildStatusMsg(session.getHandlerContext(), session.getClientId(), DeviceStatus.OFFLINE, session.getIp());
if (!statusBo.getSerialNumber().startsWith(FastBeeConstant.SERVER.WM_PREFIX) &&
!statusBo.getSerialNumber().startsWith(FastBeeConstant.SERVER.WS_PREFIX)) {
statusConsumer.consume(statusBo);
remoteManager.pushDeviceStatus(-1L,statusBo.getSerialNumber(), statusBo.getStatus());
}
} catch (Exception e) {
log.error("=>移除客户端失败={}", e.getMessage());
}
}
/**
* ping判定时间超时
*
* @param clientId 客户id
*/
public static void pingTimeout(String clientId) {
try {
removeClient(clientId);
} catch (Exception e) {
throw new ServiceException("移除超时客户端失败");
}
}
/**
* 根据clientId获取客户通道
*
* @param clientId 客户端id
* @return session
*/
public static Session getSession(String clientId) {
return sessionStore.getSession(clientId);
}
/**
* 校验Session已经注册通道
*
* @param session 客户端
* @return 结果
*/
private static boolean handleContext(Session session) {
if (null == session || null == session.getHandlerContext()) {
return true;
}
return false;
}
}

View File

@ -0,0 +1,34 @@
package com.fastbee.mqtt.manager;
import com.fastbee.mqtt.model.WillMessage;
import lombok.extern.slf4j.Slf4j;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* 遗嘱消息处理
* @author gsb
* @date 2022/9/15 15:46
*/
@Slf4j
public class WillMessageManager {
private static Map<String, WillMessage> map = new ConcurrentHashMap<>();
public static void push(WillMessage message){
map.put(message.getClientId(),message);
}
public static void pop(String clientId){
try {
WillMessage message = map.get(clientId);
if (null == message){
return;
}
ClientManager.pubTopic(message.getMessage());
}catch (Exception e){
log.error("=>发送客户端[{}],遗嘱消息异常",e.getMessage(),e);
}
}
}

View File

@ -0,0 +1,45 @@
package com.fastbee.mqtt.model;
import io.netty.handler.codec.mqtt.MqttQoS;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* @author gsb
* @date 2022/10/7 19:04
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class ClientMessage {
/*共享主题客户端id不为空则指定客户端发送*/
private String sharedClientId;
/*客户端id*/
private String clientId;
/*消息质量*/
private MqttQoS qos;
/*topic*/
private String topicName;
/*是否保留消息*/
private boolean retain;
/*数据*/
private byte[] payload;
private int messageId;
/*是否是遗嘱消息*/
private boolean willFlag;
/*是否是dup消息*/
private boolean dup;
public static ClientMessage of(MqttQoS qos,String topicName,boolean retain, byte[] payload){
return new ClientMessage(null,null,qos,topicName,retain,payload,0,false,false);
}
public static ClientMessage of(String clientId,MqttQoS qos,String topicName,boolean retain){
return new ClientMessage(null,clientId,qos,topicName,retain,null,0,false,false);
}
}

View File

@ -0,0 +1,23 @@
package com.fastbee.mqtt.model;
import lombok.Data;
import java.io.Serializable;
/**
* @author bill
*/
@Data
public class PushMessageBo implements Serializable {
/*主题*/
private String topic;
/*数据*/
private String message;
/*消息质量*/
private int qos;
private Integer value;
private Integer address;
private Integer slaveId;
}

View File

@ -0,0 +1,23 @@
package com.fastbee.mqtt.model;
import lombok.Builder;
import lombok.Data;
import java.io.Serializable;
/**
* 保留消息bo
* @author gsb
* @date 2022/9/16 14:05
*/
@Data
@Builder
public class RetainMessage implements Serializable {
/*主题*/
private String topic;
/*数据*/
private byte[] message;
/*消息质量*/
private int qos;
}

View File

@ -0,0 +1,23 @@
package com.fastbee.mqtt.model;
import lombok.AllArgsConstructor;
import lombok.Data;
/**
* 订阅topic信息
* @author gsb
* @date 2022/10/14 8:30
*/
@Data
@AllArgsConstructor
public class Subscribe {
/*topic*/
private String topicName;
/*消息质量*/
private int qos;
/*客户端id*/
private String clientId;
/*清楚回话*/
private boolean cleanSession;
}

View File

@ -0,0 +1,26 @@
package com.fastbee.mqtt.model;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import lombok.AllArgsConstructor;
import lombok.Data;
import java.io.Serializable;
/**
* @author gsb
* @date 2022/9/15 15:36
*/
@Data
@AllArgsConstructor
public class WillMessage implements Serializable {
private static final long serialVersionUID = -1L;
/*客户端Id*/
private String clientId;
/*清楚客户端*/
private boolean cleanSession;
/*topic*/
private String topic;
/*客户端推送消息*/
private MqttPublishMessage message;
}

View File

@ -0,0 +1,66 @@
package com.fastbee.mqtt.server;
import com.fastbee.server.Server;
import com.fastbee.common.constant.FastBeeConstant;
import com.fastbee.mqtt.handler.adapter.MqttMessageAdapter;
import com.fastbee.server.config.NettyConfig;
import io.netty.bootstrap.AbstractBootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.mqtt.MqttDecoder;
import io.netty.handler.codec.mqtt.MqttEncoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.concurrent.DefaultThreadFactory;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@Component
@Slf4j
public class MqttServer extends Server {
@Autowired
private MqttMessageAdapter messageAdapter;
@Override
protected AbstractBootstrap initialize() {
bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory(config.name, Thread.MAX_PRIORITY));
workerGroup = new NioEventLoopGroup(config.workerCore, new DefaultThreadFactory(config.name, Thread.MAX_PRIORITY));
if (config.businessCore > 0) {
businessService = new ThreadPoolExecutor(config.businessCore, config.businessCore, 1L,
TimeUnit.SECONDS, new LinkedBlockingQueue<>(), new DefaultThreadFactory(config.name, true, Thread.NORM_PRIORITY));
}
return new ServerBootstrap()
.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.DEBUG))
.option(ChannelOption.SO_BACKLOG, 511)
.childOption(ChannelOption.SO_KEEPALIVE, Boolean.TRUE)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel channel) {
//客户端心跳检测机制
channel.pipeline()
.addFirst(FastBeeConstant.SERVER.IDLE
, new IdleStateHandler(config.readerIdleTime, config.writerIdleTime, config.allIdleTime, TimeUnit.SECONDS))
.addLast(FastBeeConstant.SERVER.DECODER, new MqttDecoder(1024 * 1024 * 2))
.addLast(FastBeeConstant.SERVER.ENCODER, MqttEncoder.INSTANCE)
.addLast(messageAdapter);
}
});
}
}

View File

@ -0,0 +1,73 @@
package com.fastbee.mqtt.server;
import com.fastbee.server.Server;
import com.fastbee.common.constant.FastBeeConstant;
import com.fastbee.mqtt.codec.WebSocketMqttCodec;
import com.fastbee.mqtt.handler.adapter.MqttMessageAdapter;
import io.netty.bootstrap.AbstractBootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpContentCompressor;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.codec.mqtt.MqttDecoder;
import io.netty.handler.codec.mqtt.MqttEncoder;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.concurrent.DefaultThreadFactory;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* @author gsb
* @date 2022/9/15 14:23
*/
@Component
@Slf4j
public class WebSocketServer extends Server {
@Autowired
private WebSocketMqttCodec webSocketMqttCodec;
@Autowired
private MqttMessageAdapter mqttMessageAdapter;
@Override
protected AbstractBootstrap initialize() {
bossGroup = new NioEventLoopGroup();
workerGroup = new NioEventLoopGroup();
return new ServerBootstrap()
.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
ch.pipeline()
.addFirst(FastBeeConstant.WS.HEART_BEAT
, new IdleStateHandler(0, 0, 70))
/*http请求响应*/
.addLast(FastBeeConstant.WS.HTTP_SERVER_CODEC, new HttpServerCodec())
/*聚合header与body组成完整的Http请求最大数据量为1Mb*/
.addLast(FastBeeConstant.WS.AGGREGATOR, new HttpObjectAggregator(1024 * 1024))
/*压缩出站数据*/
.addLast(FastBeeConstant.WS.COMPRESSOR, new HttpContentCompressor())
/*WebSocket协议配置mqtt*/
.addLast(FastBeeConstant.WS.PROTOCOL, new WebSocketServerProtocolHandler("/mqtt",
"mqtt,mqttv3.1,mqttv3.1.1,mqttv5.0", true, 65536))
.addLast(FastBeeConstant.WS.MQTT_WEBSOCKET, webSocketMqttCodec)
.addLast(FastBeeConstant.WS.DECODER, new MqttDecoder())
.addLast(FastBeeConstant.WS.ENCODER, MqttEncoder.INSTANCE)
.addLast(FastBeeConstant.WS.BROKER_HANDLER, mqttMessageAdapter);
}
});
}
}

View File

@ -0,0 +1,90 @@
package com.fastbee.mqtt.service;
import com.fastbee.mqtt.model.ClientMessage;
import java.util.List;
import java.util.Map;
/**
* @author gsb
* @date 2022/10/14 14:35
*/
public interface IMessageStore {
/**
* 存储控制包
*
* @param topic: 控制包所属主题
* @param clientMessage: 需要存储的消息
*/
void storeMessage(String topic, ClientMessage clientMessage);
/**
* 清除topic下的所有消息
*
* @param topic: 主题
*/
void cleanTopic(String topic);
/**
* 根据clientId清除消息
*
* @param clientId: 客户端唯一标识
*/
void removeMessage(String clientId);
/**
* 匹配主题过滤器,寻找对应消息
*
* @param topicFilter: 主题过滤器
*/
List<ClientMessage> searchMessages(String topicFilter);
/**
* 保存 clientMessage
*
* @param messageId 消息id
*/
public void savePubMsg(Integer messageId, ClientMessage clientMessage);
/**
* 移除
*
* @param messageId 消息id
*/
public void removePubMsg(int messageId);
/**
* 保存 REL IN
*
* @param messageId 消息id
*/
public void saveRelInMsg(int messageId);
/**
* 保存 REL OUT
*
* @param messageId 消息id
*/
public void saveRelOutMsg(int messageId);
/**
* 移除
*
* @param messageId 消息id
*/
public void removeRelInMsg(int messageId);
/**
* 移除
*
* @param messageId 消息id
*/
public void removeRelOutMsg(int messageId);
/**
* 判断Rel out是否包含消息id
*/
public boolean outRelContains(int messageId);
}

View File

@ -0,0 +1,40 @@
package com.fastbee.mqtt.service;
import com.fastbee.mqtt.model.Subscribe;
import java.util.List;
/**
* 订阅缓存
* @author gsb
* @date 2022/10/14 8:24
*/
public interface ISubscriptionService {
/**
* 保存客户订阅的主题
*
* @param subscribeList 客户订阅
*/
void subscribe(List<Subscribe> subscribeList, String clientId);
/**
* 解除订阅
*
* @param clientId 客户id
* @param topicName 主题
*/
void unsubscribe(String clientId, String topicName);
/**
* 获取订阅了 topic 的客户id
*
* @param topic 主题
* @return 订阅了主题的客户id列表
*/
List<Subscribe> searchSubscribeClientList(String topic);
}

View File

@ -0,0 +1,292 @@
package com.fastbee.mqtt.service.impl;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.fastbee.common.core.mq.InvokeReqDto;
import com.fastbee.common.enums.DeviceLogTypeEnum;
import com.fastbee.common.enums.FunctionReplyStatus;
import com.fastbee.common.enums.TopicType;
import com.fastbee.common.enums.scenemodel.SceneModelTagOpreationEnum;
import com.fastbee.common.exception.ServiceException;
import com.fastbee.common.utils.CaculateVariableAndNumberUtils;
import com.fastbee.common.utils.DateUtils;
import com.fastbee.common.utils.StringUtils;
import com.fastbee.common.utils.date.LocalDateTimeUtils;
import com.fastbee.common.utils.gateway.mq.TopicsUtils;
import com.fastbee.iot.cache.SceneModelTagCache;
import com.fastbee.iot.domain.*;
import com.fastbee.common.core.thingsModel.ThingsModelSimpleItem;
import com.fastbee.common.core.thingsModel.ThingsModelValuesInput;
import com.fastbee.iot.mapper.SceneModelDataMapper;
import com.fastbee.iot.mapper.SceneTagPointsMapper;
import com.fastbee.iot.model.scenemodel.SceneModelTagCacheVO;
import com.fastbee.iot.model.scenemodel.SceneModelTagCycleVO;
import com.fastbee.iot.service.*;
import com.fastbee.iot.tdengine.service.ILogService;
import com.fastbee.mq.model.ReportDataBo;
import com.fastbee.mq.service.IDataHandler;
import com.fastbee.mq.service.IMqttMessagePublish;
import com.fastbee.mq.service.IRuleEngine;
import com.fastbee.mqtt.manager.MqttRemoteManager;
import com.fastbee.mqtt.model.PushMessageBo;
import com.fastbee.mqttclient.PubMqttClient;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import javax.annotation.Resource;
import java.math.BigDecimal;
import java.time.LocalDateTime;
import java.util.*;
/**
* 上报数据处理方法集合
* @author bill
*/
@Service
@Slf4j
public class DataHandlerImpl implements IDataHandler {
@Resource
private IDeviceService deviceService;
@Resource
private IEventLogService eventLogService;
@Resource
private IMqttMessagePublish messagePublish;
@Resource
private IRuleEngine ruleEngine;
@Resource
private MqttRemoteManager remoteManager;
@Resource
private TopicsUtils topicsUtils;
@Resource
private PubMqttClient mqttClient;
@Resource
private SpeakerService speakerService;
@Resource
private SceneTagPointsMapper sceneTagPointsMapper;
@Resource
private SceneModelTagCache sceneModelTagCache;
@Resource
private ILogService logService;
@Resource
private ISceneModelTagService sceneModelTagService;
@Resource
private IFunctionLogService functionLogService;
/**
* 上报属性或功能处理
*
* @param bo 上报数据模型
*/
@Override
public void reportData(ReportDataBo bo) {
try {
List<ThingsModelSimpleItem> thingsModelSimpleItems = bo.getDataList();
if (CollectionUtils.isEmpty(bo.getDataList()) || bo.getDataList().size() == 0) {
thingsModelSimpleItems = JSON.parseArray(bo.getMessage(), ThingsModelSimpleItem.class);
}
if (CollectionUtils.isEmpty(thingsModelSimpleItems)) return;
ThingsModelValuesInput input = new ThingsModelValuesInput();
input.setProductId(bo.getProductId());
// 这里上报设备编号是转的大写,后面存缓存也是使用大写的,所以在查询物模型的值时添加把设备编号转大写后取值
input.setDeviceNumber(bo.getSerialNumber().toUpperCase());
input.setThingsModelValueRemarkItem(thingsModelSimpleItems);
List<ThingsModelSimpleItem> result = deviceService.reportDeviceThingsModelValue(input, bo.getType(), bo.isShadow());
// 只有设备上报进入规则引擎流程
if (bo.isRuleEngine() && !bo.getSerialNumber().startsWith("server-")){
ruleEngine.ruleMatch(bo);
}
//发送至前端
PushMessageBo messageBo = new PushMessageBo();
messageBo.setTopic(topicsUtils.buildTopic(bo.getProductId(), bo.getSerialNumber(), TopicType.WS_SERVICE_INVOKE));
JSONObject pushObj = new JSONObject();
pushObj.put("message", result);
pushObj.put("sources",bo.getSources());
messageBo.setMessage(JSON.toJSONString(pushObj));
remoteManager.pushCommon(messageBo);
// 上报属性给小度音箱,接入小度音箱后可放开
// try {
// List<String> identifierList = thingsModelSimpleItems.stream().map(ThingsModelSimpleItem::getId).collect(Collectors.toList());
// speakerService.reportDuerosAttribute(bo.getSerialNumber(), identifierList);
// } catch (Exception e) {
// log.error("=>上报属性信息给小度音箱异常", e);
// }
} catch (Exception e) {
log.error("接收属性数据,解析数据时异常 message={},e={}", e.getMessage(),e);
}
}
/**
* 上报事件
*
* @param bo 上报数据模型
*/
@Override
public void reportEvent(ReportDataBo bo) {
try {
List<ThingsModelSimpleItem> thingsModelSimpleItems = JSON.parseArray(bo.getMessage(), ThingsModelSimpleItem.class);
Device device = deviceService.selectDeviceBySerialNumber(bo.getSerialNumber());
List<EventLog> results = new ArrayList<>();
for (int i = 0; i < thingsModelSimpleItems.size(); i++) {
// 添加到设备日志
EventLog event = new EventLog();
event.setDeviceId(device.getDeviceId());
event.setDeviceName(device.getDeviceName());
event.setLogValue(thingsModelSimpleItems.get(i).getValue());
event.setRemark(thingsModelSimpleItems.get(i).getRemark());
event.setSerialNumber(device.getSerialNumber());
event.setIdentity(thingsModelSimpleItems.get(i).getId());
event.setLogType(3);
event.setIsMonitor(0);
event.setUserId(device.getTenantId());
event.setUserName(device.getTenantName());
event.setTenantId(device.getTenantId());
event.setTenantName(device.getTenantName());
event.setCreateTime(DateUtils.getNowDate());
// 1=影子模式2=在线模式3=其他
event.setMode(2);
results.add(event);
//eventLogService.insertEventLog(event);
}
eventLogService.insertBatch(results);
if (bo.isRuleEngine()){
ruleEngine.ruleMatch(bo);
}
} catch (Exception e) {
log.error("接收事件,解析数据时异常 message={}", e.getMessage());
}
}
/**
* 上报设备信息
*/
public void reportDevice(ReportDataBo bo) {
try {
// 设备实体
Device deviceEntity = deviceService.selectDeviceBySerialNumber(bo.getSerialNumber());
// 上报设备信息
Device device = JSON.parseObject(bo.getMessage(), Device.class);
device.setProductId(bo.getProductId());
device.setSerialNumber(bo.getSerialNumber());
deviceService.reportDevice(device, deviceEntity);
// 发布设备状态
messagePublish.publishStatus(bo.getProductId(), bo.getSerialNumber(), 3, deviceEntity.getIsShadow(), device.getRssi());
} catch (Exception e) {
log.error("接收设备信息,解析数据时异常 message={}", e.getMessage());
throw new ServiceException(e.getMessage(), 1);
}
}
@Override
public String calculateSceneModelTagValue(Long id) {
LocalDateTime now = LocalDateTime.now();
SceneModelTag sceneModelTag = sceneModelTagService.selectSceneModelTagById(id);
if (null == sceneModelTag) {
return "场景运算型变量计算错误:变量为空";
}
if ((StringUtils.isEmpty(sceneModelTag.getAliasFormule()) && org.apache.commons.collections4.CollectionUtils.isEmpty(sceneModelTag.getTagPointsList()))) {
return "场景运算型变量计算错误:没有计算公式";
}
String checkMsg = sceneModelTagService.checkAliasFormule(sceneModelTag);
if (StringUtils.isNotEmpty(checkMsg)) {
return "场景运算型变量计算错误:" + checkMsg;
}
List<SceneTagPoints> sceneTagPointsList = sceneTagPointsMapper.selectListByTagId(sceneModelTag.getId());
Map<String, String> replaceMap = new HashMap<>(2);
// 计算周期
SceneModelTagCycleVO sceneModelTagCycleVO = new SceneModelTagCycleVO();
boolean b = sceneTagPointsList.stream().anyMatch(s -> !SceneModelTagOpreationEnum.ORIGINAL_VALUE.getCode().equals(s.getOperation()));
if (b) {
sceneModelTagCycleVO = sceneModelTagService.handleTimeCycle(sceneModelTag.getCycleType(), sceneModelTag.getCycle(), now);
}
// 需不需要判断每个变量启用了没有 todo
for (SceneTagPoints sceneTagPoints : sceneTagPointsList) {
String value;
value = sceneModelTagService.getSceneModelDataValue(sceneTagPoints, sceneModelTagCycleVO);
// value没值先兜底0
if (StringUtils.isEmpty(value)) {
value = "0";
}
replaceMap.put(sceneTagPoints.getAlias(), value);
}
BigDecimal execute = CaculateVariableAndNumberUtils.execute(sceneModelTag.getAliasFormule(), replaceMap);
String resultValue = execute.toPlainString();
this.saveSceneModelTagValue(sceneModelTag, resultValue, now);
return resultValue;
}
@Override
public void invokeSceneModelTagValue(InvokeReqDto reqDto, String messageId) {
LocalDateTime now = LocalDateTime.now();
String sceneModelTagId = reqDto.getIdentifier();
Map<String, Object> remoteCommand = reqDto.getRemoteCommand();
String value = remoteCommand.get(sceneModelTagId).toString();
FunctionLog functionLog = new FunctionLog();
functionLog.setIdentify(reqDto.getIdentifier());
functionLog.setFunType(4);
functionLog.setFunValue(value);
functionLog.setMessageId(messageId);
functionLog.setSerialNumber(reqDto.getSceneModelId().toString());
functionLog.setMode(3);
functionLog.setModelName(reqDto.getModelName());
SceneModelTag sceneModelTag = sceneModelTagService.selectSceneModelTagById(Long.valueOf(sceneModelTagId));
if (null == sceneModelTag) {
functionLog.setResultCode(FunctionReplyStatus.FAIl.getCode());
functionLog.setResultMsg(FunctionReplyStatus.FAIl.getMessage());
functionLogService.insertFunctionLog(functionLog);
return;
}
this.saveSceneModelTagValue(sceneModelTag, value, now);
functionLog.setResultCode(FunctionReplyStatus.SUCCESS.getCode());
functionLog.setResultMsg(FunctionReplyStatus.SUCCESS.getMessage());
functionLogService.insertFunctionLog(functionLog);
}
/**
* 保存场景变量值
* @param sceneModelTag 变量类
* @param: value 值
* @param: now 执行时间
* @return void
*/
private void saveSceneModelTagValue(SceneModelTag sceneModelTag, String value, LocalDateTime now) {
// 保存运算型变量值,存缓存
SceneModelTagCacheVO sceneModelTagCacheVO = new SceneModelTagCacheVO();
sceneModelTagCacheVO.setId(sceneModelTag.getId().toString());
sceneModelTagCacheVO.setTs(LocalDateTimeUtils.localDateTimeToStr(now, LocalDateTimeUtils.YYYY_MM_DD_HH_MM_SS));
sceneModelTagCacheVO.setValue(value);
sceneModelTagCache.addSceneModelTagValue(sceneModelTag.getSceneModelId(), sceneModelTagCacheVO);
// 是否历史存储
if (1 == sceneModelTag.getStorage()) {
DeviceLog deviceLog = new DeviceLog();
deviceLog.setIdentity(sceneModelTag.getId().toString());
deviceLog.setModelName(sceneModelTag.getName());
deviceLog.setLogType(DeviceLogTypeEnum.SCENE_VARIABLE_REPORT.getType());
deviceLog.setLogValue(value);
deviceLog.setIsMonitor(0);
deviceLog.setMode(3);
deviceLog.setCreateTime(new Date());
logService.saveDeviceLog(deviceLog);
}
//发送至前端
List<SceneModelTagCacheVO> sendMsg = new ArrayList<>();
sendMsg.add(sceneModelTagCacheVO);
PushMessageBo messageBo = new PushMessageBo();
// /场景id/变量id/scene/report对应变量标识
messageBo.setTopic(TopicsUtils.buildSceneReportTopic(sceneModelTag.getSceneModelId(), sceneModelTag.getSceneModelDeviceId()));
messageBo.setMessage(JSON.toJSONString(sendMsg));
remoteManager.pushCommon(messageBo);
}
}

View File

@ -0,0 +1,345 @@
package com.fastbee.mqtt.service.impl;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.fastbee.base.service.ISessionStore;
import com.fastbee.base.session.Session;
import com.fastbee.common.constant.FastBeeConstant;
import com.fastbee.common.core.mq.DeviceReplyBo;
import com.fastbee.common.core.mq.DeviceReport;
import com.fastbee.common.core.mq.DeviceReportBo;
import com.fastbee.common.core.mq.DeviceTestReportBo;
import com.fastbee.common.core.mq.message.DeviceData;
import com.fastbee.common.core.mq.message.DeviceMessage;
import com.fastbee.common.core.mq.message.SubDeviceMessage;
import com.fastbee.common.core.mq.ota.OtaReplyMessage;
import com.fastbee.common.core.mq.ota.OtaUpgradeBo;
import com.fastbee.common.core.redis.RedisCache;
import com.fastbee.common.core.redis.RedisKeyBuilder;
import com.fastbee.common.core.thingsModel.ThingsModelSimpleItem;
import com.fastbee.common.enums.*;
import com.fastbee.common.exception.ServiceException;
import com.fastbee.common.utils.DateUtils;
import com.fastbee.common.utils.gateway.mq.TopicsUtils;
import com.fastbee.iot.domain.Device;
import com.fastbee.iot.domain.FunctionLog;
import com.fastbee.iot.enums.DeviceType;
import com.fastbee.iot.model.DeviceStatusVO;
import com.fastbee.iot.model.ThingsModels.ValueItem;
import com.fastbee.iot.service.IDeviceService;
import com.fastbee.iot.service.IFirmwareTaskDetailService;
import com.fastbee.iot.service.IFunctionLogService;
import com.fastbee.iot.service.IProductService;
import com.fastbee.iot.cache.IDeviceCache;
import com.fastbee.mq.model.ReportDataBo;
import com.fastbee.mq.redischannel.producer.MessageProducer;
import com.fastbee.mq.redischannel.queue.DeviceTestQueue;
import com.fastbee.mq.service.IDataHandler;
import com.fastbee.mq.service.IDeviceReportMessageService;
import com.fastbee.mqtt.manager.MqttRemoteManager;
import com.fastbee.mqtt.model.PushMessageBo;
import com.fastbee.protocol.base.protocol.IProtocol;
import com.fastbee.protocol.service.IProtocolManagerService;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.math.BigDecimal;
import java.util.*;
/**
* 处理类 处理设备主动上报和设备回调信息
*
* @author bill
*/
@Service
@Slf4j
public class DeviceReportMessageServiceImpl implements IDeviceReportMessageService {
@Autowired
private IDeviceService deviceService;
@Autowired
private IProductService productService;
@Autowired
private IProtocolManagerService protocolManagerService;
@Autowired
private IFirmwareTaskDetailService firmwareTaskDetailService;
@Resource
private IDataHandler dataHandler;
@Resource
private MqttRemoteManager remoteManager;
@Resource
private RedisCache redisCache;
@Resource
private IFunctionLogService logService;
@Resource
private MqttRemoteManager mqttRemoteManager;
@Resource
private ISessionStore sessionStore;
/**
* 处理设备主动上报数据
*/
@Override
public void parseReportMsg(DeviceReportBo bo) {
String serialNumber = bo.getSerialNumber();
switch (bo.getServerType()) {
case MQTT:
log.debug("=>MQ*收到设备主题[{}],消息:[{}]", bo.getTopicName(), bo.getData());
//构建消息
DeviceStatusVO deviceStatusVO = buildReport(bo);
Long productId = bo.getProductId();
/*获取协议处理器*/
IProtocol protocol = selectedProtocol(productId);
DeviceData data = DeviceData.builder()
.serialNumber(serialNumber)
.topicName(bo.getTopicName())
.productId(deviceStatusVO.getProductId())
.data(bo.getData())
.buf(Unpooled.wrappedBuffer(bo.getData()))
.build();
/*根据协议解析后的数据*/
DeviceReport reportMessage = protocol.decode(data, serialNumber);
//设备回复,更新指令下发记录
if (reportMessage.getIsReply()) {
handlerDeviceReply(reportMessage);
}
serialNumber = reportMessage.getSerialNumber() == null ? serialNumber : reportMessage.getSerialNumber();
productId = reportMessage.getProductId()== null ? productId : reportMessage.getProductId();
reportMessage.setSerialNumber(serialNumber);
reportMessage.setProductId(productId);
reportMessage.setPlatformDate(bo.getPlatformDate());
reportMessage.setServerType(bo.getServerType());
//同步设备状态
this.synchDeviceStatus(serialNumber);
//处理网关设备上报数据
processNoSub(reportMessage, bo.getTopicName());
break;
case TCP:
log.debug("*MQ收到TCP推送消息[{}]", JSON.toJSON(bo.getThingsModelSimpleItem()));
DeviceStatusVO deviceStatusVO1 = deviceService.selectDeviceStatusAndTransportStatus(serialNumber);
Optional.ofNullable(deviceStatusVO1).orElseThrow(() -> new ServiceException("设备不存在"));
//同步设备状态
this.synchDeviceStatus(serialNumber);
DeviceReport deviceReport = new DeviceReport();
BeanUtils.copyProperties(bo, deviceReport);
deviceReport.setProductId(deviceStatusVO1.getProductId());
deviceReport.setThingsModelSimpleItem(bo.getThingsModelSimpleItem());
deviceReport.setSlaveId(bo.getSlaveId());
deviceReport.setSerialNumber(deviceStatusVO1.getSerialNumber());
//设备回复数据处理
if (bo.getIsReply()) {
handlerDeviceReply(deviceReport);
}
processNoSub(deviceReport, null);
break;
}
}
/**
* 处理设备回调数据
*/
@Override
public void parseReplyMsg(DeviceReportBo bo) {
log.debug("=>MQ*收到设备回调消息,[{}]", bo);
buildReport(bo);
//获取解析协议
IProtocol protocol = selectedProtocol(bo.getProductId());
DeviceData deviceSource = DeviceData.builder()
.serialNumber(bo.getSerialNumber())
.topicName(bo.getTopicName())
.data(bo.getData())
.build();
//协议解析后的数据
DeviceReport message = protocol.decode(deviceSource, null);
//处理网关设备回复数据
processNoSub(message, bo.getTopicName());
}
/**
* 处理设备OTA升级
*
* @param bo
*/
@Override
public void parseOTAUpdateReply(DeviceReportBo bo) {
//构建消息
buildReport(bo);
//获取编码协议器
IProtocol protocol = selectedProtocol(bo.getProductId());
DeviceData deviceSource = DeviceData
.builder()
.serialNumber(bo.getSerialNumber())
.topicName(bo.getTopicName())
.data(bo.getData())
.build();
DeviceReport otaMessage = protocol.decode(deviceSource, null);
otaMessage.setSerialNumber(bo.getSerialNumber());
otaMessage.setProductId(bo.getProductId());
otaMessage.setMessageId(bo.getMessageId());
//处理OTA升级回复消息
OtaUpgradeBo upgradeBo = OtaUpgradeBo.builder()
.messageId(otaMessage.getMessageId())
.build();
JSONObject result = JSONObject.parseObject(otaMessage.getReplyMessage());
/*
根据各自设备自定义的OTA升级回复来处理数据
这里约定设备OTA升级返回消息结构如下
json结构: "data":{"code":1,"msg":"true"}
*/
JSONObject data = (JSONObject) result.get("data");
OtaReplyMessage otaReplyMessage = JSONObject.toJavaObject(data, OtaReplyMessage.class);
otaReplyMessage.setMessageId(otaMessage.getMessageId());
OTAUpgrade upgrade = OTAUpgrade.parse(otaReplyMessage.getCode());
//更新数据库 OTA升级状态
String version = firmwareTaskDetailService.update(upgradeBo, upgrade);
//如果OTA升级成功更新设备的固件版本号
if (upgrade.equals(OTAUpgrade.SUCCESS)) {
Device device = new Device();
device.setSerialNumber(bo.getSerialNumber());
device.setFirmwareVersion(new BigDecimal(version));
deviceService.updateDeviceFirmwareVersion(device);
}
}
/**
* 构建消息
*
* @param bo
*/
@Override
public DeviceStatusVO buildReport(DeviceReportBo bo) {
DeviceStatusVO deviceStatusVO = deviceService.selectDeviceStatusAndTransportStatus(bo.getSerialNumber());
Optional.ofNullable(deviceStatusVO).orElseThrow(() -> new ServiceException("设备不存在"));
//产品id
bo.setProductId(deviceStatusVO.getProductId());
return deviceStatusVO;
}
/**
* 根据产品id获取协议处理器
*/
@Override
public IProtocol selectedProtocol(Long productId) {
//查询产品获取协议编号
String code = productService.getProtocolByProductId(productId);
return protocolManagerService.getProtocolByProtocolCode(code);
}
/**
* 处理网关设备
*
* @param message
* @param topicName
*/
/**
* 处理网关设备
*
* @param message
* @param topicName
*/
private void processNoSub(DeviceReport message, String topicName) {
if (message.getServerType().equals(ServerType.MQTT)) {
//处理topic以prop结尾上报的数据 (属性)
if (message.getServerType().equals(ServerType.MQTT)) {
if (!topicName.endsWith(TopicType.PROPERTY_POST.getTopicSuffix())) {
return;
}
}
}
ReportDataBo report = new ReportDataBo();
report.setSerialNumber(message.getSerialNumber());
report.setProductId(message.getProductId());
report.setDataList(message.getThingsModelSimpleItem());
report.setType(1);
report.setUserId(message.getUserId());
report.setUserName(message.getUserName());
report.setDeviceName(message.getDeviceName());
report.setSources(message.getSources());
//属性上报执行规则引擎
report.setRuleEngine(true);
dataHandler.reportData(report);
}
/**
* 处理设备回调信息此处按照topic区分 prop上报和设备回调reply
* 如果模组可订阅的topic有限不能区分prop上报和reply自行根据上报数据来区分
* @param message
*/
public void handlerDeviceReply(DeviceReport message) {
String messageId = "";
String sources = message.getSources();
String serialNumber = message.getSerialNumber();
String cacheKey = RedisKeyBuilder.buildDownMessageIdCacheKey(serialNumber);
Set<String> functionList = redisCache.zRange(cacheKey, 0, -1);
//从redis中获取messageId 流水号,获取下发记录
for (String fun : functionList) {
String[] split = fun.split(":");
if (split[0].equals(sources)){
messageId = split[1];
}
redisCache.zRem(cacheKey,fun);
}
FunctionLog functionLog = new FunctionLog();
switch (message.getProtocolCode()){
case FastBeeConstant.PROTOCOL.ModbusRtu:
case FastBeeConstant.PROTOCOL.ModbusToJsonHP:
case FastBeeConstant.PROTOCOL.ModbusRtuPak:
//更新值
functionLog.setResultCode(FunctionReplyStatus.SUCCESS.getCode());
functionLog.setResultMsg(FunctionReplyStatus.SUCCESS.getMessage());
functionLog.setReplyTime(DateUtils.getNowDate());
functionLog.setMessageId(message.getMessageId() == null ? messageId : message.getMessageId());
logService.updateByMessageId(functionLog);
break;
}
}
/**
* 解析OTA升级回复消息,更新升级状态
*/
private void otaUpgrade(DeviceReport message, String topicName) {
}
/**
* 同步设备状态
* @param
*/
private void synchDeviceStatus(String serialNumber){
//如果有数据上报,但是数据库设备状态为离线,则进行同步
DeviceStatusVO deviceStatusVO = deviceService.selectDeviceStatusAndTransportStatus(serialNumber);
if (deviceStatusVO.getStatus() == DeviceStatus.OFFLINE.getType()
|| deviceStatusVO.getStatus() == DeviceStatus.UNACTIVATED.getType()){
Device updateBo = new Device();
updateBo.setStatus(3);
updateBo.setSerialNumber(deviceStatusVO.getSerialNumber());
updateBo.setUpdateTime(DateUtils.getNowDate());
deviceService.updateDeviceStatus(updateBo);
mqttRemoteManager.pushDeviceStatus(deviceStatusVO.getProductId(), deviceStatusVO.getSerialNumber(), DeviceStatus.ONLINE);
//如果是子设备维护子设备的状态到session
if (deviceStatusVO.getDeviceType() == DeviceType.SUB_GATEWAY.getCode()) {
Session session = new Session();
session.setServerType(ServerType.MQTT);
session.setClientId(deviceStatusVO.getSerialNumber());
session.setLastAccessTime(DateUtils.getTimestamp());
session.setConnected(true);
sessionStore.storeSession(deviceStatusVO.getSerialNumber(), session);
}
}
}
}

View File

@ -0,0 +1,66 @@
package com.fastbee.mqtt.service.impl;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.fastbee.common.core.device.DeviceAndProtocol;
import com.fastbee.common.core.mq.DeviceTestReportBo;
import com.fastbee.common.core.mq.message.DeviceMessage;
import com.fastbee.common.enums.TopicType;
import com.fastbee.common.utils.gateway.mq.TopicsUtils;
import com.fastbee.iot.service.IDeviceService;
import com.fastbee.mq.service.IDeviceTestService;
import com.fastbee.mqtt.manager.MqttRemoteManager;
import com.fastbee.mqtt.model.PushMessageBo;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.Date;
import java.util.Objects;
/**
* @author bill
*/
@Slf4j
@Component
public class DeviceTestHandler implements IDeviceTestService {
@Resource
private TopicsUtils topicsUtils;
@Resource
private MqttRemoteManager mqttRemoteManager;
@Resource
private IDeviceService deviceService;
/**
* 处理数据调试信息
* @param testReportBo
*/
@Override
public void messageHandler(DeviceTestReportBo testReportBo){
boolean isReply = testReportBo.getIsReply();
Long productId = testReportBo.getProductId();
String serialNumber = testReportBo.getSerialNumber();
DeviceMessage deviceMessage = new DeviceMessage();
deviceMessage.setMessage(testReportBo.getSources());
deviceMessage.setTime(new Date());
if (!isReply) {
deviceMessage.setTopicName(TopicType.SERVICE_INVOKE_REPLY.getTopicSuffix());
} else {
deviceMessage.setTopicName(TopicType.PROPERTY_POST.getTopicSuffix());
}
if (Objects.isNull(productId)){
DeviceAndProtocol protocol = deviceService.selectProtocolBySerialNumber(serialNumber);
productId = protocol.getProductId();
}
//发送至前端
PushMessageBo messageBo = new PushMessageBo();
messageBo.setTopic(topicsUtils.buildTopic(productId, serialNumber.toUpperCase(), TopicType.WS_SERVICE_INVOKE));
JSONObject pushObj = new JSONObject();
pushObj.put("message", testReportBo.getThingsModelSimpleItem());
pushObj.put("sources",testReportBo.getSources());
messageBo.setMessage(JSON.toJSONString(pushObj));
mqttRemoteManager.pushCommon(messageBo);
}
}

View File

@ -0,0 +1,159 @@
package com.fastbee.mqtt.service.impl;
import com.fastbee.common.utils.gateway.mq.TopicsUtils;
import com.fastbee.mqtt.model.ClientMessage;
import com.fastbee.mqtt.service.IMessageStore;
import org.springframework.stereotype.Service;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
/**
* Retain will Qos12消息存储接口 -TODO 后续Redis处理
*
* @author gsb
* @date 2022/10/14 14:35
*/
@Service
public class MessageStoreImpl implements IMessageStore {
/**
* 存储消息,保留消息,遗留消息
*/
private final Map<String, ClientMessage> willOrRetainMap = new ConcurrentHashMap<>();
/**
* Qos2 Pub消息
*/
private final Map<Integer, ClientMessage> publishMap = new ConcurrentHashMap<>();
/**
* Qos2 REL IN消息
*/
private final Set<Integer> outRelSet = new HashSet<>();
/**
* Qos2 REL out
*/
private final Set<Integer> inRelSet = new HashSet<>();
/**
* 存储控制包
*
* @param topic: 控制包所属主题
* @param clientMessage: 需要存储的消息
*/
@Override
public void storeMessage(String topic, ClientMessage clientMessage) {
willOrRetainMap.put(topic, clientMessage);
}
/**
* 清除topic下的所有消息
*
* @param topic: 主题
*/
@Override
public void cleanTopic(String topic) {
willOrRetainMap.remove(topic);
}
/**
* 根据clientId清除消息
*
* @param clientId: 客户端唯一标识
*/
@Override
public void removeMessage(String clientId) {
for (Map.Entry<String, ClientMessage> entry : willOrRetainMap.entrySet()) {
if (entry.getValue().getClientId().equals(clientId)) {
willOrRetainMap.remove(entry.getKey());
}
}
}
/**
* 匹配主题过滤器,匹配消息
*
* @param topicFilter: 主题过滤器
*/
@Override
public List<ClientMessage> searchMessages(String topicFilter) {
List<ClientMessage> messageList = new ArrayList<>();
for (String topic : willOrRetainMap.keySet()) {
if (TopicsUtils.matchTopic(topic, topicFilter)) {
messageList.add(willOrRetainMap.get(topic));
}
}
return messageList;
}
/**
* 保存 clientMessage
*
* @param messageId 消息id
*/
@Override
public void savePubMsg(Integer messageId, ClientMessage clientMessage){
publishMap.put(messageId,clientMessage);
}
/**
* 移除
*
* @param messageId 消息id
*/
@Override
public void removePubMsg(int messageId){
publishMap.remove(messageId);
}
/**
* 保存 REL IN
*
* @param messageId 消息id
*/
@Override
public void saveRelInMsg(int messageId){
inRelSet.add(messageId);
}
/**
* 保存 REL OUT
*
* @param messageId 消息id
*/
@Override
public void saveRelOutMsg(int messageId){
outRelSet.add(messageId);
}
/**
* 移除
*
* @param messageId 消息id
*/
@Override
public void removeRelInMsg(int messageId){
inRelSet.remove(messageId);
}
/**
* 移除
*
* @param messageId 消息id
*/
@Override
public void removeRelOutMsg(int messageId){
outRelSet.remove(messageId);
}
/**
* 判断Rel out是否包含消息id
*/
@Override
public boolean outRelContains(int messageId){
return outRelSet.contains(messageId);
}
}

View File

@ -0,0 +1,434 @@
package com.fastbee.mqtt.service.impl;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.fastbee.common.constant.FastBeeConstant;
import com.fastbee.common.core.device.DeviceAndProtocol;
import com.fastbee.common.core.mq.MQSendMessageBo;
import com.fastbee.common.core.mq.message.*;
import com.fastbee.common.core.mq.ota.OtaUpgradeBo;
import com.fastbee.common.core.protocol.Message;
import com.fastbee.common.core.redis.RedisCache;
import com.fastbee.common.core.redis.RedisKeyBuilder;
import com.fastbee.common.core.thingsModel.ThingsModelSimpleItem;
import com.fastbee.common.enums.FunctionReplyStatus;
import com.fastbee.common.enums.OTAUpgrade;
import com.fastbee.common.enums.ServerType;
import com.fastbee.common.enums.TopicType;
import com.fastbee.common.exception.ServiceException;
import com.fastbee.common.utils.DateUtils;
import com.fastbee.common.utils.StringUtils;
import com.fastbee.common.utils.gateway.CRC16Utils;
import com.fastbee.common.utils.gateway.mq.TopicsUtils;
import com.fastbee.common.utils.ip.IpUtils;
import com.fastbee.common.utils.modbus.ModbusUtils;
import com.fastbee.iot.cache.IFirmwareCache;
import com.fastbee.iot.cache.ITSLCache;
import com.fastbee.iot.domain.*;
import com.fastbee.iot.enums.DeviceType;
import com.fastbee.iot.model.NtpModel;
import com.fastbee.iot.model.ThingsModels.ThingsModelValueItem;
import com.fastbee.iot.ruleEngine.MsgContext;
import com.fastbee.iot.ruleEngine.RuleProcess;
import com.fastbee.iot.service.IDeviceService;
import com.fastbee.iot.service.IFirmwareTaskDetailService;
import com.fastbee.iot.service.IFunctionLogService;
import com.fastbee.iot.service.IOrderControlService;
import com.fastbee.iot.util.SnowflakeIdWorker;
import com.fastbee.mq.model.ReportDataBo;
import com.fastbee.mq.service.IDataHandler;
import com.fastbee.mq.service.IMqttMessagePublish;
import com.fastbee.mq.service.impl.MessageManager;
import com.fastbee.mqttclient.PubMqttClient;
import com.fastbee.protocol.base.protocol.IProtocol;
import com.fastbee.protocol.domain.DeviceProtocol;
import com.fastbee.protocol.service.IProtocolManagerService;
import com.fastbee.sip.service.IGatewayService;
import io.netty.buffer.Unpooled;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.*;
import static com.fastbee.common.utils.SecurityUtils.getLoginUser;
/**
* 消息推送方法集合
*
* @author bill
*/
@Slf4j
@Service
public class MqttMessagePublishImpl implements IMqttMessagePublish {
@Resource
private IProtocolManagerService protocolManagerService;
@Resource
private PubMqttClient mqttClient;
@Resource
private IFirmwareCache firmwareCache;
@Resource
private IFirmwareTaskDetailService firmwareTaskDetailService;
@Resource
private MessageManager messageManager;
@Resource
private TopicsUtils topicsUtils;
@Resource
private IDeviceService deviceService;
@Resource
private IFunctionLogService functionLogService;
@Resource
private IDataHandler dataHandler;
@Resource
private RuleProcess ruleProcess;
@Autowired
private IGatewayService gatewayService;
@Resource
private ITSLCache itslCache;
@Resource
private RedisCache redisCache;
@Resource
private IOrderControlService orderControlService;
private final SnowflakeIdWorker snowflakeIdWorker = new SnowflakeIdWorker(3);
/**
* 服务(指令)下发
*/
@Override
public void funcSend(MQSendMessageBo bo) {
DeviceAndProtocol deviceAndProtocol = deviceService.selectProtocolBySerialNumber(bo.getSerialNumber());
Optional.ofNullable(deviceAndProtocol).orElseThrow(() -> new ServiceException("服务下发的设备:[" + bo.getSerialNumber() + "]不存在"));
int deviceType = deviceAndProtocol.getDeviceType();
String sourceNo = bo.getSerialNumber();
//处理数据下发
bo.setDp(deviceAndProtocol);
Long productId = bo.getDp().getProductId();
String serialNumber = bo.getSerialNumber();
String transport = bo.getDp().getTransport();
//获取物模型
ThingsModelValueItem thingModels = itslCache.getSingleThingModels(productId, bo.getIdentifier());
ModbusConfig modbusConfig = thingModels.getConfig();
if (!Objects.isNull(modbusConfig)) {
thingModels.getConfig().setModbusCode(ModbusUtils.getModbusCode(modbusConfig.getType()));
if (deviceType == DeviceType.SUB_GATEWAY.getCode()) {
//这里获取绑定网关时,设置的子设备地址
thingModels.getConfig().setSlave(deviceAndProtocol.getSlaveId() == null ? deviceAndProtocol.getProSlaveId() : deviceAndProtocol.getSlaveId());
//设置网关产品id设备编号
bo.setSerialNumber(deviceAndProtocol.getGwSerialNumber());
bo.getDp().setProductId(deviceAndProtocol.getGwProductId());
productId = deviceAndProtocol.getGwProductId();
serialNumber = deviceAndProtocol.getGwSerialNumber();
}
}
bo.setThingsModel(JSON.toJSONString(thingModels));
Integer type = thingModels.getType();
//处理影子模式
this.hadndlerShadow(bo, type);
//下发指令日志
FunctionLog funcLog = this.handleLog(bo, thingModels.getName());
ServerType serverType = ServerType.explain(transport);
//组建下发服务指令
FunctionCallBackBo backBo = buildMessage(bo);
switch (serverType) {
case MQTT:
// 规则引擎脚本处理,完成后返回结果
MsgContext context = ruleProcess.processRuleScript(serialNumber, 2, backBo.getTopicName(), new String(backBo.getMessage()));
if (!Objects.isNull(context) && StringUtils.isNotEmpty(context.getPayload())
&& StringUtils.isNotEmpty(context.getTopic())) {
backBo.setTopicName(context.getTopic());
backBo.setMessage(context.getPayload().getBytes());
}
publishWithLog(backBo.getTopicName(), backBo.getMessage(), funcLog);
log.debug("=>服务下发,topic=[{}],指令=[{}]", backBo.getTopicName(), new String(backBo.getMessage()));
break;
case TCP:
Message data = new Message();
data.setPayload(Unpooled.wrappedBuffer(backBo.getMessage()));
data.setClientId(backBo.getSerialNumber());
messageManager.requestR(serialNumber, data, Message.class);
funcLog.setResultMsg(FunctionReplyStatus.NORELY.getMessage());
funcLog.setResultCode(FunctionReplyStatus.NORELY.getCode());
functionLogService.insertFunctionLog(funcLog);
break;
case UDP:
break;
case COAP:
break;
case GB28181:
MqttMessagePublishImpl.log.debug("=>功能指令下发,functinos=[{}]", bo);
gatewayService.sendFunction(serialNumber, bo.getIdentifier(), bo.getParams().getString(bo.getIdentifier()));
break;
}
//发送至前端数据调试
String topic = topicsUtils.buildTopic(bo.getDp().getProductId(), bo.getSerialNumber(), TopicType.MESSAGE_POST);
DeviceMessage deviceMessage = new DeviceMessage();
deviceMessage.setMessage(backBo.getSources());
deviceMessage.setTime(new Date());
deviceMessage.setTopicName(TopicType.FUNCTION_GET.getTopicSuffix());
byte[] bytes = JSONObject.toJSONString(deviceMessage).getBytes();
publishWithLog(topic, bytes, null);
if (deviceAndProtocol.getProtocolCode().equals(FastBeeConstant.PROTOCOL.ModbusRtu)) {
//这里做一个消息id标记消息下发顺序如果设备指令带流水号则不需要使用
String cacheKey = RedisKeyBuilder.buildDownMessageIdCacheKey(sourceNo);
redisCache.zSetAdd(cacheKey, backBo.getSources() + ":" + bo.getMessageId(), DateUtils.getTimestampSeconds());
}
//处理指令下发权限问题
deviceService.updateByOrder(bo.getUserId(), deviceAndProtocol.getDeviceId());
}
/**
* 处理影子模式
*/
private void hadndlerShadow(MQSendMessageBo bo, int type) {
//处理设备影子模式
if (bo.isShadow()) {
List<ThingsModelSimpleItem> dataList = new ArrayList<>();
bo.getParams().forEach((key, value) -> {
ThingsModelSimpleItem item = new ThingsModelSimpleItem();
item.setId(key);
item.setValue(value + "");
dataList.add(item);
});
ReportDataBo dataBo = new ReportDataBo();
dataBo.setDataList(dataList)
.setProductId(bo.getDp().getProductId())
.setSerialNumber(bo.getSerialNumber())
.setRuleEngine(false)
.setShadow(true)
.setType(type);
dataHandler.reportData(dataBo);
return;
}
}
/**
* 处理下发指令日志
*
* @return
*/
private FunctionLog handleLog(MQSendMessageBo message, String modelName) {
/* 下发服务数据存储对象*/
JSONObject params = message.getParams();
String val = params.get(message.getIdentifier()) + "";
message.setValue(val);
FunctionLog funcLog = new FunctionLog();
funcLog.setCreateTime(DateUtils.getNowDate());
funcLog.setFunValue(val);
funcLog.setMessageId(message.getMessageId());
funcLog.setSerialNumber(message.getSerialNumber());
funcLog.setIdentify(message.getIdentifier());
funcLog.setShowValue(val);
funcLog.setFunType(1);
funcLog.setModelName(modelName);
return funcLog;
}
/**
* OTA升级下发
*
* @param bo
*/
@Override
public void upGradeOTA(OtaUpgradeBo bo) {
//获取唯一messageId
bo.setMessageId(String.valueOf(snowflakeIdWorker.nextId()));
//获取固件版本缓存
String url = this.firmwareCache.getFirmwareCache(bo.getOtaId());
if (StringUtils.isEmpty(url)) {
firmwareCache.setFirmwareCache(bo.getOtaId(), bo.getOtaUrl());
}
DeviceProtocol deviceProtocol = protocolManagerService.getProtocolBySerialNumber(bo.getSerialNumber());
IProtocol protocol = deviceProtocol.getProtocol();
/*组建下发OTA升级topic*/
String topicName = topicsUtils.buildTopic(bo.getProductId(), bo.getSerialNumber(), TopicType.FIRMWARE_SET);
DeviceDownMessage deviceDownMessage = buildMessage(bo);
DeviceData deviceSource = DeviceData.builder()
.serialNumber(bo.getSerialNumber())
.topicName(topicName)
.downMessage(deviceDownMessage)
.build();
//TODO -- 后续处理 。。编码OTA升级消息
byte[] otaUpgrade = protocol.encodeOTA(deviceSource);
FunctionLog log = new FunctionLog();
log.setCreateTime(DateUtils.getNowDate());
log.setSerialNumber(bo.getSerialNumber());
log.setFunType(3);
log.setMessageId(bo.getMessageId());
log.setDeviceName(bo.getDeviceName());
log.setIdentify("OTA");
log.setFunValue("名称:" + bo.getFirmwareName() + " 版本:" + bo.getFirmwareVersion());
log.setShowValue(bo.getOtaUrl());
// 通过内部mqtt客户端下发消息
publishWithLog(topicName, otaUpgrade, log);
// 更新数据库
firmwareTaskDetailService.update(bo, OTAUpgrade.SEND);
}
@Override
public FunctionCallBackBo buildMessage(MQSendMessageBo bo) {
String protocolCode = bo.getDp().getProtocolCode();
Long productId = bo.getDp().getProductId();
String serialNumber = bo.getSerialNumber();
/*组建Topic*/
String topic = topicsUtils.buildTopic(productId, serialNumber, TopicType.FUNCTION_GET);
bo.setTopicName(topic);
/*获取编码协议*/
IProtocol protocolInstance = protocolManagerService.getProtocolByProtocolCode(protocolCode);
//根据协议编码后数据
FunctionCallBackBo callBackBo = protocolInstance.encode(bo);
callBackBo.setSerialNumber(serialNumber);
callBackBo.setTopicName(topic);
return callBackBo;
}
/**
* 1.发布设备状态
*/
@Override
public void publishStatus(Long productId, String deviceNum, int deviceStatus, int isShadow, int rssi) {
String message = "{\"status\":" + deviceStatus + ",\"isShadow\":" + isShadow + ",\"rssi\":" + rssi + "}";
String topic = topicsUtils.buildTopic(productId, deviceNum, TopicType.STATUS_POST);
mqttClient.publish(1, false, topic, message);
}
/**
* 2.发布设备信息
*/
@Override
public void publishInfo(Long productId, String deviceNum) {
String topic = topicsUtils.buildTopic(productId, deviceNum, TopicType.INFO_GET);
mqttClient.publish(1, false, topic, "");
}
/**
* 3.发布时钟同步信息
*
* @param bo 数据模型
*/
public void publishNtp(ReportDataBo bo) {
NtpModel ntpModel = JSON.parseObject(bo.getMessage(), NtpModel.class);
ntpModel.setServerRecvTime(System.currentTimeMillis());
ntpModel.setServerSendTime(System.currentTimeMillis());
String topic = topicsUtils.buildTopic(bo.getProductId(), bo.getSerialNumber(), TopicType.NTP_GET);
mqttClient.publish(1, false, topic, JSON.toJSONString(ntpModel));
}
/**
* 4.发布属性
* delay 延时,秒为单位
*/
@Override
public void publishProperty(Long productId, String deviceNum, List<ThingsModelSimpleItem> thingsList, int delay) {
String pre = "";
if (delay > 0) {
pre = "$delayed/" + String.valueOf(delay) + "/";
}
String topic = topicsUtils.buildTopic(productId, deviceNum, TopicType.FUNCTION_GET);
if (thingsList == null) {
mqttClient.publish(1, true, topic, "");
} else {
mqttClient.publish(1, true, topic, JSON.toJSONString(thingsList));
}
}
/**
* 5.发布功能
* delay 延时,秒为单位
*/
@Override
public void publishFunction(Long productId, String deviceNum, List<ThingsModelSimpleItem> thingsList, int delay) {
String pre = "";
if (delay > 0) {
pre = "$delayed/" + String.valueOf(delay) + "/";
}
String topic = topicsUtils.buildTopic(productId, deviceNum, TopicType.FUNCTION_GET);
if (thingsList == null) {
mqttClient.publish(1, true, topic, "");
} else {
mqttClient.publish(1, true, topic, JSON.toJSONString(thingsList));
}
}
/**
* 设备数据同步
*
* @param deviceNumber 设备编号
* @return 设备
*/
public Device deviceSynchronization(String deviceNumber) {
Device device = deviceService.selectDeviceBySerialNumber(deviceNumber);
// 1-未激活2-禁用3-在线4-离线
if (device.getStatus() == 3) {
device.setStatus(4);
deviceService.updateDeviceStatus(device);
// 发布设备信息
publishInfo(device.getProductId(), device.getSerialNumber());
}
return device;
}
public byte[] CRC(byte[] source) {
source[2] = (byte) ((int) source[2] * 2);
byte[] result = new byte[source.length + 2];
byte[] crc16Byte = CRC16Utils.getCrc16Byte(source);
System.arraycopy(source, 0, result, 0, source.length);
System.arraycopy(crc16Byte, 0, result, result.length - 2, 2);
return result;
}
/**
* =======
* >>>>>>> guanfeng/dev
* 搭建消息
*
* @param bo
* @return
*/
private DeviceDownMessage buildMessage(OtaUpgradeBo bo) {
String messageId = String.valueOf(snowflakeIdWorker.nextId());
bo.setMessageId(messageId);
bo.setOtaUrl("http://" + IpUtils.getHostIp() + bo.getOtaUrl());
return DeviceDownMessage.builder()
.productId(bo.getProductId())
.serialNumber(bo.getSerialNumber())
.body(JSON.toJSON(bo))
.timestamp(DateUtils.getTimestamp())
.messageId(messageId)
.build();
}
public void publishWithLog(String topic, byte[] pushMessage, FunctionLog log) {
try {
redisCache.incr2(FastBeeConstant.REDIS.MESSAGE_SEND_TOTAL, -1L);
redisCache.incr2(FastBeeConstant.REDIS.MESSAGE_SEND_TODAY, 60 * 60 * 24);
mqttClient.publish(pushMessage, topic, false, 0);
if (null != log) {
//存储服务下发成功
log.setResultMsg(FunctionReplyStatus.NORELY.getMessage());
log.setResultCode(FunctionReplyStatus.NORELY.getCode());
functionLogService.insertFunctionLog(log);
}
} catch (Exception e) {
if (null != log) {
//服务下发失败存储
log.setResultMsg(FunctionReplyStatus.FAIl.getMessage() + "原因: " + e.getMessage());
log.setResultCode(FunctionReplyStatus.FAIl.getCode());
functionLogService.insertFunctionLog(log);
}
}
}
}

View File

@ -0,0 +1,56 @@
package com.fastbee.mqtt.service.impl;
import com.fastbee.common.core.redis.RedisCache;
import com.fastbee.mqtt.model.Subscribe;
import com.fastbee.mqtt.service.ISubscriptionService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.List;
/**
* @author gsb
* @date 2022/10/14 8:37
*/
@Slf4j
@Component
public class SubscriptionServiceImpl implements ISubscriptionService {
@Autowired
private RedisCache redisCache;
/**
* 保存客户订阅的主题
*
* @param subscribeList 主题列表
*/
@Override
public void subscribe(List<Subscribe> subscribeList, String clientId) {
redisCache.setCacheList(clientId, subscribeList);
}
/**
* 解除订阅
*
* @param clientId 客户id
* @param topicName 主题
*/
@Override
public void unsubscribe(String clientId, String topicName) {
redisCache.delHashValue(topicName, clientId);
}
/**
* 获取订阅了 topic 的客户id
*
* @param topic 主题
* @return 订阅了主题的客户id列表
*/
@Override
public List<Subscribe> searchSubscribeClientList(String topic) {
return null;
}
}

View File

@ -0,0 +1,144 @@
package com.fastbee.mqtt.utils;
import com.fastbee.common.core.mq.DeviceStatusBo;
import com.fastbee.common.enums.DeviceStatus;
import com.fastbee.common.utils.DateUtils;
import com.fastbee.mqtt.model.ClientMessage;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.mqtt.*;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
/**
* 服务器应答信息构建
* @author gsb
* @date 2022/10/7 14:17
*/
public class MqttMessageUtils {
/**
* 服务器确认连接应答消息 CONNACK
*/
public static MqttConnAckMessage buildConntAckMessage(MqttConnectReturnCode code, boolean sessionPresent) {
MqttFixedHeader fixedHeader = buildFixedHeader(MqttMessageType.CONNACK);
MqttConnAckVariableHeader variableHeader = new MqttConnAckVariableHeader(code, sessionPresent);
return new MqttConnAckMessage(fixedHeader, variableHeader);
}
/**
* 设备ping(心跳信息)应答 PINGRESP
*/
public static MqttMessage buildPingResp() {
MqttFixedHeader fixedHeader = buildFixedHeader(MqttMessageType.PINGRESP);
return new MqttMessage(fixedHeader);
}
/**
* 取消订阅消息应答 UNSUBACK
*/
public static MqttUnsubAckMessage buildUnsubAckMessage(MqttMessage message) {
/*构建固定报文*/
MqttFixedHeader fixedHeader = buildFixedHeader(MqttMessageType.UNSUBACK);
return new MqttUnsubAckMessage(fixedHeader, getIdVariableHeader(message));
}
/**
* 订阅确认应答 SUBACK
*/
public static MqttSubAckMessage buildSubAckMessage(MqttMessage message) {
/*构建固定报文*/
MqttFixedHeader fixedHeader = buildFixedHeader(MqttMessageType.SUBACK);
/*构建可变报文*/
MqttSubscribeMessage mqttSubscribeMessage = (MqttSubscribeMessage) message;
/*获取订阅topic的Qos*/
Set<String> topics = mqttSubscribeMessage.payload().topicSubscriptions().stream().map(MqttTopicSubscription::topicName).collect(Collectors.toSet());
List<Integer> grantedQos = new ArrayList<>(topics.size());
for (int i = 0; i < topics.size(); i++) {
grantedQos.add(mqttSubscribeMessage.payload().topicSubscriptions().get(i).qualityOfService().value());
}
/*负载*/
MqttSubAckPayload payload = new MqttSubAckPayload(grantedQos);
return new MqttSubAckMessage(fixedHeader, getIdVariableHeader(message), payload);
}
/**
* 构建推送应答消息 PUBLISH
*/
public static MqttPublishMessage buildPublishMessage(ClientMessage msg, int packageId) {
/*报文固定头*/
MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH, msg.isDup(), msg.getQos(), false, 0);
/*报文可变头*/
MqttPublishVariableHeader variableHeader = new MqttPublishVariableHeader(msg.getTopicName(), packageId);
/*负载*/
ByteBuf payload = msg.getPayload() == null ? Unpooled.EMPTY_BUFFER : Unpooled.wrappedBuffer(msg.getPayload());
/*完整报文,固定头+可变头+payload*/
return new MqttPublishMessage(fixedHeader, variableHeader, payload);
}
/**
* Qos1 收到发布消息确认 无负载 PUBACK
*/
public static MqttPubAckMessage buildAckMessage(MqttMessage message) {
MqttFixedHeader fixedHeader = buildFixedHeader(MqttMessageType.PUBACK);
return new MqttPubAckMessage(fixedHeader, getIdVariableHeader(message));
}
/**
* Qos2 发到消息收到 无负载 PUBREC
*/
public static MqttMessage buildPubRecMessage(MqttMessage message){
MqttFixedHeader fixedHeader = buildFixedHeader(MqttMessageType.PUBREC);
return new MqttMessage(fixedHeader, getIdVariableHeader(message));
}
/**
* Qos2 发布消息释放 PUBREL
*/
public static MqttMessage buildPubRelMessage(MqttMessage message){
MqttFixedHeader fixedHeader = buildFixedHeader(MqttMessageType.PUBREL);
return new MqttMessage(fixedHeader, getIdVariableHeader(message));
}
/**
* Qos2 发布消息完成 PUBCOMP
*/
public static MqttMessage buildPubCompMessage(MqttMessage message){
MqttFixedHeader fixedHeader = buildFixedHeader(MqttMessageType.PUBCOMP);
return new MqttMessage(fixedHeader, getIdVariableHeader(message));
}
/**
* 固定头定制
*/
public static MqttFixedHeader buildFixedHeader(MqttMessageType messageType) {
return new MqttFixedHeader(messageType, false, MqttQoS.AT_MOST_ONCE, false, 0);
}
/**
* 构造MqttMessageIdVariableHeader
*/
public static MqttMessageIdVariableHeader getIdVariableHeader(MqttMessage mqttMessage) {
MqttMessageIdVariableHeader idVariableHeader = (MqttMessageIdVariableHeader) mqttMessage.variableHeader();
return MqttMessageIdVariableHeader.from(idVariableHeader.messageId());
}
/*构造返回MQ的设备状态model*/
public static DeviceStatusBo buildStatusMsg(ChannelHandlerContext ctx, String clientId,DeviceStatus status,String ip){
return DeviceStatusBo.builder()
.serialNumber(clientId)
.status(status)
.ip(ip)
.hostName(ip)
.timestamp(DateUtils.getNowDate()).build();
}
}