新增设备查询订单状态报文处理

This commit is contained in:
蒾酒 2025-01-05 23:31:53 +08:00
parent 81793c325f
commit 77216fb4c1
2 changed files with 99 additions and 110 deletions

View File

@ -3,6 +3,7 @@ package com.fastbee.mq.redischannel.consumer;
import cn.hutool.core.date.DateTime;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import com.baomidou.mybatisplus.extension.conditions.query.LambdaQueryChainWrapper;
import com.baomidou.mybatisplus.extension.conditions.update.LambdaUpdateChainWrapper;
import com.fastbee.common.constant.FastBeeConstant;
import com.fastbee.common.core.domain.entity.SysUser;
@ -35,6 +36,7 @@ import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@ -94,32 +96,25 @@ public class DeviceOtherMsgConsumer {
System.err.println(topic.endsWith("/info/up"));
//设备上报数据消息---------------------------------------------------------------------------------
if(topic.endsWith("/info/up")){
deviceDataReportHandler(new String(data));
deviceDataReportHandler(productId,serialNumber,JSONUtil.parseObj(new String(data)));
//更新设备在线状态
updateDeviceOnlineStatus( productId,serialNumber);//拼成一个唯一的key只要key存在则设备在线
// updateDeviceOnlineStatusDb(serialNumber);
//保存使用记录
NgWaterPumpUsageRecords pumpUsageRecords=new NgWaterPumpUsageRecords();
pumpUsageRecords.setDeviceNumber(serialNumber);
JSONObject jsonObject = JSONUtil.parseObj(data);
// jsonObject.set("time", (LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))));//收到消息的当地时间
JSONObject jsonObject = JSONUtil.parseObj(new String(data));
//报文中data部分
JSONObject data1 = jsonObject.getJSONObject("data");
if(jsonObject.getStr("type")!=null){
//上报正常数据
if(jsonObject.getStr("type").equals("waterEleData")){
JSONObject data1 = jsonObject.getJSONObject("data");
if("waterEleData".equals(jsonObject.getStr("type"))){
//收到开阀定时报文
if(data1.get("action").equals("timeMsg")){
if("timeMsg".equals(data1.get("action"))){
pumpUsageRecords.setMessageContent(JSONUtil.toJsonStr(jsonObject));
// int i = ngWaterPumpUsageRecordsMapper.insertNgWaterPumpUsageRecords(pumpUsageRecords);
// if(i<1){
// System.err.println("--------------------------保存使用记录失败!---------------------------");
// }
LambdaUpdateChainWrapper<DeviceReportInfo> up = new LambdaUpdateChainWrapper<>(deviceReportInfoMapper)
.eq(DeviceReportInfo::getSerialNumber, serialNumber);
//获取流量计累计用水量/瞬时流量
Float meterSum = data1.getFloat("meterSum");
Float meterIns = data1.getFloat("meterIns");
@ -180,18 +175,12 @@ public class DeviceOtherMsgConsumer {
log.error("保存灌溉控制器历史数据失败");
}
}
//收到关阀时定时报文
else if(JSONUtil.parseObj(data1).get("action").equals("safeMsg")){
//收到关阀时平安报文
else if("safeMsg".equals(JSONUtil.parseObj(data1).get("action"))){
pumpUsageRecords.setMessageContent(JSONUtil.toJsonStr(jsonObject));
// int i = ngWaterPumpUsageRecordsMapper.insertNgWaterPumpUsageRecords(pumpUsageRecords);
// if(i<1){
// System.err.println("--------------------------保存使用记录失败!---------------------------");
// }
LambdaUpdateChainWrapper<DeviceReportInfo> up = new LambdaUpdateChainWrapper<>(deviceReportInfoMapper)
.eq(DeviceReportInfo::getSerialNumber, serialNumber);
//获取流量计累计用水量/瞬时流量
Float meterSum = data1.getFloat("meterSum");
Float meterIns = data1.getFloat("meterIns");
@ -252,7 +241,7 @@ public class DeviceOtherMsgConsumer {
}
//开阀门关阀门
else if(JSONUtil.parseObj(data1).get("action").equals("startPump")||JSONUtil.parseObj(data1).get("action").equals("stopPump")){
else if("startPump".equals(JSONUtil.parseObj(data1).get("action"))|| "stopPump".equals(JSONUtil.parseObj(data1).get("action"))){
int i = ngWaterPumpUsageRecordsMapper.insertNgWaterPumpUsageRecords(pumpUsageRecords);
if(i<1){
System.err.println("--------------------------保存使用记录失败!---------------------------");
@ -260,7 +249,7 @@ public class DeviceOtherMsgConsumer {
JSONObject dataJson=jsonObject.getJSONObject("data");
System.err.println(jsonObject);
System.err.println(dataJson);
if(JSONUtil.parseObj(data1).get("action").equals("startPump"))
if("startPump".equals(JSONUtil.parseObj(data1).get("action")))
{
System.err.println("开阀");
//开阀-添加灌溉记录
@ -277,7 +266,10 @@ public class DeviceOtherMsgConsumer {
{
System.err.println("用户不存在");
}
Long deptId=cardInfo.getDeptId();//获取deptId
Long deptId= null;//获取deptId
if (cardInfo != null) {
deptId = cardInfo.getDeptId();
}
userIrrigationRecord.setDeptId(deptId);
@ -374,49 +366,47 @@ public class DeviceOtherMsgConsumer {
}
//收到心跳报文
else if(jsonObject.getStr("type").equals("heartBeat")){
else if("heartBeat".equals(jsonObject.getStr("type"))){
//需要回应
//构建回复消息-----------------------------------------------------------------------------
Map<String,Object> reportMsg=new HashMap<>();
reportMsg.put("code",100);
reportMsg.put("pakSn",jsonObject.getStr("pakSn"));
// reportMsg.put("data","heartBeat");
// jsonObject.set("code",100);
issueInstructionsProducer.receiveDataReportResponse(productId.toString(),serialNumber,JSONUtil.toJsonStr(reportMsg));
}
}
//构建回复消息-----------------------------------------------------------------------------
// Map<String,Object> reportMsg=new HashMap<>();
// reportMsg.put("code",0);
// reportMsg.put("msg","ok");
// issueInstructionsProducer.receiveDataReportResponse(productId.toString(),serialNumber,JSONUtil.toJsonStr(reportMsg));
//如果是设备刷卡上报的查询报文
if(data1.getStr("action")!=null&& "askMsg".equals(data1.getStr("action"))){
cardReportHandler(productId,serialNumber,data1);
}
}
//打印设备
else if ( topic.endsWith("/info/reply")) {
else if (topic.endsWith("/info/reply")) {
platformDataReportAckHandler(new String(data));
updateDeviceOnlineStatus( productId,serialNumber);
// updateDeviceOnlineStatusDb(serialNumber);
}
else if (topic.endsWith("/cmd/down")) {
platformCmdHandler(new String(data));
//回应
} else if (topic.endsWith("/cmd/reply")) {
//收到充值结果指令
JSONObject jsonObject = JSONUtil.parseObj(new String(data));
if(jsonObject.getStr("type").equals("devCmdAck")){
if("devCmdAck".equals(jsonObject.getStr("type"))){
JSONObject data1 = jsonObject.getJSONObject("data");
Long orderNum = data1.getLong("orderNum");
//修改订单状态为已充值
try {
new LambdaUpdateChainWrapper<>(ngUserRechargeRecordsMapper)
.set(NgUserRechargeRecords::getStatus,3)
.eq(NgUserRechargeRecords::getId,orderNum)
boolean update = new LambdaUpdateChainWrapper<>(ngUserRechargeRecordsMapper)
.set(NgUserRechargeRecords::getStatus, 3)
.eq(NgUserRechargeRecords::getId, orderNum)
.update();
}catch (Exception e){
if(!update){
log.error("修改订单号为:{}的订单状态为已充值失败",orderNum);
}
}
deviceCmdAckHandler(new String(data));
updateDeviceOnlineStatus( productId,serialNumber);
@ -475,55 +465,17 @@ public class DeviceOtherMsgConsumer {
/**
* 处理设备数据上报
* @param data 消息
* 处理设备数据上报 info/up主题消息
* @param dataObj 消息
*/
private void deviceDataReportHandler(String data){
// System.err.println("mqtt接收到设备上报数据"+ data);
//回应
//
// //解析
//// String jsonString = "{\"type\": \"waterEleData\", \"pakSn\": 123, \"data\": {\"workState\": 1, \"action\": \"startPump\", \"mcuSn\": \"MCU123456\", \"sumEle\": 5000, \"sumFlow\": 3000, \"insFlow\": 2.5, \"userSumFlow\": 1500, \"areaCode\": \"010\", \"cardId\": \"CARD12345678\", \"userBalance\": 100.0, \"userSumEle\": 3000, \"curEle\": 50, \"curFlow\": 200, \"insPower\": 2300}}";
// JSONObject entries = JSONUtil.parseObj(data);
//// System.err.println("解析后:"+entries);
// //获取帧标识
// String type = entries.getStr("type");
// //获取帧号
// Long pakSn = entries.getLong("pakSn");
// //获取数据
// JSONObject dataObj = entries.getJSONObject("data");
// //获取水泵状态0=关泵,1=开泵
// Integer workState = dataObj.getInt("workState");
// //获取动作
// String action = dataObj.getStr("action");
// //获取单片机编码
// String mcuSn = dataObj.getStr("mcuSn");
// //获取累计用电量
// Integer sumEle = dataObj.getInt("sumEle");
// //获取累计用水量
// Integer sumFlow = dataObj.getInt("sumFlow");
// //获取瞬时流量
// Double insFlow = dataObj.getDouble("insFlow");
// //当前用户累计用水量
// Integer userSumFlow = dataObj.getInt("userSumFlow");
// //获取区域号
// String areaCode = dataObj.getStr("areaCode");
// //获取卡号
// String cardId = dataObj.getStr("cardId");
// //获取用户余额
// Double userBalance = dataObj.getDouble("userBalance");
// //获取用户累计用电量
// Integer userSumEle = dataObj.getInt("userSumEle");
// //获取本次用电量
// Integer curEle = dataObj.getInt("curEle");
// //获取本次用水量
// Integer curFlow = dataObj.getInt("curFlow");
// //获取瞬时功率
// Integer insPower = dataObj.getInt("insPower");
private void deviceDataReportHandler(Long productId,String serialNumber,JSONObject dataObj){
}
/**
* 处理平台接收到数据上报的回应
* 处理平台接收到数据上报的回应 info/reply 主题消息
*/
private void platformDataReportAckHandler(String data){
// System.err.println("mqtt回应收到设备上报数据"+ data);
@ -531,7 +483,7 @@ public class DeviceOtherMsgConsumer {
}
/**
* 处理平台给设备下发的指令
* 处理平台收到给设备下发的指令 cmd/down 主题消息
*/
private void platformCmdHandler(String data){
System.err.println("--------------------------------------------------------------mqtt-broker收到给设备下发命令"+ data);
@ -539,10 +491,47 @@ public class DeviceOtherMsgConsumer {
}
/**
* 处理设备收到指令回应
* 处理平台收到设备收到指令回应 cmd/reply 主题消息
*/
private void deviceCmdAckHandler(String data){
System.err.println("----------------------------------------------------------------------平台收到设备执行完指令的结果:"+ data);
}
/**
* 处理刷卡上报查询报文
*/
private void cardReportHandler(Long productId,String serialNumber,JSONObject dataObj){
//解析消息
String cardId=dataObj.getStr("cardId");//卡号
String areaCode=dataObj.getStr("areaCode");//区域码
//查询是否有未下发的指令
List<NgUserRechargeRecords> list = new LambdaQueryChainWrapper<>(ngUserRechargeRecordsMapper)
.select(NgUserRechargeRecords::getId, NgUserRechargeRecords::getStatus,NgUserRechargeRecords::getAmount)
.eq(NgUserRechargeRecords::getCardNumber,cardId)
.eq(NgUserRechargeRecords::getStatus,1)
.list();
//如果订单已经支付但并未下发
list.forEach(item->{
//下发指令
//构建回应
Map<String,Object> reply=new HashMap<>();
reply.put("cmd",5300);
Map<String,Object> data=new HashMap<>();
data.put("cmd",1000);
data.put("orderNum",item.getId());//订单号
data.put("cardId",cardId);//卡号
data.put("areaCode",areaCode);//区域码
data.put("investBalance",item.getAmount().doubleValue()*100);
// data.put("investWater",rechargecardUser.getWater()*100);
reply.put("data",data);
//发送回应
issueInstructionsProducer.receiveDataReportResponse(productId.toString(),serialNumber,JSONUtil.toJsonStr(reply));
});
}
public static void main(String[] args) {

View File

@ -165,28 +165,28 @@ public class UserRechargeCardsServiceImpl implements IUserRechargeCardsService
throw new ServiceException("充值机编码为空!!!");
}
//构建主题
String topic ="hzlink/147/"+ngUserRechargeRecords.getSerialNumber()+"/cmd/down";
// String topic ="hzlink/147/"+ngUserRechargeRecords.getSerialNumber()+"/cmd/down";
//构建消息
Map<String,Object> param = new HashMap<>();
// Map<String,Object> param = new HashMap<>();
//远程阀控
param.put("cmd",1000);
Map<String,Object> data = new HashMap<>();
data.put("orderNum", ngUserRechargeRecords.getId());//订单号
data.put("cardNum",Integer.parseInt(cardNumber));//卡号
data.put("areaCode",Integer.parseInt(areaCode));//区域号
data.put("investBalance",balance.doubleValue()*100);//充值的金额
data.put("investWater",100*100);//充值的水量
param.put("data",data);
try{
pubMqttClient.publish(1,true,topic, JSONUtil.toJsonStr(param));
}catch (Exception e)
{
throw new ServiceException("消息发布失败");
}
ngUserRechargeRecords.setStatus(2);//状态更改为已下发
userRechargeRecordsMapper.updateNgUserRechargeRecords(ngUserRechargeRecords);
// param.put("cmd",1000);
// Map<String,Object> data = new HashMap<>();
// data.put("orderNum", ngUserRechargeRecords.getId());//订单号
// data.put("cardNum",Integer.parseInt(cardNumber));//卡号
// data.put("areaCode",Integer.parseInt(areaCode));//区域号
// data.put("investBalance",balance.doubleValue()*100);//充值的金额
// data.put("investWater",100*100);//充值的水量
//
// param.put("data",data);
// try{
// pubMqttClient.publish(1,true,topic, JSONUtil.toJsonStr(param));
//
// }catch (Exception e)
// {
// throw new ServiceException("消息发布失败");
// }
// ngUserRechargeRecords.setStatus(2);//状态更改为已下发
// userRechargeRecordsMapper.updateNgUserRechargeRecords(ngUserRechargeRecords);
// int i = userRechargeCardsMapper.updateUserRechargeCards(info);
return 1;
}