diff --git a/fastbee-common/src/main/java/com/fastbee/common/core/device/DeviceAndProtocol.java b/fastbee-common/src/main/java/com/fastbee/common/core/device/DeviceAndProtocol.java index 468498e..0f106da 100644 --- a/fastbee-common/src/main/java/com/fastbee/common/core/device/DeviceAndProtocol.java +++ b/fastbee-common/src/main/java/com/fastbee/common/core/device/DeviceAndProtocol.java @@ -2,12 +2,14 @@ package com.fastbee.common.core.device; import lombok.Data; +import java.io.Serializable; + /** * @author gsb * @date 2024/6/14 9:25 */ @Data -public class DeviceAndProtocol { +public class DeviceAndProtocol implements Serializable { /** * 子设备编号 diff --git a/fastbee-gateway/fastbee-mq/src/main/java/com/fastbee/mq/redischannel/consumer/DeviceOtherMsgConsumer.java b/fastbee-gateway/fastbee-mq/src/main/java/com/fastbee/mq/redischannel/consumer/DeviceOtherMsgConsumer.java index 5cdd8fb..3e91011 100644 --- a/fastbee-gateway/fastbee-mq/src/main/java/com/fastbee/mq/redischannel/consumer/DeviceOtherMsgConsumer.java +++ b/fastbee-gateway/fastbee-mq/src/main/java/com/fastbee/mq/redischannel/consumer/DeviceOtherMsgConsumer.java @@ -1,12 +1,18 @@ package com.fastbee.mq.redischannel.consumer; +import cn.hutool.json.JSONObject; +import cn.hutool.json.JSONUtil; import com.fastbee.common.constant.FastBeeConstant; import com.fastbee.common.core.mq.DeviceReportBo; +import com.fastbee.mq.redischannel.producer.IssueInstructionsProducer; import com.fastbee.mq.service.impl.DeviceOtherMsgHandler; import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Component; import javax.annotation.Resource; +import java.util.HashMap; +import java.util.Map; /** * @author gsb @@ -19,69 +25,114 @@ public class DeviceOtherMsgConsumer { @Resource private DeviceOtherMsgHandler otherMsgHandler; + @Autowired + private IssueInstructionsProducer issueInstructionsProducer; + @Async(FastBeeConstant.TASK.DEVICE_OTHER_TASK) public void consume(DeviceReportBo bo){ try { //处理emq订阅的非 property/post 属性上报的消息 ,因为其他消息量小,放在一起处理 - String serialNumber;//设备序列号 + String serialNumber=bo.getSerialNumber();//设备序列号 + Long productId;//产品id + + Long packetId;//包号 byte[] data = bo.getData();//数据 String topic=bo.getTopicName();//主题 + //从主题中解析出产品id + String[] split = topic.split("/"); + productId=Long.valueOf(split[1]); + System.err.println("主题:"+topic+"产品id:"+productId+"设备序列号:"+serialNumber); //设备上报数据消息 if(topic.endsWith("/info/up")){ - System.err.println("mqtt接收到设备上报数据:"+ new String(data)); + deviceDataReportHandler(new String(data)); + //构建回复消息 + Map reportMsg=new HashMap<>(); + reportMsg.put("code",0); + reportMsg.put("msg","ok"); + issueInstructionsProducer.receiveDataReportResponse(productId.toString(),serialNumber,JSONUtil.toJsonStr(reportMsg)); } else if ( topic.endsWith("/info/reply")) { - System.err.println("mqtt回应收到设备上报数据:"+ new String(data)); + platformDataReportAckHandler(new String(data)); + } else if (topic.endsWith("cmd/down")) { - System.err.println("mqtt收到设备下发命令:"+ new String(data)); + platformCmdHandler(new String(data)); } else if (topic.endsWith("cmd/reply")) { - System.err.println("mqtt回应收到设备下发命令:"+ new String(data)); + deviceCmdAckHandler(new String(data)); } - -// //解析 -// String jsonString = "{\"type\": \"waterEleData\", \"pakSn\": 123, \"data\": {\"workState\": 1, \"action\": \"startPump\", \"mcuSn\": \"MCU123456\", \"sumEle\": 5000, \"sumFlow\": 3000, \"insFlow\": 2.5, \"userSumFlow\": 1500, \"areaCode\": \"010\", \"cardId\": \"CARD12345678\", \"userBalance\": 100.0, \"userSumEle\": 3000, \"curEle\": 50, \"curFlow\": 200, \"insPower\": 2300}}"; -// JSONObject entries = JSONUtil.parseObj(jsonString); -// System.err.println("解析后:"+entries); -// //获取帧标识 -// String type = entries.getStr("type"); -// //获取帧号 -// Long pakSn = entries.getLong("pakSn"); -// //获取数据 -// JSONObject dataObj = entries.getJSONObject("data"); -// //获取水泵状态0=关泵,1=开泵 -// Integer workState = dataObj.getInt("workState"); -// //获取动作 -// String action = dataObj.getStr("action"); -// //获取单片机编码 -// String mcuSn = dataObj.getStr("mcuSn"); -// //获取累计用电量 -// Integer sumEle = dataObj.getInt("sumEle"); -// //获取累计用水量 -// Integer sumFlow = dataObj.getInt("sumFlow"); -// //获取瞬时流量 -// Double insFlow = dataObj.getDouble("insFlow"); -// //当前用户累计用水量 -// Integer userSumFlow = dataObj.getInt("userSumFlow"); -// //获取区域号 -// String areaCode = dataObj.getStr("areaCode"); -// //获取卡号 -// String cardId = dataObj.getStr("cardId"); -// //获取用户余额 -// Double userBalance = dataObj.getDouble("userBalance"); -// //获取用户累计用电量 -// Integer userSumEle = dataObj.getInt("userSumEle"); -// //获取本次用电量 -// Integer curEle = dataObj.getInt("curEle"); -// //获取本次用水量 -// Integer curFlow = dataObj.getInt("curFlow"); -// //获取瞬时功率 -// Integer insPower = dataObj.getInt("insPower"); - -// otherMsgHandler.messageHandler(bo); + otherMsgHandler.messageHandler(bo); }catch (Exception e){ log.error("=>设备其他消息处理出错",e); } } + /** + * 处理设备数据上报 + * @param data 消息 + */ + private void deviceDataReportHandler(String data){ + System.err.println("mqtt接收到设备上报数据:"+ data); + //解析 +// String jsonString = "{\"type\": \"waterEleData\", \"pakSn\": 123, \"data\": {\"workState\": 1, \"action\": \"startPump\", \"mcuSn\": \"MCU123456\", \"sumEle\": 5000, \"sumFlow\": 3000, \"insFlow\": 2.5, \"userSumFlow\": 1500, \"areaCode\": \"010\", \"cardId\": \"CARD12345678\", \"userBalance\": 100.0, \"userSumEle\": 3000, \"curEle\": 50, \"curFlow\": 200, \"insPower\": 2300}}"; + JSONObject entries = JSONUtil.parseObj(data); +// System.err.println("解析后:"+entries); + //获取帧标识 + String type = entries.getStr("type"); + //获取帧号 + Long pakSn = entries.getLong("pakSn"); + //获取数据 + JSONObject dataObj = entries.getJSONObject("data"); + //获取水泵状态0=关泵,1=开泵 + Integer workState = dataObj.getInt("workState"); + //获取动作 + String action = dataObj.getStr("action"); + //获取单片机编码 + String mcuSn = dataObj.getStr("mcuSn"); + //获取累计用电量 + Integer sumEle = dataObj.getInt("sumEle"); + //获取累计用水量 + Integer sumFlow = dataObj.getInt("sumFlow"); + //获取瞬时流量 + Double insFlow = dataObj.getDouble("insFlow"); + //当前用户累计用水量 + Integer userSumFlow = dataObj.getInt("userSumFlow"); + //获取区域号 + String areaCode = dataObj.getStr("areaCode"); + //获取卡号 + String cardId = dataObj.getStr("cardId"); + //获取用户余额 + Double userBalance = dataObj.getDouble("userBalance"); + //获取用户累计用电量 + Integer userSumEle = dataObj.getInt("userSumEle"); + //获取本次用电量 + Integer curEle = dataObj.getInt("curEle"); + //获取本次用水量 + Integer curFlow = dataObj.getInt("curFlow"); + //获取瞬时功率 + Integer insPower = dataObj.getInt("insPower"); + + } + /** + * 处理平台接收到数据上报的回应 + */ + private void platformDataReportAckHandler(String data){ + System.err.println("mqtt回应收到设备上报数据:"+ data); + //处理 + + + } + /** + * 处理平台给设备下发的指令 + */ + private void platformCmdHandler(String data){ + System.err.println("mqtt收到平台给设备下发命令:"+ data); + JSONObject obj = JSONUtil.parseObj(data); + } + /** + * 处理设备收到指令回应 + */ + private void deviceCmdAckHandler(String data){ + System.err.println("mqtt回应收到设备下发命令:"+ data); + } + } diff --git a/fastbee-gateway/fastbee-mq/src/main/java/com/fastbee/mq/redischannel/producer/IssueInstructionsProducer.java b/fastbee-gateway/fastbee-mq/src/main/java/com/fastbee/mq/redischannel/producer/IssueInstructionsProducer.java new file mode 100644 index 0000000..0e0613a --- /dev/null +++ b/fastbee-gateway/fastbee-mq/src/main/java/com/fastbee/mq/redischannel/producer/IssueInstructionsProducer.java @@ -0,0 +1,39 @@ +package com.fastbee.mq.redischannel.producer; + +import com.fastbee.mqttclient.PubMqttCallBack; +import com.fastbee.mqttclient.PubMqttClient; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import java.text.MessageFormat; + +@Component +public class IssueInstructionsProducer { + + @Autowired + private PubMqttClient pubMqttClient; + + @Autowired + private PubMqttCallBack pubMqttCallBack; + + private final String issueInstructionsTopicTemplate = "/{0}/{1}/cmd/down";//平台给设备下发指令主题模板 + private final String receiveDataReportResponseTopicTemplate = "/{0}/{1}/info/reply";//平台回应收到设备上报数据主题模板 + + /** + * 平台给设备下发的指令 + */ + public void issueInstructions(String productId,String deviceNum, String message){ + + String topic = MessageFormat.format(issueInstructionsTopicTemplate,productId,deviceNum); + pubMqttClient.publish(1,true,topic, message); + } + + /** + * 平台回应接收到设备数据 + */ + public void receiveDataReportResponse(String productId,String deviceNum, String message){ + String topic = MessageFormat.format(receiveDataReportResponseTopicTemplate,productId,deviceNum); + pubMqttClient.publish(1,true,topic, message); + } + +} diff --git a/fastbee-gateway/fastbee-mq/src/main/java/com/fastbee/mq/service/impl/DeviceOtherMsgHandler.java b/fastbee-gateway/fastbee-mq/src/main/java/com/fastbee/mq/service/impl/DeviceOtherMsgHandler.java index 83f1d74..fbb8b88 100644 --- a/fastbee-gateway/fastbee-mq/src/main/java/com/fastbee/mq/service/impl/DeviceOtherMsgHandler.java +++ b/fastbee-gateway/fastbee-mq/src/main/java/com/fastbee/mq/service/impl/DeviceOtherMsgHandler.java @@ -75,8 +75,9 @@ public class DeviceOtherMsgHandler { */ private ReportDataBo buildReportData(DeviceReportBo bo) { String message = new String(bo.getData()); - log.info("收到设备信息[{}]", message); +// log.info("收到设备信息[{}]", message); Long productId = topicsUtils.parseProductId(bo.getTopicName()); + System.err.println("主题路径解析出产品id:"+productId); String serialNumber = topicsUtils.parseSerialNumber(bo.getTopicName()); ReportDataBo dataBo = new ReportDataBo(); dataBo.setMessage(message); diff --git a/fastbee-service/fastbee-iot-service/src/main/java/com/fastbee/iot/model/ProductCode.java b/fastbee-service/fastbee-iot-service/src/main/java/com/fastbee/iot/model/ProductCode.java index 6d43e2c..e632d78 100644 --- a/fastbee-service/fastbee-iot-service/src/main/java/com/fastbee/iot/model/ProductCode.java +++ b/fastbee-service/fastbee-iot-service/src/main/java/com/fastbee/iot/model/ProductCode.java @@ -2,12 +2,14 @@ package com.fastbee.iot.model; import lombok.Data; +import java.io.Serializable; + /** * @author gsb * @date 2023/9/4 17:23 */ @Data -public class ProductCode { +public class ProductCode implements Serializable { /** * 产品id