灌溉记录报文处理
This commit is contained in:
@@ -80,25 +80,23 @@ public class DeviceOtherMsgConsumer {
|
|||||||
|
|
||||||
@Async(FastBeeConstant.TASK.DEVICE_OTHER_TASK)
|
@Async(FastBeeConstant.TASK.DEVICE_OTHER_TASK)
|
||||||
public void consume(DeviceReportBo bo){
|
public void consume(DeviceReportBo bo){
|
||||||
|
//处理emq订阅的非 property/post 属性上报的消息 ,因为其他消息量小,放在一起处理
|
||||||
|
Long productId;//产品id,设备所属产品
|
||||||
|
byte[] data = bo.getData();//数据
|
||||||
|
String topic=bo.getTopicName();//主题
|
||||||
|
//从主题中解析出产品id
|
||||||
|
String[] split = topic.split("/");
|
||||||
|
productId= Long.valueOf(split[1]);
|
||||||
|
//从主题中解析出设备序列号
|
||||||
|
String serialNumber= split[2];//设备序列号
|
||||||
try {
|
try {
|
||||||
//处理emq订阅的非 property/post 属性上报的消息 ,因为其他消息量小,放在一起处理
|
|
||||||
Long productId;//产品id,设备所属产品
|
|
||||||
Long pacSn;//包号
|
|
||||||
byte[] data = bo.getData();//数据
|
|
||||||
String topic=bo.getTopicName();//主题
|
|
||||||
//从主题中解析出产品id
|
|
||||||
String[] split = topic.split("/");
|
|
||||||
productId= Long.valueOf(split[1]);
|
|
||||||
//从主题中解析出设备序列号
|
|
||||||
String serialNumber= split[2];//设备序列号
|
|
||||||
|
|
||||||
System.err.println("主题:"+topic+"--产品id:"+productId+"--设备序列号:"+serialNumber);
|
System.err.println("主题:"+topic+"--产品id:"+productId+"--设备序列号:"+serialNumber);
|
||||||
System.err.println(topic.endsWith("/info/up"));
|
|
||||||
//设备上报数据消息---------------------------------------------------------------------------------
|
//设备上报数据消息---------------------------------------------------------------------------------
|
||||||
if(topic.endsWith("/info/up")){
|
if(topic.endsWith("/info/up")){
|
||||||
deviceDataReportHandler(productId,serialNumber,JSONUtil.parseObj(new String(data)));
|
deviceDataReportHandler(productId,serialNumber,JSONUtil.parseObj(new String(data)));
|
||||||
//更新设备在线状态
|
//更新设备在线状态
|
||||||
updateDeviceOnlineStatus( productId,serialNumber);//拼成一个唯一的key,只要key存在,则设备在线
|
updateDeviceOnlineStatus(productId,serialNumber);//拼成一个唯一的key,只要key存在,则设备在线
|
||||||
// updateDeviceOnlineStatusDb(serialNumber);
|
// updateDeviceOnlineStatusDb(serialNumber);
|
||||||
//保存使用记录
|
//保存使用记录
|
||||||
NgWaterPumpUsageRecords pumpUsageRecords=new NgWaterPumpUsageRecords();
|
NgWaterPumpUsageRecords pumpUsageRecords=new NgWaterPumpUsageRecords();
|
||||||
@@ -110,7 +108,7 @@ public class DeviceOtherMsgConsumer {
|
|||||||
//上报正常数据
|
//上报正常数据
|
||||||
if("waterEleData".equals(jsonObject.getStr("type"))){
|
if("waterEleData".equals(jsonObject.getStr("type"))){
|
||||||
|
|
||||||
//收到开阀定时报文
|
//收到开阀时的定时报文
|
||||||
if("timeMsg".equals(data1.get("action"))){
|
if("timeMsg".equals(data1.get("action"))){
|
||||||
pumpUsageRecords.setMessageContent(JSONUtil.toJsonStr(jsonObject));
|
pumpUsageRecords.setMessageContent(JSONUtil.toJsonStr(jsonObject));
|
||||||
LambdaUpdateChainWrapper<DeviceReportInfo> up = new LambdaUpdateChainWrapper<>(deviceReportInfoMapper)
|
LambdaUpdateChainWrapper<DeviceReportInfo> up = new LambdaUpdateChainWrapper<>(deviceReportInfoMapper)
|
||||||
@@ -175,7 +173,7 @@ public class DeviceOtherMsgConsumer {
|
|||||||
log.error("保存灌溉控制器历史数据失败");
|
log.error("保存灌溉控制器历史数据失败");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
//收到关阀时平安报文
|
//收到关阀时的平安报文
|
||||||
else if("safeMsg".equals(JSONUtil.parseObj(data1).get("action"))){
|
else if("safeMsg".equals(JSONUtil.parseObj(data1).get("action"))){
|
||||||
pumpUsageRecords.setMessageContent(JSONUtil.toJsonStr(jsonObject));
|
pumpUsageRecords.setMessageContent(JSONUtil.toJsonStr(jsonObject));
|
||||||
|
|
||||||
@@ -240,50 +238,35 @@ public class DeviceOtherMsgConsumer {
|
|||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
//开阀门,关阀门
|
//处理开阀门报文
|
||||||
else if("startPump".equals(JSONUtil.parseObj(data1).get("action"))|| "stopPump".equals(JSONUtil.parseObj(data1).get("action"))){
|
else if("startPump".equals(JSONUtil.parseObj(data1).get("action"))){
|
||||||
int i = ngWaterPumpUsageRecordsMapper.insertNgWaterPumpUsageRecords(pumpUsageRecords);
|
// int i = ngWaterPumpUsageRecordsMapper.insertNgWaterPumpUsageRecords(pumpUsageRecords);
|
||||||
if(i<1){
|
// if(i<1){
|
||||||
System.err.println("--------------------------保存使用记录失败!---------------------------");
|
// System.err.println("--------------------------保存使用记录失败!---------------------------");
|
||||||
}
|
// }
|
||||||
JSONObject dataJson=jsonObject.getJSONObject("data");
|
JSONObject dataJson=jsonObject.getJSONObject("data");
|
||||||
System.err.println(jsonObject);
|
|
||||||
System.err.println(dataJson);
|
|
||||||
if("startPump".equals(JSONUtil.parseObj(data1).get("action")))
|
|
||||||
{
|
|
||||||
System.err.println("开阀");
|
|
||||||
log.info("设备{}开阀",serialNumber);
|
log.info("设备{}开阀",serialNumber);
|
||||||
//开阀-添加灌溉记录
|
//开阀-添加灌溉记录
|
||||||
UserIrrigationRecord userIrrigationRecord=new UserIrrigationRecord();
|
UserIrrigationRecord userIrrigationRecord=new UserIrrigationRecord();
|
||||||
userIrrigationRecord.setDeviceNumber(serialNumber);//设备编码
|
userIrrigationRecord.setDeviceNumber(serialNumber);//设备编码
|
||||||
System.err.println("serialNumber"+serialNumber);
|
|
||||||
|
|
||||||
String cardNumber= String.valueOf(dataJson.getInt("cardId"));//解析cardId
|
String cardNumber= String.valueOf(dataJson.getInt("cardId"));//解析cardId
|
||||||
userIrrigationRecord.setCardNumber(cardNumber);//卡号
|
userIrrigationRecord.setCardNumber(cardNumber);//卡号
|
||||||
System.err.println("cardNumber"+cardNumber);
|
|
||||||
|
|
||||||
UserRechargeCards cardInfo=userRechargeCardsMapper.selectUserRechargeCardsByCardnumber(cardNumber);
|
UserRechargeCards cardInfo=userRechargeCardsMapper.selectUserRechargeCardsByCardnumber(cardNumber);
|
||||||
if(cardInfo==null)
|
//根据卡号查询用户所属机构
|
||||||
{
|
List<UserRechargeCards> cardList = new LambdaQueryChainWrapper<>(userRechargeCardsMapper)
|
||||||
System.err.println("用户不存在");
|
.select(UserRechargeCards::getUserId,UserRechargeCards::getDeptId)
|
||||||
}
|
.eq(UserRechargeCards::getCardNumber, cardNumber)
|
||||||
Long deptId= null;//获取deptId
|
.list();
|
||||||
if (cardInfo != null) {
|
Long deptId= null;//获取deptId
|
||||||
deptId = cardInfo.getDeptId();
|
if(!cardList.isEmpty()){
|
||||||
}
|
deptId= cardList.get(0).getDeptId();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
userIrrigationRecord.setDeptId(deptId);
|
userIrrigationRecord.setDeptId(deptId);
|
||||||
|
|
||||||
/*Long userId=null;
|
|
||||||
System.err.println("userId");
|
|
||||||
if(userRechargeCardsMapper.selectUserRechargeCardsByCardNumber(cardNumber)==null)
|
|
||||||
{
|
|
||||||
throw new Exception("订单信息不存在");
|
|
||||||
}
|
|
||||||
userId=userRechargeCardsMapper.selectUserRechargeCardsByCardNumber(cardNumber).getUserId();
|
|
||||||
userIrrigationRecord.setUserId(userId);
|
|
||||||
System.err.println("userId"+userId);*/
|
|
||||||
|
|
||||||
|
|
||||||
DateTime currentTime=DateTime.now();//获取当前时间作为开阀时间
|
DateTime currentTime=DateTime.now();//获取当前时间作为开阀时间
|
||||||
userIrrigationRecord.setStartTime(currentTime);//开阀时间
|
userIrrigationRecord.setStartTime(currentTime);//开阀时间
|
||||||
@@ -304,69 +287,69 @@ public class DeviceOtherMsgConsumer {
|
|||||||
|
|
||||||
userIrrigationRecord.setCreateTime(DateUtils.getNowDate());
|
userIrrigationRecord.setCreateTime(DateUtils.getNowDate());
|
||||||
|
|
||||||
//对充值卡余额的操作
|
|
||||||
UserRechargeCards userRechargeCards = userRechargeCardsMapper.selectUserRechargeCardsByCardNumber(cardNumber);//获取充值卡信息
|
|
||||||
BigDecimal balance=dataJson.getBigDecimal("userBalance");//获取当前用户金额
|
|
||||||
userRechargeCards.setBalance(balance);//同步用户余额
|
|
||||||
|
|
||||||
int flag=userIrrigationRecordMapper.insertUserIrrigationRecord(userIrrigationRecord);//开阀时添加一条灌溉记录
|
int flag=userIrrigationRecordMapper.insertUserIrrigationRecord(userIrrigationRecord);//开阀时添加一条灌溉记录
|
||||||
if(flag<1)
|
if(flag<1)
|
||||||
{
|
{
|
||||||
System.err.println("灌溉记录添加失败");
|
log.error("设备{}灌溉记录添加失败",serialNumber);
|
||||||
}else{
|
|
||||||
flag=userRechargeCardsMapper.updateUserRechargeCards(userRechargeCards);//同步充值卡余额
|
|
||||||
if(flag<1)
|
|
||||||
{
|
|
||||||
System.err.println("充值卡金额同步失败");
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}else if("stopPump".equals(JSONUtil.parseObj(data1).get("action"))){
|
|
||||||
System.err.println("关阀");
|
|
||||||
log.info("设备{}关",serialNumber);
|
|
||||||
//关阀-修改灌溉记录,修改结束时间、灌溉用水量(做差 关阀-开阀用户总累计流量。)查卡号最新一条记录
|
|
||||||
String cardNumber=dataJson.getStr("cardId");//解析cardId
|
|
||||||
//根据设备编码和卡号获取当前状态为灌溉中的灌溉记录
|
|
||||||
UserIrrigationRecord userIrrigationRecord=userIrrigationRecordMapper.selectUserIrrigationRecordListBycardNumberAndDeviceNumber(cardNumber,serialNumber).get(0);
|
|
||||||
|
|
||||||
if(userIrrigationRecord==null)
|
}
|
||||||
{
|
//处理关阀门报文
|
||||||
System.err.println("此灌溉记录不存在");
|
else if ("stopPump".equals(JSONUtil.parseObj(data1).get("action"))) {
|
||||||
}
|
JSONObject dataJson=jsonObject.getJSONObject("data");
|
||||||
|
|
||||||
|
log.info("设备{}关阀",serialNumber);
|
||||||
|
//关阀-修改灌溉记录,修改结束时间、灌溉用水量(做差 关阀-开阀用户总累计流量。)查卡号最新一条记录
|
||||||
|
String cardNumber=dataJson.getStr("cardId");//解析cardId
|
||||||
|
//根据设备编码和卡号获取当前状态为灌溉中的灌溉记录
|
||||||
|
List<UserIrrigationRecord> irrigationRecordList = new LambdaQueryChainWrapper<>(userIrrigationRecordMapper)
|
||||||
|
.select(UserIrrigationRecord::getId, UserIrrigationRecord::getStartTime,UserIrrigationRecord::getOpenCumFlow)
|
||||||
|
.eq(UserIrrigationRecord::getCardNumber, cardNumber)
|
||||||
|
.eq(UserIrrigationRecord::getStatus,1)
|
||||||
|
.orderByDesc(UserIrrigationRecord::getStartTime)
|
||||||
|
.list();
|
||||||
|
if(irrigationRecordList.isEmpty())
|
||||||
|
{
|
||||||
|
log.error("设备{}灌溉记录不存在",serialNumber);
|
||||||
|
}else{
|
||||||
|
UserIrrigationRecord userIrrigationRecord=new UserIrrigationRecord();
|
||||||
|
userIrrigationRecord.setId(irrigationRecordList.get(0).getId());
|
||||||
|
|
||||||
|
log.info("查询到{}设备的灌溉记录{}",serialNumber,userIrrigationRecord);
|
||||||
DateTime currentTime=DateTime.now();//获取当前时间作为关阀时间
|
DateTime currentTime=DateTime.now();//获取当前时间作为关阀时间
|
||||||
userIrrigationRecord.setEndTime(currentTime);//关阀时间
|
userIrrigationRecord.setEndTime(currentTime);//关阀时间
|
||||||
userIrrigationRecord.setLastTime(currentTime);//灌溉最新上报时间
|
userIrrigationRecord.setLastTime(currentTime);//灌溉最新上报时间
|
||||||
|
|
||||||
BigDecimal closeCumFlow=dataJson.getBigDecimal("userSumFlow");//用户关阀时总用水量
|
BigDecimal closeCumFlow=dataJson.getBigDecimal("userSumFlow");//用户关阀时总用水量
|
||||||
|
// System.err.println("用户开阀时总用水量:"+irrigationRecordList.get(0).getOpenCumFlow());
|
||||||
|
// System.err.println("用户关阀时总用水量:"+closeCumFlow);
|
||||||
userIrrigationRecord.setCloseCumFlow(closeCumFlow);
|
userIrrigationRecord.setCloseCumFlow(closeCumFlow);
|
||||||
|
|
||||||
BigDecimal currentFlow=closeCumFlow.subtract(userIrrigationRecord.getOpenCumFlow());//计算结果为当前用水量
|
|
||||||
|
|
||||||
|
BigDecimal currentFlow=closeCumFlow.subtract(irrigationRecordList.get(0).getOpenCumFlow());//计算结果为当前用水量
|
||||||
|
//当前用水量
|
||||||
|
log.info("设备{}本次灌溉用户当前用水量{}",serialNumber,currentFlow);
|
||||||
userIrrigationRecord.setCurFlow(currentFlow);
|
userIrrigationRecord.setCurFlow(currentFlow);
|
||||||
|
|
||||||
BigDecimal userBalance=dataJson.getBigDecimal("userBalance");//用户余额
|
BigDecimal userBalance=dataJson.getBigDecimal("userBalance");//用户余额
|
||||||
|
System.err.println("用户余额:"+userBalance);
|
||||||
userIrrigationRecord.setBalance(userBalance);
|
userIrrigationRecord.setBalance(userBalance);
|
||||||
|
|
||||||
userIrrigationRecord.setStatus(2);//灌溉状态更改为结束灌溉
|
userIrrigationRecord.setStatus(2);//灌溉状态更改为结束灌溉
|
||||||
|
|
||||||
//对充值卡余额的操作
|
|
||||||
UserRechargeCards userRechargeCards = userRechargeCardsMapper.selectUserRechargeCardsByCardNumber(cardNumber);//获取充值卡信息
|
|
||||||
BigDecimal balance=dataJson.getBigDecimal("userBalance");//获取当前用户金额
|
|
||||||
userRechargeCards.setBalance(balance);//同步用户余额
|
|
||||||
|
|
||||||
int flag=userIrrigationRecordMapper.updateUserIrrigationRecord(userIrrigationRecord);//关阀时修改一条灌溉记录
|
log.info("封装关阀数据{}",userIrrigationRecord);
|
||||||
|
int flag=userIrrigationRecordMapper.updateById(userIrrigationRecord);//关阀时修改一条灌溉记录
|
||||||
if(flag<1)
|
if(flag<1)
|
||||||
{
|
{
|
||||||
System.err.println("灌溉记录修改失败");
|
log.error("设备{}灌溉记录修改失败",serialNumber);
|
||||||
}else{
|
|
||||||
flag=userRechargeCardsMapper.updateUserRechargeCards(userRechargeCards);//同步充值卡余额
|
|
||||||
if(flag<1)
|
|
||||||
{
|
|
||||||
System.err.println("充值卡金额同步失败");
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
//收到心跳报文
|
//收到心跳报文
|
||||||
else if("heartBeat".equals(jsonObject.getStr("type"))){
|
else if("heartBeat".equals(jsonObject.getStr("type"))){
|
||||||
//需要回应
|
//需要回应
|
||||||
@@ -416,7 +399,7 @@ public class DeviceOtherMsgConsumer {
|
|||||||
}
|
}
|
||||||
otherMsgHandler.messageHandler(bo);
|
otherMsgHandler.messageHandler(bo);
|
||||||
}catch (Exception e){
|
}catch (Exception e){
|
||||||
// log.error("=>设备其他消息处理出错",e);
|
log.error("=>主题:{}设备消息:{}处理出错:{}",topic,new String(data), e.getMessage());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -471,8 +454,8 @@ public class DeviceOtherMsgConsumer {
|
|||||||
* @param dataObj 消息
|
* @param dataObj 消息
|
||||||
*/
|
*/
|
||||||
private void deviceDataReportHandler(Long productId,String serialNumber,JSONObject dataObj){
|
private void deviceDataReportHandler(Long productId,String serialNumber,JSONObject dataObj){
|
||||||
|
//消息data部分
|
||||||
|
JSONObject data = dataObj.getJSONObject("data");
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user