diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java index 2635a5fd..aee414f5 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java @@ -318,7 +318,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements } else { // 非上级平台请求,查询是否设备请求(通常为接收语音广播的设备) - Device device = storager.queryVideoDevice(requesterId); + Device device = redisCatchStorage.getDevice(requesterId); if (device != null) { logger.info("收到设备" + requesterId + "的语音广播Invite请求"); responseAck(evt, Response.TRYING); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java index 74b63c87..2e58d9d3 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java @@ -112,7 +112,7 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements MobilePosition mobilePosition = new MobilePosition(); Element deviceIdElement = rootElement.element("DeviceID"); String deviceId = deviceIdElement.getTextTrim().toString(); - Device device = storager.queryVideoDevice(deviceId); + Device device = redisCatchStorage.getDevice(deviceId); if (device != null) { if (!StringUtils.isEmpty(device.getName())) { mobilePosition.setDeviceName(device.getName()); @@ -168,7 +168,7 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements Element deviceIdElement = rootElement.element("DeviceID"); String deviceId = deviceIdElement.getText().toString(); - Device device = storager.queryVideoDevice(deviceId); + Device device = redisCatchStorage.getDevice(deviceId); if (device == null) { return; } @@ -235,7 +235,7 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader); Element rootElement = getRootElement(evt); - Device device = storager.queryVideoDevice(deviceId); + Device device = redisCatchStorage.getDevice(deviceId); if (device == null) { return; } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/RegisterRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/RegisterRequestProcessor.java index 3f14e23e..053bf9a9 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/RegisterRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/RegisterRequestProcessor.java @@ -10,6 +10,7 @@ import com.genersoft.iot.vmp.gb28181.event.EventPublisher; import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver; import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor; import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; +import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorager; import gov.nist.javax.sip.RequestEventExt; import gov.nist.javax.sip.address.AddressImpl; @@ -51,6 +52,9 @@ public class RegisterRequestProcessor extends SIPRequestProcessorParent implemen @Autowired private RegisterLogicHandler handler; + @Autowired + private IRedisCatchStorage redisCatchStorage; + @Autowired private IVideoManagerStorager storager; @@ -86,7 +90,7 @@ public class RegisterRequestProcessor extends SIPRequestProcessorParent implemen AddressImpl address = (AddressImpl) fromHeader.getAddress(); SipUri uri = (SipUri) address.getURI(); String deviceId = uri.getUser(); - Device device = storager.queryVideoDevice(deviceId); + Device device = redisCatchStorage.getDevice(deviceId); AuthorizationHeader authorhead = (AuthorizationHeader) request.getHeader(AuthorizationHeader.NAME); // 校验密码是否正确 if (authorhead != null) { diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/MessageRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/MessageRequestProcessor.java index 241ee68f..0aab8716 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/MessageRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/MessageRequestProcessor.java @@ -6,6 +6,7 @@ import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver; import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor; import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; import com.genersoft.iot.vmp.gb28181.utils.SipUtils; +import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorager; import org.dom4j.DocumentException; import org.dom4j.Element; @@ -38,6 +39,9 @@ public class MessageRequestProcessor extends SIPRequestProcessorParent implement @Autowired private IVideoManagerStorager storage; + @Autowired + private IRedisCatchStorage redisCatchStorage; + @Override public void afterPropertiesSet() throws Exception { // 添加消息处理的订阅 @@ -53,7 +57,7 @@ public class MessageRequestProcessor extends SIPRequestProcessorParent implement logger.debug("接收到消息:" + evt.getRequest()); String deviceId = SipUtils.getUserIdFromFromHeader(evt.getRequest()); // 查询设备是否存在 - Device device = storage.queryVideoDevice(deviceId); + Device device = redisCatchStorage.getDevice(deviceId); // 查询上级平台是否存在 ParentPlatform parentPlatform = storage.queryParentPlatByServerGBId(deviceId); try { diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java index 8c1239e8..7eac768a 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java @@ -467,7 +467,7 @@ public class ZLMHttpHookListener { if (s.length == 2) { String deviceId = s[0]; String channelId = s[1]; - Device device = storager.queryVideoDevice(deviceId); + Device device = redisCatchStorage.getDevice(deviceId); if (device != null) { UUID uuid = UUID.randomUUID(); SSRCInfo ssrcInfo; diff --git a/src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java b/src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java index d228a1a2..06c8b732 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java @@ -63,4 +63,6 @@ public interface IStreamPushService { void zlmServerOffline(String mediaServerId); void clean(); + + boolean saveToRandomGB(); } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java index 640e99aa..dab637f7 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java @@ -96,7 +96,7 @@ public class PlayServiceImpl implements IPlayService { resultHolder.invokeResult(msg); return playResult; } - Device device = storager.queryVideoDevice(deviceId); + Device device = redisCatchStorage.getDevice(deviceId); StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(deviceId, channelId); playResult.setDevice(device); // 超时处理 diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java index be34fae5..ecbddc45 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java @@ -255,4 +255,30 @@ public class StreamPushServiceImpl implements IStreamPushService { public void clean() { } + + @Override + public boolean saveToRandomGB() { + List streamPushItems = streamPushMapper.selectAll(); + long gbId = 100001; + for (StreamPushItem streamPushItem : streamPushItems) { + streamPushItem.setStreamType("push"); + streamPushItem.setStatus(true); + streamPushItem.setGbId("34020000004111" + gbId); + gbId ++; + } + int limitCount = 30; + + if (streamPushItems.size() > limitCount) { + for (int i = 0; i < streamPushItems.size(); i += limitCount) { + int toIndex = i + limitCount; + if (i + limitCount > streamPushItems.size()) { + toIndex = streamPushItems.size(); + } + gbStreamMapper.batchAdd(streamPushItems.subList(i, toIndex)); + } + }else { + gbStreamMapper.batchAdd(streamPushItems); + } + return true; + } } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java b/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java index 4f240d86..c9ffb356 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java @@ -2,6 +2,7 @@ package com.genersoft.iot.vmp.storager; import com.alibaba.fastjson.JSONObject; import com.genersoft.iot.vmp.common.StreamInfo; +import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; import com.genersoft.iot.vmp.gb28181.bean.ParentPlatformCatch; import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; @@ -169,4 +170,15 @@ public interface IRedisCatchStorage { ThirdPartyGB queryMemberNoGBId(String queryKey); List getStreams(String mediaServerId, String pull); + + /** + * 将device信息写入redis + * @param device + */ + void updateDevice(Device device); + + /** + * 获取Device + */ + Device getDevice(String deviceId); } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/GbStreamMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/GbStreamMapper.java index 9757b13d..ffbca9c9 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/dao/GbStreamMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/GbStreamMapper.java @@ -2,6 +2,7 @@ package com.genersoft.iot.vmp.storager.dao; import com.genersoft.iot.vmp.gb28181.bean.GbStream; import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem; +import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem; import org.apache.ibatis.annotations.*; import org.springframework.stereotype.Repository; @@ -79,4 +80,17 @@ public interface GbStreamMapper { "" + "") void batchDel(List streamProxyItemList); + + @Insert("") + void batchAdd(List subList); } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java index af9a206c..bad2aaeb 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java @@ -377,4 +377,16 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage { } return result; } + + @Override + public void updateDevice(Device device) { + String key = VideoManagerConstants.DEVICE_PREFIX + userSetup.getServerId() + "_" + device.getDeviceId(); + redis.set(key, device); + } + + @Override + public Device getDevice(String deviceId) { + String key = VideoManagerConstants.DEVICE_PREFIX + userSetup.getServerId() + "_" + deviceId; + return (Device)redis.get(key); + } } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStoragerImpl.java b/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStoragerImpl.java index b7454a84..f1a9f9a5 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStoragerImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStoragerImpl.java @@ -110,6 +110,7 @@ public class VideoManagerStoragerImpl implements IVideoManagerStorager { */ @Override public synchronized boolean create(Device device) { + redisCatchStorage.updateDevice(device); return deviceMapper.add(device) > 0; } @@ -128,11 +129,14 @@ public class VideoManagerStoragerImpl implements IVideoManagerStorager { Device deviceByDeviceId = deviceMapper.getDeviceByDeviceId(device.getDeviceId()); if (deviceByDeviceId == null) { device.setCreateTime(now); + redisCatchStorage.updateDevice(device); return deviceMapper.add(device) > 0; }else { + redisCatchStorage.updateDevice(device); return deviceMapper.update(device) > 0; } + } @Override @@ -185,11 +189,32 @@ public class VideoManagerStoragerImpl implements IVideoManagerStorager { } } } + int limitCount = 300; if (addChannels.size() > 0) { - deviceChannelMapper.batchAdd(addChannels); + if (addChannels.size() > limitCount) { + for (int i = 0; i < addChannels.size(); i += limitCount) { + int toIndex = i + limitCount; + if (i + limitCount > addChannels.size()) { + toIndex = addChannels.size(); + } + deviceChannelMapper.batchAdd(addChannels.subList(i, toIndex)); + } + }else { + deviceChannelMapper.batchAdd(addChannels); + } } if (updateChannels.size() > 0) { - deviceChannelMapper.batchUpdate(updateChannels); + if (updateChannels.size() > limitCount) { + for (int i = 0; i < updateChannels.size(); i += limitCount) { + int toIndex = i + limitCount; + if (i + limitCount > updateChannels.size()) { + toIndex = updateChannels.size(); + } + deviceChannelMapper.batchAdd(updateChannels.subList(i, toIndex)); + } + }else { + deviceChannelMapper.batchUpdate(updateChannels); + } } } } @@ -322,6 +347,7 @@ public class VideoManagerStoragerImpl implements IVideoManagerStorager { } device.setOnline(1); logger.info("更新设备在线: " + deviceId); + redisCatchStorage.updateDevice(device); return deviceMapper.update(device) > 0; } @@ -337,6 +363,7 @@ public class VideoManagerStoragerImpl implements IVideoManagerStorager { Device device = deviceMapper.getDeviceByDeviceId(deviceId); if (device == null) return false; device.setOnline(0); + redisCatchStorage.updateDevice(device); return deviceMapper.update(device) > 0; }