diff --git a/src/main/java/com/genersoft/iot/vmp/common/StreamInfo.java b/src/main/java/com/genersoft/iot/vmp/common/StreamInfo.java index c5e2a24a..e16c1add 100644 --- a/src/main/java/com/genersoft/iot/vmp/common/StreamInfo.java +++ b/src/main/java/com/genersoft/iot/vmp/common/StreamInfo.java @@ -30,7 +30,7 @@ public class StreamInfo { private String rtsps; private String rtc; private String mediaServerId; - private JSONArray tracks; + private Object tracks; public static class TransactionInfo{ public String callId; @@ -105,11 +105,11 @@ public class StreamInfo { this.rtsp = rtsp; } - public JSONArray getTracks() { + public Object getTracks() { return tracks; } - public void setTracks(JSONArray tracks) { + public void setTracks(Object tracks) { this.tracks = tracks; } diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java index e35f0592..16802cbb 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java @@ -3,11 +3,13 @@ package com.genersoft.iot.vmp.media.zlm; import java.util.List; import java.util.UUID; +import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONArray; import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.conf.MediaConfig; import com.genersoft.iot.vmp.conf.UserSetup; import com.genersoft.iot.vmp.gb28181.bean.Device; +import com.genersoft.iot.vmp.media.zlm.dto.MediaItem; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.service.IMediaService; @@ -258,12 +260,13 @@ public class ZLMHttpHookListener { */ @ResponseBody @PostMapping(value = "/on_stream_changed", produces = "application/json;charset=UTF-8") - public ResponseEntity onStreamChanged(@RequestBody JSONObject json){ + public ResponseEntity onStreamChanged(@RequestBody MediaItem item){ if (logger.isDebugEnabled()) { - logger.debug("ZLM HOOK on_stream_changed API调用,参数:" + json.toString()); + logger.debug("ZLM HOOK on_stream_changed API调用,参数:" + JSONObject.toJSONString(item)); } - String mediaServerId = json.getString("mediaServerId"); + String mediaServerId = item.getMediaServerId(); + JSONObject json = (JSONObject) JSON.toJSON(item); ZLMHttpHookSubscribe.Event subscribe = this.subscribe.getSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, json); if (subscribe != null ) { MediaServerItem mediaInfo = mediaServerService.getOne(mediaServerId); @@ -272,13 +275,12 @@ public class ZLMHttpHookListener { } } - // 流消失移除redis play - String app = json.getString("app"); - String streamId = json.getString("stream"); - String schema = json.getString("schema"); - JSONArray tracks = json.getJSONArray("tracks"); - boolean regist = json.getBoolean("regist"); + String app = item.getApp(); + String streamId = item.getStream(); + String schema = item.getSchema(); + List tracks = item.getTracks(); + boolean regist = item.isRegist(); if (tracks != null) { logger.info("[stream: " + streamId + "] on_stream_changed->>" + schema); } @@ -298,24 +300,34 @@ public class ZLMHttpHookListener { redisCatchStorage.stopPlayback(streamInfo); } }else { - if (!"rtp".equals(app) ){ - // 发送流变化redis消息 - JSONObject jsonObject = new JSONObject(); - jsonObject.put("serverId", userSetup.getServerId()); - jsonObject.put("app", app); - jsonObject.put("stream", streamId); - jsonObject.put("register", regist); - jsonObject.put("mediaServerId", mediaServerId); - redisCatchStorage.sendStreamChangeMsg(jsonObject); + if (!"rtp".equals(app)){ + + boolean pushChange = false; MediaServerItem mediaServerItem = mediaServerService.getOne(mediaServerId); if (regist) { - zlmMediaListManager.addMedia(mediaServerItem, app, streamId); - StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream(mediaServerItem, app, streamId, tracks); - redisCatchStorage.addStream(mediaServerItem, app, streamId, streamInfo); + if ((item.getOriginType() == 1 || item.getOriginType() == 2 || item.getOriginType() == 8)) { + pushChange = true; + zlmMediaListManager.addMedia(item); + StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream(mediaServerItem, app, streamId, tracks); + redisCatchStorage.addPushStream(mediaServerItem, app, streamId, streamInfo); + } }else { - zlmMediaListManager.removeMedia( app, streamId); - redisCatchStorage.removeStream(mediaServerItem, app, streamId); + int result = zlmMediaListManager.removeMedia( app, streamId); + redisCatchStorage.removePushStream(mediaServerItem, app, streamId); + if (result > 0) { + pushChange = true; + } + } + if(pushChange) { + // 发送流变化redis消息 + JSONObject jsonObject = new JSONObject(); + jsonObject.put("serverId", userSetup.getServerId()); + jsonObject.put("app", app); + jsonObject.put("stream", streamId); + jsonObject.put("register", regist); + jsonObject.put("mediaServerId", mediaServerId); + redisCatchStorage.sendStreamChangeMsg(jsonObject); } } } diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaListManager.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaListManager.java index ea1123cb..49fe098e 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaListManager.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaListManager.java @@ -1,6 +1,7 @@ package com.genersoft.iot.vmp.media.zlm; import com.alibaba.fastjson.JSONObject; +import com.genersoft.iot.vmp.media.zlm.dto.MediaItem; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem; import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem; @@ -87,6 +88,10 @@ public class ZLMMediaListManager { updateMedia(mediaServerItem, app, streamId); } + public void addMedia(MediaItem mediaItem) { + storager.updateMedia(streamPushService.transform(mediaItem)); + } + public void updateMedia(MediaServerItem mediaServerItem, String app, String streamId) { //使用异步更新推流 @@ -113,14 +118,16 @@ public class ZLMMediaListManager { } - public void removeMedia(String app, String streamId) { + public int removeMedia(String app, String streamId) { // 查找是否关联了国标, 关联了不删除, 置为离线 StreamProxyItem streamProxyItem = gbStreamMapper.selectOne(app, streamId); + int result = 0; if (streamProxyItem == null) { - storager.removeMedia(app, streamId); + result = storager.removeMedia(app, streamId); }else { - storager.mediaOutline(app, streamId); + result =storager.mediaOutline(app, streamId); } + return result; } // public void clearAllSessions() { diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/MediaItem.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/MediaItem.java index 6d9ceee9..4685d1fd 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/MediaItem.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/MediaItem.java @@ -4,6 +4,11 @@ import java.util.List; public class MediaItem { + /** + * 注册/注销 + */ + private boolean regist; + /** * 应用名 */ @@ -53,6 +58,11 @@ public class MediaItem { */ private String originUrl; + /** + * 服务器id + */ + private String mediaServerId; + /** * GMT unix系统时间戳,单位秒 */ @@ -78,6 +88,14 @@ public class MediaItem { */ private String vhost; + public boolean isRegist() { + return regist; + } + + public void setRegist(boolean regist) { + this.regist = regist; + } + /** * 是否是docker部署, docker部署不会自动更新zlm使用的端口,需要自己手动修改 */ @@ -376,4 +394,12 @@ public class MediaItem { public void setDocker(boolean docker) { this.docker = docker; } + + public String getMediaServerId() { + return mediaServerId; + } + + public void setMediaServerId(String mediaServerId) { + this.mediaServerId = mediaServerId; + } } diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/StreamProxyItem.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/StreamProxyItem.java index 40ba215f..38e44a98 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/StreamProxyItem.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/StreamProxyItem.java @@ -17,6 +17,7 @@ public class StreamProxyItem extends GbStream { private boolean enable; private boolean enable_hls; private boolean enable_mp4; + private boolean enable_remove_none_reader; // 无人观看时删除 private String platformGbId; private String createTime; @@ -142,4 +143,12 @@ public class StreamProxyItem extends GbStream { public void setCreateTime(String createTime) { this.createTime = createTime; } + + public boolean isEnable_remove_none_reader() { + return enable_remove_none_reader; + } + + public void setEnable_remove_none_reader(boolean enable_remove_none_reader) { + this.enable_remove_none_reader = enable_remove_none_reader; + } } diff --git a/src/main/java/com/genersoft/iot/vmp/service/IMediaService.java b/src/main/java/com/genersoft/iot/vmp/service/IMediaService.java index 54f8315e..8c05b85f 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/IMediaService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/IMediaService.java @@ -32,7 +32,7 @@ public interface IMediaService { * @param stream * @return */ - StreamInfo getStreamInfoByAppAndStream(MediaServerItem mediaServerItem, String app, String stream, JSONArray tracks); + StreamInfo getStreamInfoByAppAndStream(MediaServerItem mediaServerItem, String app, String stream, Object tracks); /** * 根据应用名和流ID获取播放地址, 只是地址拼接,返回的ip使用远程访问ip,适用与zlm与wvp在一台主机的情况 @@ -40,5 +40,5 @@ public interface IMediaService { * @param stream * @return */ - StreamInfo getStreamInfoByAppAndStream(MediaServerItem mediaInfo, String app, String stream, JSONArray tracks, String addr); + StreamInfo getStreamInfoByAppAndStream(MediaServerItem mediaInfo, String app, String stream, Object tracks, String addr); } diff --git a/src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java b/src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java index 899da98b..94e7d691 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java @@ -1,6 +1,7 @@ package com.genersoft.iot.vmp.service; import com.genersoft.iot.vmp.gb28181.bean.GbStream; +import com.genersoft.iot.vmp.media.zlm.dto.MediaItem; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem; import com.github.pagehelper.PageInfo; @@ -32,4 +33,6 @@ public interface IStreamPushService { * @return */ PageInfo getPushList(Integer page, Integer count); + + StreamPushItem transform(MediaItem item); } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServiceImpl.java index a261d247..9e5221bc 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServiceImpl.java @@ -5,6 +5,7 @@ import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils; +import com.genersoft.iot.vmp.media.zlm.dto.MediaItem; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; @@ -31,7 +32,7 @@ public class MediaServiceImpl implements IMediaService { @Override - public StreamInfo getStreamInfoByAppAndStream(MediaServerItem mediaInfo, String app, String stream, JSONArray tracks) { + public StreamInfo getStreamInfoByAppAndStream(MediaServerItem mediaInfo, String app, String stream, Object tracks) { return getStreamInfoByAppAndStream(mediaInfo, app, stream, tracks, null); } @@ -69,7 +70,7 @@ public class MediaServiceImpl implements IMediaService { } @Override - public StreamInfo getStreamInfoByAppAndStream(MediaServerItem mediaInfo, String app, String stream, JSONArray tracks, String addr) { + public StreamInfo getStreamInfoByAppAndStream(MediaServerItem mediaInfo, String app, String stream, Object tracks, String addr) { StreamInfo streamInfoResult = new StreamInfo(); streamInfoResult.setStreamId(stream); streamInfoResult.setApp(app); diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java index aabf35f3..28207212 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java @@ -51,33 +51,38 @@ public class StreamPushServiceImpl implements IStreamPushService { for (MediaItem item : mediaItems) { // 不保存国标推理以及拉流代理的流 - if (item.getOriginType() == 3 || item.getOriginType() == 4 || item.getOriginType() == 5) { - continue; - } - String key = item.getApp() + "_" + item.getStream(); - StreamPushItem streamPushItem = result.get(key); - if (streamPushItem == null) { - streamPushItem = new StreamPushItem(); - streamPushItem.setApp(item.getApp()); - streamPushItem.setMediaServerId(mediaServerItem.getId()); - streamPushItem.setStream(item.getStream()); - streamPushItem.setAliveSecond(item.getAliveSecond()); - streamPushItem.setCreateStamp(item.getCreateStamp()); - streamPushItem.setOriginSock(item.getOriginSock()); - streamPushItem.setTotalReaderCount(item.getTotalReaderCount()); - streamPushItem.setOriginType(item.getOriginType()); - streamPushItem.setOriginTypeStr(item.getOriginTypeStr()); - streamPushItem.setOriginUrl(item.getOriginUrl()); - streamPushItem.setCreateStamp(item.getCreateStamp()); - streamPushItem.setAliveSecond(item.getAliveSecond()); - streamPushItem.setStatus(true); - streamPushItem.setVhost(item.getVhost()); - result.put(key, streamPushItem); + if (item.getOriginType() == 1 || item.getOriginType() == 2 || item.getOriginType() == 8) { + String key = item.getApp() + "_" + item.getStream(); + StreamPushItem streamPushItem = result.get(key); + if (streamPushItem == null) { + streamPushItem = transform(item); + result.put(key, streamPushItem); + } } + } return new ArrayList<>(result.values()); } + @Override + public StreamPushItem transform(MediaItem item) { + StreamPushItem streamPushItem = new StreamPushItem(); + streamPushItem.setApp(item.getApp()); + streamPushItem.setMediaServerId(item.getMediaServerId()); + streamPushItem.setStream(item.getStream()); + streamPushItem.setAliveSecond(item.getAliveSecond()); + streamPushItem.setCreateStamp(item.getCreateStamp()); + streamPushItem.setOriginSock(item.getOriginSock()); + streamPushItem.setTotalReaderCount(item.getTotalReaderCount()); + streamPushItem.setOriginType(item.getOriginType()); + streamPushItem.setOriginTypeStr(item.getOriginTypeStr()); + streamPushItem.setOriginUrl(item.getOriginUrl()); + streamPushItem.setCreateStamp(item.getCreateStamp()); + streamPushItem.setAliveSecond(item.getAliveSecond()); + streamPushItem.setStatus(true); + streamPushItem.setVhost(item.getVhost()); + return streamPushItem; + } @Override public PageInfo getPushList(Integer page, Integer count) { diff --git a/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java b/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java index 69bcd84a..25743e13 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java @@ -135,7 +135,7 @@ public interface IRedisCatchStorage { * @param app * @param streamId */ - void addStream(MediaServerItem mediaServerItem, String app, String streamId, StreamInfo streamInfo); + void addPushStream(MediaServerItem mediaServerItem, String app, String streamId, StreamInfo streamInfo); /** * 移除流信息从redis @@ -143,5 +143,5 @@ public interface IRedisCatchStorage { * @param app * @param streamId */ - void removeStream(MediaServerItem mediaServerItem, String app, String streamId); + void removePushStream(MediaServerItem mediaServerItem, String app, String streamId); } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/IVideoManagerStorager.java b/src/main/java/com/genersoft/iot/vmp/storager/IVideoManagerStorager.java index 931530ae..3f4b73fa 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/IVideoManagerStorager.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/IVideoManagerStorager.java @@ -353,7 +353,7 @@ public interface IVideoManagerStorager { * @param app * @param stream */ - void removeMedia(String app, String stream); + int removeMedia(String app, String stream); /** @@ -366,7 +366,7 @@ public interface IVideoManagerStorager { * @param app * @param streamId */ - void mediaOutline(String app, String streamId); + int mediaOutline(String app, String streamId); /** * 设置平台在线/离线 diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/GbStreamMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/GbStreamMapper.java index 230afbc9..1fdc6d85 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/dao/GbStreamMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/GbStreamMapper.java @@ -53,7 +53,7 @@ public interface GbStreamMapper { @Update("UPDATE gb_stream " + "SET status=${status} " + "WHERE app=#{app} AND stream=#{stream}") - void setStatus(String app, String stream, boolean status); + int setStatus(String app, String stream, boolean status); @Select("SELECT gs.*, pgs.platformId FROM gb_stream gs LEFT JOIN platform_gb_stream pgs ON gs.app = pgs.app AND gs.stream = pgs.stream WHERE mediaServerId=#{mediaServerId} ") List selectAllByMediaServerId(String mediaServerId); diff --git a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java index 0d0ede7d..0d5b98d6 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java @@ -308,13 +308,13 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage { } @Override - public void addStream(MediaServerItem mediaServerItem, String app, String streamId, StreamInfo streamInfo) { + public void addPushStream(MediaServerItem mediaServerItem, String app, String streamId, StreamInfo streamInfo) { String key = VideoManagerConstants.WVP_SERVER_STREAM_PUSH_PREFIX + app + "_" + streamId + "_" + mediaServerItem.getId(); redis.set(key, streamInfo); } @Override - public void removeStream(MediaServerItem mediaServerItem, String app, String streamId) { + public void removePushStream(MediaServerItem mediaServerItem, String app, String streamId) { String key = VideoManagerConstants.WVP_SERVER_STREAM_PUSH_PREFIX + app + "_" + streamId + "_" + mediaServerItem.getId(); redis.del(key); } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStoragerImpl.java b/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStoragerImpl.java index 0e24942d..37905eca 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStoragerImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStoragerImpl.java @@ -605,8 +605,8 @@ public class VideoManagerStoragerImpl implements IVideoManagerStorager { } @Override - public void removeMedia(String app, String stream) { - streamPushMapper.del(app, stream); + public int removeMedia(String app, String stream) { + return streamPushMapper.del(app, stream); } @Override @@ -615,8 +615,8 @@ public class VideoManagerStoragerImpl implements IVideoManagerStorager { } @Override - public void mediaOutline(String app, String streamId) { - gbStreamMapper.setStatus(app, streamId, false); + public int mediaOutline(String app, String streamId) { + return gbStreamMapper.setStatus(app, streamId, false); } @Override