diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/CatalogData.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/CatalogData.java index 40f676e5..338f8ad5 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/CatalogData.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/CatalogData.java @@ -4,6 +4,7 @@ import java.util.Date; import java.util.List; public class CatalogData { + private int sn; // 命令序列号 private int total; private List channelList; private Date lastTime; @@ -15,6 +16,15 @@ public class CatalogData { } private CatalogDataStatus status; + + public int getSn() { + return sn; + } + + public void setSn(int sn) { + this.sn = sn; + } + public int getTotal() { return total; } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/online/OnlineEventListener.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/online/OnlineEventListener.java index e454d490..b97457a2 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/online/OnlineEventListener.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/event/online/OnlineEventListener.java @@ -54,6 +54,7 @@ public class OnlineEventListener implements ApplicationListener { @Autowired private SIPCommander cmder; + private SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); @Override @@ -76,7 +77,7 @@ public class OnlineEventListener implements ApplicationListener { if (deviceInStore == null) { //第一次上线 logger.info("[{}] 首次注册,查询设备信息以及通道信息", device.getDeviceId()); cmder.deviceInfoQuery(device); - cmder.catalogQuery(device, null); + deviceService.sync(device); } break; // 设备主动发送心跳触发的在线事件 diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/session/CatalogDataCatch.java b/src/main/java/com/genersoft/iot/vmp/gb28181/session/CatalogDataCatch.java index fbc2a323..c3de8a29 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/session/CatalogDataCatch.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/session/CatalogDataCatch.java @@ -26,28 +26,35 @@ public class CatalogDataCatch { @Autowired private IVideoManagerStorage storager; - public void addReady(String key) { - CatalogData catalogData = data.get(key); + public void addReady(Device device, int sn ) { + CatalogData catalogData = data.get(device.getDeviceId()); if (catalogData == null || catalogData.getStatus().equals(CatalogData.CatalogDataStatus.end)) { catalogData = new CatalogData(); catalogData.setChannelList(new ArrayList<>()); + catalogData.setDevice(device); + catalogData.setSn(sn); catalogData.setStatus(CatalogData.CatalogDataStatus.ready); catalogData.setLastTime(new Date(System.currentTimeMillis())); - data.put(key, catalogData); + data.put(device.getDeviceId(), catalogData); } } - public void put(String key, int total, Device device, List deviceChannelList) { - CatalogData catalogData = data.get(key); + public void put(String deviceId, int sn, int total, Device device, List deviceChannelList) { + CatalogData catalogData = data.get(deviceId); if (catalogData == null) { catalogData = new CatalogData(); + catalogData.setSn(sn); catalogData.setTotal(total); catalogData.setDevice(device); catalogData.setChannelList(new ArrayList<>()); catalogData.setStatus(CatalogData.CatalogDataStatus.runIng); catalogData.setLastTime(new Date(System.currentTimeMillis())); - data.put(key, catalogData); + data.put(deviceId, catalogData); }else { + // 同一个设备的通道同步请求只考虑一个,其他的直接忽略 + if (catalogData.getSn() != sn) { + return; + } catalogData.setTotal(total); catalogData.setDevice(device); catalogData.setStatus(CatalogData.CatalogDataStatus.runIng); @@ -56,20 +63,20 @@ public class CatalogDataCatch { } } - public List get(String key) { - CatalogData catalogData = data.get(key); + public List get(String deviceId) { + CatalogData catalogData = data.get(deviceId); if (catalogData == null) return null; return catalogData.getChannelList(); } - public int getTotal(String key) { - CatalogData catalogData = data.get(key); + public int getTotal(String deviceId) { + CatalogData catalogData = data.get(deviceId); if (catalogData == null) return 0; return catalogData.getTotal(); } - public SyncStatus getSyncStatus(String key) { - CatalogData catalogData = data.get(key); + public SyncStatus getSyncStatus(String deviceId) { + CatalogData catalogData = data.get(deviceId); if (catalogData == null) return null; SyncStatus syncStatus = new SyncStatus(); syncStatus.setCurrent(catalogData.getChannelList().size()); @@ -78,10 +85,6 @@ public class CatalogDataCatch { return syncStatus; } - public void del(String key) { - data.remove(key); - } - @Scheduled(fixedRate = 5 * 1000) //每5秒执行一次, 发现数据5秒未更新则移除数据并认为数据接收超时 private void timerTask(){ Set keys = data.keySet(); @@ -92,23 +95,30 @@ public class CatalogDataCatch { Calendar calendarBefore30S = Calendar.getInstance(); calendarBefore30S.setTime(new Date()); calendarBefore30S.set(Calendar.SECOND, calendarBefore30S.get(Calendar.SECOND) - 30); - for (String key : keys) { - CatalogData catalogData = data.get(key); - if (catalogData.getLastTime().before(calendarBefore5S.getTime())) { // 超过五秒收不到消息任务超时, 只更新这一部分数据 - storager.resetChannels(catalogData.getDevice().getDeviceId(), catalogData.getChannelList()); - String errorMsg = "更新成功,共" + catalogData.getTotal() + "条,已更新" + catalogData.getChannelList().size() + "条"; + for (String deviceId : keys) { + CatalogData catalogData = data.get(deviceId); + if ( catalogData.getLastTime().before(calendarBefore5S.getTime())) { // 超过五秒收不到消息任务超时, 只更新这一部分数据 + if (catalogData.getStatus().equals(CatalogData.CatalogDataStatus.runIng)) { + storager.resetChannels(catalogData.getDevice().getDeviceId(), catalogData.getChannelList()); + if (catalogData.getTotal() != catalogData.getChannelList().size()) { + String errorMsg = "更新成功,共" + catalogData.getTotal() + "条,已更新" + catalogData.getChannelList().size() + "条"; + catalogData.setErrorMsg(errorMsg); + } + }else if (catalogData.getStatus().equals(CatalogData.CatalogDataStatus.ready)) { + String errorMsg = "同步失败,等待回复超时"; + catalogData.setErrorMsg(errorMsg); + } catalogData.setStatus(CatalogData.CatalogDataStatus.end); - catalogData.setErrorMsg(errorMsg); } - if (catalogData.getLastTime().before(calendarBefore30S.getTime())) { // 超过三十秒,如果标记为end则删除 - data.remove(key); + if (catalogData.getStatus().equals(CatalogData.CatalogDataStatus.end) && catalogData.getLastTime().before(calendarBefore30S.getTime())) { // 超过三十秒,如果标记为end则删除 + data.remove(deviceId); } } } - public void setChannelSyncEnd(String key, String errorMsg) { - CatalogData catalogData = data.get(key); + public void setChannelSyncEnd(String deviceId, String errorMsg) { + CatalogData catalogData = data.get(deviceId); if (catalogData == null)return; catalogData.setStatus(CatalogData.CatalogDataStatus.end); catalogData.setErrorMsg(errorMsg); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommander.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommander.java index 1de03bd7..aea37b6c 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommander.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommander.java @@ -250,7 +250,7 @@ public interface ISIPCommander { * * @param device 视频设备 */ - boolean catalogQuery(Device device, SipSubscribe.Event errorEvent); + boolean catalogQuery(Device device, int sn, SipSubscribe.Event errorEvent); /** * 查询录像信息 diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java index 027238b7..89e70d0a 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java @@ -1208,14 +1208,14 @@ public class SIPCommander implements ISIPCommander { * @param device 视频设备 */ @Override - public boolean catalogQuery(Device device, SipSubscribe.Event errorEvent) { + public boolean catalogQuery(Device device, int sn, SipSubscribe.Event errorEvent) { try { StringBuffer catalogXml = new StringBuffer(200); String charset = device.getCharset(); catalogXml.append("\r\n"); catalogXml.append("\r\n"); catalogXml.append("Catalog\r\n"); - catalogXml.append("" + (int)((Math.random()*9+1)*100000) + "\r\n"); + catalogXml.append("" + sn + "\r\n"); catalogXml.append("" + device.getDeviceId() + "\r\n"); catalogXml.append("\r\n"); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/CatalogResponseMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/CatalogResponseMessageHandler.java index dbc25fc6..2ec80476 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/CatalogResponseMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/CatalogResponseMessageHandler.java @@ -86,23 +86,17 @@ public class CatalogResponseMessageHandler extends SIPRequestProcessorParent imp rootElement = getRootElement(evt, device.getCharset()); Element deviceListElement = rootElement.element("DeviceList"); Element sumNumElement = rootElement.element("SumNum"); - if (sumNumElement == null || deviceListElement == null) { + Element snElement = rootElement.element("SN"); + if (snElement == null || sumNumElement == null || deviceListElement == null) { responseAck(evt, Response.BAD_REQUEST, "xml error"); return; } int sumNum = Integer.parseInt(sumNumElement.getText()); + if (sumNum == 0) { // 数据已经完整接收 storager.cleanChannelsForDevice(device.getDeviceId()); - RequestMessage msg = new RequestMessage(); - msg.setKey(key); - WVPResult result = new WVPResult<>(); - result.setCode(0); - result.setData(device); - msg.setData(result); - result.setMsg("更新成功,共0条"); - deferredResultHolder.invokeAllResult(msg); - catalogDataCatch.del(key); + catalogDataCatch.setChannelSyncEnd(device.getDeviceId(), null); }else { Iterator deviceListIterator = deviceListElement.elementIterator(); if (deviceListIterator != null) { @@ -123,24 +117,18 @@ public class CatalogResponseMessageHandler extends SIPRequestProcessorParent imp channelList.add(deviceChannel); } + int sn = Integer.parseInt(snElement.getText()); logger.info("收到来自设备【{}】的通道: {}个,{}/{}", device.getDeviceId(), channelList.size(), catalogDataCatch.get(key) == null ? 0 :catalogDataCatch.get(key).size(), sumNum); - catalogDataCatch.put(key, sumNum, device, channelList); - if (catalogDataCatch.get(key).size() == sumNum) { + catalogDataCatch.put(device.getDeviceId(), sn, sumNum, device, channelList); + if (catalogDataCatch.get(device.getDeviceId()).size() == sumNum) { // 数据已经完整接收 - boolean resetChannelsResult = storager.resetChannels(device.getDeviceId(), catalogDataCatch.get(key)); - RequestMessage msg = new RequestMessage(); - msg.setKey(key); - WVPResult result = new WVPResult<>(); - result.setCode(0); - result.setData(device); - if (resetChannelsResult || sumNum ==0) { - result.setMsg("更新成功,共" + sumNum + "条,已更新" + catalogDataCatch.get(key).size() + "条"); + boolean resetChannelsResult = storager.resetChannels(device.getDeviceId(), catalogDataCatch.get(device.getDeviceId())); + if (!resetChannelsResult) { + String errorMsg = "接收成功,写入失败,共" + sumNum + "条,已接收" + catalogDataCatch.get(device.getDeviceId()).size() + "条"; + catalogDataCatch.setChannelSyncEnd(device.getDeviceId(), errorMsg); }else { - result.setMsg("接收成功,写入失败,共" + sumNum + "条,已接收" + catalogDataCatch.get(key).size() + "条"); + catalogDataCatch.setChannelSyncEnd(device.getDeviceId(), null); } - msg.setData(result); - deferredResultHolder.invokeAllResult(msg); - catalogDataCatch.del(key); } } // 回复200 OK @@ -228,21 +216,18 @@ public class CatalogResponseMessageHandler extends SIPRequestProcessorParent imp } public SyncStatus getChannelSyncProgress(String deviceId) { - String key = DeferredResultHolder.CALLBACK_CMD_CATALOG + deviceId; - if (catalogDataCatch.get(key) == null) { + if (catalogDataCatch.get(deviceId) == null) { return null; }else { - return catalogDataCatch.getSyncStatus(key); + return catalogDataCatch.getSyncStatus(deviceId); } } - public void setChannelSyncReady(String deviceId) { - String key = DeferredResultHolder.CALLBACK_CMD_CATALOG + deviceId; - catalogDataCatch.addReady(key); + public void setChannelSyncReady(Device device, int sn) { + catalogDataCatch.addReady(device, sn); } public void setChannelSyncEnd(String deviceId, String errorMsg) { - String key = DeferredResultHolder.CALLBACK_CMD_CATALOG + deviceId; - catalogDataCatch.setChannelSyncEnd(key, errorMsg); + catalogDataCatch.setChannelSyncEnd(deviceId, errorMsg); } } diff --git a/src/main/java/com/genersoft/iot/vmp/service/IDeviceService.java b/src/main/java/com/genersoft/iot/vmp/service/IDeviceService.java index 17cf7f42..08ccfffc 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/IDeviceService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/IDeviceService.java @@ -44,15 +44,8 @@ public interface IDeviceService { SyncStatus getChannelSyncStatus(String deviceId); /** - * 设置通道同步状态 - * @param deviceId 设备ID - */ - void setChannelSyncReady(String deviceId); - - /** - * 设置同步结束 - * @param deviceId 设备ID - * @param errorMsg 错误信息 + * 通道同步 + * @param device */ - void setChannelSyncEnd(String deviceId, String errorMsg); + void sync(Device device); } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java index d3432865..f36b3aed 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java @@ -100,12 +100,16 @@ public class DeviceServiceImpl implements IDeviceService { } @Override - public void setChannelSyncReady(String deviceId) { - catalogResponseMessageHandler.setChannelSyncReady(deviceId); - } - - @Override - public void setChannelSyncEnd(String deviceId, String errorMsg) { - catalogResponseMessageHandler.setChannelSyncEnd(deviceId, errorMsg); + public void sync(Device device) { + if (catalogResponseMessageHandler.getChannelSyncProgress(device.getDeviceId()) != null) { + logger.info("开启同步时发现同步已经存在"); + return; + } + int sn = (int)((Math.random()*9+1)*100000); + catalogResponseMessageHandler.setChannelSyncReady(device, sn); + sipCommander.catalogQuery(device, sn, event -> { + String errorMsg = String.format("同步通道失败,错误码: %s, %s", event.statusCode, event.msg); + catalogResponseMessageHandler.setChannelSyncEnd(device.getDeviceId(), errorMsg); + }); } } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStorageImpl.java b/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStorageImpl.java index 20e56d95..a3c5c6cf 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStorageImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStorageImpl.java @@ -238,12 +238,15 @@ public class VideoManagerStorageImpl implements IVideoManagerStorage { @Override public boolean resetChannels(String deviceId, List deviceChannelList) { + if (deviceChannelList == null) { + return false; + } TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition); // 数据去重 List channels = new ArrayList<>(); StringBuilder stringBuilder = new StringBuilder(); Map subContMap = new HashMap<>(); - if (deviceChannelList.size() > 1) { + if (deviceChannelList != null && deviceChannelList.size() > 1) { // 数据去重 Set gbIdSet = new HashSet<>(); for (DeviceChannel deviceChannel : deviceChannelList) { @@ -300,6 +303,7 @@ public class VideoManagerStorageImpl implements IVideoManagerStorage { dataSourceTransactionManager.commit(transactionStatus); //手动提交 return true; }catch (Exception e) { + e.printStackTrace(); dataSourceTransactionManager.rollback(transactionStatus); return false; } @@ -415,10 +419,9 @@ public class VideoManagerStorageImpl implements IVideoManagerStorage { TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition); boolean result = false; try { - if (platformChannelMapper.delChannelForDeviceId(deviceId) <0 // 删除与国标平台的关联 - || deviceChannelMapper.cleanChannelsByDeviceId(deviceId) < 0 // 删除他的通道 - || deviceMapper.del(deviceId) < 0 // 移除设备信息 - ) { + platformChannelMapper.delChannelForDeviceId(deviceId); + deviceChannelMapper.cleanChannelsByDeviceId(deviceId); + if ( deviceMapper.del(deviceId) < 0 ) { //事务回滚 dataSourceTransactionManager.rollback(transactionStatus); } diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/device/DeviceQuery.java b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/device/DeviceQuery.java index 12136f47..cbcb4ff5 100644 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/device/DeviceQuery.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/device/DeviceQuery.java @@ -172,12 +172,8 @@ public class DeviceQuery { wvpResult.setData(syncStatus); return wvpResult; } - SyncStatus syncStatusReady = new SyncStatus(); - deviceService.setChannelSyncReady(deviceId); - cmder.catalogQuery(device, event -> { - String errorMsg = String.format("同步通道失败,错误码: %s, %s", event.statusCode, event.msg); - deviceService.setChannelSyncEnd(deviceId, errorMsg); - }); + deviceService.sync(device); + WVPResult wvpResult = new WVPResult<>(); wvpResult.setCode(0); wvpResult.setMsg("开始同步"); diff --git a/web_src/src/components/dialog/SyncChannelProgress.vue b/web_src/src/components/dialog/SyncChannelProgress.vue index 1ec16f45..246f1ae7 100644 --- a/web_src/src/components/dialog/SyncChannelProgress.vue +++ b/web_src/src/components/dialog/SyncChannelProgress.vue @@ -61,23 +61,36 @@ export default { if (!this.syncFlag) { this.syncFlag = true; } - if (res.data.data == null) { - this.syncStatus = "success" - this.percentage = 100; - this.msg = '同步成功'; - }else if (res.data.data.total == 0){ - this.msg = `等待同步中`; - this.timmer = setTimeout(this.getProgress, 300) - }else if (res.data.data.errorMsg !== null ){ - this.msg = res.data.data.errorMsg; - this.syncStatus = "exception" - }else { - this.total = res.data.data.total; - this.current = res.data.data.current; - this.percentage = Math.floor(Number(res.data.data.current)/Number(res.data.data.total)* 10000)/100; - this.msg = `同步中...[${res.data.data.current}/${res.data.data.total}]`; - this.timmer = setTimeout(this.getProgress, 300) + + if (res.data.data != null) { + if (res.data.data.total == 0) { + if (res.data.data.errorMsg !== null ){ + this.msg = res.data.data.errorMsg; + this.syncStatus = "exception" + }else { + this.msg = `等待同步中`; + this.timmer = setTimeout(this.getProgress, 300) + } + }else { + if (res.data.data.total == res.data.data.current) { + this.syncStatus = "success" + this.percentage = 100; + this.msg = '同步成功'; + }else { + if (res.data.data.errorMsg !== null ){ + this.msg = res.data.data.errorMsg; + this.syncStatus = "exception" + }else { + this.total = res.data.data.total; + this.current = res.data.data.current; + this.percentage = Math.floor(Number(res.data.data.current)/Number(res.data.data.total)* 10000)/100; + this.msg = `同步中...[${res.data.data.current}/${res.data.data.total}]`; + this.timmer = setTimeout(this.getProgress, 300) + } + } + } } + }else { if (this.syncFlag) { this.syncStatus = "success"