package com.lpro.iot.protocal.impl; import java.util.ArrayList; import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; import org.apache.commons.lang.StringUtils; import org.apache.mina.core.session.IoSession; import com.alibaba.fastjson.JSON; import com.lpro.iot.bean.IotNodeInfo; import com.lpro.iot.bean.IotNodeInfoBO; import com.lpro.iot.bean.IotSensorInfoBO; import com.lpro.iot.bean.ReturnObj; import com.lpro.iot.common.Cache; import com.lpro.iot.common.Code; import com.lpro.iot.common.Code.IOT_NODE_STATUS; import com.lpro.iot.common.Config; import com.lpro.iot.common.Constants; import com.lpro.iot.mqtt.MqttService; import com.lpro.iot.protocal.Iprotocal; import com.lpro.iot.utils.HttpServiceSender; import com.lpro.iot.utils.ObjectUtil; import com.lpro.iot.utils.Util; /** * * @author M * HJ212 * */ public class ProtocalHj212 implements Iprotocal { private final static String nodeSensorsData = "NODE_SENSORS_DATA" ; @Override public boolean match(IoSession session, byte[] data, String msg) { if (msg.length() > 10 && "##".equals( msg.subSequence(0, 2) ) ) { return true; } return false; } @Override public void loginProtocal(IoSession session, byte[] data, String msg , ReturnObj obj) { // 解析登录命令 String deviceCode = ""; if(obj == null){ String[] cmds = msg.split("MN=") ; if(cmds.length >1){ deviceCode = cmds[1].split(";")[0]; } if(StringUtils.isBlank(deviceCode)){ return ; } obj = Util.NodeDevice.login(deviceCode); }else{ deviceCode = msg ; } if(obj.getStatus() == Code.ResponseCode.OK+0){ // 将devicecode放入session session.setAttribute(Constants.DEVICECODE,deviceCode); session.setAttribute( nodeSensorsData, obj.getData()) ; // session 数据缓存 session.setAttribute("dataCache",""); // 添加协议类型 session.setAttribute(Constants.PROTOCOL_CATEGORY, this.getClass().getSimpleName()); Cache.nodeIdsessionMap.put( deviceCode , session); // 登录成功,可以直接转换数据 analysisData(session, data, msg); } } @Override public void analysisData(IoSession session, byte[] data, String msg) { // node id Object deviceCode = session.getAttribute(Constants.DEVICECODE); List senorList = new ArrayList(); String dataCache = (String) session.getAttribute("dataCache"); if(Cache.deviceMap.containsKey(deviceCode)){ // 如果有,则返回数据 Map map =new HashMap<>(); map.put("askii", msg) ; map.put("byte", Util.binhexoct.bytesToHex(data)) ; MqttService.pubMessage(JSON.toJSONString(map), "/sys/debug/" + deviceCode ); } if(ObjectUtil.isNotEmpty(dataCache)){ msg = dataCache + msg ; session.setAttribute("dataCache",""); } try { String[] cmds = msg.split("&&") ; if( cmds.length <3 ){ session.setAttribute("dataCache",msg); }else if( cmds.length >3){ return ; }else{ Date date = new Date() ; String[] datas = cmds[1].split(";"); for(String tmp : datas){ IotSensorInfoBO sensorInfo = new IotSensorInfoBO(); sensorInfo.setDevice_code(deviceCode.toString() ); if(tmp.contains("DataTime")){ String dateStr = tmp.split("=")[1]; if(dateStr.length() == 17){ date = Util.date.parse( Util.date.dtVeryLong, dateStr) ; }else{ date = Util.date.parse( Util.date.dtLong, dateStr) ; } if( date.getTime() < new Date().getTime() - 24*60*60*1000 ){ date = new Date() ; }else if(date.getTime() > new Date().getTime() + 10 *60*100 ){ date = new Date() ; } }else{ String[] val ; if(tmp.contains(",")){ val = tmp.split(",")[0].split("=") ; }else{ val = tmp.split("=") ; } sensorInfo.setSensor_device_id(val[0]); sensorInfo.setPort_id(0); try{ sensorInfo.setSdata( Float.parseFloat(val[1]) ); }catch (Exception e) { continue ; } sensorInfo.setMtime(date); senorList.add(sensorInfo); } } String body = Util.json.object2Json(senorList); String url = Constants.URL.SENSORS_DATA; // 控制状态修改 HttpServiceSender.doPut(url, Config.IOT_USER_KEY, body); } } catch (Exception e) { e.printStackTrace(); } } @Override public void handbert(IoSession session) { // 判断session 读写空闲时间 ( 在每种协议中,确认服务端空闲时间,每个的空闲时间不同则行 ) if( session.containsAttribute(nodeSensorsData) ){ IotNodeInfoBO nodeBO = (IotNodeInfoBO) session.getAttribute(nodeSensorsData); // 如果上传周期配置为0以及以下,则不设置设备离线 ; if( nodeBO.getFrequency() < 1 ){ return ; } if(session.getLastReadTime() + (nodeBO.getFrequency()+30)*1000 < new Date().getTime() ){ // session 已判断为假死,关闭session session.close(true); return ; } }else{ session.close(true); } } @Override public void execServer(IoSession session, byte[] data, String msg) { } @Override public void logout(IoSession session) { IotNodeInfo iotNodeInfo = new IotNodeInfo(); iotNodeInfo.setDevice_code( session.getAttribute(Constants.DEVICECODE) +""); ; iotNodeInfo.setIot_node_status(IOT_NODE_STATUS.offline); iotNodeInfo.setLpmKey(Config.IOT_LPM_KEY); String body = Util.json.object2Json(iotNodeInfo); HttpServiceSender.doPut(Constants.URL.NODE_INFO, Config.IOT_USER_KEY, body); } }