|
|
@ -23,6 +23,7 @@ import com.genersoft.iot.vmp.gb28181.bean.RecordInfo; |
|
|
|
import com.genersoft.iot.vmp.gb28181.bean.RecordItem; |
|
|
|
import com.genersoft.iot.vmp.gb28181.event.DeviceOffLineDetector; |
|
|
|
import com.genersoft.iot.vmp.gb28181.event.EventPublisher; |
|
|
|
import com.genersoft.iot.vmp.gb28181.transmit.callback.CheckForAllRecordsThread; |
|
|
|
import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; |
|
|
|
import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage; |
|
|
|
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander; |
|
|
@ -43,14 +44,17 @@ import org.dom4j.io.SAXReader; |
|
|
|
import org.slf4j.Logger; |
|
|
|
import org.slf4j.LoggerFactory; |
|
|
|
import org.springframework.util.StringUtils; |
|
|
|
|
|
|
|
/** |
|
|
|
* @Description:MESSAGE请求处理器 |
|
|
|
* @author: swwheihei |
|
|
|
* @date: 2020年5月3日 下午5:32:41 |
|
|
|
*/ |
|
|
|
|
|
|
|
@SuppressWarnings(value={"unchecked", "rawtypes"}) |
|
|
|
public class MessageRequestProcessor extends SIPRequestAbstractProcessor { |
|
|
|
|
|
|
|
public static volatile List<String> threadNameList = new ArrayList(); |
|
|
|
|
|
|
|
private UserSetup userSetup = (UserSetup) SpringBeanFactory.getBean("userSetup"); |
|
|
|
|
|
|
|
private final static Logger logger = LoggerFactory.getLogger(MessageRequestProcessor.class); |
|
|
@ -240,10 +244,10 @@ public class MessageRequestProcessor extends SIPRequestAbstractProcessor { |
|
|
|
try { |
|
|
|
Element rootElement = getRootElement(evt); |
|
|
|
String deviceId = XmlUtil.getText(rootElement, "DeviceID"); |
|
|
|
String result = XmlUtil.getText(rootElement, "Result"); |
|
|
|
//String result = XmlUtil.getText(rootElement, "Result");
|
|
|
|
// 回复200 OK
|
|
|
|
responseAck(evt); |
|
|
|
if (!XmlUtil.isEmpty(result)) { |
|
|
|
if (rootElement.getName().equals("Response")) {//} !XmlUtil.isEmpty(result)) {
|
|
|
|
// 此处是对本平台发出DeviceControl指令的应答
|
|
|
|
JSONObject json = new JSONObject(); |
|
|
|
XmlUtil.node2Json(rootElement, json); |
|
|
@ -272,11 +276,10 @@ public class MessageRequestProcessor extends SIPRequestAbstractProcessor { |
|
|
|
try { |
|
|
|
Element rootElement = getRootElement(evt); |
|
|
|
String deviceId = XmlUtil.getText(rootElement, "DeviceID"); |
|
|
|
String result = XmlUtil.getText(rootElement, "Result"); |
|
|
|
// 回复200 OK
|
|
|
|
responseAck(evt); |
|
|
|
//if (!XmlUtil.isEmpty(result)) {
|
|
|
|
// 此处是对本平台发出DeviceControl指令的应答
|
|
|
|
if (rootElement.getName().equals("Response")) { |
|
|
|
// 此处是对本平台发出DeviceControl指令的应答
|
|
|
|
JSONObject json = new JSONObject(); |
|
|
|
XmlUtil.node2Json(rootElement, json); |
|
|
|
if (logger.isDebugEnabled()) { |
|
|
@ -287,9 +290,9 @@ public class MessageRequestProcessor extends SIPRequestAbstractProcessor { |
|
|
|
msg.setType(DeferredResultHolder.CALLBACK_CMD_DEVICECONFIG); |
|
|
|
msg.setData(json); |
|
|
|
deferredResultHolder.invokeResult(msg); |
|
|
|
// } else {
|
|
|
|
// // 此处是上级发出的DeviceConfig指令
|
|
|
|
//}
|
|
|
|
} else { |
|
|
|
// 此处是上级发出的DeviceConfig指令
|
|
|
|
} |
|
|
|
} catch (ParseException | SipException | InvalidArgumentException | DocumentException e) { |
|
|
|
e.printStackTrace(); |
|
|
|
} |
|
|
@ -304,11 +307,10 @@ public class MessageRequestProcessor extends SIPRequestAbstractProcessor { |
|
|
|
try { |
|
|
|
Element rootElement = getRootElement(evt); |
|
|
|
String deviceId = XmlUtil.getText(rootElement, "DeviceID"); |
|
|
|
String result = XmlUtil.getText(rootElement, "Result"); |
|
|
|
// 回复200 OK
|
|
|
|
responseAck(evt); |
|
|
|
//if (!XmlUtil.isEmpty(result)) {
|
|
|
|
// 此处是对本平台发出DeviceControl指令的应答
|
|
|
|
if (rootElement.getName().equals("Response")) { |
|
|
|
// 此处是对本平台发出DeviceControl指令的应答
|
|
|
|
JSONObject json = new JSONObject(); |
|
|
|
XmlUtil.node2Json(rootElement, json); |
|
|
|
if (logger.isDebugEnabled()) { |
|
|
@ -319,9 +321,9 @@ public class MessageRequestProcessor extends SIPRequestAbstractProcessor { |
|
|
|
msg.setType(DeferredResultHolder.CALLBACK_CMD_CONFIGDOWNLOAD); |
|
|
|
msg.setData(json); |
|
|
|
deferredResultHolder.invokeResult(msg); |
|
|
|
// } else {
|
|
|
|
// // 此处是上级发出的DeviceConfig指令
|
|
|
|
//}
|
|
|
|
} else { |
|
|
|
// 此处是上级发出的DeviceConfig指令
|
|
|
|
} |
|
|
|
} catch (ParseException | SipException | InvalidArgumentException | DocumentException e) { |
|
|
|
e.printStackTrace(); |
|
|
|
} |
|
|
@ -336,7 +338,6 @@ public class MessageRequestProcessor extends SIPRequestAbstractProcessor { |
|
|
|
try { |
|
|
|
Element rootElement = getRootElement(evt); |
|
|
|
String deviceId = XmlUtil.getText(rootElement, "DeviceID"); |
|
|
|
String result = XmlUtil.getText(rootElement, "Result"); |
|
|
|
// 回复200 OK
|
|
|
|
responseAck(evt); |
|
|
|
if (rootElement.getName().equals("Response")) {// !XmlUtil.isEmpty(result)) {
|
|
|
@ -648,8 +649,11 @@ public class MessageRequestProcessor extends SIPRequestAbstractProcessor { |
|
|
|
Element recordListElement = rootElement.element("RecordList"); |
|
|
|
if (recordListElement == null || recordInfo.getSumNum() == 0) { |
|
|
|
logger.info("无录像数据"); |
|
|
|
// responseAck(evt);
|
|
|
|
// return;
|
|
|
|
RequestMessage msg = new RequestMessage(); |
|
|
|
msg.setDeviceId(deviceId); |
|
|
|
msg.setType(DeferredResultHolder.CALLBACK_CMD_RECORDINFO); |
|
|
|
msg.setData(recordInfo); |
|
|
|
deferredResultHolder.invokeResult(msg); |
|
|
|
} else { |
|
|
|
Iterator<Element> recordListIterator = recordListElement.elementIterator(); |
|
|
|
List<RecordItem> recordList = new ArrayList<RecordItem>(); |
|
|
@ -679,44 +683,63 @@ public class MessageRequestProcessor extends SIPRequestAbstractProcessor { |
|
|
|
record.setRecorderId(XmlUtil.getText(itemRecord, "RecorderID")); |
|
|
|
recordList.add(record); |
|
|
|
} |
|
|
|
// recordList.sort(Comparator.naturalOrder());
|
|
|
|
recordInfo.setRecordList(recordList); |
|
|
|
} |
|
|
|
|
|
|
|
// 存在录像且如果当前录像明细个数小于总条数,说明拆包返回,需要组装,暂不返回
|
|
|
|
if (recordInfo.getSumNum() > 0 && recordList.size() > 0 && recordList.size() < recordInfo.getSumNum()) { |
|
|
|
// 为防止连续请求该设备的录像数据,返回数据错乱,特增加sn进行区分
|
|
|
|
String cacheKey = CACHE_RECORDINFO_KEY + deviceId + sn; |
|
|
|
|
|
|
|
redis.set(cacheKey + "_" + uuid, recordList, 90); |
|
|
|
List<Object> cacheKeys = redis.scan(cacheKey + "_*"); |
|
|
|
List<RecordItem> totalRecordList = new ArrayList<RecordItem>(); |
|
|
|
for (int i = 0; i < cacheKeys.size(); i++) { |
|
|
|
totalRecordList.addAll((List<RecordItem>) redis.get(cacheKeys.get(i).toString())); |
|
|
|
// 改用单独线程统计已获取录像文件数量,避免多包并行分别统计不完整的问题
|
|
|
|
String cacheKey = CACHE_RECORDINFO_KEY + deviceId + sn; |
|
|
|
redis.set(cacheKey + "_" + uuid, recordList, 90); |
|
|
|
if (!threadNameList.contains(cacheKey)) { |
|
|
|
threadNameList.add(cacheKey); |
|
|
|
CheckForAllRecordsThread chk = new CheckForAllRecordsThread(cacheKey, recordInfo); |
|
|
|
chk.setName(cacheKey); |
|
|
|
chk.setDeferredResultHolder(deferredResultHolder); |
|
|
|
chk.setRedis(redis); |
|
|
|
chk.setLogger(logger); |
|
|
|
chk.start(); |
|
|
|
if (logger.isDebugEnabled()) { |
|
|
|
logger.debug("Start Thread " + cacheKey + "."); |
|
|
|
} |
|
|
|
if (totalRecordList.size() < recordInfo.getSumNum()) { |
|
|
|
logger.info("已获取" + totalRecordList.size() + "项录像数据,共" + recordInfo.getSumNum() + "项"); |
|
|
|
return; |
|
|
|
} |
|
|
|
logger.info("录像数据已全部获取,共" + recordInfo.getSumNum() + "项"); |
|
|
|
recordInfo.setRecordList(totalRecordList); |
|
|
|
for (int i = 0; i < cacheKeys.size(); i++) { |
|
|
|
redis.del(cacheKeys.get(i).toString()); |
|
|
|
} else { |
|
|
|
if (logger.isDebugEnabled()) { |
|
|
|
logger.debug("Thread " + cacheKey + " already started."); |
|
|
|
} |
|
|
|
} |
|
|
|
// 自然顺序排序, 元素进行升序排列
|
|
|
|
recordInfo.getRecordList().sort(Comparator.naturalOrder()); |
|
|
|
|
|
|
|
// 存在录像且如果当前录像明细个数小于总条数,说明拆包返回,需要组装,暂不返回
|
|
|
|
// if (recordInfo.getSumNum() > 0 && recordList.size() > 0 && recordList.size() < recordInfo.getSumNum()) {
|
|
|
|
// // 为防止连续请求该设备的录像数据,返回数据错乱,特增加sn进行区分
|
|
|
|
// String cacheKey = CACHE_RECORDINFO_KEY + deviceId + sn;
|
|
|
|
|
|
|
|
// redis.set(cacheKey + "_" + uuid, recordList, 90);
|
|
|
|
// List<Object> cacheKeys = redis.scan(cacheKey + "_*");
|
|
|
|
// List<RecordItem> totalRecordList = new ArrayList<RecordItem>();
|
|
|
|
// for (int i = 0; i < cacheKeys.size(); i++) {
|
|
|
|
// totalRecordList.addAll((List<RecordItem>) redis.get(cacheKeys.get(i).toString()));
|
|
|
|
// }
|
|
|
|
// if (totalRecordList.size() < recordInfo.getSumNum()) {
|
|
|
|
// logger.info("已获取" + totalRecordList.size() + "项录像数据,共" + recordInfo.getSumNum() + "项");
|
|
|
|
// return;
|
|
|
|
// }
|
|
|
|
// logger.info("录像数据已全部获取,共" + recordInfo.getSumNum() + "项");
|
|
|
|
// recordInfo.setRecordList(totalRecordList);
|
|
|
|
// for (int i = 0; i < cacheKeys.size(); i++) {
|
|
|
|
// redis.del(cacheKeys.get(i).toString());
|
|
|
|
// }
|
|
|
|
// }
|
|
|
|
// // 自然顺序排序, 元素进行升序排列
|
|
|
|
// recordInfo.getRecordList().sort(Comparator.naturalOrder());
|
|
|
|
} |
|
|
|
// 走到这里,有以下可能:1、没有录像信息,第一次收到recordinfo的消息即返回响应数据,无redis操作
|
|
|
|
// 2、有录像数据,且第一次即收到完整数据,返回响应数据,无redis操作
|
|
|
|
// 3、有录像数据,在超时时间内收到多次包组装后数量足够,返回数据
|
|
|
|
|
|
|
|
RequestMessage msg = new RequestMessage(); |
|
|
|
msg.setDeviceId(deviceId); |
|
|
|
msg.setType(DeferredResultHolder.CALLBACK_CMD_RECORDINFO); |
|
|
|
msg.setData(recordInfo); |
|
|
|
deferredResultHolder.invokeResult(msg); |
|
|
|
logger.info("处理完成,返回结果"); |
|
|
|
// RequestMessage msg = new RequestMessage();
|
|
|
|
// msg.setDeviceId(deviceId);
|
|
|
|
// msg.setType(DeferredResultHolder.CALLBACK_CMD_RECORDINFO);
|
|
|
|
// msg.setData(recordInfo);
|
|
|
|
// deferredResultHolder.invokeResult(msg);
|
|
|
|
// logger.info("处理完成,返回结果");
|
|
|
|
} catch (DocumentException | SipException | InvalidArgumentException | ParseException e) { |
|
|
|
e.printStackTrace(); |
|
|
|
} |
|
|
@ -799,4 +822,4 @@ public class MessageRequestProcessor extends SIPRequestAbstractProcessor { |
|
|
|
public void setRedisCatchStorage(IRedisCatchStorage redisCatchStorage) { |
|
|
|
this.redisCatchStorage = redisCatchStorage; |
|
|
|
} |
|
|
|
} |
|
|
|
} |