diff --git a/fastbee-gateway/fastbee-mq/src/main/java/com/fastbee/mq/redischannel/consumer/DeviceOtherMsgConsumer.java b/fastbee-gateway/fastbee-mq/src/main/java/com/fastbee/mq/redischannel/consumer/DeviceOtherMsgConsumer.java index e19b1f8..d7e4591 100644 --- a/fastbee-gateway/fastbee-mq/src/main/java/com/fastbee/mq/redischannel/consumer/DeviceOtherMsgConsumer.java +++ b/fastbee-gateway/fastbee-mq/src/main/java/com/fastbee/mq/redischannel/consumer/DeviceOtherMsgConsumer.java @@ -3,6 +3,7 @@ package com.fastbee.mq.redischannel.consumer; import cn.hutool.core.date.DateTime; import cn.hutool.json.JSONObject; import cn.hutool.json.JSONUtil; +import com.baomidou.mybatisplus.extension.conditions.query.LambdaQueryChainWrapper; import com.baomidou.mybatisplus.extension.conditions.update.LambdaUpdateChainWrapper; import com.fastbee.common.constant.FastBeeConstant; import com.fastbee.common.core.domain.entity.SysUser; @@ -35,6 +36,7 @@ import java.time.LocalDateTime; import java.time.LocalTime; import java.time.format.DateTimeFormatter; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; @@ -94,32 +96,25 @@ public class DeviceOtherMsgConsumer { System.err.println(topic.endsWith("/info/up")); //设备上报数据消息--------------------------------------------------------------------------------- if(topic.endsWith("/info/up")){ - deviceDataReportHandler(new String(data)); + deviceDataReportHandler(productId,serialNumber,JSONUtil.parseObj(new String(data))); //更新设备在线状态 updateDeviceOnlineStatus( productId,serialNumber);//拼成一个唯一的key,只要key存在,则设备在线 // updateDeviceOnlineStatusDb(serialNumber); //保存使用记录 NgWaterPumpUsageRecords pumpUsageRecords=new NgWaterPumpUsageRecords(); pumpUsageRecords.setDeviceNumber(serialNumber); - JSONObject jsonObject = JSONUtil.parseObj(data); - // jsonObject.set("time", (LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))));//收到消息的当地时间 + JSONObject jsonObject = JSONUtil.parseObj(new String(data)); + //报文中data部分 + JSONObject data1 = jsonObject.getJSONObject("data"); if(jsonObject.getStr("type")!=null){ //上报正常数据 - if(jsonObject.getStr("type").equals("waterEleData")){ - JSONObject data1 = jsonObject.getJSONObject("data"); + if("waterEleData".equals(jsonObject.getStr("type"))){ //收到开阀定时报文 - if(data1.get("action").equals("timeMsg")){ + if("timeMsg".equals(data1.get("action"))){ pumpUsageRecords.setMessageContent(JSONUtil.toJsonStr(jsonObject)); - - // int i = ngWaterPumpUsageRecordsMapper.insertNgWaterPumpUsageRecords(pumpUsageRecords); - // if(i<1){ - // System.err.println("--------------------------保存使用记录失败!---------------------------"); - // } - LambdaUpdateChainWrapper up = new LambdaUpdateChainWrapper<>(deviceReportInfoMapper) .eq(DeviceReportInfo::getSerialNumber, serialNumber); - //获取流量计累计用水量/瞬时流量 Float meterSum = data1.getFloat("meterSum"); Float meterIns = data1.getFloat("meterIns"); @@ -180,18 +175,12 @@ public class DeviceOtherMsgConsumer { log.error("保存灌溉控制器历史数据失败"); } } - //收到关阀时定时报文 - else if(JSONUtil.parseObj(data1).get("action").equals("safeMsg")){ + //收到关阀时平安报文 + else if("safeMsg".equals(JSONUtil.parseObj(data1).get("action"))){ pumpUsageRecords.setMessageContent(JSONUtil.toJsonStr(jsonObject)); - // int i = ngWaterPumpUsageRecordsMapper.insertNgWaterPumpUsageRecords(pumpUsageRecords); - // if(i<1){ - // System.err.println("--------------------------保存使用记录失败!---------------------------"); - // } - LambdaUpdateChainWrapper up = new LambdaUpdateChainWrapper<>(deviceReportInfoMapper) .eq(DeviceReportInfo::getSerialNumber, serialNumber); - //获取流量计累计用水量/瞬时流量 Float meterSum = data1.getFloat("meterSum"); Float meterIns = data1.getFloat("meterIns"); @@ -252,7 +241,7 @@ public class DeviceOtherMsgConsumer { } //开阀门,关阀门 - else if(JSONUtil.parseObj(data1).get("action").equals("startPump")||JSONUtil.parseObj(data1).get("action").equals("stopPump")){ + else if("startPump".equals(JSONUtil.parseObj(data1).get("action"))|| "stopPump".equals(JSONUtil.parseObj(data1).get("action"))){ int i = ngWaterPumpUsageRecordsMapper.insertNgWaterPumpUsageRecords(pumpUsageRecords); if(i<1){ System.err.println("--------------------------保存使用记录失败!---------------------------"); @@ -260,7 +249,7 @@ public class DeviceOtherMsgConsumer { JSONObject dataJson=jsonObject.getJSONObject("data"); System.err.println(jsonObject); System.err.println(dataJson); - if(JSONUtil.parseObj(data1).get("action").equals("startPump")) + if("startPump".equals(JSONUtil.parseObj(data1).get("action"))) { System.err.println("开阀"); //开阀-添加灌溉记录 @@ -277,7 +266,10 @@ public class DeviceOtherMsgConsumer { { System.err.println("用户不存在"); } - Long deptId=cardInfo.getDeptId();//获取deptId + Long deptId= null;//获取deptId + if (cardInfo != null) { + deptId = cardInfo.getDeptId(); + } userIrrigationRecord.setDeptId(deptId); @@ -374,49 +366,47 @@ public class DeviceOtherMsgConsumer { } //收到心跳报文 - else if(jsonObject.getStr("type").equals("heartBeat")){ + else if("heartBeat".equals(jsonObject.getStr("type"))){ //需要回应 //构建回复消息----------------------------------------------------------------------------- Map reportMsg=new HashMap<>(); reportMsg.put("code",100); reportMsg.put("pakSn",jsonObject.getStr("pakSn")); - // reportMsg.put("data","heartBeat"); - // jsonObject.set("code",100); issueInstructionsProducer.receiveDataReportResponse(productId.toString(),serialNumber,JSONUtil.toJsonStr(reportMsg)); } } - //构建回复消息----------------------------------------------------------------------------- - // Map reportMsg=new HashMap<>(); - // reportMsg.put("code",0); - // reportMsg.put("msg","ok"); - // issueInstructionsProducer.receiveDataReportResponse(productId.toString(),serialNumber,JSONUtil.toJsonStr(reportMsg)); + //如果是设备刷卡上报的查询报文 + if(data1.getStr("action")!=null&& "askMsg".equals(data1.getStr("action"))){ + cardReportHandler(productId,serialNumber,data1); + } + } - //打印设备 - else if ( topic.endsWith("/info/reply")) { + + else if (topic.endsWith("/info/reply")) { platformDataReportAckHandler(new String(data)); updateDeviceOnlineStatus( productId,serialNumber); - // updateDeviceOnlineStatusDb(serialNumber); + } else if (topic.endsWith("/cmd/down")) { platformCmdHandler(new String(data)); - //回应 + } else if (topic.endsWith("/cmd/reply")) { //收到充值结果指令 JSONObject jsonObject = JSONUtil.parseObj(new String(data)); - if(jsonObject.getStr("type").equals("devCmdAck")){ + if("devCmdAck".equals(jsonObject.getStr("type"))){ JSONObject data1 = jsonObject.getJSONObject("data"); Long orderNum = data1.getLong("orderNum"); //修改订单状态为已充值 - try { - new LambdaUpdateChainWrapper<>(ngUserRechargeRecordsMapper) - .set(NgUserRechargeRecords::getStatus,3) - .eq(NgUserRechargeRecords::getId,orderNum) + boolean update = new LambdaUpdateChainWrapper<>(ngUserRechargeRecordsMapper) + .set(NgUserRechargeRecords::getStatus, 3) + .eq(NgUserRechargeRecords::getId, orderNum) .update(); - }catch (Exception e){ - + if(!update){ + log.error("修改订单号为:{}的订单状态为已充值失败",orderNum); } + } deviceCmdAckHandler(new String(data)); updateDeviceOnlineStatus( productId,serialNumber); @@ -475,55 +465,17 @@ public class DeviceOtherMsgConsumer { /** - * 处理设备数据上报 - * @param data 消息 + * 处理设备数据上报 info/up主题消息 + * @param dataObj 消息 */ - private void deviceDataReportHandler(String data){ - // System.err.println("mqtt接收到设备上报数据:"+ data); - //回应 -// -// //解析 -//// String jsonString = "{\"type\": \"waterEleData\", \"pakSn\": 123, \"data\": {\"workState\": 1, \"action\": \"startPump\", \"mcuSn\": \"MCU123456\", \"sumEle\": 5000, \"sumFlow\": 3000, \"insFlow\": 2.5, \"userSumFlow\": 1500, \"areaCode\": \"010\", \"cardId\": \"CARD12345678\", \"userBalance\": 100.0, \"userSumEle\": 3000, \"curEle\": 50, \"curFlow\": 200, \"insPower\": 2300}}"; -// JSONObject entries = JSONUtil.parseObj(data); -//// System.err.println("解析后:"+entries); -// //获取帧标识 -// String type = entries.getStr("type"); -// //获取帧号 -// Long pakSn = entries.getLong("pakSn"); -// //获取数据 -// JSONObject dataObj = entries.getJSONObject("data"); -// //获取水泵状态0=关泵,1=开泵 -// Integer workState = dataObj.getInt("workState"); -// //获取动作 -// String action = dataObj.getStr("action"); -// //获取单片机编码 -// String mcuSn = dataObj.getStr("mcuSn"); -// //获取累计用电量 -// Integer sumEle = dataObj.getInt("sumEle"); -// //获取累计用水量 -// Integer sumFlow = dataObj.getInt("sumFlow"); -// //获取瞬时流量 -// Double insFlow = dataObj.getDouble("insFlow"); -// //当前用户累计用水量 -// Integer userSumFlow = dataObj.getInt("userSumFlow"); -// //获取区域号 -// String areaCode = dataObj.getStr("areaCode"); -// //获取卡号 -// String cardId = dataObj.getStr("cardId"); -// //获取用户余额 -// Double userBalance = dataObj.getDouble("userBalance"); -// //获取用户累计用电量 -// Integer userSumEle = dataObj.getInt("userSumEle"); -// //获取本次用电量 -// Integer curEle = dataObj.getInt("curEle"); -// //获取本次用水量 -// Integer curFlow = dataObj.getInt("curFlow"); -// //获取瞬时功率 -// Integer insPower = dataObj.getInt("insPower"); + private void deviceDataReportHandler(Long productId,String serialNumber,JSONObject dataObj){ + + + } /** - * 处理平台接收到数据上报的回应 + * 处理平台接收到数据上报的回应 info/reply 主题消息 */ private void platformDataReportAckHandler(String data){ // System.err.println("mqtt回应收到设备上报数据:"+ data); @@ -531,7 +483,7 @@ public class DeviceOtherMsgConsumer { } /** - * 处理平台给设备下发的指令 + * 处理平台收到给设备下发的指令 cmd/down 主题消息 */ private void platformCmdHandler(String data){ System.err.println("--------------------------------------------------------------mqtt-broker收到给设备下发命令:"+ data); @@ -539,10 +491,47 @@ public class DeviceOtherMsgConsumer { } /** - * 处理设备收到指令回应 + * 处理平台收到设备收到指令回应 cmd/reply 主题消息 */ private void deviceCmdAckHandler(String data){ System.err.println("----------------------------------------------------------------------平台收到设备执行完指令的结果:"+ data); + } + + /** + * 处理刷卡上报查询报文 + */ + private void cardReportHandler(Long productId,String serialNumber,JSONObject dataObj){ + //解析消息 + String cardId=dataObj.getStr("cardId");//卡号 + String areaCode=dataObj.getStr("areaCode");//区域码 + //查询是否有未下发的指令 + List list = new LambdaQueryChainWrapper<>(ngUserRechargeRecordsMapper) + .select(NgUserRechargeRecords::getId, NgUserRechargeRecords::getStatus,NgUserRechargeRecords::getAmount) + .eq(NgUserRechargeRecords::getCardNumber,cardId) + .eq(NgUserRechargeRecords::getStatus,1) + .list(); + + //如果订单已经支付但并未下发 + list.forEach(item->{ + //下发指令 + //构建回应 + Map reply=new HashMap<>(); + reply.put("cmd",5300); + Map data=new HashMap<>(); + data.put("cmd",1000); + data.put("orderNum",item.getId());//订单号 + data.put("cardId",cardId);//卡号 + data.put("areaCode",areaCode);//区域码 + data.put("investBalance",item.getAmount().doubleValue()*100); + // data.put("investWater",rechargecardUser.getWater()*100); + reply.put("data",data); + //发送回应 + issueInstructionsProducer.receiveDataReportResponse(productId.toString(),serialNumber,JSONUtil.toJsonStr(reply)); + }); + + + + } public static void main(String[] args) { diff --git a/fastbee-service/fastbee-rechargecard-service/src/main/java/com/fastbee/rechargecard/service/impl/UserRechargeCardsServiceImpl.java b/fastbee-service/fastbee-rechargecard-service/src/main/java/com/fastbee/rechargecard/service/impl/UserRechargeCardsServiceImpl.java index fbf33bd..3d6a3aa 100644 --- a/fastbee-service/fastbee-rechargecard-service/src/main/java/com/fastbee/rechargecard/service/impl/UserRechargeCardsServiceImpl.java +++ b/fastbee-service/fastbee-rechargecard-service/src/main/java/com/fastbee/rechargecard/service/impl/UserRechargeCardsServiceImpl.java @@ -165,28 +165,28 @@ public class UserRechargeCardsServiceImpl implements IUserRechargeCardsService throw new ServiceException("充值机编码为空!!!"); } //构建主题 - String topic ="hzlink/147/"+ngUserRechargeRecords.getSerialNumber()+"/cmd/down"; + // String topic ="hzlink/147/"+ngUserRechargeRecords.getSerialNumber()+"/cmd/down"; //构建消息 - Map param = new HashMap<>(); + // Map param = new HashMap<>(); //远程阀控 - param.put("cmd",1000); - Map data = new HashMap<>(); - data.put("orderNum", ngUserRechargeRecords.getId());//订单号 - data.put("cardNum",Integer.parseInt(cardNumber));//卡号 - data.put("areaCode",Integer.parseInt(areaCode));//区域号 - data.put("investBalance",balance.doubleValue()*100);//充值的金额 - data.put("investWater",100*100);//充值的水量 - - param.put("data",data); - try{ - pubMqttClient.publish(1,true,topic, JSONUtil.toJsonStr(param)); - - }catch (Exception e) - { - throw new ServiceException("消息发布失败"); - } - ngUserRechargeRecords.setStatus(2);//状态更改为已下发 - userRechargeRecordsMapper.updateNgUserRechargeRecords(ngUserRechargeRecords); + // param.put("cmd",1000); + // Map data = new HashMap<>(); + // data.put("orderNum", ngUserRechargeRecords.getId());//订单号 + // data.put("cardNum",Integer.parseInt(cardNumber));//卡号 + // data.put("areaCode",Integer.parseInt(areaCode));//区域号 + // data.put("investBalance",balance.doubleValue()*100);//充值的金额 + // data.put("investWater",100*100);//充值的水量 + // + // param.put("data",data); + // try{ + // pubMqttClient.publish(1,true,topic, JSONUtil.toJsonStr(param)); + // + // }catch (Exception e) + // { + // throw new ServiceException("消息发布失败"); + // } + // ngUserRechargeRecords.setStatus(2);//状态更改为已下发 + // userRechargeRecordsMapper.updateNgUserRechargeRecords(ngUserRechargeRecords); // int i = userRechargeCardsMapper.updateUserRechargeCards(info); return 1; }