Browse Source

适配zlm的hook保活

pull/276/head
648540858 3 years ago
parent
commit
9b1af8ef13
  1. 1
      sql/mysql.sql
  2. 4
      src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java
  3. 1
      src/main/java/com/genersoft/iot/vmp/conf/MediaConfig.java
  4. 2
      src/main/java/com/genersoft/iot/vmp/conf/SipConfig.java
  5. 9
      src/main/java/com/genersoft/iot/vmp/gb28181/event/EventPublisher.java
  6. 4
      src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
  7. 14
      src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRunner.java
  8. 11
      src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMServerConfig.java
  9. 11
      src/main/java/com/genersoft/iot/vmp/media/zlm/dto/MediaServerItem.java
  10. 25
      src/main/java/com/genersoft/iot/vmp/media/zlm/event/ZLMEventAbstract.java
  11. 10
      src/main/java/com/genersoft/iot/vmp/service/IMediaServerService.java
  12. 16
      src/main/java/com/genersoft/iot/vmp/service/IStreamProxyService.java
  13. 15
      src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java
  14. 93
      src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java
  15. 20
      src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java
  16. 15
      src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java
  17. 7
      src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java
  18. 1
      src/main/java/com/genersoft/iot/vmp/storager/IVideoManagerStorager.java
  19. 5
      src/main/java/com/genersoft/iot/vmp/storager/dao/GbStreamMapper.java
  20. 8
      src/main/java/com/genersoft/iot/vmp/storager/dao/MediaServerMapper.java
  21. 13
      src/main/java/com/genersoft/iot/vmp/storager/dao/StreamProxyMapper.java
  22. 3
      src/main/java/com/genersoft/iot/vmp/storager/dao/StreamPushMapper.java
  23. 18
      src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java
  24. 1
      src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStoragerImpl.java
  25. BIN
      src/main/resources/wvp.sqlite
  26. 1
      src/test/java/com/genersoft/iot/vmp/service/impl/RoleServiceImplTest.java
  27. 2
      src/test/java/com/genersoft/iot/vmp/service/impl/UserServiceImplTest.java

1
sql/mysql.sql

@ -148,6 +148,7 @@ create table media_server
defaultServer int not null, defaultServer int not null,
createTime varchar(50) not null, createTime varchar(50) not null,
updateTime varchar(50) not null, updateTime varchar(50) not null,
hookAliveInterval int not null,
constraint media_server_i constraint media_server_i
unique (ip, httpPort) unique (ip, httpPort)
); );

4
src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java

