设备激活接口未保存设备名称bug
ws协议连接mqtt代理一直断线重连问题
add:
mqtt消息转发接口
展连流量计设备实时数据接口
update:
设备在线状态逻辑
设备上电审核接口
This commit is contained in:
蒾酒
2024-12-26 04:36:55 +08:00
parent 27eac91d1c
commit 4760dba312
14 changed files with 160 additions and 75 deletions

View File

@ -15,6 +15,9 @@ import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@ -46,7 +49,6 @@ public class DeviceOtherMsgConsumer {
public void consume(DeviceReportBo bo){
try {
//处理emq订阅的非 property/post 属性上报的消息 ,因为其他消息量小,放在一起处理
Long productId;//产品id
Long packetId;//包号
byte[] data = bo.getData();//数据
@ -57,7 +59,7 @@ public class DeviceOtherMsgConsumer {
//从主题中解析出设备序列号
String serialNumber= split[2];//设备序列号
System.err.println("主题:"+topic+"--产品id:"+productId+"--设备序列号:"+serialNumber);
// System.err.println("主题:"+topic+"--产品id:"+productId+"--设备序列号:"+serialNumber);
//设备上报数据消息---------------------------------------------------------------------------------
if(topic.endsWith("/info/up")){
deviceDataReportHandler(new String(data));
@ -65,6 +67,7 @@ public class DeviceOtherMsgConsumer {
NgWaterPumpUsageRecords pumpUsageRecords=new NgWaterPumpUsageRecords();
pumpUsageRecords.setDeviceNumber(serialNumber);
JSONObject jsonObject = JSONUtil.parseObj(data);
jsonObject.set("time", (LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))));
if(jsonObject.getStr("type")!=null){
if(jsonObject.getStr("type").equals("waterEleData")){
String data1 = jsonObject.getStr("data");
@ -80,10 +83,10 @@ public class DeviceOtherMsgConsumer {
}
}
//构建回复消息-----------------------------------------------------------------------------
Map<String,Object> reportMsg=new HashMap<>();
reportMsg.put("code",0);
reportMsg.put("msg","ok");
issueInstructionsProducer.receiveDataReportResponse(productId.toString(),serialNumber,JSONUtil.toJsonStr(reportMsg));
// Map<String,Object> reportMsg=new HashMap<>();
// reportMsg.put("code",0);
// reportMsg.put("msg","ok");
// issueInstructionsProducer.receiveDataReportResponse(productId.toString(),serialNumber,JSONUtil.toJsonStr(reportMsg));
}
//打印设备
else if ( topic.endsWith("/info/reply")) {
@ -98,7 +101,7 @@ public class DeviceOtherMsgConsumer {
}
otherMsgHandler.messageHandler(bo);
}catch (Exception e){
log.error("=>设备其他消息处理出错",e);
// log.error("=>设备其他消息处理出错",e);
}
}
@ -111,10 +114,10 @@ public class DeviceOtherMsgConsumer {
//判断
if(Boolean.TRUE.equals(stringRedisTemplate.hasKey("neixiang_device_online_status:" + productId + ":" + serialNumber))){
//重新设置过期时间
stringRedisTemplate.expire("neixiang_device_online_status:"+productId+":"+serialNumber,1200, TimeUnit.SECONDS);
stringRedisTemplate.expire("neixiang_device_online_status:"+productId+":"+serialNumber,360, TimeUnit.SECONDS);
return;
}
stringRedisTemplate.opsForValue().set("neixiang_device_online_status:"+productId+":"+serialNumber,"1",1200, TimeUnit.SECONDS);
stringRedisTemplate.opsForValue().set("neixiang_device_online_status:"+productId+":"+serialNumber,"1",360, TimeUnit.SECONDS);
}catch (Exception e){
log.error("=>更新设备在线状态出错",e);
}
@ -128,7 +131,7 @@ public class DeviceOtherMsgConsumer {
* @param data 消息
*/
private void deviceDataReportHandler(String data){
System.err.println("mqtt接收到设备上报数据"+ data);
// System.err.println("mqtt接收到设备上报数据"+ data);
//回应
//
// //解析
@ -175,17 +178,15 @@ public class DeviceOtherMsgConsumer {
* 处理平台接收到数据上报的回应
*/
private void platformDataReportAckHandler(String data){
System.err.println("mqtt回应收到设备上报数据"+ data);
// System.err.println("mqtt回应收到设备上报数据"+ data);
//处理
}
/**
* 处理平台给设备下发的指令
*/
private void platformCmdHandler(String data){
System.err.println("mqtt收到平台给设备下发命令:"+ data);
System.err.println("--------------------------------------------------------------mqtt-broker收到给设备下发命令:"+ data);
// JSONObject obj = JSONUtil.parseObj(data);
}
@ -193,7 +194,10 @@ public class DeviceOtherMsgConsumer {
* 处理设备收到指令回应
*/
private void deviceCmdAckHandler(String data){
System.err.println("平台收到设备执行完指令的结果:"+ data);
System.err.println("----------------------------------------------------------------------平台收到设备执行完指令的结果:"+ data);
}
public static void main(String[] args) {
System.err.println((LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))));
}
}

View File

@ -87,42 +87,42 @@ public class DeviceStatusConsumer {
}
private void handlerShadow(Device device,DeviceStatus status){
//获取设备协议编码
DeviceAndProtocol dp = deviceService.selectProtocolBySerialNumber(device.getSerialNumber());
String protocolCode = dp.getProtocolCode();
/* 设备上线 处理影子值*/
if (status.equals(DeviceStatus.ONLINE) && device.getIsShadow() ==1){
ThingsModelShadow shadow = deviceService.getDeviceShadowThingsModel(device);
List<ThingsModelSimpleItem> properties = shadow.getProperties();
List<ThingsModelSimpleItem> functions = shadow.getFunctions();
//JsonArray组合发送
if (FastBeeConstant.PROTOCOL.JsonArray.equals(protocolCode)) {
if (!CollectionUtils.isEmpty(properties)) {
mqttMessagePublish.publishProperty(device.getProductId(), device.getSerialNumber(), properties, 3);
}
if (!CollectionUtils.isEmpty(functions)) {
mqttMessagePublish.publishFunction(device.getProductId(), device.getSerialNumber(), functions, 3);
}
} else { //其他协议单个发送
functions.addAll(properties);
if (!CollectionUtils.isEmpty(functions)) {
for (ThingsModelSimpleItem function : functions) {
MQSendMessageBo bo = new MQSendMessageBo();
bo.setDp(dp);
bo.setShadow(false);
bo.setIdentifier(function.getId());
bo.setSerialNumber(device.getSerialNumber());
JSONObject jsonObject = new JSONObject();
jsonObject.put(function.getId(),function.getValue());
bo.setParams(jsonObject);
bo.setValue(function.getValue());
long id = snowflakeIdWorker.nextId();
bo.setMessageId(id +"");
//发送到MQ处理
MessageProducer.sendFunctionInvoke(bo);
}
}
}
}
// //获取设备协议编码
// DeviceAndProtocol dp = deviceService.selectProtocolBySerialNumber(device.getSerialNumber());
// String protocolCode = dp.getProtocolCode();
// /* 设备上线 处理影子值*/
// if (status.equals(DeviceStatus.ONLINE) && device.getIsShadow() ==1){
// ThingsModelShadow shadow = deviceService.getDeviceShadowThingsModel(device);
// List<ThingsModelSimpleItem> properties = shadow.getProperties();
// List<ThingsModelSimpleItem> functions = shadow.getFunctions();
// //JsonArray组合发送
// if (FastBeeConstant.PROTOCOL.JsonArray.equals(protocolCode)) {
// if (!CollectionUtils.isEmpty(properties)) {
// mqttMessagePublish.publishProperty(device.getProductId(), device.getSerialNumber(), properties, 3);
// }
// if (!CollectionUtils.isEmpty(functions)) {
// mqttMessagePublish.publishFunction(device.getProductId(), device.getSerialNumber(), functions, 3);
// }
// } else { //其他协议单个发送
// functions.addAll(properties);
// if (!CollectionUtils.isEmpty(functions)) {
// for (ThingsModelSimpleItem function : functions) {
// MQSendMessageBo bo = new MQSendMessageBo();
// bo.setDp(dp);
// bo.setShadow(false);
// bo.setIdentifier(function.getId());
// bo.setSerialNumber(device.getSerialNumber());
// JSONObject jsonObject = new JSONObject();
// jsonObject.put(function.getId(),function.getValue());
// bo.setParams(jsonObject);
// bo.setValue(function.getValue());
// long id = snowflakeIdWorker.nextId();
// bo.setMessageId(id +"");
// //发送到MQ处理
// MessageProducer.sendFunctionInvoke(bo);
// }
// }
// }
// }
}
}