deviceChannelList = storager.queryOnlineChannelsByDeviceId(device.getDeviceId());
- eventPublisher.catalogEventPublish(null, deviceChannelList, CatalogEvent.ON);
// 上线添加订阅
if (device.getSubscribeCycleForCatalog() > 0) {
+ // 查询在线设备那些开启了订阅,为设备开启定时的目录订阅
deviceService.addCatalogSubscribe(device);
}
+ if (device.getSubscribeCycleForMobilePosition() > 0) {
+ deviceService.addMobilePositionSubscribe(device);
+ }
}
}
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/platformKeepaliveExpire/PlatformKeepaliveExpireEventLister.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/platformKeepaliveExpire/PlatformKeepaliveExpireEventLister.java
index 00926574..67b297c3 100644
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/platformKeepaliveExpire/PlatformKeepaliveExpireEventLister.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/event/platformKeepaliveExpire/PlatformKeepaliveExpireEventLister.java
@@ -6,7 +6,7 @@ import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
-import com.genersoft.iot.vmp.storager.IVideoManagerStorager;
+import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -14,7 +14,6 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;
-import javax.sip.ResponseEvent;
import javax.sip.message.Response;
/**
@@ -29,7 +28,7 @@ public class PlatformKeepaliveExpireEventLister implements ApplicationListener{
- timer.cancel();
+ dynamicTask.stop(taskKey);
};
- sipCommanderFroPlatform.register(parentPlatform, null, okEvent);
- timer.schedule(new TimerTask() {
- @Override
- public void run() {
- logger.info("[平台注册]再次向平台注册,平台国标ID:" + event.getPlatformGbID());
- sipCommanderFroPlatform.register(parentPlatform, null, okEvent);
- }
- }, 15*1000 ,Long.parseLong(parentPlatform.getExpires())* 1000);
+ dynamicTask.startCron(taskKey, ()->{
+ logger.info("[平台注册]再次向平台注册,平台国标ID:" + event.getPlatformGbID());
+ sipCommanderFroPlatform.register(parentPlatform, null, okEvent);
+ }, Integer.parseInt(parentPlatform.getExpires())* 1000);
}
}
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/platformNotRegister/PlatformNotRegisterEventLister.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/platformNotRegister/PlatformNotRegisterEventLister.java
index 1b8e7aed..14ed76a2 100644
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/platformNotRegister/PlatformNotRegisterEventLister.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/event/platformNotRegister/PlatformNotRegisterEventLister.java
@@ -1,5 +1,6 @@
package com.genersoft.iot.vmp.gb28181.event.platformNotRegister;
+import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.SipConfig;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
@@ -9,7 +10,7 @@ import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
-import com.genersoft.iot.vmp.storager.IVideoManagerStorager;
+import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -31,7 +32,7 @@ public class PlatformNotRegisterEventLister implements ApplicationListener 0) {
logger.info("[ 平台未注册事件 ] 停止[ {} ]的所有推流", event.getPlatformGbID());
- StringBuilder app = new StringBuilder();
- StringBuilder stream = new StringBuilder();
for (SendRtpItem sendRtpItem : sendRtpItems) {
- if (app.length() != 0) {
- app.append(",");
- }
- app.append(sendRtpItem.getApp());
- if (stream.length() != 0) {
- stream.append(",");
- }
- stream.append(sendRtpItem.getStreamId());
redisCatchStorage.deleteSendRTPServer(event.getPlatformGbID(), sendRtpItem.getChannelId(), null, null);
MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
Map param = new HashMap<>();
param.put("vhost", "__defaultVhost__");
- param.put("app", app.toString());
- param.put("stream", stream.toString());
+ param.put("app", sendRtpItem.getApp());
+ param.put("stream", sendRtpItem.getStreamId());
zlmrtpServerFactory.stopSendRtpStream(mediaInfo, param);
}
}
- Timer timer = new Timer();
+ String taskKey = "platform-not-register-" + parentPlatform.getServerGBId();
SipSubscribe.Event okEvent = (responseEvent)->{
- timer.cancel();
+ dynamicTask.stop(taskKey);
};
- logger.info("[平台注册]平台国标ID:" + event.getPlatformGbID());
- sipCommanderFroPlatform.register(parentPlatform, null, okEvent);
- // 设置注册失败则每隔15秒发起一次注册
- timer.schedule(new TimerTask() {
- @Override
- public void run() {
- logger.info("[平台注册]再次向平台注册,平台国标ID:" + event.getPlatformGbID());
- sipCommanderFroPlatform.register(parentPlatform, null, okEvent);
- }
- }, config.getRegisterTimeInterval()* 1000, config.getRegisterTimeInterval()* 1000);//十五秒后再次发起注册
+ dynamicTask.startCron(taskKey, ()->{
+ logger.info("[平台注册]再次向平台注册,平台国标ID:" + event.getPlatformGbID());
+ sipCommanderFroPlatform.register(parentPlatform, null, okEvent);
+ }, config.getRegisterTimeInterval()* 1000);
}
}
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/record/RecordEndEventListener.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/record/RecordEndEventListener.java
index 95ffbfa0..4965026d 100644
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/record/RecordEndEventListener.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/event/record/RecordEndEventListener.java
@@ -31,10 +31,8 @@ public class RecordEndEventListener implements ApplicationListener handlerMap = new HashMap<>();
@Override
public void onApplicationEvent(RecordEndEvent event) {
- if (logger.isDebugEnabled()) {
- logger.debug("录像查询完成事件触发,deviceId:{}, channelId: {}, 录像数量{}条", event.getRecordInfo().getDeviceId(),
- event.getRecordInfo().getChannelId(), event.getRecordInfo().getRecordList().size() );
- }
+ logger.info("录像查询完成事件触发,deviceId:{}, channelId: {}, 录像数量{}条", event.getRecordInfo().getDeviceId(),
+ event.getRecordInfo().getChannelId(), event.getRecordInfo().getSumNum() );
if (handlerMap.size() > 0) {
for (RecordEndEventHandler recordEndEventHandler : handlerMap.values()) {
recordEndEventHandler.handler(event.getRecordInfo());
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/SubscribeListenerForPlatform.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/SubscribeListenerForPlatform.java
deleted file mode 100644
index 3b2bd230..00000000
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/SubscribeListenerForPlatform.java
+++ /dev/null
@@ -1,50 +0,0 @@
-package com.genersoft.iot.vmp.gb28181.event.subscribe;
-
-import com.genersoft.iot.vmp.common.VideoManagerConstants;
-import com.genersoft.iot.vmp.conf.DynamicTask;
-import com.genersoft.iot.vmp.conf.RedisKeyExpirationEventMessageListener;
-import com.genersoft.iot.vmp.conf.UserSetup;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.data.redis.connection.Message;
-import org.springframework.data.redis.listener.RedisMessageListenerContainer;
-import org.springframework.stereotype.Component;
-
-/**
- * 平台订阅到期事件
- */
-@Component
-public class SubscribeListenerForPlatform extends RedisKeyExpirationEventMessageListener {
-
- private Logger logger = LoggerFactory.getLogger(SubscribeListenerForPlatform.class);
-
- @Autowired
- private UserSetup userSetup;
-
- @Autowired
- private DynamicTask dynamicTask;
-
- public SubscribeListenerForPlatform(RedisMessageListenerContainer listenerContainer, UserSetup userSetup) {
- super(listenerContainer, userSetup);
- }
-
-
- /**
- * 监听失效的key
- * @param message
- * @param pattern
- */
- @Override
- public void onMessage(Message message, byte[] pattern) {
- // 获取失效的key
- String expiredKey = message.toString();
- logger.debug(expiredKey);
- // 订阅到期
- String PLATFORM_KEEPLIVEKEY_PREFIX = VideoManagerConstants.SIP_SUBSCRIBE_PREFIX + userSetup.getServerId() + "_";
- if (expiredKey.startsWith(PLATFORM_KEEPLIVEKEY_PREFIX)) {
- // 取消定时任务
- dynamicTask.stop(expiredKey);
- }
- }
-}
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java
index d511f421..7e5ecb49 100644
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java
@@ -2,17 +2,14 @@ package com.genersoft.iot.vmp.gb28181.event.subscribe.catalog;
import com.genersoft.iot.vmp.common.VideoManagerConstants;
import com.genersoft.iot.vmp.conf.SipConfig;
-import com.genersoft.iot.vmp.conf.UserSetup;
+import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.*;
-import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
-import com.genersoft.iot.vmp.gb28181.event.platformNotRegister.PlatformNotRegisterEvent;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform;
import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
-import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.service.IGbStreamService;
import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
-import com.genersoft.iot.vmp.storager.IVideoManagerStorager;
+import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -31,7 +28,7 @@ public class CatalogEventLister implements ApplicationListener {
private final static Logger logger = LoggerFactory.getLogger(CatalogEventLister.class);
@Autowired
- private IVideoManagerStorager storager;
+ private IVideoManagerStorage storager;
@Autowired
private IRedisCatchStorage redisCatchStorage;
@Autowired
@@ -47,7 +44,7 @@ public class CatalogEventLister implements ApplicationListener {
private SipConfig config;
@Autowired
- private UserSetup userSetup;
+ private UserSetting userSetting;
@Autowired
private IGbStreamService gbStreamService;
@@ -63,13 +60,13 @@ public class CatalogEventLister implements ApplicationListener {
Map> parentPlatformMap = new HashMap<>();
if (event.getPlatformId() != null) {
parentPlatform = storager.queryParentPlatByServerGBId(event.getPlatformId());
- if (parentPlatform != null && !parentPlatform.isStatus())return;
- String key = VideoManagerConstants.SIP_SUBSCRIBE_PREFIX + userSetup.getServerId() + "_Catalog_" + event.getPlatformId();
-// subscribe = redisCatchStorage.getSubscribe(key);
+ if (parentPlatform != null && !parentPlatform.isStatus()) {
+ return;
+ }
subscribe = subscribeHolder.getCatalogSubscribe(event.getPlatformId());
if (subscribe == null) {
- logger.debug("发送订阅消息时发现订阅信息已经不存在");
+ logger.info("发送订阅消息时发现订阅信息已经不存在");
return;
}
}else {
@@ -85,7 +82,9 @@ public class CatalogEventLister implements ApplicationListener {
}else if (event.getGbStreams() != null) {
if (platforms.size() > 0) {
for (GbStream gbStream : event.getGbStreams()) {
- if (gbStream == null || StringUtils.isEmpty(gbStream.getGbId())) continue;
+ if (gbStream == null || StringUtils.isEmpty(gbStream.getGbId())) {
+ continue;
+ }
List parentPlatformsForGB = storager.queryPlatFormListForStreamWithGBId(gbStream.getApp(),gbStream.getStream(), platforms);
parentPlatformMap.put(gbStream.getGbId(), parentPlatformsForGB);
}
@@ -104,7 +103,7 @@ public class CatalogEventLister implements ApplicationListener {
}
if (event.getGbStreams() != null && event.getGbStreams().size() > 0){
for (GbStream gbStream : event.getGbStreams()) {
- DeviceChannel deviceChannelByStream = gbStreamService.getDeviceChannelListByStream(gbStream, gbStream.getCatalogId(), parentPlatform.getDeviceGBId());
+ DeviceChannel deviceChannelByStream = gbStreamService.getDeviceChannelListByStream(gbStream, gbStream.getCatalogId(), parentPlatform);
deviceChannelList.add(deviceChannelByStream);
}
}
@@ -118,7 +117,9 @@ public class CatalogEventLister implements ApplicationListener {
if (parentPlatforms != null && parentPlatforms.size() > 0) {
for (ParentPlatform platform : parentPlatforms) {
SubscribeInfo subscribeInfo = subscribeHolder.getCatalogSubscribe(platform.getServerGBId());
- if (subscribeInfo == null) continue;
+ if (subscribeInfo == null) {
+ continue;
+ }
logger.info("[Catalog事件: {}]平台:{},影响通道{}", event.getType(), platform.getServerGBId(), gbId);
List deviceChannelList = new ArrayList<>();
DeviceChannel deviceChannel = new DeviceChannel();
@@ -143,7 +144,10 @@ public class CatalogEventLister implements ApplicationListener {
}
if (event.getGbStreams() != null && event.getGbStreams().size() > 0){
for (GbStream gbStream : event.getGbStreams()) {
- DeviceChannel deviceChannelByStream = gbStreamService.getDeviceChannelListByStream(gbStream, gbStream.getCatalogId(), parentPlatform.getDeviceGBId());
+ DeviceChannel deviceChannelByStream = gbStreamService.getDeviceChannelListByStream(gbStream, gbStream.getCatalogId(), parentPlatform);
+ if (deviceChannelByStream.getParentId().length() <= 10) { // 父节点是行政区划,则设置CivilCode使用此行政区划
+ deviceChannelByStream.setCivilCode(deviceChannelByStream.getParentId());
+ }
deviceChannelList.add(deviceChannelByStream);
}
}
@@ -156,18 +160,20 @@ public class CatalogEventLister implements ApplicationListener {
List parentPlatforms = parentPlatformMap.get(gbId);
if (parentPlatforms != null && parentPlatforms.size() > 0) {
for (ParentPlatform platform : parentPlatforms) {
-// String key = VideoManagerConstants.SIP_SUBSCRIBE_PREFIX + userSetup.getServerId() + "_Catalog_" + platform.getServerGBId();
-// SubscribeInfo subscribeInfo = redisCatchStorage.getSubscribe(key);
- SubscribeInfo subscribeInfo = subscribeHolder.getCatalogSubscribe(event.getPlatformId());
- if (subscribeInfo == null) continue;
+ SubscribeInfo subscribeInfo = subscribeHolder.getCatalogSubscribe(platform.getServerGBId());
+ if (subscribeInfo == null) {
+ continue;
+ }
logger.info("[Catalog事件: {}]平台:{},影响通道{}", event.getType(), platform.getServerGBId(), gbId);
List deviceChannelList = new ArrayList<>();
DeviceChannel deviceChannel = storager.queryChannelInParentPlatform(platform.getServerGBId(), gbId);
deviceChannelList.add(deviceChannel);
GbStream gbStream = storager.queryStreamInParentPlatform(platform.getServerGBId(), gbId);
- DeviceChannel deviceChannelByStream = gbStreamService.getDeviceChannelListByStream(gbStream, gbStream.getCatalogId(), platform.getDeviceGBId());
- deviceChannelList.add(deviceChannelByStream);
- sipCommanderFroPlatform.sendNotifyForCatalogOther(event.getType(), platform, deviceChannelList, subscribeInfo, null);
+ if(gbStream != null){
+ DeviceChannel deviceChannelByStream = gbStreamService.getDeviceChannelListByStream(gbStream, gbStream.getCatalogId(), platform);
+ deviceChannelList.add(deviceChannelByStream);
+ }
+ sipCommanderFroPlatform.sendNotifyForCatalogAddOrUpdate(event.getType(), platform, deviceChannelList, subscribeInfo, null);
}
}
}
@@ -178,3 +184,4 @@ public class CatalogEventLister implements ApplicationListener {
}
}
}
+
\ No newline at end of file
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/session/CatalogDataCatch.java b/src/main/java/com/genersoft/iot/vmp/gb28181/session/CatalogDataCatch.java
index e3b39749..62393d52 100644
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/session/CatalogDataCatch.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/session/CatalogDataCatch.java
@@ -3,9 +3,10 @@ package com.genersoft.iot.vmp.gb28181.session;
import com.genersoft.iot.vmp.gb28181.bean.CatalogData;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
+import com.genersoft.iot.vmp.gb28181.bean.SyncStatus;
import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage;
-import com.genersoft.iot.vmp.storager.IVideoManagerStorager;
+import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
@@ -23,52 +24,119 @@ public class CatalogDataCatch {
private DeferredResultHolder deferredResultHolder;
@Autowired
- private IVideoManagerStorager storager;
+ private IVideoManagerStorage storager;
- public void put(String key, int total, Device device, List deviceChannelList) {
- CatalogData catalogData = data.get(key);
+ public void addReady(Device device, int sn ) {
+ CatalogData catalogData = data.get(device.getDeviceId());
+ if (catalogData == null || catalogData.getStatus().equals(CatalogData.CatalogDataStatus.end)) {
+ catalogData = new CatalogData();
+ catalogData.setChannelList(new ArrayList<>());
+ catalogData.setDevice(device);
+ catalogData.setSn(sn);
+ catalogData.setStatus(CatalogData.CatalogDataStatus.ready);
+ catalogData.setLastTime(new Date(System.currentTimeMillis()));
+ data.put(device.getDeviceId(), catalogData);
+ }
+ }
+
+ public void put(String deviceId, int sn, int total, Device device, List deviceChannelList) {
+ CatalogData catalogData = data.get(deviceId);
if (catalogData == null) {
catalogData = new CatalogData();
+ catalogData.setSn(sn);
catalogData.setTotal(total);
catalogData.setDevice(device);
catalogData.setChannelList(new ArrayList<>());
- data.put(key, catalogData);
+ catalogData.setStatus(CatalogData.CatalogDataStatus.runIng);
+ catalogData.setLastTime(new Date(System.currentTimeMillis()));
+ data.put(deviceId, catalogData);
+ }else {
+ // 同一个设备的通道同步请求只考虑一个,其他的直接忽略
+ if (catalogData.getSn() != sn) {
+ return;
+ }
+ catalogData.setTotal(total);
+ catalogData.setDevice(device);
+ catalogData.setStatus(CatalogData.CatalogDataStatus.runIng);
+ catalogData.getChannelList().addAll(deviceChannelList);
+ catalogData.setLastTime(new Date(System.currentTimeMillis()));
}
- catalogData.getChannelList().addAll(deviceChannelList);
- catalogData.setLastTime(new Date(System.currentTimeMillis()));
}
- public List get(String key) {
- CatalogData catalogData = data.get(key);
- if (catalogData == null) return null;
+ public List get(String deviceId) {
+ CatalogData catalogData = data.get(deviceId);
+ if (catalogData == null) {
+ return null;
+ }
return catalogData.getChannelList();
}
- public void del(String key) {
- data.remove(key);
+ public int getTotal(String deviceId) {
+ CatalogData catalogData = data.get(deviceId);
+ if (catalogData == null) {
+ return 0;
+ }
+ return catalogData.getTotal();
+ }
+
+ public SyncStatus getSyncStatus(String deviceId) {
+ CatalogData catalogData = data.get(deviceId);
+ if (catalogData == null) {
+ return null;
+ }
+ SyncStatus syncStatus = new SyncStatus();
+ syncStatus.setCurrent(catalogData.getChannelList().size());
+ syncStatus.setTotal(catalogData.getTotal());
+ syncStatus.setErrorMsg(catalogData.getErrorMsg());
+ return syncStatus;
+ }
+
+ public boolean isSyncRunning(String deviceId) {
+ CatalogData catalogData = data.get(deviceId);
+ if (catalogData == null) {
+ return false;
+ }
+ return !catalogData.getStatus().equals(CatalogData.CatalogDataStatus.end);
}
@Scheduled(fixedRate = 5 * 1000) //每5秒执行一次, 发现数据5秒未更新则移除数据并认为数据接收超时
private void timerTask(){
Set keys = data.keySet();
- Calendar calendar = Calendar.getInstance();
- calendar.setTime(new Date());
- calendar.set(Calendar.SECOND, calendar.get(Calendar.SECOND) - 5);
- for (String key : keys) {
- CatalogData catalogData = data.get(key);
- if (catalogData.getLastTime().before(calendar.getTime())) {
-
- storager.resetChannels(catalogData.getDevice().getDeviceId(), catalogData.getChannelList());
- RequestMessage msg = new RequestMessage();
- msg.setKey(key);
- WVPResult