@ -10,10 +10,12 @@ public class VideoManagerConstants {
public static final String WVP_SERVER_PREFIX = "VMP_SIGNALLING_SERVER_INFO_"; public static final String WVP_SERVER_PREFIX = "VMP_SIGNALLING_SERVER_INFO_";
public static final String WVP_SERVER_STREAM_PUSH_PREFIX = "VMP_SIGNALLING_STREAM_"; public static final String WVP_SERVER_STREAM_PREFIX = "VMP_SIGNALLING_STREAM_";
public static final String MEDIA_SERVER_PREFIX = "VMP_MEDIA_SERVER_"; public static final String MEDIA_SERVER_PREFIX = "VMP_MEDIA_SERVER_";
public static final String MEDIA_SERVER_KEEPALIVE_PREFIX = "VMP_MEDIA_SERVER_KEEPALIVE_";
public static final String MEDIA_SERVERS_ONLINE_PREFIX = "VMP_MEDIA_ONLINE_SERVERS_"; public static final String MEDIA_SERVERS_ONLINE_PREFIX = "VMP_MEDIA_ONLINE_SERVERS_";
public static final String MEDIA_STREAM_PREFIX = "VMP_MEDIA_STREAM"; public static final String MEDIA_STREAM_PREFIX = "VMP_MEDIA_STREAM";

1
src/main/java/com/genersoft/iot/vmp/conf/MediaConfig.java

@ -203,6 +203,7 @@ public class MediaConfig{
mediaServerItem.setRtpPortRange(rtpPortRange); mediaServerItem.setRtpPortRange(rtpPortRange);
mediaServerItem.setSendRtpPortRange(sendRtpPortRange); mediaServerItem.setSendRtpPortRange(sendRtpPortRange);
mediaServerItem.setRecordAssistPort(recordAssistPort); mediaServerItem.setRecordAssistPort(recordAssistPort);
mediaServerItem.setHookAliveInterval(120);
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
mediaServerItem.setCreateTime(format.format(System.currentTimeMillis())); mediaServerItem.setCreateTime(format.format(System.currentTimeMillis()));

2
src/main/java/com/genersoft/iot/vmp/conf/SipConfig.java

@ -27,7 +27,7 @@ public class SipConfig {
Integer keepaliveTimeOut = 255; Integer keepaliveTimeOut = 255;
Integer registerTimeInterval = 60; Integer registerTimeInterval = 120;
private boolean alarm = false; private boolean alarm = false;

9
src/main/java/com/genersoft/iot/vmp/gb28181/event/EventPublisher.java

@ -1,15 +1,16 @@
package com.genersoft.iot.vmp.gb28181.event; package com.genersoft.iot.vmp.gb28181.event;
import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.event.offline.OfflineEvent;
import com.genersoft.iot.vmp.gb28181.event.platformKeepaliveExpire.PlatformKeepaliveExpireEvent; import com.genersoft.iot.vmp.gb28181.event.platformKeepaliveExpire.PlatformKeepaliveExpireEvent;
import com.genersoft.iot.vmp.gb28181.event.platformNotRegister.PlatformNotRegisterEvent; import com.genersoft.iot.vmp.gb28181.event.platformNotRegister.PlatformNotRegisterEvent;
import com.genersoft.iot.vmp.media.zlm.event.ZLMOfflineEvent;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationEventPublisher; import org.springframework.context.ApplicationEventPublisher;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import com.genersoft.iot.vmp.gb28181.bean.DeviceAlarm; import com.genersoft.iot.vmp.gb28181.bean.DeviceAlarm;
import com.genersoft.iot.vmp.gb28181.event.alarm.AlarmEvent; import com.genersoft.iot.vmp.gb28181.event.alarm.AlarmEvent;
import com.genersoft.iot.vmp.gb28181.event.offline.OfflineEvent;
import com.genersoft.iot.vmp.gb28181.event.online.OnlineEvent; import com.genersoft.iot.vmp.gb28181.event.online.OnlineEvent;
/** /**
@ -66,5 +67,11 @@ public class EventPublisher {
alarmEvent.setAlarmInfo(deviceAlarm); alarmEvent.setAlarmInfo(deviceAlarm);
applicationEventPublisher.publishEvent(alarmEvent); applicationEventPublisher.publishEvent(alarmEvent);
} }
public void zlmOfflineEventPublish(String mediaServerId){
ZLMOfflineEvent outEvent = new ZLMOfflineEvent(this);
outEvent.setMediaServerId(mediaServerId);
applicationEventPublisher.publishEvent(outEvent);
}
} }

4
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java

@ -359,8 +359,8 @@ public class ZLMHttpHookListener {
type = "PULL"; type = "PULL";
} }
} }
zlmMediaListManager.removeMedia( app, streamId); zlmMediaListManager.removeMedia(app, streamId);
redisCatchStorage.removeStream(mediaServerItem, OriginType.values()[item.getOriginType()].getType(), app, streamId); redisCatchStorage.removeStream(mediaServerItem, type, app, streamId);
} }
// 发送流变化redis消息 // 发送流变化redis消息

14
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRunner.java

@ -7,6 +7,7 @@ import com.genersoft.iot.vmp.conf.MediaConfig;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.service.IStreamProxyService; import com.genersoft.iot.vmp.service.IStreamProxyService;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
@ -39,6 +40,9 @@ public class ZLMRunner implements CommandLineRunner {
@Autowired @Autowired
private IMediaServerService mediaServerService; private IMediaServerService mediaServerService;
@Autowired
private IRedisCatchStorage redisCatchStorage;
@Autowired @Autowired
private MediaConfig mediaConfig; private MediaConfig mediaConfig;
@ -70,8 +74,14 @@ public class ZLMRunner implements CommandLineRunner {
} }
}); });
// TODO 订阅 zlm保活事件, 当zlm离线时做业务的处理 // 订阅 zlm保活事件, 当zlm离线时做业务的处理
hookSubscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_server_keepalive,null,
(MediaServerItem mediaServerItem, JSONObject response)->{
String mediaServerId = response.getString("mediaServerId");
if (mediaServerId !=null ) {
mediaServerService.updateMediaServerKeepalive(mediaServerId, response.getJSONObject("data"));
}
});
// 获取zlm信息 // 获取zlm信息
logger.info("等待默认zlm接入..."); logger.info("等待默认zlm接入...");

11
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMServerConfig.java

@ -65,6 +65,9 @@ public class ZLMServerConfig {
@JSONField(name = "hook.admin_params") @JSONField(name = "hook.admin_params")
private String hookAdminParams; private String hookAdminParams;
@JSONField(name = "hook.alive_interval")
private int hookAliveInterval;
@JSONField(name = "hook.enable") @JSONField(name = "hook.enable")
private String hookEnable; private String hookEnable;
@ -791,4 +794,12 @@ public class ZLMServerConfig {
public void setShellPhell(String shellPhell) { public void setShellPhell(String shellPhell) {
this.shellPhell = shellPhell; this.shellPhell = shellPhell;
} }
public int getHookAliveInterval() {
return hookAliveInterval;
}
public void setHookAliveInterval(int hookAliveInterval) {
this.hookAliveInterval = hookAliveInterval;
}
} }

11
src/main/java/com/genersoft/iot/vmp/media/zlm/dto/MediaServerItem.java

@ -39,6 +39,8 @@ public class MediaServerItem{
private int streamNoneReaderDelayMS; private int streamNoneReaderDelayMS;
private int hookAliveInterval;
private boolean rtpEnable; private boolean rtpEnable;
private boolean status; private boolean status;
@ -87,6 +89,7 @@ public class MediaServerItem{
autoConfig = true; // 默认值true; autoConfig = true; // 默认值true;
secret = zlmServerConfig.getApiSecret(); secret = zlmServerConfig.getApiSecret();
streamNoneReaderDelayMS = zlmServerConfig.getGeneralStreamNoneReaderDelayMS(); streamNoneReaderDelayMS = zlmServerConfig.getGeneralStreamNoneReaderDelayMS();
hookAliveInterval = zlmServerConfig.getHookAliveInterval();
rtpEnable = false; // 默认使用单端口;直到用户自己设置开启多端口 rtpEnable = false; // 默认使用单端口;直到用户自己设置开启多端口
rtpPortRange = "30000,30500"; // 默认使用30000,30500作为级联时发送流的端口号 rtpPortRange = "30000,30500"; // 默认使用30000,30500作为级联时发送流的端口号
sendRtpPortRange = "30000,30500"; // 默认使用30000,30500作为级联时发送流的端口号 sendRtpPortRange = "30000,30500"; // 默认使用30000,30500作为级联时发送流的端口号
@ -309,4 +312,12 @@ public class MediaServerItem{
public void setSendRtpPortRange(String sendRtpPortRange) { public void setSendRtpPortRange(String sendRtpPortRange) {
this.sendRtpPortRange = sendRtpPortRange; this.sendRtpPortRange = sendRtpPortRange;
} }
public int getHookAliveInterval() {
return hookAliveInterval;
}
public void setHookAliveInterval(int hookAliveInterval) {
this.hookAliveInterval = hookAliveInterval;
}
} }

25
src/main/java/com/genersoft/iot/vmp/media/zlm/event/ZLMEventAbstract.java

@ -0,0 +1,25 @@
package com.genersoft.iot.vmp.media.zlm.event;
import com.genersoft.iot.vmp.media.zlm.ZLMServerConfig;
import org.springframework.context.ApplicationEvent;
public abstract class ZLMEventAbstract extends ApplicationEvent {
private static final long serialVersionUID = 1L;
private String mediaServerId;
public ZLMEventAbstract(Object source) {
super(source);
}
public String getMediaServerId() {
return mediaServerId;
}
public void setMediaServerId(String mediaServerId) {
this.mediaServerId = mediaServerId;
}
}

10
src/main/java/com/genersoft/iot/vmp/service/IMediaServerService.java

@ -1,5 +1,6 @@
package com.genersoft.iot.vmp.service; package com.genersoft.iot.vmp.service;
import com.alibaba.fastjson.JSONObject;
import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.media.zlm.ZLMServerConfig; import com.genersoft.iot.vmp.media.zlm.ZLMServerConfig;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
@ -30,6 +31,13 @@ public interface IMediaServerService {
*/ */
void zlmServerOnline(ZLMServerConfig zlmServerConfig); void zlmServerOnline(ZLMServerConfig zlmServerConfig);
/**
* 节点离线
* @param mediaServerId
* @return
*/
void zlmServerOffline(String mediaServerId);
MediaServerItem getMediaServerForMinimumLoad(); MediaServerItem getMediaServerForMinimumLoad();
void setZLMConfig(MediaServerItem mediaServerItem); void setZLMConfig(MediaServerItem mediaServerItem);
@ -67,4 +75,6 @@ public interface IMediaServerService {
void delete(String id); void delete(String id);
MediaServerItem getDefaultMediaServer(); MediaServerItem getDefaultMediaServer();
void updateMediaServerKeepalive(String zlmServerConfig, JSONObject data);
} }

16
src/main/java/com/genersoft/iot/vmp/service/IStreamProxyService.java

@ -2,6 +2,7 @@ package com.genersoft.iot.vmp.service;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.media.zlm.ZLMServerConfig;
import com.genersoft.iot.vmp.media.zlm.dto.MediaItem; import com.genersoft.iot.vmp.media.zlm.dto.MediaItem;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem; import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem;
@ -73,4 +74,19 @@ public interface IStreamProxyService {
* @return * @return
*/ */
StreamProxyItem getStreamProxyByAppAndStream(String app, String streamId); StreamProxyItem getStreamProxyByAppAndStream(String app, String streamId);
/**
* 新的节点加入
* @param zlmServerConfig
* @return
*/
void zlmServerOnline(ZLMServerConfig zlmServerConfig);
/**
* 节点离线
* @param mediaServerId
* @return
*/
void zlmServerOffline(String mediaServerId);
} }

15
src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java

@ -1,6 +1,7 @@
package com.genersoft.iot.vmp.service; package com.genersoft.iot.vmp.service;
import com.genersoft.iot.vmp.gb28181.bean.GbStream; import com.genersoft.iot.vmp.gb28181.bean.GbStream;
import com.genersoft.iot.vmp.media.zlm.ZLMServerConfig;
import com.genersoft.iot.vmp.media.zlm.dto.MediaItem; import com.genersoft.iot.vmp.media.zlm.dto.MediaItem;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem; import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
@ -46,4 +47,18 @@ public interface IStreamPushService {
*/ */
boolean stop(String app, String streamId); boolean stop(String app, String streamId);
/**
* 新的节点加入
* @param zlmServerConfig
* @return
*/
void zlmServerOnline(ZLMServerConfig zlmServerConfig);
/**
* 节点离线
* @param mediaServerId
* @return
*/
void zlmServerOffline(String mediaServerId);
} }

