diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/CatalogData.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/CatalogData.java new file mode 100644 index 00000000..c74fb87b --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/CatalogData.java @@ -0,0 +1,43 @@ +package com.genersoft.iot.vmp.gb28181.bean; + +import java.util.Date; +import java.util.List; + +public class CatalogData { + private int total; + private List channelList; + private Date lastTime; + private Device device; + + public int getTotal() { + return total; + } + + public void setTotal(int total) { + this.total = total; + } + + public List getChannelList() { + return channelList; + } + + public void setChannelList(List channelList) { + this.channelList = channelList; + } + + public Date getLastTime() { + return lastTime; + } + + public void setLastTime(Date lastTime) { + this.lastTime = lastTime; + } + + public Device getDevice() { + return device; + } + + public void setDevice(Device device) { + this.device = device; + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/session/CatalogDataCatch.java b/src/main/java/com/genersoft/iot/vmp/gb28181/session/CatalogDataCatch.java new file mode 100644 index 00000000..e3b39749 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/session/CatalogDataCatch.java @@ -0,0 +1,74 @@ +package com.genersoft.iot.vmp.gb28181.session; + +import com.genersoft.iot.vmp.gb28181.bean.CatalogData; +import com.genersoft.iot.vmp.gb28181.bean.Device; +import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; +import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; +import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage; +import com.genersoft.iot.vmp.storager.IVideoManagerStorager; +import com.genersoft.iot.vmp.vmanager.bean.WVPResult; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; + +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; + +@Component +public class CatalogDataCatch { + + public static Map data = new ConcurrentHashMap<>(); + + @Autowired + private DeferredResultHolder deferredResultHolder; + + @Autowired + private IVideoManagerStorager storager; + + public void put(String key, int total, Device device, List deviceChannelList) { + CatalogData catalogData = data.get(key); + if (catalogData == null) { + catalogData = new CatalogData(); + catalogData.setTotal(total); + catalogData.setDevice(device); + catalogData.setChannelList(new ArrayList<>()); + data.put(key, catalogData); + } + catalogData.getChannelList().addAll(deviceChannelList); + catalogData.setLastTime(new Date(System.currentTimeMillis())); + } + + public List get(String key) { + CatalogData catalogData = data.get(key); + if (catalogData == null) return null; + return catalogData.getChannelList(); + } + + public void del(String key) { + data.remove(key); + } + + @Scheduled(fixedRate = 5 * 1000) //每5秒执行一次, 发现数据5秒未更新则移除数据并认为数据接收超时 + private void timerTask(){ + Set keys = data.keySet(); + Calendar calendar = Calendar.getInstance(); + calendar.setTime(new Date()); + calendar.set(Calendar.SECOND, calendar.get(Calendar.SECOND) - 5); + for (String key : keys) { + CatalogData catalogData = data.get(key); + if (catalogData.getLastTime().before(calendar.getTime())) { + + storager.resetChannels(catalogData.getDevice().getDeviceId(), catalogData.getChannelList()); + RequestMessage msg = new RequestMessage(); + msg.setKey(key); + WVPResult result = new WVPResult<>(); + result.setCode(0); + result.setMsg("更新成功,共" + catalogData.getTotal() + "条,已更新" + catalogData.getChannelList().size() + "条"); + result.setData(catalogData.getDevice()); + msg.setData(result); + deferredResultHolder.invokeAllResult(msg); + data.remove(key); + } + } + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/CatalogResponseMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/CatalogResponseMessageHandler.java index 5fcc3ada..f8da94b5 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/CatalogResponseMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/CatalogResponseMessageHandler.java @@ -7,6 +7,7 @@ import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; import com.genersoft.iot.vmp.gb28181.event.DeviceOffLineDetector; import com.genersoft.iot.vmp.gb28181.event.EventPublisher; +import com.genersoft.iot.vmp.gb28181.session.CatalogDataCatch; import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage; import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; @@ -14,6 +15,7 @@ import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.IMessag import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.ResponseMessageHandler; import com.genersoft.iot.vmp.gb28181.utils.NumericUtil; import com.genersoft.iot.vmp.storager.IVideoManagerStorager; +import com.genersoft.iot.vmp.vmanager.bean.WVPResult; import org.dom4j.DocumentException; import org.dom4j.Element; import org.slf4j.Logger; @@ -27,7 +29,9 @@ import javax.sip.RequestEvent; import javax.sip.SipException; import javax.sip.message.Response; import java.text.ParseException; +import java.text.SimpleDateFormat; import java.util.ArrayList; +import java.util.Date; import java.util.Iterator; import java.util.List; @@ -39,6 +43,8 @@ public class CatalogResponseMessageHandler extends SIPRequestProcessorParent imp private Logger logger = LoggerFactory.getLogger(CatalogResponseMessageHandler.class); private final String cmdType = "Catalog"; + private SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + @Autowired private ResponseMessageHandler responseMessageHandler; @@ -48,6 +54,9 @@ public class CatalogResponseMessageHandler extends SIPRequestProcessorParent imp @Autowired private DeferredResultHolder deferredResultHolder; + @Autowired + private CatalogDataCatch catalogDataCatch; + @Autowired private DeviceOffLineDetector offLineDetector; @@ -69,6 +78,12 @@ public class CatalogResponseMessageHandler extends SIPRequestProcessorParent imp try { rootElement = getRootElement(evt, device.getCharset()); Element deviceListElement = rootElement.element("DeviceList"); + Element sumNumElement = rootElement.element("SumNum"); + if (sumNumElement == null || deviceListElement == null) { + responseAck(evt, Response.BAD_REQUEST, "xml error"); + return; + } + int sumNum = Integer.parseInt(sumNumElement.getText()); Iterator deviceListIterator = deviceListElement.elementIterator(); if (deviceListIterator != null) { List channelList = new ArrayList<>(); @@ -86,6 +101,10 @@ public class CatalogResponseMessageHandler extends SIPRequestProcessorParent imp String status = statusElement != null ? statusElement.getText().toString() : "ON"; DeviceChannel deviceChannel = new DeviceChannel(); deviceChannel.setName(channelName); + deviceChannel.setDeviceId(device.getDeviceId()); + String now = this.format.format(new Date(System.currentTimeMillis())); + deviceChannel.setCreateTime(now); + deviceChannel.setUpdateTime(now); deviceChannel.setChannelId(channelDeviceId); // ONLINE OFFLINE HIKVISION DS-7716N-E4 NVR的兼容性处理 if (status.equals("ON") || status.equals("On") || status.equals("ONLINE")) { @@ -153,14 +172,28 @@ public class CatalogResponseMessageHandler extends SIPRequestProcessorParent imp deviceChannel.setPTZType(Integer.parseInt(getText(itemDevice, "PTZType"))); } deviceChannel.setHasAudio(true); // 默认含有音频,播放时再检查是否有音频及是否AAC - // TODO 修改为批量插入 channelList.add(deviceChannel); } - storager.updateChannels(device.getDeviceId(), channelList); - RequestMessage msg = new RequestMessage(); - msg.setKey(key); - msg.setData(device); - deferredResultHolder.invokeAllResult(msg); + + catalogDataCatch.put(key, sumNum, device, channelList); + if (catalogDataCatch.get(key).size() == sumNum) { + // 数据已经完整接收 + boolean resetChannelsResult = storager.resetChannels(device.getDeviceId(), catalogDataCatch.get(key)); + RequestMessage msg = new RequestMessage(); + msg.setKey(key); + WVPResult result = new WVPResult<>(); + result.setCode(0); + result.setData(device); + if (resetChannelsResult) { + result.setMsg("更新成功,共" + sumNum + "条,已更新" + catalogDataCatch.get(key).size() + "条"); + }else { + result.setMsg("接收成功,写入失败,共" + sumNum + "条,已接收" + catalogDataCatch.get(key).size() + "条"); + } + msg.setData(result); + deferredResultHolder.invokeAllResult(msg); + catalogDataCatch.del(key); + } + // 回复200 OK responseAck(evt, Response.OK); if (offLineDetector.isOnline(device.getDeviceId())) { diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java index 6dfec546..5fef8cf5 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java @@ -73,7 +73,6 @@ public class StreamPushServiceImpl implements IStreamPushService { result.put(key, streamPushItem); } } - } return new ArrayList<>(result.values()); diff --git a/src/main/java/com/genersoft/iot/vmp/storager/IVideoManagerStorager.java b/src/main/java/com/genersoft/iot/vmp/storager/IVideoManagerStorager.java index 6e3ab77f..9118d760 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/IVideoManagerStorager.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/IVideoManagerStorager.java @@ -55,7 +55,7 @@ public interface IVideoManagerStorager { * @param deviceId 设备id * @param channels 多个通道 */ - public void updateChannels(String deviceId, List channels); + public int updateChannels(String deviceId, List channels); /** * 开始播放 @@ -425,4 +425,10 @@ public interface IVideoManagerStorager { */ StreamProxyItem getStreamProxyByAppAndStream(String app, String streamId); + /** + * catlog查询结束后完全重写通道信息 + * @param deviceId + * @param deviceChannelList + */ + boolean resetChannels(String deviceId, List deviceChannelList); } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java index 28d63c3e..8377ccbf 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java @@ -133,7 +133,7 @@ public interface DeviceChannelMapper { "'${item.streamId}', ${item.longitude}, ${item.latitude},'${item.createTime}', '${item.updateTime}')" + " " + "") - void batchAdd(List addChannels); + int batchAdd(List addChannels); @Update({""}) - void batchUpdate(List updateChannels); + int batchUpdate(List updateChannels); @Select(value = {"