物联网平台 +Web 组态
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

306 lines
9.6 KiB

package com.lpro.iot.protocal.impl;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.mina.core.session.IoSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.alibaba.fastjson.JSON;
import com.lpro.iot.bean.IotNodeInfo;
import com.lpro.iot.bean.IotNodeInfoBO;
import com.lpro.iot.bean.IotSensorInfo;
import com.lpro.iot.bean.IotSensorInfoBO;
import com.lpro.iot.bean.ReturnNodeInfoObj;
import com.lpro.iot.common.Cache;
import com.lpro.iot.common.Code;
import com.lpro.iot.common.Code.IOT_NODE_STATUS;
import com.lpro.iot.common.Config;
import com.lpro.iot.common.Constants;
import com.lpro.iot.mqtt.MqttService;
import com.lpro.iot.protocal.Iprotocal;
import com.lpro.iot.server.MainServer;
import com.lpro.iot.utils.HttpServiceSender;
import com.lpro.iot.utils.ObjectUtil;
import com.lpro.iot.utils.Util;
/**
*
* @author chenrj
* 无锡咏为角度传感器
*
*/
public class ProtocalYw implements Iprotocal {
public final static Logger LOGGER = (Logger) LoggerFactory.getLogger(ProtocalYw.class);
public static void main(String[] args) {
// ProtocalYw yw = new ProtocalYw() ;
// byte[] data = new byte[]{ 0x77,0x13 ,0x00 ,0x00 ,0x00 ,0x01 ,(byte) 0x84 ,0x00 ,0x00 ,
// 0x21 ,(byte) 0x83 ,0x10 ,0x01 ,0x09 ,0x27 ,0x00 ,0x10 ,0x64 ,0x09 ,(byte) 0xFA };
// yw.analysisData(null, data, "");
String address = "1" ;
byte[] command = {0x77,0x07} ;
command = Util.protocal.append(command,
Util.protocal.octInt2ByteArray( Integer.parseInt(address) , 4) );
command = Util.protocal.append(command,
new byte[]{0x04} );
byte a = 0 ;
for(int i=1;i< command.length ;i++){
a += command[i] ;
}
command = Util.protocal.append(command, new byte[]{ a } );
System.out.println( command );
}
/**
* 登录命令解析并处理
* @param session
* @param data
* @param msg
*/
@Override
public void loginProtocal(IoSession session,byte[] data,String msg, ReturnNodeInfoObj obj){
// 解析登录命令
String deviceCode = null ;
deviceCode = msg ;
if(obj == null){
// 这边对IMing协议的登录串数据不能超过个字节
if(msg.length() >60){
return ;
}
obj = Util.NodeDevice.login(deviceCode);
}
if(obj.getStatus() == Code.ResponseCode.OK+0 ){
// 将devicecode放入session
session.setAttribute(Constants.DEVICE_CODE, deviceCode);
session.setAttribute(Constants.NODE_INFO , obj.getData()) ;
// session 数据缓存
session.setAttribute(Constants.DATA_CACHE , "" );
// 添加协议类型
session.setAttribute(Constants.PROTOCOL_CATEGORY, this.getClass().getSimpleName() );
// 这边判断设备是否有session连接存在着
if( Cache.nodeIdsessionMap.containsKey(deviceCode) ){
// 如果之前存在,则删除sesion的属性,并关闭会话
Cache.nodeIdsessionMap.get(deviceCode).removeAttribute(Constants.DEVICE_CODE);
Cache.nodeIdsessionMap.get(deviceCode).close(true) ;
}
Cache.nodeIdsessionMap.put(deviceCode , session) ;
Thread t = new Thread(new TimerTaskThread(obj.getData())) ;
t.start();
session.setAttribute(Constants.THREAD_HASH_CODE, t.hashCode()) ;
}
}
public class TimerTaskThread implements Runnable{
private IotNodeInfoBO nodeInfo;
public TimerTaskThread(IotNodeInfoBO nodeInfo) {
super();
this.nodeInfo = nodeInfo;
}
@Override
public void run() {
while(true){
try{
if(ObjectUtil.isNotEmpty(nodeInfo)){
Integer period = nodeInfo.getFrequency() ;
if(period !=null && period < 2){
period = 2 ;
}
Thread.sleep(period * 1000);
//
IoSession session = Cache.nodeIdsessionMap.get(this.nodeInfo.getDevice_code()) ;
if(ObjectUtil.isEmpty(session)){
return ;
}
// 判断线程是否一致
Integer hashCode = (Integer) session.getAttribute(Constants.THREAD_HASH_CODE) ;
if(hashCode+0 != this.hashCode()){
return ;
}
List<IotSensorInfoBO> sensorList = ((IotNodeInfoBO) session.getAttribute(Constants.NODE_INFO)).getIotSensorList() ;
Set<String> addressSet = new HashSet<>();
for(IotSensorInfoBO tmp : sensorList ){
// 轮训
if(addressSet.contains(tmp.getSensor_device_id())){
continue ;
}else{
addressSet.add(tmp.getSensor_device_id()) ;
}
String address = tmp.getSensor_device_id() ;
byte[] command = {0x77,0x07} ;
command = Util.protocal.append(command,
Util.protocal.octInt2ByteArray( Integer.parseInt(address) , 4) );
command = Util.protocal.append(command,
new byte[]{0x04} );
byte a = 0 ;
for(int i=1;i< command.length ;i++){
a += command[i] ;
}
command = Util.protocal.append(command, new byte[]{ a } );
Util.mina.write(session, command);
}
}else{
return ;
}
}catch(Exception e){
}
}
}
}
/**
* 数据解析
* @param session
* @param data
* @param msg
*/
@Override
public void analysisData(IoSession session, byte[] data, String msg) {
// node id
Object deviceCode = session.getAttribute(Constants.DEVICE_CODE);
// 窗口调试,返回
if(Cache.deviceMap.containsKey(deviceCode)){
Map<String,String> map =new HashMap<>();
map.put("askii", msg) ;
map.put("byte", Util.binhexoct.bytesToHex(data)) ;
MqttService.pubMessage(JSON.toJSONString(map), "/sys/debug/" + deviceCode );
}
LOGGER.info("Lora YW 数据=" + msg );
//
try {
if( 0x77 == Util.protocal.getOtcFromByte(data[0]) ) {
int lenght = Util.protocal.getOtcFromByte(data[1] ) ;
if( lenght + 1 == data.length || data.length % (lenght+1)==0 ){
String address = Util.protocal.getOctFromHexBytes(data, 2, 5) ;
// x轴值
int i = 7 ;
float x = ( data[i]==0x0?1:-1) * Integer.parseInt( Util.protocal.getHexFromByte(data[i+1]) +""+
Util.protocal.getHexFromByte(data[i+2]) + Util.protocal.getHexFromByte(data[i+3]) ) /10000.0f ;
// y轴
i = 11 ;
float y = ( data[i]==0x0?1:-1) * Integer.parseInt( Util.protocal.getHexFromByte(data[i+1])+""+
Util.protocal.getHexFromByte(data[i+2])+Util.protocal.getHexFromByte(data[i+3]) )/10000.0f ;
// 温度
i = 15 ;
float tempure = ( data[i]==0x0?1:-1) * Integer.parseInt( Util.protocal.getHexFromByte(data[i+1])+""+
Util.protocal.getHexFromByte(data[i+2]) )/100.0f ;
float battery = Integer.parseInt(Util.protocal.getHexFromByte(data[18]) ) + 0f;
List<IotSensorInfo> list = new ArrayList<>();
list.add(new IotSensorInfo(address, 0, x, deviceCode.toString())) ;
list.add(new IotSensorInfo(address, 1, y, deviceCode.toString())) ;
list.add(new IotSensorInfo(address, 2, tempure, deviceCode.toString())) ;
list.add(new IotSensorInfo(address, 3, battery, deviceCode.toString())) ;
// 上传
// 放入到线程中走
MainServer.threadPool.execute(new Runnable() {
@Override
public void run() {
String body = Util.json.object2Json(list);
String url = Constants.URL.SENSORS_DATA;
// 控制状态修改
HttpServiceSender.doPut(url, Config.IOT_USER_KEY, body);
}
});
}else{
LOGGER.info("Lora YW 数据不完整,length=" + lenght+ ", datalength=" + data.length );
}
}
}catch(Exception e) {
}
}
/**
* 心跳包
*/
@Override
public void handbert(IoSession session){
try{
// 判断session 读写空闲时间 ( 在每种协议中,确认服务端空闲时间,每个的空闲时间不同则行 )
if( session.containsAttribute(Constants.NODE_INFO) ){
IotNodeInfoBO nodeBO = (IotNodeInfoBO) session.getAttribute(Constants.NODE_INFO);
// 如果上传周期配置为0以及以下,则不设置设备离线 ;
if( nodeBO.getFrequency() <1 ){
return ;
}
if(session.getLastReadTime() + (nodeBO.getFrequency()+60)*1000 < new Date().getTime() ){
// session 已判断为假死,关闭session
session.close(true);
return ;
}
}else{
session.close(true);
}
}catch(Exception e){
session.close(true);
}
}
/**
* 动作执行
* @param session
* @param data
* @param msg
* IOT_SERVER_LPM:TYPE,nodeId,SENSOR_DEVICE_ID,PORT_ID,DATA,FORMULATE
*/
@Override
public void execServer(IoSession session, byte[] data, String msg) {
// IOT_SERVER_LPM:control,device_code,sensor_device_id, subtype
String subStr = msg.split(":")[1];
String[] commands = subStr.split(",");
String type = commands[0];
String nodeDeviceCode = commands[1];
String address = commands[2];
String subType = commands[3] ; // 子类型
byte btype = Byte.parseByte(subType) ;
// 设置绝对零点
byte[] command = { 0x77 , 0x08 } ;
command = Util.protocal.append(command,
Util.protocal.octInt2ByteArray( Integer.parseInt(address) , 4) );
command = Util.protocal.append(command,
new byte[]{0x05, btype } );
byte a = 0 ;
for(int i=1;i< command.length ;i++){
a += command[i] ;
}
command = Util.protocal.append(command, new byte[]{ a } );
Util.mina.write(Cache.nodeIdsessionMap.get(nodeDeviceCode), command);
}
@Override
public boolean match(IoSession session, byte[] data, String msg) {
return false ;
}
@Override
public void logout(IoSession session) {
IotNodeInfo iotNodeInfo = new IotNodeInfo();
iotNodeInfo.setDevice_code( session.getAttribute(Constants.DEVICE_CODE) +""); ;
iotNodeInfo.setIot_node_status(IOT_NODE_STATUS.offline);
iotNodeInfo.setLpmKey(Config.IOT_LPM_KEY);
String body = Util.json.object2Json(iotNodeInfo);
HttpServiceSender.doPut(Constants.URL.NODE_INFO, Config.IOT_USER_KEY, body);
}
}