93
src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java

@ -97,6 +97,7 @@ public class MediaServerServiceImpl implements IMediaServerService, CommandLineR
if (!redisUtil.hasKey(key)) { if (!redisUtil.hasKey(key)) {
redisUtil.set(key, mediaServerItem); redisUtil.set(key, mediaServerItem);
} }
} }
} }
@ -272,6 +273,7 @@ public class MediaServerServiceImpl implements IMediaServerService, CommandLineR
WVPResult<String> result = new WVPResult<>(); WVPResult<String> result = new WVPResult<>();
mediaServerItem.setCreateTime(this.format.format(System.currentTimeMillis())); mediaServerItem.setCreateTime(this.format.format(System.currentTimeMillis()));
mediaServerItem.setUpdateTime(this.format.format(System.currentTimeMillis())); mediaServerItem.setUpdateTime(this.format.format(System.currentTimeMillis()));
mediaServerItem.setHookAliveInterval(120);
JSONObject responseJSON = zlmresTfulUtils.getMediaServerConfig(mediaServerItem); JSONObject responseJSON = zlmresTfulUtils.getMediaServerConfig(mediaServerItem);
if (responseJSON != null) { if (responseJSON != null) {
JSONArray data = responseJSON.getJSONArray("data"); JSONArray data = responseJSON.getJSONArray("data");
@ -329,6 +331,7 @@ public class MediaServerServiceImpl implements IMediaServerService, CommandLineR
logger.warn("[未注册的zlm] 拒接接入:来自{}:{}", zlmServerConfig.getIp(),zlmServerConfig.getHttpPort() ); logger.warn("[未注册的zlm] 拒接接入:来自{}:{}", zlmServerConfig.getIp(),zlmServerConfig.getHttpPort() );
return; return;
} }
serverItem.setHookAliveInterval(zlmServerConfig.getHookAliveInterval());
if (serverItem.getHttpPort() == 0) { if (serverItem.getHttpPort() == 0) {
serverItem.setHttpPort(zlmServerConfig.getHttpPort()); serverItem.setHttpPort(zlmServerConfig.getHttpPort());
} }
@ -350,87 +353,31 @@ public class MediaServerServiceImpl implements IMediaServerService, CommandLineR
if (serverItem.getRtpProxyPort() == 0) { if (serverItem.getRtpProxyPort() == 0) {
serverItem.setRtpProxyPort(zlmServerConfig.getRtpProxyPort()); serverItem.setRtpProxyPort(zlmServerConfig.getRtpProxyPort());
} }
if (StringUtils.isEmpty(serverItem.getId())) {
serverItem.setId(zlmServerConfig.getGeneralMediaServerId());
}
serverItem.setStatus(true);
if (StringUtils.isEmpty(serverItem.getId())) { if (StringUtils.isEmpty(serverItem.getId())) {
serverItem.setId(zlmServerConfig.getGeneralMediaServerId()); serverItem.setId(zlmServerConfig.getGeneralMediaServerId());
mediaServerMapper.updateByHostAndPort(serverItem); mediaServerMapper.updateByHostAndPort(serverItem);
}else { }else {
mediaServerMapper.update(serverItem); mediaServerMapper.update(serverItem);
} }
if (redisUtil.get(VideoManagerConstants.MEDIA_SERVER_PREFIX + userSetup.getServerId() + "_" + serverItem.getId()) == null) { String key = VideoManagerConstants.MEDIA_SERVER_PREFIX + userSetup.getServerId() + "_" + serverItem.getId();
if (redisUtil.get(key) == null) {
SsrcConfig ssrcConfig = new SsrcConfig(serverItem.getId(), null, sipConfig.getDomain()); SsrcConfig ssrcConfig = new SsrcConfig(serverItem.getId(), null, sipConfig.getDomain());
serverItem.setSsrcConfig(ssrcConfig); serverItem.setSsrcConfig(ssrcConfig);
redisUtil.set(VideoManagerConstants.MEDIA_SERVER_PREFIX + userSetup.getServerId() + "_" + serverItem.getId(), serverItem); redisUtil.set(key, serverItem);
} }
serverItem.setStatus(true);
resetOnlineServerItem(serverItem); resetOnlineServerItem(serverItem);
updateMediaServerKeepalive(serverItem.getId(), null);
setZLMConfig(serverItem); setZLMConfig(serverItem);
}
// if (zlmServerConfig.getGeneralMediaServerId().equals(mediaConfig.getId()) @Override
// || (zlmServerConfig.getIp().equals(mediaConfig.getIp()) && zlmServerConfig.getHttpPort() == mediaConfig.getHttpPort())) { public void zlmServerOffline(String mediaServerId) {
// // 配置文件的zlm delete(mediaServerId);
// // 如果是配置文件中的zlm。 也就是默认zlm。 一切以配置文件内容为准
// // wvp互惠修改zlm的端口,需要自行配置。
// MediaServerItem serverItemFromConfig = mediaConfig.getMediaSerItem();
// serverItemFromConfig.setId(zlmServerConfig.getGeneralMediaServerId());
// if (mediaConfig.getHttpPort() == 0) {
// serverItemFromConfig.setHttpPort(zlmServerConfig.getHttpPort());
// }
// if (mediaConfig.getHttpSSlPort() == 0) {
// serverItemFromConfig.setHttpSSlPort(zlmServerConfig.getHttpSSLport());
// }
// if (mediaConfig.getRtmpPort() == 0) {
// serverItemFromConfig.setRtmpPort(zlmServerConfig.getRtmpPort());
// }
// if (mediaConfig.getRtmpSSlPort() == 0) {
// serverItemFromConfig.setRtmpSSlPort(zlmServerConfig.getRtmpSslPort());
// }
// if (mediaConfig.getRtspPort() == 0) {
// serverItemFromConfig.setRtspPort(zlmServerConfig.getRtspPort());
// }
// if (mediaConfig.getRtspSSLPort() == 0) {
// serverItemFromConfig.setRtspSSLPort(zlmServerConfig.getRtspSSlport());
// }
// if (mediaConfig.getRtpProxyPort() == 0) {
// serverItemFromConfig.setRtpProxyPort(zlmServerConfig.getRtpProxyPort());
// }
// if (serverItem != null){
// mediaServerMapper.delDefault();
// mediaServerMapper.add(serverItemFromConfig);
// String key = VideoManagerConstants.MEDIA_SERVER_PREFIX + serverItemFromConfig.getId();
// MediaServerItem serverItemInRedis = (MediaServerItem)redisUtil.get(key);
// if (serverItemInRedis != null) {
// serverItemFromConfig.setSsrcConfig(serverItemInRedis.getSsrcConfig());
// }else {
// serverItemFromConfig.setSsrcConfig(new SsrcConfig(serverItemFromConfig.getId(), null, sipConfig.getDomain()));
// }
// redisUtil.set(key, serverItemFromConfig);
// }else {
// String key = VideoManagerConstants.MEDIA_SERVER_PREFIX + serverItemFromConfig.getId();
// serverItemFromConfig.setSsrcConfig(new SsrcConfig(serverItemFromConfig.getId(), null, sipConfig.getDomain()));
// redisUtil.set(key, serverItemFromConfig);
// mediaServerMapper.add(serverItemFromConfig);
// }
// resetOnlineServerItem(serverItemFromConfig);
// setZLMConfig(serverItemFromConfig);
// }
// 移除未添加的zlm的接入,所有的zlm必须先添加后才可以加入使用
// else {
// String now = this.format.format(System.currentTimeMillis());
// if (serverItem == null){
// // 一个新的zlm接入wvp
// serverItem = new MediaServerItem(zlmServerConfig, sipConfig.getIp());
// serverItem.setCreateTime(now);
// serverItem.setUpdateTime(now);
// String key = VideoManagerConstants.MEDIA_SERVER_PREFIX + serverItem.getId();
// serverItem.setSsrcConfig(new SsrcConfig(serverItem.getId(), null, sipConfig.getDomain()));
// redisUtil.set(key, serverItem);
// // 存入数据库
// mediaServerMapper.add(serverItem);
// setZLMConfig(serverItem);
// }
// resetOnlineServerItem(serverItem);
// }
} }
@Override @Override
@ -611,9 +558,17 @@ public class MediaServerServiceImpl implements IMediaServerService, CommandLineR
@Override @Override
public void delete(String id) { public void delete(String id) {
redisUtil.zRemove(VideoManagerConstants.MEDIA_SERVERS_ONLINE_PREFIX + userSetup.getServerId() + "_", id); redisUtil.zRemove(VideoManagerConstants.MEDIA_SERVERS_ONLINE_PREFIX + userSetup.getServerId(), id);
String key = VideoManagerConstants.MEDIA_SERVER_PREFIX + userSetup.getServerId() + "_" + id; String key = VideoManagerConstants.MEDIA_SERVER_PREFIX + userSetup.getServerId() + "_" + id;
redisUtil.del(key); redisUtil.del(key);
mediaServerMapper.delOne(id); mediaServerMapper.delOne(id);
} }
@Override
public void updateMediaServerKeepalive(String mediaServerId, JSONObject data) {
MediaServerItem mediaServerItem = getOne(mediaServerId);
String key = VideoManagerConstants.MEDIA_SERVER_KEEPALIVE_PREFIX + userSetup.getServerId() + "_" + mediaServerId;
int hookAliveInterval = mediaServerItem.getHookAliveInterval() + 2;
redisUtil.set(key, data, hookAliveInterval);
}
} }

