mqtt设备对接逻辑
This commit is contained in:
parent
6f41eb2599
commit
51caf11519
@ -2,12 +2,14 @@ package com.fastbee.common.core.device;
|
|||||||
|
|
||||||
import lombok.Data;
|
import lombok.Data;
|
||||||
|
|
||||||
|
import java.io.Serializable;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @author gsb
|
* @author gsb
|
||||||
* @date 2024/6/14 9:25
|
* @date 2024/6/14 9:25
|
||||||
*/
|
*/
|
||||||
@Data
|
@Data
|
||||||
public class DeviceAndProtocol {
|
public class DeviceAndProtocol implements Serializable {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 子设备编号
|
* 子设备编号
|
||||||
|
@ -1,12 +1,18 @@
|
|||||||
package com.fastbee.mq.redischannel.consumer;
|
package com.fastbee.mq.redischannel.consumer;
|
||||||
|
|
||||||
|
import cn.hutool.json.JSONObject;
|
||||||
|
import cn.hutool.json.JSONUtil;
|
||||||
import com.fastbee.common.constant.FastBeeConstant;
|
import com.fastbee.common.constant.FastBeeConstant;
|
||||||
import com.fastbee.common.core.mq.DeviceReportBo;
|
import com.fastbee.common.core.mq.DeviceReportBo;
|
||||||
|
import com.fastbee.mq.redischannel.producer.IssueInstructionsProducer;
|
||||||
import com.fastbee.mq.service.impl.DeviceOtherMsgHandler;
|
import com.fastbee.mq.service.impl.DeviceOtherMsgHandler;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
import org.springframework.scheduling.annotation.Async;
|
import org.springframework.scheduling.annotation.Async;
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
import javax.annotation.Resource;
|
import javax.annotation.Resource;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @author gsb
|
* @author gsb
|
||||||
@ -19,69 +25,114 @@ public class DeviceOtherMsgConsumer {
|
|||||||
@Resource
|
@Resource
|
||||||
private DeviceOtherMsgHandler otherMsgHandler;
|
private DeviceOtherMsgHandler otherMsgHandler;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private IssueInstructionsProducer issueInstructionsProducer;
|
||||||
|
|
||||||
@Async(FastBeeConstant.TASK.DEVICE_OTHER_TASK)
|
@Async(FastBeeConstant.TASK.DEVICE_OTHER_TASK)
|
||||||
public void consume(DeviceReportBo bo){
|
public void consume(DeviceReportBo bo){
|
||||||
try {
|
try {
|
||||||
//处理emq订阅的非 property/post 属性上报的消息 ,因为其他消息量小,放在一起处理
|
//处理emq订阅的非 property/post 属性上报的消息 ,因为其他消息量小,放在一起处理
|
||||||
String serialNumber;//设备序列号
|
String serialNumber=bo.getSerialNumber();//设备序列号
|
||||||
|
Long productId;//产品id
|
||||||
|
|
||||||
|
|
||||||
Long packetId;//包号
|
Long packetId;//包号
|
||||||
byte[] data = bo.getData();//数据
|
byte[] data = bo.getData();//数据
|
||||||
String topic=bo.getTopicName();//主题
|
String topic=bo.getTopicName();//主题
|
||||||
|
//从主题中解析出产品id
|
||||||
|
String[] split = topic.split("/");
|
||||||
|
productId=Long.valueOf(split[1]);
|
||||||
|
System.err.println("主题:"+topic+"产品id:"+productId+"设备序列号:"+serialNumber);
|
||||||
//设备上报数据消息
|
//设备上报数据消息
|
||||||
if(topic.endsWith("/info/up")){
|
if(topic.endsWith("/info/up")){
|
||||||
System.err.println("mqtt接收到设备上报数据:"+ new String(data));
|
deviceDataReportHandler(new String(data));
|
||||||
|
//构建回复消息
|
||||||
|
Map<String,Object> reportMsg=new HashMap<>();
|
||||||
|
reportMsg.put("code",0);
|
||||||
|
reportMsg.put("msg","ok");
|
||||||
|
issueInstructionsProducer.receiveDataReportResponse(productId.toString(),serialNumber,JSONUtil.toJsonStr(reportMsg));
|
||||||
} else if ( topic.endsWith("/info/reply")) {
|
} else if ( topic.endsWith("/info/reply")) {
|
||||||
System.err.println("mqtt回应收到设备上报数据:"+ new String(data));
|
platformDataReportAckHandler(new String(data));
|
||||||
|
|
||||||
} else if (topic.endsWith("cmd/down")) {
|
} else if (topic.endsWith("cmd/down")) {
|
||||||
System.err.println("mqtt收到设备下发命令:"+ new String(data));
|
platformCmdHandler(new String(data));
|
||||||
|
|
||||||
} else if (topic.endsWith("cmd/reply")) {
|
} else if (topic.endsWith("cmd/reply")) {
|
||||||
System.err.println("mqtt回应收到设备下发命令:"+ new String(data));
|
deviceCmdAckHandler(new String(data));
|
||||||
}
|
}
|
||||||
|
otherMsgHandler.messageHandler(bo);
|
||||||
// //解析
|
|
||||||
// String jsonString = "{\"type\": \"waterEleData\", \"pakSn\": 123, \"data\": {\"workState\": 1, \"action\": \"startPump\", \"mcuSn\": \"MCU123456\", \"sumEle\": 5000, \"sumFlow\": 3000, \"insFlow\": 2.5, \"userSumFlow\": 1500, \"areaCode\": \"010\", \"cardId\": \"CARD12345678\", \"userBalance\": 100.0, \"userSumEle\": 3000, \"curEle\": 50, \"curFlow\": 200, \"insPower\": 2300}}";
|
|
||||||
// JSONObject entries = JSONUtil.parseObj(jsonString);
|
|
||||||
// System.err.println("解析后:"+entries);
|
|
||||||
// //获取帧标识
|
|
||||||
// String type = entries.getStr("type");
|
|
||||||
// //获取帧号
|
|
||||||
// Long pakSn = entries.getLong("pakSn");
|
|
||||||
// //获取数据
|
|
||||||
// JSONObject dataObj = entries.getJSONObject("data");
|
|
||||||
// //获取水泵状态0=关泵,1=开泵
|
|
||||||
// Integer workState = dataObj.getInt("workState");
|
|
||||||
// //获取动作
|
|
||||||
// String action = dataObj.getStr("action");
|
|
||||||
// //获取单片机编码
|
|
||||||
// String mcuSn = dataObj.getStr("mcuSn");
|
|
||||||
// //获取累计用电量
|
|
||||||
// Integer sumEle = dataObj.getInt("sumEle");
|
|
||||||
// //获取累计用水量
|
|
||||||
// Integer sumFlow = dataObj.getInt("sumFlow");
|
|
||||||
// //获取瞬时流量
|
|
||||||
// Double insFlow = dataObj.getDouble("insFlow");
|
|
||||||
// //当前用户累计用水量
|
|
||||||
// Integer userSumFlow = dataObj.getInt("userSumFlow");
|
|
||||||
// //获取区域号
|
|
||||||
// String areaCode = dataObj.getStr("areaCode");
|
|
||||||
// //获取卡号
|
|
||||||
// String cardId = dataObj.getStr("cardId");
|
|
||||||
// //获取用户余额
|
|
||||||
// Double userBalance = dataObj.getDouble("userBalance");
|
|
||||||
// //获取用户累计用电量
|
|
||||||
// Integer userSumEle = dataObj.getInt("userSumEle");
|
|
||||||
// //获取本次用电量
|
|
||||||
// Integer curEle = dataObj.getInt("curEle");
|
|
||||||
// //获取本次用水量
|
|
||||||
// Integer curFlow = dataObj.getInt("curFlow");
|
|
||||||
// //获取瞬时功率
|
|
||||||
// Integer insPower = dataObj.getInt("insPower");
|
|
||||||
|
|
||||||
// otherMsgHandler.messageHandler(bo);
|
|
||||||
}catch (Exception e){
|
}catch (Exception e){
|
||||||
log.error("=>设备其他消息处理出错",e);
|
log.error("=>设备其他消息处理出错",e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 处理设备数据上报
|
||||||
|
* @param data 消息
|
||||||
|
*/
|
||||||
|
private void deviceDataReportHandler(String data){
|
||||||
|
System.err.println("mqtt接收到设备上报数据:"+ data);
|
||||||
|
//解析
|
||||||
|
// String jsonString = "{\"type\": \"waterEleData\", \"pakSn\": 123, \"data\": {\"workState\": 1, \"action\": \"startPump\", \"mcuSn\": \"MCU123456\", \"sumEle\": 5000, \"sumFlow\": 3000, \"insFlow\": 2.5, \"userSumFlow\": 1500, \"areaCode\": \"010\", \"cardId\": \"CARD12345678\", \"userBalance\": 100.0, \"userSumEle\": 3000, \"curEle\": 50, \"curFlow\": 200, \"insPower\": 2300}}";
|
||||||
|
JSONObject entries = JSONUtil.parseObj(data);
|
||||||
|
// System.err.println("解析后:"+entries);
|
||||||
|
//获取帧标识
|
||||||
|
String type = entries.getStr("type");
|
||||||
|
//获取帧号
|
||||||
|
Long pakSn = entries.getLong("pakSn");
|
||||||
|
//获取数据
|
||||||
|
JSONObject dataObj = entries.getJSONObject("data");
|
||||||
|
//获取水泵状态0=关泵,1=开泵
|
||||||
|
Integer workState = dataObj.getInt("workState");
|
||||||
|
//获取动作
|
||||||
|
String action = dataObj.getStr("action");
|
||||||
|
//获取单片机编码
|
||||||
|
String mcuSn = dataObj.getStr("mcuSn");
|
||||||
|
//获取累计用电量
|
||||||
|
Integer sumEle = dataObj.getInt("sumEle");
|
||||||
|
//获取累计用水量
|
||||||
|
Integer sumFlow = dataObj.getInt("sumFlow");
|
||||||
|
//获取瞬时流量
|
||||||
|
Double insFlow = dataObj.getDouble("insFlow");
|
||||||
|
//当前用户累计用水量
|
||||||
|
Integer userSumFlow = dataObj.getInt("userSumFlow");
|
||||||
|
//获取区域号
|
||||||
|
String areaCode = dataObj.getStr("areaCode");
|
||||||
|
//获取卡号
|
||||||
|
String cardId = dataObj.getStr("cardId");
|
||||||
|
//获取用户余额
|
||||||
|
Double userBalance = dataObj.getDouble("userBalance");
|
||||||
|
//获取用户累计用电量
|
||||||
|
Integer userSumEle = dataObj.getInt("userSumEle");
|
||||||
|
//获取本次用电量
|
||||||
|
Integer curEle = dataObj.getInt("curEle");
|
||||||
|
//获取本次用水量
|
||||||
|
Integer curFlow = dataObj.getInt("curFlow");
|
||||||
|
//获取瞬时功率
|
||||||
|
Integer insPower = dataObj.getInt("insPower");
|
||||||
|
|
||||||
|
}
|
||||||
|
/**
|
||||||
|
* 处理平台接收到数据上报的回应
|
||||||
|
*/
|
||||||
|
private void platformDataReportAckHandler(String data){
|
||||||
|
System.err.println("mqtt回应收到设备上报数据:"+ data);
|
||||||
|
//处理
|
||||||
|
|
||||||
|
|
||||||
|
}
|
||||||
|
/**
|
||||||
|
* 处理平台给设备下发的指令
|
||||||
|
*/
|
||||||
|
private void platformCmdHandler(String data){
|
||||||
|
System.err.println("mqtt收到平台给设备下发命令:"+ data);
|
||||||
|
JSONObject obj = JSONUtil.parseObj(data);
|
||||||
|
}
|
||||||
|
/**
|
||||||
|
* 处理设备收到指令回应
|
||||||
|
*/
|
||||||
|
private void deviceCmdAckHandler(String data){
|
||||||
|
System.err.println("mqtt回应收到设备下发命令:"+ data);
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -0,0 +1,39 @@
|
|||||||
|
package com.fastbee.mq.redischannel.producer;
|
||||||
|
|
||||||
|
import com.fastbee.mqttclient.PubMqttCallBack;
|
||||||
|
import com.fastbee.mqttclient.PubMqttClient;
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
import java.text.MessageFormat;
|
||||||
|
|
||||||
|
@Component
|
||||||
|
public class IssueInstructionsProducer {
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private PubMqttClient pubMqttClient;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private PubMqttCallBack pubMqttCallBack;
|
||||||
|
|
||||||
|
private final String issueInstructionsTopicTemplate = "/{0}/{1}/cmd/down";//平台给设备下发指令主题模板
|
||||||
|
private final String receiveDataReportResponseTopicTemplate = "/{0}/{1}/info/reply";//平台回应收到设备上报数据主题模板
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 平台给设备下发的指令
|
||||||
|
*/
|
||||||
|
public void issueInstructions(String productId,String deviceNum, String message){
|
||||||
|
|
||||||
|
String topic = MessageFormat.format(issueInstructionsTopicTemplate,productId,deviceNum);
|
||||||
|
pubMqttClient.publish(1,true,topic, message);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 平台回应接收到设备数据
|
||||||
|
*/
|
||||||
|
public void receiveDataReportResponse(String productId,String deviceNum, String message){
|
||||||
|
String topic = MessageFormat.format(receiveDataReportResponseTopicTemplate,productId,deviceNum);
|
||||||
|
pubMqttClient.publish(1,true,topic, message);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
@ -75,8 +75,9 @@ public class DeviceOtherMsgHandler {
|
|||||||
*/
|
*/
|
||||||
private ReportDataBo buildReportData(DeviceReportBo bo) {
|
private ReportDataBo buildReportData(DeviceReportBo bo) {
|
||||||
String message = new String(bo.getData());
|
String message = new String(bo.getData());
|
||||||
log.info("收到设备信息[{}]", message);
|
// log.info("收到设备信息[{}]", message);
|
||||||
Long productId = topicsUtils.parseProductId(bo.getTopicName());
|
Long productId = topicsUtils.parseProductId(bo.getTopicName());
|
||||||
|
System.err.println("主题路径解析出产品id:"+productId);
|
||||||
String serialNumber = topicsUtils.parseSerialNumber(bo.getTopicName());
|
String serialNumber = topicsUtils.parseSerialNumber(bo.getTopicName());
|
||||||
ReportDataBo dataBo = new ReportDataBo();
|
ReportDataBo dataBo = new ReportDataBo();
|
||||||
dataBo.setMessage(message);
|
dataBo.setMessage(message);
|
||||||
|
@ -2,12 +2,14 @@ package com.fastbee.iot.model;
|
|||||||
|
|
||||||
import lombok.Data;
|
import lombok.Data;
|
||||||
|
|
||||||
|
import java.io.Serializable;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @author gsb
|
* @author gsb
|
||||||
* @date 2023/9/4 17:23
|
* @date 2023/9/4 17:23
|
||||||
*/
|
*/
|
||||||
@Data
|
@Data
|
||||||
public class ProductCode {
|
public class ProductCode implements Serializable {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 产品id
|
* 产品id
|
||||||
|
Loading…
x
Reference in New Issue
Block a user