diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/callback/CheckForAllRecordsThread.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/callback/CheckForAllRecordsThread.java new file mode 100644 index 00000000..770edf06 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/callback/CheckForAllRecordsThread.java @@ -0,0 +1,78 @@ +package com.genersoft.iot.vmp.gb28181.transmit.callback; + +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import com.genersoft.iot.vmp.gb28181.bean.RecordInfo; +import com.genersoft.iot.vmp.gb28181.bean.RecordItem; +import com.genersoft.iot.vmp.gb28181.transmit.request.impl.MessageRequestProcessor; +import com.genersoft.iot.vmp.utils.redis.RedisUtil; + +import org.slf4j.Logger; + +@SuppressWarnings("unchecked") +public class CheckForAllRecordsThread extends Thread { + + private String key; + + private RecordInfo recordInfo; + + private RedisUtil redis; + + private Logger logger; + + private DeferredResultHolder deferredResultHolder; + + public CheckForAllRecordsThread(String key, RecordInfo recordInfo) { + this.key = key; + this.recordInfo = recordInfo; + } + + public void run() { + + String cacheKey = this.key; + + for (long stop = System.nanoTime() + TimeUnit.SECONDS.toNanos(10); stop > System.nanoTime();) { + List cacheKeys = redis.scan(cacheKey + "_*"); + List totalRecordList = new ArrayList(); + for (int i = 0; i < cacheKeys.size(); i++) { + totalRecordList.addAll((List) redis.get(cacheKeys.get(i).toString())); + } + if (totalRecordList.size() < this.recordInfo.getSumNum()) { + logger.info("已获取" + totalRecordList.size() + "项录像数据,共" + this.recordInfo.getSumNum() + "项"); + } else { + logger.info("录像数据已全部获取,共" + this.recordInfo.getSumNum() + "项"); + this.recordInfo.setRecordList(totalRecordList); + for (int i = 0; i < cacheKeys.size(); i++) { + redis.del(cacheKeys.get(i).toString()); + } + break; + } + } + // 自然顺序排序, 元素进行升序排列 + this.recordInfo.getRecordList().sort(Comparator.naturalOrder()); + RequestMessage msg = new RequestMessage(); + String deviceId = recordInfo.getDeviceId(); + msg.setDeviceId(deviceId); + msg.setType(DeferredResultHolder.CALLBACK_CMD_RECORDINFO); + msg.setData(recordInfo); + deferredResultHolder.invokeResult(msg); + logger.info("处理完成,返回结果"); + MessageRequestProcessor.threadNameList.remove(cacheKey); + } + + public void setRedis(RedisUtil redis) { + this.redis = redis; + } + + public void setDeferredResultHolder(DeferredResultHolder deferredResultHolder) { + this.deferredResultHolder = deferredResultHolder; + } + + public void setLogger(Logger logger) { + this.logger = logger; + } + +} diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/MessageRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/MessageRequestProcessor.java index c865ad39..e7fbfe0d 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/MessageRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/MessageRequestProcessor.java @@ -23,6 +23,7 @@ import com.genersoft.iot.vmp.gb28181.bean.RecordInfo; import com.genersoft.iot.vmp.gb28181.bean.RecordItem; import com.genersoft.iot.vmp.gb28181.event.DeviceOffLineDetector; import com.genersoft.iot.vmp.gb28181.event.EventPublisher; +import com.genersoft.iot.vmp.gb28181.transmit.callback.CheckForAllRecordsThread; import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage; import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander; @@ -43,14 +44,17 @@ import org.dom4j.io.SAXReader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.util.StringUtils; + /** * @Description:MESSAGE请求处理器 * @author: swwheihei * @date: 2020年5月3日 下午5:32:41 */ - +@SuppressWarnings(value={"unchecked", "rawtypes"}) public class MessageRequestProcessor extends SIPRequestAbstractProcessor { + public static volatile List threadNameList = new ArrayList(); + private UserSetup userSetup = (UserSetup) SpringBeanFactory.getBean("userSetup"); private final static Logger logger = LoggerFactory.getLogger(MessageRequestProcessor.class); @@ -240,10 +244,10 @@ public class MessageRequestProcessor extends SIPRequestAbstractProcessor { try { Element rootElement = getRootElement(evt); String deviceId = XmlUtil.getText(rootElement, "DeviceID"); - String result = XmlUtil.getText(rootElement, "Result"); + //String result = XmlUtil.getText(rootElement, "Result"); // 回复200 OK responseAck(evt); - if (!XmlUtil.isEmpty(result)) { + if (rootElement.getName().equals("Response")) {//} !XmlUtil.isEmpty(result)) { // 此处是对本平台发出DeviceControl指令的应答 JSONObject json = new JSONObject(); XmlUtil.node2Json(rootElement, json); @@ -272,11 +276,10 @@ public class MessageRequestProcessor extends SIPRequestAbstractProcessor { try { Element rootElement = getRootElement(evt); String deviceId = XmlUtil.getText(rootElement, "DeviceID"); - String result = XmlUtil.getText(rootElement, "Result"); // 回复200 OK responseAck(evt); - //if (!XmlUtil.isEmpty(result)) { - // 此处是对本平台发出DeviceControl指令的应答 + if (rootElement.getName().equals("Response")) { + // 此处是对本平台发出DeviceControl指令的应答 JSONObject json = new JSONObject(); XmlUtil.node2Json(rootElement, json); if (logger.isDebugEnabled()) { @@ -287,9 +290,9 @@ public class MessageRequestProcessor extends SIPRequestAbstractProcessor { msg.setType(DeferredResultHolder.CALLBACK_CMD_DEVICECONFIG); msg.setData(json); deferredResultHolder.invokeResult(msg); - // } else { - // // 此处是上级发出的DeviceConfig指令 - //} + } else { + // 此处是上级发出的DeviceConfig指令 + } } catch (ParseException | SipException | InvalidArgumentException | DocumentException e) { e.printStackTrace(); } @@ -304,11 +307,10 @@ public class MessageRequestProcessor extends SIPRequestAbstractProcessor { try { Element rootElement = getRootElement(evt); String deviceId = XmlUtil.getText(rootElement, "DeviceID"); - String result = XmlUtil.getText(rootElement, "Result"); // 回复200 OK responseAck(evt); - //if (!XmlUtil.isEmpty(result)) { - // 此处是对本平台发出DeviceControl指令的应答 + if (rootElement.getName().equals("Response")) { + // 此处是对本平台发出DeviceControl指令的应答 JSONObject json = new JSONObject(); XmlUtil.node2Json(rootElement, json); if (logger.isDebugEnabled()) { @@ -319,9 +321,9 @@ public class MessageRequestProcessor extends SIPRequestAbstractProcessor { msg.setType(DeferredResultHolder.CALLBACK_CMD_CONFIGDOWNLOAD); msg.setData(json); deferredResultHolder.invokeResult(msg); - // } else { - // // 此处是上级发出的DeviceConfig指令 - //} + } else { + // 此处是上级发出的DeviceConfig指令 + } } catch (ParseException | SipException | InvalidArgumentException | DocumentException e) { e.printStackTrace(); } @@ -336,7 +338,6 @@ public class MessageRequestProcessor extends SIPRequestAbstractProcessor { try { Element rootElement = getRootElement(evt); String deviceId = XmlUtil.getText(rootElement, "DeviceID"); - String result = XmlUtil.getText(rootElement, "Result"); // 回复200 OK responseAck(evt); if (rootElement.getName().equals("Response")) {// !XmlUtil.isEmpty(result)) { @@ -648,8 +649,11 @@ public class MessageRequestProcessor extends SIPRequestAbstractProcessor { Element recordListElement = rootElement.element("RecordList"); if (recordListElement == null || recordInfo.getSumNum() == 0) { logger.info("无录像数据"); - // responseAck(evt); - // return; + RequestMessage msg = new RequestMessage(); + msg.setDeviceId(deviceId); + msg.setType(DeferredResultHolder.CALLBACK_CMD_RECORDINFO); + msg.setData(recordInfo); + deferredResultHolder.invokeResult(msg); } else { Iterator recordListIterator = recordListElement.elementIterator(); List recordList = new ArrayList(); @@ -679,44 +683,63 @@ public class MessageRequestProcessor extends SIPRequestAbstractProcessor { record.setRecorderId(XmlUtil.getText(itemRecord, "RecorderID")); recordList.add(record); } - // recordList.sort(Comparator.naturalOrder()); recordInfo.setRecordList(recordList); } - // 存在录像且如果当前录像明细个数小于总条数,说明拆包返回,需要组装,暂不返回 - if (recordInfo.getSumNum() > 0 && recordList.size() > 0 && recordList.size() < recordInfo.getSumNum()) { - // 为防止连续请求该设备的录像数据,返回数据错乱,特增加sn进行区分 - String cacheKey = CACHE_RECORDINFO_KEY + deviceId + sn; - - redis.set(cacheKey + "_" + uuid, recordList, 90); - List cacheKeys = redis.scan(cacheKey + "_*"); - List totalRecordList = new ArrayList(); - for (int i = 0; i < cacheKeys.size(); i++) { - totalRecordList.addAll((List) redis.get(cacheKeys.get(i).toString())); + // 改用单独线程统计已获取录像文件数量,避免多包并行分别统计不完整的问题 + String cacheKey = CACHE_RECORDINFO_KEY + deviceId + sn; + redis.set(cacheKey + "_" + uuid, recordList, 90); + if (!threadNameList.contains(cacheKey)) { + threadNameList.add(cacheKey); + CheckForAllRecordsThread chk = new CheckForAllRecordsThread(cacheKey, recordInfo); + chk.setName(cacheKey); + chk.setDeferredResultHolder(deferredResultHolder); + chk.setRedis(redis); + chk.setLogger(logger); + chk.start(); + if (logger.isDebugEnabled()) { + logger.debug("Start Thread " + cacheKey + "."); } - if (totalRecordList.size() < recordInfo.getSumNum()) { - logger.info("已获取" + totalRecordList.size() + "项录像数据,共" + recordInfo.getSumNum() + "项"); - return; - } - logger.info("录像数据已全部获取,共" + recordInfo.getSumNum() + "项"); - recordInfo.setRecordList(totalRecordList); - for (int i = 0; i < cacheKeys.size(); i++) { - redis.del(cacheKeys.get(i).toString()); + } else { + if (logger.isDebugEnabled()) { + logger.debug("Thread " + cacheKey + " already started."); } } - // 自然顺序排序, 元素进行升序排列 - recordInfo.getRecordList().sort(Comparator.naturalOrder()); + + // 存在录像且如果当前录像明细个数小于总条数,说明拆包返回,需要组装,暂不返回 + // if (recordInfo.getSumNum() > 0 && recordList.size() > 0 && recordList.size() < recordInfo.getSumNum()) { + // // 为防止连续请求该设备的录像数据,返回数据错乱,特增加sn进行区分 + // String cacheKey = CACHE_RECORDINFO_KEY + deviceId + sn; + + // redis.set(cacheKey + "_" + uuid, recordList, 90); + // List cacheKeys = redis.scan(cacheKey + "_*"); + // List totalRecordList = new ArrayList(); + // for (int i = 0; i < cacheKeys.size(); i++) { + // totalRecordList.addAll((List) redis.get(cacheKeys.get(i).toString())); + // } + // if (totalRecordList.size() < recordInfo.getSumNum()) { + // logger.info("已获取" + totalRecordList.size() + "项录像数据,共" + recordInfo.getSumNum() + "项"); + // return; + // } + // logger.info("录像数据已全部获取,共" + recordInfo.getSumNum() + "项"); + // recordInfo.setRecordList(totalRecordList); + // for (int i = 0; i < cacheKeys.size(); i++) { + // redis.del(cacheKeys.get(i).toString()); + // } + // } + // // 自然顺序排序, 元素进行升序排列 + // recordInfo.getRecordList().sort(Comparator.naturalOrder()); } // 走到这里,有以下可能:1、没有录像信息,第一次收到recordinfo的消息即返回响应数据,无redis操作 // 2、有录像数据,且第一次即收到完整数据,返回响应数据,无redis操作 // 3、有录像数据,在超时时间内收到多次包组装后数量足够,返回数据 - RequestMessage msg = new RequestMessage(); - msg.setDeviceId(deviceId); - msg.setType(DeferredResultHolder.CALLBACK_CMD_RECORDINFO); - msg.setData(recordInfo); - deferredResultHolder.invokeResult(msg); - logger.info("处理完成,返回结果"); + // RequestMessage msg = new RequestMessage(); + // msg.setDeviceId(deviceId); + // msg.setType(DeferredResultHolder.CALLBACK_CMD_RECORDINFO); + // msg.setData(recordInfo); + // deferredResultHolder.invokeResult(msg); + // logger.info("处理完成,返回结果"); } catch (DocumentException | SipException | InvalidArgumentException | ParseException e) { e.printStackTrace(); } @@ -799,4 +822,4 @@ public class MessageRequestProcessor extends SIPRequestAbstractProcessor { public void setRedisCatchStorage(IRedisCatchStorage redisCatchStorage) { this.redisCatchStorage = redisCatchStorage; } -} +} \ No newline at end of file