20
src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java

@ -5,6 +5,7 @@ import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.gb28181.bean.GbStream; import com.genersoft.iot.vmp.gb28181.bean.GbStream;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils; import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils;
import com.genersoft.iot.vmp.media.zlm.ZLMServerConfig;
import com.genersoft.iot.vmp.media.zlm.dto.MediaItem; import com.genersoft.iot.vmp.media.zlm.dto.MediaItem;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem; import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem;
@ -50,6 +51,9 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
@Autowired @Autowired
private StreamProxyMapper streamProxyMapper; private StreamProxyMapper streamProxyMapper;
@Autowired
private IRedisCatchStorage redisCatchStorage;
@Autowired @Autowired
private GbStreamMapper gbStreamMapper; private GbStreamMapper gbStreamMapper;
@ -249,4 +253,20 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
public StreamProxyItem getStreamProxyByAppAndStream(String app, String streamId) { public StreamProxyItem getStreamProxyByAppAndStream(String app, String streamId) {
return videoManagerStorager.getStreamProxyByAppAndStream(app, streamId); return videoManagerStorager.getStreamProxyByAppAndStream(app, streamId);
} }
@Override
public void zlmServerOnline(ZLMServerConfig zlmServerConfig) {
}
@Override
public void zlmServerOffline(String mediaServerId) {
// 移除开启了无人观看自动移除的流
streamProxyMapper.deleteAutoRemoveItemByMediaServerId(mediaServerId);
// 其他的流设置未启用
streamProxyMapper.updateStatus(false, mediaServerId);
// 移除redis内流的信息
redisCatchStorage.removeStream(mediaServerId, "PULL");
}
} }

