设备在线状态逻辑调整,支付下发指令参数调整等

This commit is contained in:
蒾酒
2024-12-28 18:17:30 +08:00
parent 6ed87aa6e6
commit 250530570a
11 changed files with 81 additions and 30 deletions

View File

@ -2,8 +2,11 @@ package com.fastbee.mq.redischannel.consumer;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import com.baomidou.mybatisplus.extension.conditions.update.LambdaUpdateChainWrapper;
import com.fastbee.common.constant.FastBeeConstant;
import com.fastbee.common.core.mq.DeviceReportBo;
import com.fastbee.iot.domain.DeviceReportInfo;
import com.fastbee.iot.mapper.DeviceReportInfoMapper;
import com.fastbee.mq.redischannel.producer.IssueInstructionsProducer;
import com.fastbee.mq.service.impl.DeviceOtherMsgHandler;
import com.fastbee.mqttclient.PubMqttClient;
@ -36,6 +39,9 @@ public class DeviceOtherMsgConsumer {
@Autowired
private IssueInstructionsProducer issueInstructionsProducer;
@Autowired
private DeviceReportInfoMapper deviceReportInfoMapper;
@Autowired
private NgWaterPumpUsageRecordsMapper ngWaterPumpUsageRecordsMapper;
@ -63,6 +69,9 @@ public class DeviceOtherMsgConsumer {
//设备上报数据消息---------------------------------------------------------------------------------
if(topic.endsWith("/info/up")){
deviceDataReportHandler(new String(data));
//更新设备在线状态
updateDeviceOnlineStatus( productId,serialNumber);
// updateDeviceOnlineStatusDb(serialNumber);
//保存使用记录
NgWaterPumpUsageRecords pumpUsageRecords=new NgWaterPumpUsageRecords();
pumpUsageRecords.setDeviceNumber(serialNumber);
@ -73,8 +82,7 @@ public class DeviceOtherMsgConsumer {
String data1 = jsonObject.getStr("data");
//收到定时报文
if(JSONUtil.parseObj(data1).get("action").equals("timeMsg")){
//更新设备在线状态
updateDeviceOnlineStatus( productId,serialNumber);
pumpUsageRecords.setMessageContent(JSONUtil.toJsonStr(jsonObject));
int i = ngWaterPumpUsageRecordsMapper.insertNgWaterPumpUsageRecords(pumpUsageRecords);
if(i<1){
@ -83,6 +91,11 @@ public class DeviceOtherMsgConsumer {
}else if(JSONUtil.parseObj(data1).get("action").equals("safeMsg")){
//续费在线状态
updateDeviceOnlineStatus( productId,serialNumber);
//保存数据
int i = ngWaterPumpUsageRecordsMapper.insertNgWaterPumpUsageRecords(pumpUsageRecords);
if(i<1){
System.err.println("--------------------------保存使用记录失败!---------------------------");
}
}
}
}
@ -95,6 +108,8 @@ public class DeviceOtherMsgConsumer {
//打印设备
else if ( topic.endsWith("/info/reply")) {
platformDataReportAckHandler(new String(data));
updateDeviceOnlineStatus( productId,serialNumber);
// updateDeviceOnlineStatusDb(serialNumber);
}
else if (topic.endsWith("/cmd/down")) {
platformCmdHandler(new String(data));
@ -102,6 +117,8 @@ public class DeviceOtherMsgConsumer {
} else if (topic.endsWith("/cmd/reply")) {
deviceCmdAckHandler(new String(data));
updateDeviceOnlineStatus( productId,serialNumber);
// updateDeviceOnlineStatusDb(serialNumber);
}
otherMsgHandler.messageHandler(bo);
}catch (Exception e){
@ -111,7 +128,7 @@ public class DeviceOtherMsgConsumer {
/**
* 更新设备在线状态
* 缓存更新设备在线状态
*/
private void updateDeviceOnlineStatus(Long productId,String serialNumber){
try {
@ -123,9 +140,24 @@ public class DeviceOtherMsgConsumer {
}
stringRedisTemplate.opsForValue().set("neixiang_device_online_status:"+productId+":"+serialNumber,"1",360, TimeUnit.SECONDS);
}catch (Exception e){
log.error("=>更新设备在线状态出错",e);
log.error("=>更新缓存设备在线状态出错",e);
}
}
/**
* 数据库更新设备在线状态
*/
private void updateDeviceOnlineStatusDb(String serialNumber){
try {
boolean update = new LambdaUpdateChainWrapper<>(deviceReportInfoMapper)
.set(DeviceReportInfo::getOnLine, 1)
.eq(DeviceReportInfo::getSerialNumber, serialNumber)
.update();
if(!update){
throw new RuntimeException("=>数据库更新设备在线状态失败");
}
}catch (Exception e){
log.error("=>更新数据库设备在线状态出错",e);
}
}