From 9b1af8ef1396de45884fe86c56844714045b82ec Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: Tue, 7 Dec 2021 21:13:55 +0800 Subject: [PATCH] =?UTF-8?q?=E9=80=82=E9=85=8Dzlm=E7=9A=84hook=E4=BF=9D?= =?UTF-8?q?=E6=B4=BB?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- sql/mysql.sql | 1 + .../iot/vmp/common/VideoManagerConstants.java | 4 +- .../genersoft/iot/vmp/conf/MediaConfig.java | 1 + .../com/genersoft/iot/vmp/conf/SipConfig.java | 2 +- .../iot/vmp/gb28181/event/EventPublisher.java | 9 +- .../vmp/media/zlm/ZLMHttpHookListener.java | 4 +- .../iot/vmp/media/zlm/ZLMRunner.java | 14 ++- .../iot/vmp/media/zlm/ZLMServerConfig.java | 11 +++ .../vmp/media/zlm/dto/MediaServerItem.java | 11 +++ .../vmp/media/zlm/event/ZLMEventAbstract.java | 25 +++++ .../iot/vmp/service/IMediaServerService.java | 10 ++ .../iot/vmp/service/IStreamProxyService.java | 16 +++ .../iot/vmp/service/IStreamPushService.java | 15 +++ .../service/impl/MediaServerServiceImpl.java | 93 +++++------------- .../service/impl/StreamProxyServiceImpl.java | 20 ++++ .../service/impl/StreamPushServiceImpl.java | 15 +++ .../iot/vmp/storager/IRedisCatchStorage.java | 7 ++ .../vmp/storager/IVideoManagerStorager.java | 1 + .../iot/vmp/storager/dao/GbStreamMapper.java | 5 + .../vmp/storager/dao/MediaServerMapper.java | 8 +- .../vmp/storager/dao/StreamProxyMapper.java | 13 +++ .../vmp/storager/dao/StreamPushMapper.java | 3 + .../storager/impl/RedisCatchStorageImpl.java | 18 ++-- .../impl/VideoManagerStoragerImpl.java | 1 + src/main/resources/wvp.sqlite | Bin 143360 -> 143360 bytes .../vmp/service/impl/RoleServiceImplTest.java | 1 - .../vmp/service/impl/UserServiceImplTest.java | 2 - 27 files changed, 223 insertions(+), 87 deletions(-) create mode 100644 src/main/java/com/genersoft/iot/vmp/media/zlm/event/ZLMEventAbstract.java diff --git a/sql/mysql.sql b/sql/mysql.sql index 64fed126..50b70bc6 100644 --- a/sql/mysql.sql +++ b/sql/mysql.sql @@ -148,6 +148,7 @@ create table media_server defaultServer int not null, createTime varchar(50) not null, updateTime varchar(50) not null, + hookAliveInterval int not null, constraint media_server_i unique (ip, httpPort) ); diff --git a/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java b/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java index 3b47ff0e..97cb3d94 100644 --- a/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java +++ b/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java @@ -10,10 +10,12 @@ public class VideoManagerConstants { public static final String WVP_SERVER_PREFIX = "VMP_SIGNALLING_SERVER_INFO_"; - public static final String WVP_SERVER_STREAM_PUSH_PREFIX = "VMP_SIGNALLING_STREAM_"; + public static final String WVP_SERVER_STREAM_PREFIX = "VMP_SIGNALLING_STREAM_"; public static final String MEDIA_SERVER_PREFIX = "VMP_MEDIA_SERVER_"; + public static final String MEDIA_SERVER_KEEPALIVE_PREFIX = "VMP_MEDIA_SERVER_KEEPALIVE_"; + public static final String MEDIA_SERVERS_ONLINE_PREFIX = "VMP_MEDIA_ONLINE_SERVERS_"; public static final String MEDIA_STREAM_PREFIX = "VMP_MEDIA_STREAM"; diff --git a/src/main/java/com/genersoft/iot/vmp/conf/MediaConfig.java b/src/main/java/com/genersoft/iot/vmp/conf/MediaConfig.java index 0c7ef887..2b52bcd3 100644 --- a/src/main/java/com/genersoft/iot/vmp/conf/MediaConfig.java +++ b/src/main/java/com/genersoft/iot/vmp/conf/MediaConfig.java @@ -203,6 +203,7 @@ public class MediaConfig{ mediaServerItem.setRtpPortRange(rtpPortRange); mediaServerItem.setSendRtpPortRange(sendRtpPortRange); mediaServerItem.setRecordAssistPort(recordAssistPort); + mediaServerItem.setHookAliveInterval(120); SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); mediaServerItem.setCreateTime(format.format(System.currentTimeMillis())); diff --git a/src/main/java/com/genersoft/iot/vmp/conf/SipConfig.java b/src/main/java/com/genersoft/iot/vmp/conf/SipConfig.java index 4c24d14a..6fa802da 100644 --- a/src/main/java/com/genersoft/iot/vmp/conf/SipConfig.java +++ b/src/main/java/com/genersoft/iot/vmp/conf/SipConfig.java @@ -27,7 +27,7 @@ public class SipConfig { Integer keepaliveTimeOut = 255; - Integer registerTimeInterval = 60; + Integer registerTimeInterval = 120; private boolean alarm = false; diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/EventPublisher.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/EventPublisher.java index 33d6dd43..fd0cfdc9 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/EventPublisher.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/event/EventPublisher.java @@ -1,15 +1,16 @@ package com.genersoft.iot.vmp.gb28181.event; import com.genersoft.iot.vmp.gb28181.bean.Device; +import com.genersoft.iot.vmp.gb28181.event.offline.OfflineEvent; import com.genersoft.iot.vmp.gb28181.event.platformKeepaliveExpire.PlatformKeepaliveExpireEvent; import com.genersoft.iot.vmp.gb28181.event.platformNotRegister.PlatformNotRegisterEvent; +import com.genersoft.iot.vmp.media.zlm.event.ZLMOfflineEvent; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.ApplicationEventPublisher; import org.springframework.stereotype.Component; import com.genersoft.iot.vmp.gb28181.bean.DeviceAlarm; import com.genersoft.iot.vmp.gb28181.event.alarm.AlarmEvent; -import com.genersoft.iot.vmp.gb28181.event.offline.OfflineEvent; import com.genersoft.iot.vmp.gb28181.event.online.OnlineEvent; /** @@ -66,5 +67,11 @@ public class EventPublisher { alarmEvent.setAlarmInfo(deviceAlarm); applicationEventPublisher.publishEvent(alarmEvent); } + + public void zlmOfflineEventPublish(String mediaServerId){ + ZLMOfflineEvent outEvent = new ZLMOfflineEvent(this); + outEvent.setMediaServerId(mediaServerId); + applicationEventPublisher.publishEvent(outEvent); + } } diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java index caa68ef9..70655585 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java @@ -359,8 +359,8 @@ public class ZLMHttpHookListener { type = "PULL"; } } - zlmMediaListManager.removeMedia( app, streamId); - redisCatchStorage.removeStream(mediaServerItem, OriginType.values()[item.getOriginType()].getType(), app, streamId); + zlmMediaListManager.removeMedia(app, streamId); + redisCatchStorage.removeStream(mediaServerItem, type, app, streamId); } // 发送流变化redis消息 diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRunner.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRunner.java index 71df2957..5555617f 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRunner.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRunner.java @@ -7,6 +7,7 @@ import com.genersoft.iot.vmp.conf.MediaConfig; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.service.IStreamProxyService; +import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -39,6 +40,9 @@ public class ZLMRunner implements CommandLineRunner { @Autowired private IMediaServerService mediaServerService; + @Autowired + private IRedisCatchStorage redisCatchStorage; + @Autowired private MediaConfig mediaConfig; @@ -70,8 +74,14 @@ public class ZLMRunner implements CommandLineRunner { } }); - // TODO 订阅 zlm保活事件, 当zlm离线时做业务的处理 - + // 订阅 zlm保活事件, 当zlm离线时做业务的处理 + hookSubscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_server_keepalive,null, + (MediaServerItem mediaServerItem, JSONObject response)->{ + String mediaServerId = response.getString("mediaServerId"); + if (mediaServerId !=null ) { + mediaServerService.updateMediaServerKeepalive(mediaServerId, response.getJSONObject("data")); + } + }); // 获取zlm信息 logger.info("等待默认zlm接入..."); diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMServerConfig.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMServerConfig.java index 841c14cf..7e3da46a 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMServerConfig.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMServerConfig.java @@ -65,6 +65,9 @@ public class ZLMServerConfig { @JSONField(name = "hook.admin_params") private String hookAdminParams; + @JSONField(name = "hook.alive_interval") + private int hookAliveInterval; + @JSONField(name = "hook.enable") private String hookEnable; @@ -791,4 +794,12 @@ public class ZLMServerConfig { public void setShellPhell(String shellPhell) { this.shellPhell = shellPhell; } + + public int getHookAliveInterval() { + return hookAliveInterval; + } + + public void setHookAliveInterval(int hookAliveInterval) { + this.hookAliveInterval = hookAliveInterval; + } } diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/MediaServerItem.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/MediaServerItem.java index b7b4e287..260da272 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/MediaServerItem.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/MediaServerItem.java @@ -39,6 +39,8 @@ public class MediaServerItem{ private int streamNoneReaderDelayMS; + private int hookAliveInterval; + private boolean rtpEnable; private boolean status; @@ -87,6 +89,7 @@ public class MediaServerItem{ autoConfig = true; // 默认值true; secret = zlmServerConfig.getApiSecret(); streamNoneReaderDelayMS = zlmServerConfig.getGeneralStreamNoneReaderDelayMS(); + hookAliveInterval = zlmServerConfig.getHookAliveInterval(); rtpEnable = false; // 默认使用单端口;直到用户自己设置开启多端口 rtpPortRange = "30000,30500"; // 默认使用30000,30500作为级联时发送流的端口号 sendRtpPortRange = "30000,30500"; // 默认使用30000,30500作为级联时发送流的端口号 @@ -309,4 +312,12 @@ public class MediaServerItem{ public void setSendRtpPortRange(String sendRtpPortRange) { this.sendRtpPortRange = sendRtpPortRange; } + + public int getHookAliveInterval() { + return hookAliveInterval; + } + + public void setHookAliveInterval(int hookAliveInterval) { + this.hookAliveInterval = hookAliveInterval; + } } diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/event/ZLMEventAbstract.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/event/ZLMEventAbstract.java new file mode 100644 index 00000000..8ffbddea --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/event/ZLMEventAbstract.java @@ -0,0 +1,25 @@ +package com.genersoft.iot.vmp.media.zlm.event; + +import com.genersoft.iot.vmp.media.zlm.ZLMServerConfig; +import org.springframework.context.ApplicationEvent; + +public abstract class ZLMEventAbstract extends ApplicationEvent { + + + private static final long serialVersionUID = 1L; + + private String mediaServerId; + + + public ZLMEventAbstract(Object source) { + super(source); + } + + public String getMediaServerId() { + return mediaServerId; + } + + public void setMediaServerId(String mediaServerId) { + this.mediaServerId = mediaServerId; + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/service/IMediaServerService.java b/src/main/java/com/genersoft/iot/vmp/service/IMediaServerService.java index e539f233..fcba07f8 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/IMediaServerService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/IMediaServerService.java @@ -1,5 +1,6 @@ package com.genersoft.iot.vmp.service; +import com.alibaba.fastjson.JSONObject; import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.media.zlm.ZLMServerConfig; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; @@ -30,6 +31,13 @@ public interface IMediaServerService { */ void zlmServerOnline(ZLMServerConfig zlmServerConfig); + /** + * 节点离线 + * @param mediaServerId + * @return + */ + void zlmServerOffline(String mediaServerId); + MediaServerItem getMediaServerForMinimumLoad(); void setZLMConfig(MediaServerItem mediaServerItem); @@ -67,4 +75,6 @@ public interface IMediaServerService { void delete(String id); MediaServerItem getDefaultMediaServer(); + + void updateMediaServerKeepalive(String zlmServerConfig, JSONObject data); } diff --git a/src/main/java/com/genersoft/iot/vmp/service/IStreamProxyService.java b/src/main/java/com/genersoft/iot/vmp/service/IStreamProxyService.java index 69225385..618b8241 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/IStreamProxyService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/IStreamProxyService.java @@ -2,6 +2,7 @@ package com.genersoft.iot.vmp.service; import com.alibaba.fastjson.JSONObject; import com.genersoft.iot.vmp.common.StreamInfo; +import com.genersoft.iot.vmp.media.zlm.ZLMServerConfig; import com.genersoft.iot.vmp.media.zlm.dto.MediaItem; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem; @@ -73,4 +74,19 @@ public interface IStreamProxyService { * @return */ StreamProxyItem getStreamProxyByAppAndStream(String app, String streamId); + + + /** + * 新的节点加入 + * @param zlmServerConfig + * @return + */ + void zlmServerOnline(ZLMServerConfig zlmServerConfig); + + /** + * 节点离线 + * @param mediaServerId + * @return + */ + void zlmServerOffline(String mediaServerId); } diff --git a/src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java b/src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java index 7733254e..d8a4465f 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java @@ -1,6 +1,7 @@ package com.genersoft.iot.vmp.service; import com.genersoft.iot.vmp.gb28181.bean.GbStream; +import com.genersoft.iot.vmp.media.zlm.ZLMServerConfig; import com.genersoft.iot.vmp.media.zlm.dto.MediaItem; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem; @@ -46,4 +47,18 @@ public interface IStreamPushService { */ boolean stop(String app, String streamId); + /** + * 新的节点加入 + * @param zlmServerConfig + * @return + */ + void zlmServerOnline(ZLMServerConfig zlmServerConfig); + + /** + * 节点离线 + * @param mediaServerId + * @return + */ + void zlmServerOffline(String mediaServerId); + } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java index b6bfc00c..e02bd3f9 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java @@ -97,6 +97,7 @@ public class MediaServerServiceImpl implements IMediaServerService, CommandLineR if (!redisUtil.hasKey(key)) { redisUtil.set(key, mediaServerItem); } + } } @@ -272,6 +273,7 @@ public class MediaServerServiceImpl implements IMediaServerService, CommandLineR WVPResult result = new WVPResult<>(); mediaServerItem.setCreateTime(this.format.format(System.currentTimeMillis())); mediaServerItem.setUpdateTime(this.format.format(System.currentTimeMillis())); + mediaServerItem.setHookAliveInterval(120); JSONObject responseJSON = zlmresTfulUtils.getMediaServerConfig(mediaServerItem); if (responseJSON != null) { JSONArray data = responseJSON.getJSONArray("data"); @@ -329,6 +331,7 @@ public class MediaServerServiceImpl implements IMediaServerService, CommandLineR logger.warn("[未注册的zlm] 拒接接入:来自{}:{}", zlmServerConfig.getIp(),zlmServerConfig.getHttpPort() ); return; } + serverItem.setHookAliveInterval(zlmServerConfig.getHookAliveInterval()); if (serverItem.getHttpPort() == 0) { serverItem.setHttpPort(zlmServerConfig.getHttpPort()); } @@ -350,87 +353,31 @@ public class MediaServerServiceImpl implements IMediaServerService, CommandLineR if (serverItem.getRtpProxyPort() == 0) { serverItem.setRtpProxyPort(zlmServerConfig.getRtpProxyPort()); } + if (StringUtils.isEmpty(serverItem.getId())) { + serverItem.setId(zlmServerConfig.getGeneralMediaServerId()); + } + serverItem.setStatus(true); if (StringUtils.isEmpty(serverItem.getId())) { serverItem.setId(zlmServerConfig.getGeneralMediaServerId()); mediaServerMapper.updateByHostAndPort(serverItem); }else { mediaServerMapper.update(serverItem); } - if (redisUtil.get(VideoManagerConstants.MEDIA_SERVER_PREFIX + userSetup.getServerId() + "_" + serverItem.getId()) == null) { + String key = VideoManagerConstants.MEDIA_SERVER_PREFIX + userSetup.getServerId() + "_" + serverItem.getId(); + if (redisUtil.get(key) == null) { SsrcConfig ssrcConfig = new SsrcConfig(serverItem.getId(), null, sipConfig.getDomain()); serverItem.setSsrcConfig(ssrcConfig); - redisUtil.set(VideoManagerConstants.MEDIA_SERVER_PREFIX + userSetup.getServerId() + "_" + serverItem.getId(), serverItem); + redisUtil.set(key, serverItem); } - serverItem.setStatus(true); resetOnlineServerItem(serverItem); + updateMediaServerKeepalive(serverItem.getId(), null); setZLMConfig(serverItem); + } -// if (zlmServerConfig.getGeneralMediaServerId().equals(mediaConfig.getId()) -// || (zlmServerConfig.getIp().equals(mediaConfig.getIp()) && zlmServerConfig.getHttpPort() == mediaConfig.getHttpPort())) { -// // 配置文件的zlm -// // 如果是配置文件中的zlm。 也就是默认zlm。 一切以配置文件内容为准 -// // wvp互惠修改zlm的端口,需要自行配置。 -// MediaServerItem serverItemFromConfig = mediaConfig.getMediaSerItem(); -// serverItemFromConfig.setId(zlmServerConfig.getGeneralMediaServerId()); -// if (mediaConfig.getHttpPort() == 0) { -// serverItemFromConfig.setHttpPort(zlmServerConfig.getHttpPort()); -// } -// if (mediaConfig.getHttpSSlPort() == 0) { -// serverItemFromConfig.setHttpSSlPort(zlmServerConfig.getHttpSSLport()); -// } -// if (mediaConfig.getRtmpPort() == 0) { -// serverItemFromConfig.setRtmpPort(zlmServerConfig.getRtmpPort()); -// } -// if (mediaConfig.getRtmpSSlPort() == 0) { -// serverItemFromConfig.setRtmpSSlPort(zlmServerConfig.getRtmpSslPort()); -// } -// if (mediaConfig.getRtspPort() == 0) { -// serverItemFromConfig.setRtspPort(zlmServerConfig.getRtspPort()); -// } -// if (mediaConfig.getRtspSSLPort() == 0) { -// serverItemFromConfig.setRtspSSLPort(zlmServerConfig.getRtspSSlport()); -// } -// if (mediaConfig.getRtpProxyPort() == 0) { -// serverItemFromConfig.setRtpProxyPort(zlmServerConfig.getRtpProxyPort()); -// } -// if (serverItem != null){ -// mediaServerMapper.delDefault(); -// mediaServerMapper.add(serverItemFromConfig); -// String key = VideoManagerConstants.MEDIA_SERVER_PREFIX + serverItemFromConfig.getId(); -// MediaServerItem serverItemInRedis = (MediaServerItem)redisUtil.get(key); -// if (serverItemInRedis != null) { -// serverItemFromConfig.setSsrcConfig(serverItemInRedis.getSsrcConfig()); -// }else { -// serverItemFromConfig.setSsrcConfig(new SsrcConfig(serverItemFromConfig.getId(), null, sipConfig.getDomain())); -// } -// redisUtil.set(key, serverItemFromConfig); -// }else { -// String key = VideoManagerConstants.MEDIA_SERVER_PREFIX + serverItemFromConfig.getId(); -// serverItemFromConfig.setSsrcConfig(new SsrcConfig(serverItemFromConfig.getId(), null, sipConfig.getDomain())); -// redisUtil.set(key, serverItemFromConfig); -// mediaServerMapper.add(serverItemFromConfig); -// } -// resetOnlineServerItem(serverItemFromConfig); -// setZLMConfig(serverItemFromConfig); -// } - // 移除未添加的zlm的接入,所有的zlm必须先添加后才可以加入使用 -// else { -// String now = this.format.format(System.currentTimeMillis()); -// if (serverItem == null){ -// // 一个新的zlm接入wvp -// serverItem = new MediaServerItem(zlmServerConfig, sipConfig.getIp()); -// serverItem.setCreateTime(now); -// serverItem.setUpdateTime(now); -// String key = VideoManagerConstants.MEDIA_SERVER_PREFIX + serverItem.getId(); -// serverItem.setSsrcConfig(new SsrcConfig(serverItem.getId(), null, sipConfig.getDomain())); -// redisUtil.set(key, serverItem); -// // 存入数据库 -// mediaServerMapper.add(serverItem); -// setZLMConfig(serverItem); -// } -// resetOnlineServerItem(serverItem); -// } + @Override + public void zlmServerOffline(String mediaServerId) { + delete(mediaServerId); } @Override @@ -611,9 +558,17 @@ public class MediaServerServiceImpl implements IMediaServerService, CommandLineR @Override public void delete(String id) { - redisUtil.zRemove(VideoManagerConstants.MEDIA_SERVERS_ONLINE_PREFIX + userSetup.getServerId() + "_", id); + redisUtil.zRemove(VideoManagerConstants.MEDIA_SERVERS_ONLINE_PREFIX + userSetup.getServerId(), id); String key = VideoManagerConstants.MEDIA_SERVER_PREFIX + userSetup.getServerId() + "_" + id; redisUtil.del(key); mediaServerMapper.delOne(id); } + + @Override + public void updateMediaServerKeepalive(String mediaServerId, JSONObject data) { + MediaServerItem mediaServerItem = getOne(mediaServerId); + String key = VideoManagerConstants.MEDIA_SERVER_KEEPALIVE_PREFIX + userSetup.getServerId() + "_" + mediaServerId; + int hookAliveInterval = mediaServerItem.getHookAliveInterval() + 2; + redisUtil.set(key, data, hookAliveInterval); + } } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java index 0541124f..907893d1 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java @@ -5,6 +5,7 @@ import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.gb28181.bean.GbStream; import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils; +import com.genersoft.iot.vmp.media.zlm.ZLMServerConfig; import com.genersoft.iot.vmp.media.zlm.dto.MediaItem; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem; @@ -50,6 +51,9 @@ public class StreamProxyServiceImpl implements IStreamProxyService { @Autowired private StreamProxyMapper streamProxyMapper; + @Autowired + private IRedisCatchStorage redisCatchStorage; + @Autowired private GbStreamMapper gbStreamMapper; @@ -249,4 +253,20 @@ public class StreamProxyServiceImpl implements IStreamProxyService { public StreamProxyItem getStreamProxyByAppAndStream(String app, String streamId) { return videoManagerStorager.getStreamProxyByAppAndStream(app, streamId); } + + @Override + public void zlmServerOnline(ZLMServerConfig zlmServerConfig) { + + } + + @Override + public void zlmServerOffline(String mediaServerId) { + // 移除开启了无人观看自动移除的流 + streamProxyMapper.deleteAutoRemoveItemByMediaServerId(mediaServerId); + // 其他的流设置未启用 + streamProxyMapper.updateStatus(false, mediaServerId); + // 移除redis内流的信息 + redisCatchStorage.removeStream(mediaServerId, "PULL"); + + } } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java index 7928d5ae..7c17c2a1 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java @@ -5,6 +5,7 @@ import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.TypeReference; import com.genersoft.iot.vmp.gb28181.bean.GbStream; import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils; +import com.genersoft.iot.vmp.media.zlm.ZLMServerConfig; import com.genersoft.iot.vmp.media.zlm.dto.MediaItem; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem; @@ -135,4 +136,18 @@ public class StreamPushServiceImpl implements IStreamPushService { return true; } + @Override + public void zlmServerOnline(ZLMServerConfig zlmServerConfig) { + // 似乎没啥需要做的 + } + + @Override + public void zlmServerOffline(String mediaServerId) { + // 移除没有serverId的推流 + streamPushMapper.deleteWithoutGBId(mediaServerId); + // 其他的流设置未启用 + gbStreamMapper.updateStatusByMediaServerId(mediaServerId, false); + // 移除redis内流的信息 + redisCatchStorage.removeStream(mediaServerId, "PUSH"); + } } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java b/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java index 04589414..0803dd64 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java @@ -146,6 +146,13 @@ public interface IRedisCatchStorage { */ void removeStream(MediaServerItem mediaServerItem, String type, String app, String streamId); + + /** + * 移除流信息从redis + * @param mediaServerId + */ + void removeStream(String mediaServerId, String type); + /** * 开始下载录像时存入 * @param streamInfo diff --git a/src/main/java/com/genersoft/iot/vmp/storager/IVideoManagerStorager.java b/src/main/java/com/genersoft/iot/vmp/storager/IVideoManagerStorager.java index 4c988d9a..18f30a55 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/IVideoManagerStorager.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/IVideoManagerStorager.java @@ -422,4 +422,5 @@ public interface IVideoManagerStorager { * @return */ StreamProxyItem getStreamProxyByAppAndStream(String app, String streamId); + } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/GbStreamMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/GbStreamMapper.java index 84c04a16..f1599255 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/dao/GbStreamMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/GbStreamMapper.java @@ -60,4 +60,9 @@ public interface GbStreamMapper { @Select("SELECT gs.*, pgs.platformId FROM gb_stream gs LEFT JOIN platform_gb_stream pgs ON gs.app = pgs.app AND gs.stream = pgs.stream WHERE mediaServerId=#{mediaServerId} ") List selectAllByMediaServerId(String mediaServerId); + + @Update("UPDATE gb_stream " + + "SET status=${status} " + + "WHERE mediaServerId=#{mediaServerId} ") + void updateStatusByMediaServerId(String mediaServerId, boolean status); } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/MediaServerMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/MediaServerMapper.java index aaa18459..4e09e797 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/dao/MediaServerMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/MediaServerMapper.java @@ -36,7 +36,8 @@ public interface MediaServerMapper { "recordAssistPort, " + "defaultServer, " + "createTime, " + - "updateTime" + + "updateTime, " + + "hookAliveInterval" + ") VALUES " + "(" + "'${id}', " + @@ -60,7 +61,8 @@ public interface MediaServerMapper { "${recordAssistPort}, " + "${defaultServer}, " + "'${createTime}', " + - "'${updateTime}')") + "'${updateTime}', " + + "${hookAliveInterval})") int add(MediaServerItem mediaServerItem); @Update(value = {" "}) int update(MediaServerItem mediaServerItem); @@ -108,6 +111,7 @@ public interface MediaServerMapper { ", sendRtpPortRange='${sendRtpPortRange}'" + ", secret='${secret}'" + ", recordAssistPort=${recordAssistPort}" + + ", hookAliveInterval=${hookAliveInterval}" + "WHERE ip='${ip}' and httpPort=${httpPort}"+ " "}) int updateByHostAndPort(MediaServerItem mediaServerItem); diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/StreamProxyMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/StreamProxyMapper.java index 11753f7e..82520ec4 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/dao/StreamProxyMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/StreamProxyMapper.java @@ -51,4 +51,17 @@ public interface StreamProxyMapper { "LEFT JOIN gb_stream pgs on st.app = pgs.app AND st.stream = pgs.stream " + "WHERE st.enable=${enable} and st.mediaServerId = '${id}' order by st.createTime desc") List selectForEnableInMediaServer(String id, boolean enable); + + @Select("SELECT st.*, pgs.gbId, pgs.name, pgs.longitude, pgs.latitude FROM stream_proxy st " + + "LEFT JOIN gb_stream pgs on st.app = pgs.app AND st.stream = pgs.stream " + + "WHERE st.mediaServerId = '${id}' order by st.createTime desc") + List selectInMediaServer(String id); + + @Update("UPDATE stream_proxy " + + "SET enable=#{status} " + + "WHERE mediaServerId=#{mediaServerId}") + void updateStatus(boolean status, String mediaServerId); + + @Delete("DELETE FROM stream_proxy WHERE mediaServerId=#{mediaServerId}") + void deleteAutoRemoveItemByMediaServerId(String mediaServerId); } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/StreamPushMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/StreamPushMapper.java index 41e4c44e..9fe6ebf2 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/dao/StreamPushMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/StreamPushMapper.java @@ -53,4 +53,7 @@ public interface StreamPushMapper { @Delete("DELETE FROM stream_push") void clear(); + @Delete("DELETE FROM stream_push WHERE mediaServerId=#{mediaServerId}") + void deleteWithoutGBId(String mediaServerId); + } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java index 984b994b..6adc05df 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java @@ -333,17 +333,14 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage { @Override public void addStream(MediaServerItem mediaServerItem, String type, String app, String streamId, StreamInfo streamInfo) { - String key = VideoManagerConstants.WVP_SERVER_STREAM_PUSH_PREFIX + userSetup.getServerId() + "_" + type + "_" + app + "_" + streamId + "_" + mediaServerItem.getId(); + String key = VideoManagerConstants.WVP_SERVER_STREAM_PREFIX + userSetup.getServerId() + "_" + type + "_" + app + "_" + streamId + "_" + mediaServerItem.getId(); redis.set(key, streamInfo); } @Override public void removeStream(MediaServerItem mediaServerItem, String type, String app, String streamId) { - String key = VideoManagerConstants.WVP_SERVER_STREAM_PUSH_PREFIX + userSetup.getServerId() + "_*_" + app + "_" + streamId + "_" + mediaServerItem.getId(); - List streams = redis.scan(key); - for (Object stream : streams) { - redis.del((String) stream); - } + String key = VideoManagerConstants.WVP_SERVER_STREAM_PREFIX + userSetup.getServerId() + "_" + type + "_" + app + "_" + streamId + "_" + mediaServerItem.getId(); + redis.del(key); } @Override @@ -359,4 +356,13 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage { JSONObject jsonObject = (JSONObject)redis.get(key); return JSONObject.toJavaObject(jsonObject, ThirdPartyGB.class); } + + @Override + public void removeStream(String mediaServerId, String type) { + String key = VideoManagerConstants.WVP_SERVER_STREAM_PREFIX + userSetup.getServerId() + "_" + type + "_*_*_" + mediaServerId; + List streams = redis.scan(key); + for (Object stream : streams) { + redis.del((String) stream); + } + } } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStoragerImpl.java b/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStoragerImpl.java index 35dda131..b7454a84 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStoragerImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStoragerImpl.java @@ -738,4 +738,5 @@ public class VideoManagerStoragerImpl implements IVideoManagerStorager { public StreamProxyItem getStreamProxyByAppAndStream(String app, String streamId) { return streamProxyMapper.selectOne(app, streamId); } + } diff --git a/src/main/resources/wvp.sqlite b/src/main/resources/wvp.sqlite index a1df1735628ecf8727738da22e638475b502962b..e6140f4e2fc2b4588c18105072cac0e99e82f108 100644 GIT binary patch delta 351 zcmZp8z|ru4V}i8ce+C8yWgzATVkQO#X8Vac#*F_rCd}t&G*~21uq=U5V6#F2y8tI6 zPyrVZb8Z$a_`pA@fMqkwhQINeOhF8S3|S2P>v+EL?Bt5$lIIlXxWK`{zMCzF&76&i z(U~EObw0~wrXv$4I&81@Vtm2K0yLy(;>5h|w(g8}%-au&Fm7j?oX=~!{hJIUx3Vs~ zEdwvZE(Uf1*8MDs%vTt8G0$e2&XmB&z%K{kspNG*_Fb3%j_gDr0x< zoml~0oe1>S8x>&Ix_xSb<_@rV4f4~z}~EjMJ+ delta 357 zcmZp8z|ru4V}i8c9|i^nWgzAPVkQO#X1$3z#*BY9Cd}t&lwBlHuvwvjU4WGlD9O25 zu;B~;6h=s3r40A`RQU_jB1SB(`~&N{aEGL#JyFg9~5Vln7+b` yQ5t0DZlImp8MmMHW3+B$;&xO?Kn?VzA}0}Ps{{EQ;oKfGrYZ~y?c?PkIN diff --git a/src/test/java/com/genersoft/iot/vmp/service/impl/RoleServiceImplTest.java b/src/test/java/com/genersoft/iot/vmp/service/impl/RoleServiceImplTest.java index 20209f25..13479fe2 100644 --- a/src/test/java/com/genersoft/iot/vmp/service/impl/RoleServiceImplTest.java +++ b/src/test/java/com/genersoft/iot/vmp/service/impl/RoleServiceImplTest.java @@ -25,7 +25,6 @@ class RoleServiceImplTest { void getAllUser() { List all = roleService.getAll(); Role roleById = roleService.getRoleById(1); - System.out.println(); } diff --git a/src/test/java/com/genersoft/iot/vmp/service/impl/UserServiceImplTest.java b/src/test/java/com/genersoft/iot/vmp/service/impl/UserServiceImplTest.java index 9e3c6724..41148706 100644 --- a/src/test/java/com/genersoft/iot/vmp/service/impl/UserServiceImplTest.java +++ b/src/test/java/com/genersoft/iot/vmp/service/impl/UserServiceImplTest.java @@ -27,10 +27,8 @@ class UserServiceImplTest { @org.junit.jupiter.api.Test void getAllUser() { List allUsers = userService.getAllUsers(); - System.out.println(userService.getAllUsers().size()); User admin = userService.getUser("admin", "21232f297a57a5a743894a0e4a801fc3"); User admin1 = userService.getUserByUsername("admin"); - System.out.println(12); }