15
src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java

@ -5,6 +5,7 @@ import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.TypeReference; import com.alibaba.fastjson.TypeReference;
import com.genersoft.iot.vmp.gb28181.bean.GbStream; import com.genersoft.iot.vmp.gb28181.bean.GbStream;
import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils; import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils;
import com.genersoft.iot.vmp.media.zlm.ZLMServerConfig;
import com.genersoft.iot.vmp.media.zlm.dto.MediaItem; import com.genersoft.iot.vmp.media.zlm.dto.MediaItem;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem; import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
@ -135,4 +136,18 @@ public class StreamPushServiceImpl implements IStreamPushService {
return true; return true;
} }
@Override
public void zlmServerOnline(ZLMServerConfig zlmServerConfig) {
// 似乎没啥需要做的
}
@Override
public void zlmServerOffline(String mediaServerId) {
// 移除没有serverId的推流
streamPushMapper.deleteWithoutGBId(mediaServerId);
// 其他的流设置未启用
gbStreamMapper.updateStatusByMediaServerId(mediaServerId, false);
// 移除redis内流的信息
redisCatchStorage.removeStream(mediaServerId, "PUSH");
}
} }

7
src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java

@ -146,6 +146,13 @@ public interface IRedisCatchStorage {
*/ */
void removeStream(MediaServerItem mediaServerItem, String type, String app, String streamId); void removeStream(MediaServerItem mediaServerItem, String type, String app, String streamId);
/**
* 移除流信息从redis
* @param mediaServerId
*/
void removeStream(String mediaServerId, String type);
/** /**
* 开始下载录像时存入 * 开始下载录像时存入
* @param streamInfo * @param streamInfo

1
src/main/java/com/genersoft/iot/vmp/storager/IVideoManagerStorager.java

@ -422,4 +422,5 @@ public interface IVideoManagerStorager {
* @return * @return
*/ */
StreamProxyItem getStreamProxyByAppAndStream(String app, String streamId); StreamProxyItem getStreamProxyByAppAndStream(String app, String streamId);
} }

