diff --git a/sql/mysql.sql b/sql/mysql.sql index 65693750..26ea595b 100644 --- a/sql/mysql.sql +++ b/sql/mysql.sql @@ -444,6 +444,7 @@ CREATE TABLE `stream_proxy` ( `enable_hls` bit(1) DEFAULT NULL, `enable_mp4` bit(1) DEFAULT NULL, `enable` bit(1) NOT NULL, + `status` bit(1) NOT NULL, `enable_remove_none_reader` bit(1) NOT NULL, `createTime` varchar(50) NOT NULL, PRIMARY KEY (`app`,`stream`) diff --git a/sql/update.sql b/sql/update.sql new file mode 100644 index 00000000..d6386ea0 --- /dev/null +++ b/sql/update.sql @@ -0,0 +1 @@ +ALTER TABLE stream_proxy ADD status bit(1) not null; \ No newline at end of file diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderProvider.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderProvider.java index d65e6305..ee237c0c 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderProvider.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderProvider.java @@ -225,6 +225,7 @@ public class SIPRequestHeaderProvider { public Request createInfoRequest(Device device, StreamInfo streamInfo, String content, Long cseq) throws PeerUnavailableException, ParseException, InvalidArgumentException { Request request = null; + if (streamInfo == null) return null; Dialog dialog = streamSession.getDialog(streamInfo.getDeviceID(), streamInfo.getChannelId()); SipURI requestLine = sipFactory.createAddressFactory().createSipURI(device.getDeviceId(), diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java index cb4cbd74..29f8c058 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java @@ -1589,6 +1589,9 @@ public class SIPCommander implements ISIPCommander { content.append("CSeq: " + cseq + "\r\n"); content.append("PauseTime: now\r\n"); Request request = headerProvider.createInfoRequest(device, streamInfo, content.toString(), cseq); + if (request == null) { + return; + } logger.info(request.toString()); ClientTransaction clientTransaction = null; if ("TCP".equals(device.getTransport())) { @@ -1617,6 +1620,7 @@ public class SIPCommander implements ISIPCommander { content.append("CSeq: " + cseq + "\r\n"); content.append("Range: npt=now-\r\n"); Request request = headerProvider.createInfoRequest(device, streamInfo, content.toString(), cseq); + if (request == null) return; logger.info(request.toString()); ClientTransaction clientTransaction = null; if ("TCP".equals(device.getTransport())) { @@ -1645,6 +1649,7 @@ public class SIPCommander implements ISIPCommander { content.append("Range: npt=" + Math.abs(seekTime) + "-\r\n"); Request request = headerProvider.createInfoRequest(device, streamInfo, content.toString(), cseq); + if (request == null) return; logger.info(request.toString()); ClientTransaction clientTransaction = null; if ("TCP".equals(device.getTransport())) { @@ -1672,6 +1677,7 @@ public class SIPCommander implements ISIPCommander { content.append("CSeq: " + cseq + "\r\n"); content.append("Scale: " + String.format("%.1f",speed) + "\r\n"); Request request = headerProvider.createInfoRequest(device, streamInfo, content.toString(), cseq); + if (request == null) return; logger.info(request.toString()); ClientTransaction clientTransaction = null; if ("TCP".equals(device.getTransport())) { diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java index 5919619f..6e6b7b50 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java @@ -332,6 +332,11 @@ public class ZLMHttpHookListener { }else { mediaServerService.removeCount(mediaServerId); } + if (item.getOriginType() == OriginType.PULL.ordinal() + || item.getOriginType() == OriginType.FFMPEG_PULL.ordinal()) { + // 设置拉流代理上线/离线 + streamProxyService.updateStatus(regist, app, streamId); + } if ("rtp".equals(app) && !regist ) { StreamInfo streamInfo = redisCatchStorage.queryPlayByStreamId(streamId); if (streamInfo!=null){ @@ -355,6 +360,7 @@ public class ZLMHttpHookListener { || item.getOriginType() == OriginType.RTC_PUSH.ordinal() ) { streamPushItem = zlmMediaListManager.addPush(item); } + List gbStreams = new ArrayList<>(); if (streamPushItem == null || streamPushItem.getGbId() == null) { GbStream gbStream = storager.getGbStream(app, streamId); diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/StreamProxyItem.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/StreamProxyItem.java index 38e44a98..39685b0b 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/StreamProxyItem.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/StreamProxyItem.java @@ -151,4 +151,5 @@ public class StreamProxyItem extends GbStream { public void setEnable_remove_none_reader(boolean enable_remove_none_reader) { this.enable_remove_none_reader = enable_remove_none_reader; } + } diff --git a/src/main/java/com/genersoft/iot/vmp/service/IStreamProxyService.java b/src/main/java/com/genersoft/iot/vmp/service/IStreamProxyService.java index 40b2c9a6..ac100006 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/IStreamProxyService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/IStreamProxyService.java @@ -55,6 +55,16 @@ public interface IStreamProxyService { */ boolean start(String app, String stream); + /** + * 更新状态 + * @param status 状态 + * @param app + * @param stream + */ + int updateStatus(boolean status, String app, String stream); + + + /** * 停用用视频代理 * @param app diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java index cd5f8ab0..ccb2520a 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java @@ -14,8 +14,11 @@ import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils; import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; import com.genersoft.iot.vmp.media.zlm.ZLMServerConfig; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; +import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem; import com.genersoft.iot.vmp.service.IMediaServerService; +import com.genersoft.iot.vmp.service.IStreamProxyService; import com.genersoft.iot.vmp.service.bean.SSRCInfo; +import com.genersoft.iot.vmp.storager.IVideoManagerStorager; import com.genersoft.iot.vmp.storager.dao.MediaServerMapper; import com.genersoft.iot.vmp.utils.redis.JedisUtil; import com.genersoft.iot.vmp.utils.redis.RedisUtil; @@ -70,6 +73,12 @@ public class MediaServerServiceImpl implements IMediaServerService, CommandLineR @Autowired private RedisUtil redisUtil; + @Autowired + private IVideoManagerStorager storager; + + @Autowired + private IStreamProxyService streamProxyService; + @Autowired private EventPublisher publisher; @@ -231,6 +240,7 @@ public class MediaServerServiceImpl implements IMediaServerService, CommandLineR public List getAllOnline() { String key = VideoManagerConstants.MEDIA_SERVERS_ONLINE_PREFIX + userSetup.getServerId(); Set mediaServerIdSet = redisUtil.zRevRange(key, 0, -1); + List result = new ArrayList<>(); if (mediaServerIdSet != null && mediaServerIdSet.size() > 0) { for (String mediaServerId : mediaServerIdSet) { @@ -238,6 +248,7 @@ public class MediaServerServiceImpl implements IMediaServerService, CommandLineR result.add((MediaServerItem) redisUtil.get(serverKey)); } } + Collections.reverse(result); return result; } @@ -374,6 +385,7 @@ public class MediaServerServiceImpl implements IMediaServerService, CommandLineR resetOnlineServerItem(serverItem); updateMediaServerKeepalive(serverItem.getId(), null); setZLMConfig(serverItem, "0".equals(zlmServerConfig.getHookEnable())); + publisher.zlmOnlineEventPublish(serverItem.getId()); } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java index 13277c21..afac6eba 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java @@ -58,6 +58,9 @@ public class StreamProxyServiceImpl implements IStreamProxyService { @Autowired private IRedisCatchStorage redisCatchStorage; + @Autowired + private IVideoManagerStorager storager; + @Autowired private UserSetup userSetup; @@ -278,7 +281,27 @@ public class StreamProxyServiceImpl implements IStreamProxyService { @Override public void zlmServerOnline(String mediaServerId) { - zlmServerOffline(mediaServerId); + // 移除开启了无人观看自动移除的流 + List streamProxyItemList = streamProxyMapper.selecAutoRemoveItemByMediaServerId(mediaServerId); + if (streamProxyItemList.size() > 0) { + gbStreamMapper.batchDel(streamProxyItemList); + } + streamProxyMapper.deleteAutoRemoveItemByMediaServerId(mediaServerId); + + // 恢复流代理, 只查找这个这个流媒体 + List streamProxyListForEnable = storager.getStreamProxyListForEnableInMediaServer( + mediaServerId, true, false); + for (StreamProxyItem streamProxyDto : streamProxyListForEnable) { + logger.info("恢复流代理," + streamProxyDto.getApp() + "/" + streamProxyDto.getStream()); + JSONObject jsonObject = addStreamProxyToZlm(streamProxyDto); + if (jsonObject == null) { + // 设置为离线 + logger.info("恢复流代理失败" + streamProxyDto.getApp() + "/" + streamProxyDto.getStream()); + updateStatus(false, streamProxyDto.getApp(), streamProxyDto.getStream()); + }else { + updateStatus(true, streamProxyDto.getApp(), streamProxyDto.getStream()); + } + } } @Override @@ -289,8 +312,8 @@ public class StreamProxyServiceImpl implements IStreamProxyService { gbStreamMapper.batchDel(streamProxyItemList); } streamProxyMapper.deleteAutoRemoveItemByMediaServerId(mediaServerId); - // 其他的流设置未启用 - streamProxyMapper.updateStatus(false, mediaServerId); + // 其他的流设置离线 + streamProxyMapper.updateStatusByMediaServerId(false, mediaServerId); String type = "PULL"; // 发送redis消息 @@ -314,4 +337,9 @@ public class StreamProxyServiceImpl implements IStreamProxyService { public void clean() { } + + @Override + public int updateStatus(boolean status, String app, String stream) { + return streamProxyMapper.updateStatus(status, app, stream); + } } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/IVideoManagerStorager.java b/src/main/java/com/genersoft/iot/vmp/storager/IVideoManagerStorager.java index 038fe2b8..5e2745a6 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/IVideoManagerStorager.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/IVideoManagerStorager.java @@ -398,10 +398,11 @@ public interface IVideoManagerStorager { /** * 根据媒体ID获取启用/不启用的代理列表 * @param id 媒体ID - * @param b 启用/不启用 + * @param enable 启用/不启用 + * @param status 状态 * @return */ - List getStreamProxyListForEnableInMediaServer(String id, boolean b); + List getStreamProxyListForEnableInMediaServer(String id, boolean enable, boolean status); /** * 根据通道ID获取其所在设备 diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/StreamProxyMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/StreamProxyMapper.java index b6e1ba1a..63cd425d 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/dao/StreamProxyMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/StreamProxyMapper.java @@ -11,9 +11,9 @@ import java.util.List; public interface StreamProxyMapper { @Insert("INSERT INTO stream_proxy (type, app, stream,mediaServerId, url, src_url, dst_url, " + - "timeout_ms, ffmpeg_cmd_key, rtp_type, enable_hls, enable_mp4, enable, enable_remove_none_reader, createTime) VALUES" + + "timeout_ms, ffmpeg_cmd_key, rtp_type, enable_hls, enable_mp4, enable, status, enable_remove_none_reader, createTime) VALUES" + "('${type}','${app}', '${stream}', '${mediaServerId}','${url}', '${src_url}', '${dst_url}', " + - "'${timeout_ms}', '${ffmpeg_cmd_key}', '${rtp_type}', ${enable_hls}, ${enable_mp4}, ${enable}, " + + "'${timeout_ms}', '${ffmpeg_cmd_key}', '${rtp_type}', ${enable_hls}, ${enable_mp4}, ${enable}, ${status}, " + "${enable_remove_none_reader}, '${createTime}' )") int add(StreamProxyItem streamProxyDto); @@ -30,6 +30,7 @@ public interface StreamProxyMapper { "rtp_type=#{rtp_type}, " + "enable_hls=#{enable_hls}, " + "enable=#{enable}, " + + "status=#{status}, " + "enable_remove_none_reader=#{enable_remove_none_reader}, " + "enable_mp4=#{enable_mp4} " + "WHERE app=#{app} AND stream=#{stream}") @@ -49,8 +50,8 @@ public interface StreamProxyMapper { @Select("SELECT st.*, pgs.gbId, pgs.name, pgs.longitude, pgs.latitude FROM stream_proxy st " + "LEFT JOIN gb_stream pgs on st.app = pgs.app AND st.stream = pgs.stream " + - "WHERE st.enable=${enable} and st.mediaServerId = '${id}' order by st.createTime desc") - List selectForEnableInMediaServer(String id, boolean enable); + "WHERE st.enable=${enable} and st.status=${status} and st.mediaServerId = '${id}' order by st.createTime desc") + List selectForEnableInMediaServer(String id, boolean enable, boolean status); @Select("SELECT st.*, pgs.gbId, pgs.name, pgs.longitude, pgs.latitude FROM stream_proxy st " + "LEFT JOIN gb_stream pgs on st.app = pgs.app AND st.stream = pgs.stream " + @@ -58,9 +59,14 @@ public interface StreamProxyMapper { List selectInMediaServer(String id); @Update("UPDATE stream_proxy " + - "SET enable=#{status} " + + "SET status=#{status} " + "WHERE mediaServerId=#{mediaServerId}") - void updateStatus(boolean status, String mediaServerId); + void updateStatusByMediaServerId(boolean status, String mediaServerId); + + @Update("UPDATE stream_proxy " + + "SET status=${status} " + + "WHERE app=#{app} AND stream=#{stream}") + int updateStatus(boolean status, String app, String stream); @Delete("DELETE FROM stream_proxy WHERE enable_remove_none_reader=true AND mediaServerId=#{mediaServerId}") void deleteAutoRemoveItemByMediaServerId(String mediaServerId); diff --git a/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStoragerImpl.java b/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStoragerImpl.java index 816a749d..f43f92f6 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStoragerImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStoragerImpl.java @@ -860,8 +860,8 @@ public class VideoManagerStoragerImpl implements IVideoManagerStorager { } @Override - public List getStreamProxyListForEnableInMediaServer(String id, boolean enable) { - return streamProxyMapper.selectForEnableInMediaServer(id, enable); + public List getStreamProxyListForEnableInMediaServer(String id, boolean enable, boolean status) { + return streamProxyMapper.selectForEnableInMediaServer(id, enable, status); } @@ -1021,7 +1021,8 @@ public class VideoManagerStoragerImpl implements IVideoManagerStorager { deviceChannel.setParental(1); deviceChannel.setParentId(catalog.getParentId()); deviceChannel.setRegisterWay(1); - deviceChannel.setCivilCode(sipConfig.getDomain()); + // 行政区划应该是Domain的前八位 + deviceChannel.setCivilCode(sipConfig.getDomain().substring(0, sipConfig.getDomain().length() - 2)); deviceChannel.setModel("live"); deviceChannel.setOwner("wvp-pro"); deviceChannel.setSecrecy("0"); diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/ptz/PtzController.java b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/ptz/PtzController.java index 68acde33..95d1aefa 100644 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/ptz/PtzController.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/ptz/PtzController.java @@ -152,10 +152,10 @@ public class PtzController { msg.setData("获取设备预置位超时"); resultHolder.invokeResult(msg); }); - resultHolder.put(key, uuid, result); if (resultHolder.exist(key, null)) { return result; } + resultHolder.put(key, uuid, result); cmder.presetQuery(device, channelId, event -> { RequestMessage msg = new RequestMessage(); msg.setId(uuid); diff --git a/web_src/package-lock.json b/web_src/package-lock.json index 9a0ce7f6..dffa57e8 100644 --- a/web_src/package-lock.json +++ b/web_src/package-lock.json @@ -261,8 +261,8 @@ }, "async-validator": { "version": "1.8.5", - "resolved": "https://registry.nlark.com/async-validator/download/async-validator-1.8.5.tgz", - "integrity": "sha1-3D4I7B/Q3dtn5ghC8CwM0c7G1/A=", + "resolved": "https://registry.npmjs.org/async-validator/-/async-validator-1.8.5.tgz", + "integrity": "sha512-tXBM+1m056MAX0E8TL2iCjg8WvSyXu0Zc8LNtYqrVeyoL3+esHRZ4SieE9fKQyyU09uONjnMEjrNBMqT0mbvmA==", "requires": { "babel-runtime": "6.x" } @@ -3091,8 +3091,8 @@ }, "deepmerge": { "version": "1.5.2", - "resolved": "https://registry.npm.taobao.org/deepmerge/download/deepmerge-1.5.2.tgz?cache=0&sync_timestamp=1572279556265&other_urls=https%3A%2F%2Fregistry.npm.taobao.org%2Fdeepmerge%2Fdownload%2Fdeepmerge-1.5.2.tgz", - "integrity": "sha1-EEmdhohEza1P7ghC34x/bwyVp1M=" + "resolved": "https://registry.npmjs.org/deepmerge/-/deepmerge-1.5.2.tgz", + "integrity": "sha512-95k0GDqvBjZavkuvzx/YqVLv/6YYa17fz6ILMSf7neqQITCPbnfEnQvEgMPNjH4kgobe7+WIL0yJEHku+H3qtQ==" }, "define-properties": { "version": "1.1.3", @@ -3381,9 +3381,9 @@ "dev": true }, "element-ui": { - "version": "2.15.1", - "resolved": "https://registry.npm.taobao.org/element-ui/download/element-ui-2.15.1.tgz?cache=0&sync_timestamp=1614082623756&other_urls=https%3A%2F%2Fregistry.npm.taobao.org%2Felement-ui%2Fdownload%2Felement-ui-2.15.1.tgz", - "integrity": "sha1-raAKpuMsAndKLndWPdhGaPgTzf8=", + "version": "2.15.6", + "resolved": "https://registry.npmjs.org/element-ui/-/element-ui-2.15.6.tgz", + "integrity": "sha512-rcYXEKd/j2G0AgficAOk1Zd1AsnHRkhmrK4yLHmNOiimU2JfsywgfKUjMoFuT6pQx0luhovj8lFjpE4Fnt58Iw==", "requires": { "async-validator": "~1.8.1", "babel-helper-vue-jsx-merge-props": "^2.0.0", @@ -5990,7 +5990,7 @@ }, "normalize-wheel": { "version": "1.0.1", - "resolved": "https://registry.npm.taobao.org/normalize-wheel/download/normalize-wheel-1.0.1.tgz", + "resolved": "https://registry.npmjs.org/normalize-wheel/-/normalize-wheel-1.0.1.tgz", "integrity": "sha1-rsiGr/2wRQcNhWRH32Ls+GFG7EU=" }, "npm-run-path": { @@ -9380,8 +9380,8 @@ }, "resize-observer-polyfill": { "version": "1.5.1", - "resolved": "https://registry.nlark.com/resize-observer-polyfill/download/resize-observer-polyfill-1.5.1.tgz", - "integrity": "sha1-DpAg3T0hAkRY1OvSfiPkAmmBBGQ=" + "resolved": "https://registry.npmjs.org/resize-observer-polyfill/-/resize-observer-polyfill-1.5.1.tgz", + "integrity": "sha512-LwZrotdHOo12nQuZlHEmtuXdqGoOD0OhaxopaNFxWzInpEgaLWoVuAMbTzixuosCx2nEG58ngzW3vxdWoxIgdg==" }, "resolve": { "version": "1.17.0", @@ -10316,8 +10316,8 @@ }, "throttle-debounce": { "version": "1.1.0", - "resolved": "https://registry.npm.taobao.org/throttle-debounce/download/throttle-debounce-1.1.0.tgz", - "integrity": "sha1-UYU9o3vmihVctugns1FKPEIuic0=" + "resolved": "https://registry.npmjs.org/throttle-debounce/-/throttle-debounce-1.1.0.tgz", + "integrity": "sha512-XH8UiPCQcWNuk2LYePibW/4qL97+ZQ1AN3FNXwZRBNPPowo/NRU5fAlDCSNBJIYCKbioZfuYtMhG4quqoJhVzg==" }, "through2": { "version": "2.0.5", diff --git a/web_src/package.json b/web_src/package.json index 50252668..20fc3a2b 100644 --- a/web_src/package.json +++ b/web_src/package.json @@ -13,7 +13,7 @@ "axios": "^0.24.0", "core-js": "^2.6.5", "echarts": "^4.9.0", - "element-ui": "^2.15.1", + "element-ui": "^2.15.6", "fingerprintjs2": "^2.1.2", "moment": "^2.29.1", "postcss-pxtorem": "^5.1.1", diff --git a/web_src/src/components/StreamProxyList.vue b/web_src/src/components/StreamProxyList.vue index 64e994a8..9ecb9330 100644 --- a/web_src/src/components/StreamProxyList.vue +++ b/web_src/src/components/StreamProxyList.vue @@ -42,6 +42,14 @@ + + +