5
src/main/java/com/genersoft/iot/vmp/storager/dao/GbStreamMapper.java

@ -60,4 +60,9 @@ public interface GbStreamMapper {
@Select("SELECT gs.*, pgs.platformId FROM gb_stream gs LEFT JOIN platform_gb_stream pgs ON gs.app = pgs.app AND gs.stream = pgs.stream WHERE mediaServerId=#{mediaServerId} ") @Select("SELECT gs.*, pgs.platformId FROM gb_stream gs LEFT JOIN platform_gb_stream pgs ON gs.app = pgs.app AND gs.stream = pgs.stream WHERE mediaServerId=#{mediaServerId} ")
List<GbStream> selectAllByMediaServerId(String mediaServerId); List<GbStream> selectAllByMediaServerId(String mediaServerId);
@Update("UPDATE gb_stream " +
"SET status=${status} " +
"WHERE mediaServerId=#{mediaServerId} ")
void updateStatusByMediaServerId(String mediaServerId, boolean status);
} }

8
src/main/java/com/genersoft/iot/vmp/storager/dao/MediaServerMapper.java

@ -36,7 +36,8 @@ public interface MediaServerMapper {
"recordAssistPort, " + "recordAssistPort, " +
"defaultServer, " + "defaultServer, " +
"createTime, " + "createTime, " +
"updateTime" + "updateTime, " +
"hookAliveInterval" +
") VALUES " + ") VALUES " +
"(" + "(" +
"'${id}', " + "'${id}', " +
@ -60,7 +61,8 @@ public interface MediaServerMapper {
"${recordAssistPort}, " + "${recordAssistPort}, " +
"${defaultServer}, " + "${defaultServer}, " +
"'${createTime}', " + "'${createTime}', " +
"'${updateTime}')") "'${updateTime}', " +
"${hookAliveInterval})")
int add(MediaServerItem mediaServerItem); int add(MediaServerItem mediaServerItem);
@Update(value = {" <script>" + @Update(value = {" <script>" +
@ -84,6 +86,7 @@ public interface MediaServerMapper {
"<if test=\"sendRtpPortRange != null\">, sendRtpPortRange='${sendRtpPortRange}'</if>" + "<if test=\"sendRtpPortRange != null\">, sendRtpPortRange='${sendRtpPortRange}'</if>" +
"<if test=\"secret != null\">, secret='${secret}'</if>" + "<if test=\"secret != null\">, secret='${secret}'</if>" +
"<if test=\"recordAssistPort != null\">, recordAssistPort=${recordAssistPort}</if>" + "<if test=\"recordAssistPort != null\">, recordAssistPort=${recordAssistPort}</if>" +
"<if test=\"hookAliveInterval != null\">, hookAliveInterval=${hookAliveInterval}</if>" +
"WHERE id='${id}'"+ "WHERE id='${id}'"+
" </script>"}) " </script>"})
int update(MediaServerItem mediaServerItem); int update(MediaServerItem mediaServerItem);
@ -108,6 +111,7 @@ public interface MediaServerMapper {
"<if test=\"sendRtpPortRange != null\">, sendRtpPortRange='${sendRtpPortRange}'</if>" + "<if test=\"sendRtpPortRange != null\">, sendRtpPortRange='${sendRtpPortRange}'</if>" +
"<if test=\"secret != null\">, secret='${secret}'</if>" + "<if test=\"secret != null\">, secret='${secret}'</if>" +
"<if test=\"recordAssistPort != null\">, recordAssistPort=${recordAssistPort}</if>" + "<if test=\"recordAssistPort != null\">, recordAssistPort=${recordAssistPort}</if>" +
"<if test=\"hookAliveInterval != null\">, hookAliveInterval=${hookAliveInterval}</if>" +
"WHERE ip='${ip}' and httpPort=${httpPort}"+ "WHERE ip='${ip}' and httpPort=${httpPort}"+
" </script>"}) " </script>"})
int updateByHostAndPort(MediaServerItem mediaServerItem); int updateByHostAndPort(MediaServerItem mediaServerItem);

13
src/main/java/com/genersoft/iot/vmp/storager/dao/StreamProxyMapper.java

@ -51,4 +51,17 @@ public interface StreamProxyMapper {
"LEFT JOIN gb_stream pgs on st.app = pgs.app AND st.stream = pgs.stream " + "LEFT JOIN gb_stream pgs on st.app = pgs.app AND st.stream = pgs.stream " +
"WHERE st.enable=${enable} and st.mediaServerId = '${id}' order by st.createTime desc") "WHERE st.enable=${enable} and st.mediaServerId = '${id}' order by st.createTime desc")
List<StreamProxyItem> selectForEnableInMediaServer(String id, boolean enable); List<StreamProxyItem> selectForEnableInMediaServer(String id, boolean enable);
@Select("SELECT st.*, pgs.gbId, pgs.name, pgs.longitude, pgs.latitude FROM stream_proxy st " +
"LEFT JOIN gb_stream pgs on st.app = pgs.app AND st.stream = pgs.stream " +
"WHERE st.mediaServerId = '${id}' order by st.createTime desc")
List<StreamProxyItem> selectInMediaServer(String id);
@Update("UPDATE stream_proxy " +
"SET enable=#{status} " +
"WHERE mediaServerId=#{mediaServerId}")
void updateStatus(boolean status, String mediaServerId);
@Delete("DELETE FROM stream_proxy WHERE mediaServerId=#{mediaServerId}")
void deleteAutoRemoveItemByMediaServerId(String mediaServerId);
} }

3
src/main/java/com/genersoft/iot/vmp/storager/dao/StreamPushMapper.java

@ -53,4 +53,7 @@ public interface StreamPushMapper {
@Delete("DELETE FROM stream_push") @Delete("DELETE FROM stream_push")
void clear(); void clear();
@Delete("DELETE FROM stream_push WHERE mediaServerId=#{mediaServerId}")
void deleteWithoutGBId(String mediaServerId);
} }

18
src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java

@ -333,17 +333,14 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage {
@Override @Override
public void addStream(MediaServerItem mediaServerItem, String type, String app, String streamId, StreamInfo streamInfo) { public void addStream(MediaServerItem mediaServerItem, String type, String app, String streamId, StreamInfo streamInfo) {
String key = VideoManagerConstants.WVP_SERVER_STREAM_PUSH_PREFIX + userSetup.getServerId() + "_" + type + "_" + app + "_" + streamId + "_" + mediaServerItem.getId(); String key = VideoManagerConstants.WVP_SERVER_STREAM_PREFIX + userSetup.getServerId() + "_" + type + "_" + app + "_" + streamId + "_" + mediaServerItem.getId();
redis.set(key, streamInfo); redis.set(key, streamInfo);
} }
@Override @Override
public void removeStream(MediaServerItem mediaServerItem, String type, String app, String streamId) { public void removeStream(MediaServerItem mediaServerItem, String type, String app, String streamId) {
String key = VideoManagerConstants.WVP_SERVER_STREAM_PUSH_PREFIX + userSetup.getServerId() + "_*_" + app + "_" + streamId + "_" + mediaServerItem.getId(); String key = VideoManagerConstants.WVP_SERVER_STREAM_PREFIX + userSetup.getServerId() + "_" + type + "_" + app + "_" + streamId + "_" + mediaServerItem.getId();
List<Object> streams = redis.scan(key); redis.del(key);
for (Object stream : streams) {
redis.del((String) stream);
}
} }
@Override @Override
@ -359,4 +356,13 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage {
JSONObject jsonObject = (JSONObject)redis.get(key); JSONObject jsonObject = (JSONObject)redis.get(key);
return JSONObject.toJavaObject(jsonObject, ThirdPartyGB.class); return JSONObject.toJavaObject(jsonObject, ThirdPartyGB.class);
} }
@Override
public void removeStream(String mediaServerId, String type) {
String key = VideoManagerConstants.WVP_SERVER_STREAM_PREFIX + userSetup.getServerId() + "_" + type + "_*_*_" + mediaServerId;
List<Object> streams = redis.scan(key);
for (Object stream : streams) {
redis.del((String) stream);
}
}
} }

1
src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStoragerImpl.java

@ -738,4 +738,5 @@ public class VideoManagerStoragerImpl implements IVideoManagerStorager {
public StreamProxyItem getStreamProxyByAppAndStream(String app, String streamId) { public StreamProxyItem getStreamProxyByAppAndStream(String app, String streamId) {
return streamProxyMapper.selectOne(app, streamId); return streamProxyMapper.selectOne(app, streamId);
} }
} }

BIN
src/main/resources/wvp.sqlite

Binary file not shown.

1
src/test/java/com/genersoft/iot/vmp/service/impl/RoleServiceImplTest.java

@ -25,7 +25,6 @@ class RoleServiceImplTest {
void getAllUser() { void getAllUser() {
List<Role> all = roleService.getAll(); List<Role> all = roleService.getAll();
Role roleById = roleService.getRoleById(1); Role roleById = roleService.getRoleById(1);
System.out.println();
} }

2
src/test/java/com/genersoft/iot/vmp/service/impl/UserServiceImplTest.java

@ -27,10 +27,8 @@ class UserServiceImplTest {
@org.junit.jupiter.api.Test @org.junit.jupiter.api.Test
void getAllUser() { void getAllUser() {
List<User> allUsers = userService.getAllUsers(); List<User> allUsers = userService.getAllUsers();
System.out.println(userService.getAllUsers().size());
User admin = userService.getUser("admin", "21232f297a57a5a743894a0e4a801fc3"); User admin = userService.getUser("admin", "21232f297a57a5a743894a0e4a801fc3");
User admin1 = userService.getUserByUsername("admin"); User admin1 = userService.getUserByUsername("admin");
System.out.println(12);
} }

Loading…
Cancel
Save