diff --git a/sql/mysql.sql b/sql/mysql.sql index 13e1ccdf..d45f6de5 100644 --- a/sql/mysql.sql +++ b/sql/mysql.sql @@ -415,6 +415,7 @@ DROP TABLE IF EXISTS `stream_proxy`; CREATE TABLE `stream_proxy` ( `id` int NOT NULL AUTO_INCREMENT, `type` varchar(50) COLLATE utf8mb4_general_ci NOT NULL, + `name` varchar(255) COLLATE utf8mb4_general_ci NOT NULL, `app` varchar(255) COLLATE utf8mb4_general_ci NOT NULL, `stream` varchar(255) COLLATE utf8mb4_general_ci NOT NULL, `url` varchar(255) COLLATE utf8mb4_general_ci DEFAULT NULL, diff --git a/src/main/java/com/genersoft/iot/vmp/common/StreamInfo.java b/src/main/java/com/genersoft/iot/vmp/common/StreamInfo.java index e16c1add..06263840 100644 --- a/src/main/java/com/genersoft/iot/vmp/common/StreamInfo.java +++ b/src/main/java/com/genersoft/iot/vmp/common/StreamInfo.java @@ -5,7 +5,7 @@ import com.alibaba.fastjson.JSONArray; public class StreamInfo { private String app; - private String streamId; + private String stream; private String deviceID; private String channelId; private String flv; @@ -153,12 +153,12 @@ public class StreamInfo { this.ws_ts = ws_ts; } - public String getStreamId() { - return streamId; + public String getStream() { + return stream; } - public void setStreamId(String streamId) { - this.streamId = streamId; + public void setStream(String stream) { + this.stream = stream; } public String getRtc() { diff --git a/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java b/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java index 923e411e..0d638435 100644 --- a/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java +++ b/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java @@ -29,6 +29,7 @@ public class VideoManagerConstants { // 此处多了一个_,暂不修改 public static final String PLAYER_PREFIX = "VMP_PLAYER_"; public static final String PLAY_BLACK_PREFIX = "VMP_PLAYBACK_"; + public static final String PLAY_INFO_PREFIX = "VMP_PLAY_INFO_"; public static final String DOWNLOAD_PREFIX = "VMP_DOWNLOAD_"; diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SsrcTransaction.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SsrcTransaction.java index 9700f8a5..c2dedec1 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SsrcTransaction.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SsrcTransaction.java @@ -4,11 +4,12 @@ public class SsrcTransaction { private String deviceId; private String channelId; - private String ssrc; - private String streamId; + private String callId; + private String stream; private byte[] transaction; private byte[] dialog; private String mediaServerId; + private String ssrc; public String getDeviceId() { return deviceId; @@ -26,20 +27,20 @@ public class SsrcTransaction { this.channelId = channelId; } - public String getSsrc() { - return ssrc; + public String getCallId() { + return callId; } - public void setSsrc(String ssrc) { - this.ssrc = ssrc; + public void setCallId(String callId) { + this.callId = callId; } - public String getStreamId() { - return streamId; + public String getStream() { + return stream; } - public void setStreamId(String streamId) { - this.streamId = streamId; + public void setStream(String stream) { + this.stream = stream; } public byte[] getTransaction() { @@ -65,4 +66,12 @@ public class SsrcTransaction { public void setMediaServerId(String mediaServerId) { this.mediaServerId = mediaServerId; } + + public String getSsrc() { + return ssrc; + } + + public void setSsrc(String ssrc) { + this.ssrc = ssrc; + } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/SipSubscribe.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/SipSubscribe.java index a00ac630..49c52d5e 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/SipSubscribe.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/event/SipSubscribe.java @@ -23,24 +23,36 @@ public class SipSubscribe { private Map okSubscribes = new ConcurrentHashMap<>(); - private Map timeSubscribes = new ConcurrentHashMap<>(); + private Map okTimeSubscribes = new ConcurrentHashMap<>(); + private Map errorTimeSubscribes = new ConcurrentHashMap<>(); -// @Scheduled(cron="*/5 * * * * ?") //每五秒执行一次 + // @Scheduled(cron="*/5 * * * * ?") //每五秒执行一次 // @Scheduled(fixedRate= 100 * 60 * 60 ) - @Scheduled(cron="0 0 * * * ?") //每小时执行一次, 每个整点 + @Scheduled(cron="0 0/5 * * * ?") //每5分钟执行一次 public void execute(){ logger.info("[定时任务] 清理过期的订阅信息"); Calendar calendar = Calendar.getInstance(); calendar.setTime(new Date()); - calendar.set(Calendar.HOUR, calendar.get(Calendar.HOUR) - 1); - for (String key : timeSubscribes.keySet()) { - if (timeSubscribes.get(key).before(calendar.getTime())){ - logger.info("[定时任务] 清理过期的订阅信息: {}", key); - errorSubscribes.remove(key); + calendar.set(Calendar.MINUTE, calendar.get(Calendar.MINUTE) - 5); + + for (String key : okTimeSubscribes.keySet()) { + if (okTimeSubscribes.get(key).before(calendar.getTime())){ +// logger.info("[定时任务] 清理过期的订阅信息: {}", key); okSubscribes.remove(key); - timeSubscribes.remove(key); + okTimeSubscribes.remove(key); + } + } + for (String key : errorTimeSubscribes.keySet()) { + if (errorTimeSubscribes.get(key).before(calendar.getTime())){ +// logger.info("[定时任务] 清理过期的订阅信息: {}", key); + errorSubscribes.remove(key); + errorTimeSubscribes.remove(key); } } + logger.info("okTimeSubscribes.size:{}",okTimeSubscribes.size()); + logger.info("okSubscribes.size:{}",okSubscribes.size()); + logger.info("errorTimeSubscribes.size:{}",errorTimeSubscribes.size()); + logger.info("errorSubscribes.size:{}",errorSubscribes.size()); } public interface Event { @@ -105,12 +117,12 @@ public class SipSubscribe { public void addErrorSubscribe(String key, SipSubscribe.Event event) { errorSubscribes.put(key, event); - timeSubscribes.put(key, new Date()); + errorTimeSubscribes.put(key, new Date()); } public void addOkSubscribe(String key, SipSubscribe.Event event) { okSubscribes.put(key, event); - timeSubscribes.put(key, new Date()); + okTimeSubscribes.put(key, new Date()); } public SipSubscribe.Event getErrorSubscribe(String key) { @@ -119,7 +131,7 @@ public class SipSubscribe { public void removeErrorSubscribe(String key) { errorSubscribes.remove(key); - timeSubscribes.remove(key); + errorTimeSubscribes.remove(key); } public SipSubscribe.Event getOkSubscribe(String key) { @@ -128,7 +140,7 @@ public class SipSubscribe { public void removeOkSubscribe(String key) { okSubscribes.remove(key); - timeSubscribes.remove(key); + okTimeSubscribes.remove(key); } public int getErrorSubscribesSize(){ return errorSubscribes.size(); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/session/VideoStreamSessionManager.java b/src/main/java/com/genersoft/iot/vmp/gb28181/session/VideoStreamSessionManager.java index d41b04da..3e9f28aa 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/session/VideoStreamSessionManager.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/session/VideoStreamSessionManager.java @@ -14,6 +14,7 @@ import com.genersoft.iot.vmp.utils.redis.RedisUtil; import gov.nist.javax.sip.stack.SIPDialog; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; +import org.springframework.util.StringUtils; /** * @description:视频流session管理器,管理视频预览、预览回放的通信句柄 @@ -29,39 +30,55 @@ public class VideoStreamSessionManager { @Autowired private UserSetup userSetup; - public void put(String deviceId, String channelId ,String ssrc, String streamId, String mediaServerId, ClientTransaction transaction){ + /** + * 添加一个点播/回放的事务信息 + * 后续可以通过流Id/callID + * @param deviceId 设备ID + * @param channelId 通道ID + * @param callId 一次请求的CallID + * @param stream 流名称 + * @param mediaServerId 所使用的流媒体ID + * @param transaction 事务 + */ + public void put(String deviceId, String channelId, String callId, String stream, String ssrc, String mediaServerId, ClientTransaction transaction){ SsrcTransaction ssrcTransaction = new SsrcTransaction(); ssrcTransaction.setDeviceId(deviceId); ssrcTransaction.setChannelId(channelId); - ssrcTransaction.setStreamId(streamId); + ssrcTransaction.setStream(stream); byte[] transactionByteArray = SerializeUtils.serialize(transaction); ssrcTransaction.setTransaction(transactionByteArray); + ssrcTransaction.setCallId(callId); ssrcTransaction.setSsrc(ssrc); ssrcTransaction.setMediaServerId(mediaServerId); - redisUtil.set(VideoManagerConstants.MEDIA_TRANSACTION_USED_PREFIX + userSetup.getServerId() + "_" + deviceId + "_" + channelId, ssrcTransaction); + redisUtil.set(VideoManagerConstants.MEDIA_TRANSACTION_USED_PREFIX + userSetup.getServerId() + + "_" + deviceId + "_" + channelId + "_" + callId + "_" + stream, ssrcTransaction); + redisUtil.set(VideoManagerConstants.MEDIA_TRANSACTION_USED_PREFIX + userSetup.getServerId() + + "_" + deviceId + "_" + channelId + "_" + callId + "_" + stream, ssrcTransaction); } - public void put(String deviceId, String channelId , Dialog dialog){ - SsrcTransaction ssrcTransaction = getSsrcTransaction(deviceId, channelId); + public void put(String deviceId, String channelId, String callId, Dialog dialog){ + SsrcTransaction ssrcTransaction = getSsrcTransaction(deviceId, channelId, callId, null); if (ssrcTransaction != null) { byte[] dialogByteArray = SerializeUtils.serialize(dialog); ssrcTransaction.setDialog(dialogByteArray); } - redisUtil.set(VideoManagerConstants.MEDIA_TRANSACTION_USED_PREFIX + userSetup.getServerId() + "_" + deviceId + "_" + channelId, ssrcTransaction); + redisUtil.set(VideoManagerConstants.MEDIA_TRANSACTION_USED_PREFIX + userSetup.getServerId() + + "_" + deviceId + "_" + channelId + "_" + ssrcTransaction.getCallId() + "_" + + ssrcTransaction.getStream(), ssrcTransaction); } - public ClientTransaction getTransaction(String deviceId, String channelId){ - SsrcTransaction ssrcTransaction = getSsrcTransaction(deviceId, channelId); + public ClientTransaction getTransactionByStream(String deviceId, String channelId, String stream){ + SsrcTransaction ssrcTransaction = getSsrcTransaction(deviceId, channelId, null, stream); if (ssrcTransaction == null) return null; byte[] transactionByteArray = ssrcTransaction.getTransaction(); ClientTransaction clientTransaction = (ClientTransaction)SerializeUtils.deSerialize(transactionByteArray); return clientTransaction; } - public SIPDialog getDialog(String deviceId, String channelId){ - SsrcTransaction ssrcTransaction = getSsrcTransaction(deviceId, channelId); + public SIPDialog getDialogByStream(String deviceId, String channelId, String stream){ + SsrcTransaction ssrcTransaction = getSsrcTransaction(deviceId, channelId, null, stream); if (ssrcTransaction == null) return null; byte[] dialogByteArray = ssrcTransaction.getDialog(); if (dialogByteArray == null) return null; @@ -69,36 +86,37 @@ public class VideoStreamSessionManager { return dialog; } - public SsrcTransaction getSsrcTransaction(String deviceId, String channelId){ - SsrcTransaction ssrcTransaction = (SsrcTransaction)redisUtil.get(VideoManagerConstants.MEDIA_TRANSACTION_USED_PREFIX + userSetup.getServerId() + "_" + deviceId + "_" + channelId); - return ssrcTransaction; + public SsrcTransaction getSsrcTransaction(String deviceId, String channelId, String callId, String stream){ + if (StringUtils.isEmpty(callId)) callId ="*"; + if (StringUtils.isEmpty(stream)) stream ="*"; + String key = VideoManagerConstants.MEDIA_TRANSACTION_USED_PREFIX + userSetup.getServerId() + "_" + deviceId + "_" + channelId + "_" + callId+ "_" + stream; + List scanResult = redisUtil.scan(key); + if (scanResult.size() == 0) return null; + return (SsrcTransaction)redisUtil.get((String) scanResult.get(0)); } - public String getStreamId(String deviceId, String channelId){ - SsrcTransaction ssrcTransaction = getSsrcTransaction(deviceId, channelId); - if (ssrcTransaction == null) return null; - return ssrcTransaction.getStreamId(); - } - public String getMediaServerId(String deviceId, String channelId){ - SsrcTransaction ssrcTransaction = getSsrcTransaction(deviceId, channelId); + public String getMediaServerId(String deviceId, String channelId, String stream){ + SsrcTransaction ssrcTransaction = getSsrcTransaction(deviceId, channelId, null, stream); if (ssrcTransaction == null) return null; return ssrcTransaction.getMediaServerId(); } - public String getSSRC(String deviceId, String channelId){ - SsrcTransaction ssrcTransaction = getSsrcTransaction(deviceId, channelId); + public String getSSRC(String deviceId, String channelId, String stream){ + SsrcTransaction ssrcTransaction = getSsrcTransaction(deviceId, channelId, null, stream); if (ssrcTransaction == null) return null; return ssrcTransaction.getSsrc(); } - public void remove(String deviceId, String channelId) { - SsrcTransaction ssrcTransaction = getSsrcTransaction(deviceId, channelId); + public void remove(String deviceId, String channelId, String stream) { + SsrcTransaction ssrcTransaction = getSsrcTransaction(deviceId, channelId, null, stream); if (ssrcTransaction == null) return; - redisUtil.del(VideoManagerConstants.MEDIA_TRANSACTION_USED_PREFIX + userSetup.getServerId() + "_" + deviceId + "_" + channelId); + redisUtil.del(VideoManagerConstants.MEDIA_TRANSACTION_USED_PREFIX + userSetup.getServerId() + "_" + + deviceId + "_" + channelId + "_" + ssrcTransaction.getCallId() + "_" + ssrcTransaction.getStream()); } + public List getAllSsrc() { - List ssrcTransactionKeys = redisUtil.scan(String.format("%s_*_*", VideoManagerConstants.MEDIA_TRANSACTION_USED_PREFIX+ userSetup.getServerId() + "_" )); + List ssrcTransactionKeys = redisUtil.scan(String.format("%s_*_*_*_*", VideoManagerConstants.MEDIA_TRANSACTION_USED_PREFIX+ userSetup.getServerId() + "_" )); List result= new ArrayList<>(); for (int i = 0; i < ssrcTransactionKeys.size(); i++) { String key = (String)ssrcTransactionKeys.get(i); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommander.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommander.java index af2c2c2c..7b3f0b21 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommander.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommander.java @@ -119,8 +119,8 @@ public interface ISIPCommander { /** * 视频流停止 */ - void streamByeCmd(String deviceId, String channelId, SipSubscribe.Event okEvent); - void streamByeCmd(String deviceId, String channelId); + void streamByeCmd(String deviceId, String channelId, String ssrc, SipSubscribe.Event okEvent); + void streamByeCmd(String deviceId, String channelId, String ssrc); /** * 回放暂停 diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommanderForPlatform.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommanderForPlatform.java index e41db8b6..cd2d6274 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommanderForPlatform.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommanderForPlatform.java @@ -18,7 +18,7 @@ public interface ISIPCommanderForPlatform { * @return */ boolean register(ParentPlatform parentPlatform, SipSubscribe.Event errorEvent , SipSubscribe.Event okEvent); - boolean register(ParentPlatform parentPlatform, String callId, WWWAuthenticateHeader www, SipSubscribe.Event errorEvent , SipSubscribe.Event okEvent); + boolean register(ParentPlatform parentPlatform, String callId, WWWAuthenticateHeader www, SipSubscribe.Event errorEvent , SipSubscribe.Event okEvent, boolean registerAgain); /** * 向上级平台注销 diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderPlarformProvider.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderPlarformProvider.java index 947dc948..c0e72817 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderPlarformProvider.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderPlarformProvider.java @@ -128,7 +128,15 @@ public class SIPRequestHeaderPlarformProvider { Request registerRequest = createRegisterRequest(parentPlatform, redisCatchStorage.getCSEQ(Request.REGISTER), fromTag, viaTag, callIdHeader); - + SipURI requestURI = sipFactory.createAddressFactory().createSipURI(parentPlatform.getServerGBId(), parentPlatform.getServerIP() + ":" + parentPlatform.getServerPort()); + if (www == null) { + AuthorizationHeader authorizationHeader = sipFactory.createHeaderFactory().createAuthorizationHeader("Digest"); + authorizationHeader.setUsername(parentPlatform.getDeviceGBId()); + authorizationHeader.setURI(requestURI); + authorizationHeader.setAlgorithm("MD5"); + registerRequest.addHeader(authorizationHeader); + return registerRequest; + } String realm = www.getRealm(); String nonce = www.getNonce(); String scheme = www.getScheme(); @@ -139,7 +147,6 @@ public class SIPRequestHeaderPlarformProvider { callIdHeader.setCallId(callId); - SipURI requestURI = sipFactory.createAddressFactory().createSipURI(parentPlatform.getServerGBId(), parentPlatform.getServerIP() + ":" + parentPlatform.getServerPort()); String cNonce = null; String nc = "00000001"; if (qop != null) { diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderProvider.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderProvider.java index 2da4c466..f4a0ec40 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderProvider.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderProvider.java @@ -226,7 +226,7 @@ public class SIPRequestHeaderProvider { throws PeerUnavailableException, ParseException, InvalidArgumentException { Request request = null; if (streamInfo == null) return null; - Dialog dialog = streamSession.getDialog(streamInfo.getDeviceID(), streamInfo.getChannelId()); + Dialog dialog = streamSession.getDialogByStream(streamInfo.getDeviceID(), streamInfo.getChannelId(), streamInfo.getStream()); SipURI requestLine = sipFactory.createAddressFactory().createSipURI(device.getDeviceId(), device.getHostAddress()); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java index da664dd9..ff871e3c 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java @@ -331,7 +331,7 @@ public class SIPCommander implements ISIPCommander { */ @Override public void playStreamCmd(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId, ZLMHttpHookSubscribe.Event event, SipSubscribe.Event errorEvent) { - String streamId = ssrcInfo.getStreamId(); + String streamId = ssrcInfo.getStream(); try { if (device == null) return; String streamMode = device.getStreamMode().toUpperCase(); @@ -407,6 +407,8 @@ public class SIPCommander implements ISIPCommander { } content.append("y="+ssrcInfo.getSsrc()+"\r\n");//ssrc + // f字段:f= v/编码格式/分辨率/帧率/码率类型/码率大小a/编码格式/码率大小/采样率 +// content.append("f=v/2/5/25/1/4000a/1/8/1" + "\r\n"); // 未发现支持此特性的设备 String tm = Long.toString(System.currentTimeMillis()); @@ -415,14 +417,14 @@ public class SIPCommander implements ISIPCommander { Request request = headerProvider.createInviteRequest(device, channelId, content.toString(), null, "FromInvt" + tm, null, ssrcInfo.getSsrc(), callIdHeader); - String finalStreamId = streamId; transmitRequest(device, request, (e -> { - streamSession.remove(device.getDeviceId(), channelId); + streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream()); mediaServerService.releaseSsrc(mediaServerItem, ssrcInfo.getSsrc()); errorEvent.response(e); }), e ->{ - streamSession.put(device.getDeviceId(), channelId ,ssrcInfo.getSsrc(), finalStreamId, mediaServerItem.getId(), ((ResponseEvent)e.event).getClientTransaction()); - streamSession.put(device.getDeviceId(), channelId , e.dialog); + // 这里为例避免一个通道的点播只有一个callID这个参数使用一个固定值 + streamSession.put(device.getDeviceId(), channelId ,"play", streamId, ssrcInfo.getSsrc(), mediaServerItem.getId(), ((ResponseEvent)e.event).getClientTransaction()); + streamSession.put(device.getDeviceId(), channelId ,"play", e.dialog); }); @@ -444,12 +446,12 @@ public class SIPCommander implements ISIPCommander { , SipSubscribe.Event errorEvent) { try { - logger.info("{} 分配的ZLM为: {} [{}:{}]", ssrcInfo.getStreamId(), mediaServerItem.getId(), mediaServerItem.getIp(), ssrcInfo.getPort()); + logger.info("{} 分配的ZLM为: {} [{}:{}]", ssrcInfo.getStream(), mediaServerItem.getId(), mediaServerItem.getIp(), ssrcInfo.getPort()); // 添加订阅 JSONObject subscribeKey = new JSONObject(); subscribeKey.put("app", "rtp"); - subscribeKey.put("stream", ssrcInfo.getStreamId()); + subscribeKey.put("stream", ssrcInfo.getStream()); subscribeKey.put("regist", true); subscribeKey.put("mediaServerId", mediaServerItem.getId()); logger.debug("录像回放添加订阅,订阅内容:" + subscribeKey.toString()); @@ -530,8 +532,8 @@ public class SIPCommander implements ISIPCommander { transmitRequest(device, request, errorEvent, okEvent -> { ResponseEvent responseEvent = (ResponseEvent) okEvent.event; - streamSession.put(device.getDeviceId(), channelId, ssrcInfo.getSsrc(), ssrcInfo.getStreamId(), mediaServerItem.getId(), responseEvent.getClientTransaction()); - streamSession.put(device.getDeviceId(), channelId, okEvent.dialog); + streamSession.put(device.getDeviceId(), channelId, callIdHeader.getCallId(), ssrcInfo.getStream(), ssrcInfo.getSsrc(), mediaServerItem.getId(), responseEvent.getClientTransaction()); + streamSession.put(device.getDeviceId(), channelId, callIdHeader.getCallId(), okEvent.dialog); }); } catch ( SipException | ParseException | InvalidArgumentException e) { e.printStackTrace(); @@ -551,12 +553,12 @@ public class SIPCommander implements ISIPCommander { public void downloadStreamCmd(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId, String startTime, String endTime, String downloadSpeed, ZLMHttpHookSubscribe.Event event , SipSubscribe.Event errorEvent) { try { - logger.info("{} 分配的ZLM为: {} [{}:{}]", ssrcInfo.getStreamId(), mediaServerItem.getId(), mediaServerItem.getIp(), ssrcInfo.getPort()); + logger.info("{} 分配的ZLM为: {} [{}:{}]", ssrcInfo.getStream(), mediaServerItem.getId(), mediaServerItem.getIp(), ssrcInfo.getPort()); // 添加订阅 JSONObject subscribeKey = new JSONObject(); subscribeKey.put("app", "rtp"); - subscribeKey.put("stream", ssrcInfo.getStreamId()); + subscribeKey.put("stream", ssrcInfo.getStream()); subscribeKey.put("regist", true); subscribeKey.put("mediaServerId", mediaServerItem.getId()); logger.debug("录像回放添加订阅,订阅内容:" + subscribeKey.toString()); @@ -637,7 +639,8 @@ public class SIPCommander implements ISIPCommander { Request request = headerProvider.createPlaybackInviteRequest(device, channelId, content.toString(), null, "fromplybck" + tm, null, callIdHeader, ssrcInfo.getSsrc()); ClientTransaction transaction = transmitRequest(device, request, errorEvent); - streamSession.put(device.getDeviceId(), channelId, ssrcInfo.getSsrc(), ssrcInfo.getStreamId(), mediaServerItem.getId(), transaction); + streamSession.put(device.getDeviceId(), channelId, callIdHeader.getCallId(), ssrcInfo.getStream(), ssrcInfo.getSsrc(), mediaServerItem.getId(), transaction); + streamSession.put(device.getDeviceId(), channelId, callIdHeader.getCallId(), ssrcInfo.getStream(), ssrcInfo.getSsrc(), mediaServerItem.getId(), transaction); } catch ( SipException | ParseException | InvalidArgumentException e) { e.printStackTrace(); @@ -648,17 +651,17 @@ public class SIPCommander implements ISIPCommander { * 视频流停止, 不使用回调 */ @Override - public void streamByeCmd(String deviceId, String channelId) { - streamByeCmd(deviceId, channelId, null); + public void streamByeCmd(String deviceId, String channelId, String stream) { + streamByeCmd(deviceId, channelId, stream, null); } /** * 视频流停止 */ @Override - public void streamByeCmd(String deviceId, String channelId, SipSubscribe.Event okEvent) { + public void streamByeCmd(String deviceId, String channelId, String stream, SipSubscribe.Event okEvent) { try { - ClientTransaction transaction = streamSession.getTransaction(deviceId, channelId); + ClientTransaction transaction = streamSession.getTransactionByStream(deviceId, channelId, stream); if (transaction == null) { logger.warn("[ {} -> {}]停止视频流的时候发现事务已丢失", deviceId, channelId); SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult<>(); @@ -667,7 +670,7 @@ public class SIPCommander implements ISIPCommander { } return; } - SIPDialog dialog = streamSession.getDialog(deviceId, channelId); + SIPDialog dialog = streamSession.getDialogByStream(deviceId, channelId, stream); if (dialog == null) { logger.warn("[ {} -> {}]停止视频流的时候发现对话已丢失", deviceId, channelId); return; @@ -711,11 +714,11 @@ public class SIPCommander implements ISIPCommander { dialog.sendRequest(clientTransaction); - SsrcTransaction ssrcTransaction = streamSession.getSsrcTransaction(deviceId, channelId); + SsrcTransaction ssrcTransaction = streamSession.getSsrcTransaction(deviceId, channelId, callIdHeader.getCallId(), null); if (ssrcTransaction != null) { MediaServerItem mediaServerItem = mediaServerService.getOne(ssrcTransaction.getMediaServerId()); mediaServerService.releaseSsrc(mediaServerItem, ssrcTransaction.getSsrc()); - streamSession.remove(deviceId, channelId); + streamSession.remove(deviceId, channelId, ssrcTransaction.getStream()); } } catch (SipException | ParseException e) { e.printStackTrace(); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java index 1bf06010..b57a5e44 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java @@ -52,7 +52,7 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform { @Override public boolean register(ParentPlatform parentPlatform, SipSubscribe.Event errorEvent , SipSubscribe.Event okEvent) { - return register(parentPlatform, null, null, errorEvent, okEvent); + return register(parentPlatform, null, null, errorEvent, okEvent, false); } @Override @@ -64,15 +64,16 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform { redisCatchStorage.updatePlatformCatchInfo(parentPlatformCatch); } - return register(parentPlatform, null, null, errorEvent, okEvent); + return register(parentPlatform, null, null, errorEvent, okEvent, false); } @Override - public boolean register(ParentPlatform parentPlatform, @Nullable String callId, @Nullable WWWAuthenticateHeader www, SipSubscribe.Event errorEvent , SipSubscribe.Event okEvent) { + public boolean register(ParentPlatform parentPlatform, @Nullable String callId, @Nullable WWWAuthenticateHeader www, + SipSubscribe.Event errorEvent , SipSubscribe.Event okEvent, boolean registerAgain) { try { Request request = null; String tm = Long.toString(System.currentTimeMillis()); - if (www == null ) { + if (!registerAgain ) { // //callid CallIdHeader callIdHeader = null; if(parentPlatform.getTransport().equals("TCP")) { diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java index 1e99c0b9..cb03d4cf 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java @@ -87,7 +87,11 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In if (streamInfo == null) { streamInfo = new StreamInfo(); streamInfo.setApp(sendRtpItem.getApp()); - streamInfo.setStreamId(sendRtpItem.getStreamId()); + streamInfo.setStream(sendRtpItem.getStreamId()); + }else { + streamInfo = redisCatchStorage.queryPlayByDevice(deviceId, channelId); + sendRtpItem.setStreamId(streamInfo.getStream()); + streamInfo.setApp("rtp"); } redisCatchStorage.updateSendRTPSever(sendRtpItem); logger.info(platformGbId); @@ -95,7 +99,7 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In Map param = new HashMap<>(); param.put("vhost","__defaultVhost__"); param.put("app",streamInfo.getApp()); - param.put("stream",streamInfo.getStreamId()); + param.put("stream",streamInfo.getStream()); param.put("ssrc", sendRtpItem.getSsrc()); param.put("dst_url",sendRtpItem.getIp()); param.put("dst_port", sendRtpItem.getPort()); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java index 9b6e7276..60ea11bb 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java @@ -111,7 +111,7 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In } storager.stopPlay(device.getDeviceId(), channelId); - mediaServerService.closeRTPServer(device, channelId); + mediaServerService.closeRTPServer(device, channelId, streamInfo.getStream()); } } } catch (SipException e) { diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/KeepaliveNotifyMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/KeepaliveNotifyMessageHandler.java index 228ce10f..edd9fe86 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/KeepaliveNotifyMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/KeepaliveNotifyMessageHandler.java @@ -68,6 +68,7 @@ public class KeepaliveNotifyMessageHandler extends SIPRequestProcessorParent imp } if (device.getPort() != rPort) { device.setPort(rPort); + device.setHostAddress(received.concat(":").concat(String.valueOf(rPort))); videoManagerStorager.updateDevice(device); redisCatchStorage.updateDevice(device); } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/MediaStatusNotifyMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/MediaStatusNotifyMessageHandler.java index c61b7279..3c83ec29 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/MediaStatusNotifyMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/MediaStatusNotifyMessageHandler.java @@ -62,7 +62,7 @@ public class MediaStatusNotifyMessageHandler extends SIPRequestProcessorParent i StreamInfo streamInfo = redisCatchStorage.queryPlaybackByDevice(device.getDeviceId(), "*"); if (streamInfo != null) { redisCatchStorage.stopPlayback(streamInfo); - cmder.streamByeCmd(streamInfo.getDeviceID(), streamInfo.getChannelId()); + cmder.streamByeCmd(streamInfo.getDeviceID(), streamInfo.getChannelId(), streamInfo.getStream()); } } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/RegisterResponseProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/RegisterResponseProcessor.java index a5dced37..b6040aad 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/RegisterResponseProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/RegisterResponseProcessor.java @@ -78,7 +78,7 @@ public class RegisterResponseProcessor extends SIPResponseProcessorAbstract { if (response.getStatusCode() == 401) { WWWAuthenticateHeader www = (WWWAuthenticateHeader)response.getHeader(WWWAuthenticateHeader.NAME); - sipCommanderForPlatform.register(parentPlatform, callId, www, null, null); + sipCommanderForPlatform.register(parentPlatform, callId, www, null, null, true); }else if (response.getStatusCode() == 200){ // 注册/注销成功 logger.info(String.format("%s %s成功", platformGBId, action)); diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java index 6664fe83..14705bc0 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java @@ -360,6 +360,7 @@ public class ZLMHttpHookListener { StreamPushItem streamPushItem = null; StreamInfo streamInfoByAppAndStream = mediaService.getStreamInfoByAppAndStream(mediaServerItem, app, streamId, tracks); item.setStreamInfo(streamInfoByAppAndStream); + redisCatchStorage.addStream(mediaServerItem, type, app, streamId, item); if (item.getOriginType() == OriginType.RTSP_PUSH.ordinal() || item.getOriginType() == OriginType.RTMP_PUSH.ordinal() @@ -438,14 +439,16 @@ public class ZLMHttpHookListener { if (redisCatchStorage.isChannelSendingRTP(streamInfoForPlayCatch.getChannelId())) { ret.put("close", false); } else { - cmder.streamByeCmd(streamInfoForPlayCatch.getDeviceID(), streamInfoForPlayCatch.getChannelId()); + cmder.streamByeCmd(streamInfoForPlayCatch.getDeviceID(), streamInfoForPlayCatch.getChannelId(), + streamInfoForPlayCatch.getStream()); redisCatchStorage.stopPlay(streamInfoForPlayCatch); storager.stopPlay(streamInfoForPlayCatch.getDeviceID(), streamInfoForPlayCatch.getChannelId()); } }else{ StreamInfo streamInfoForPlayBackCatch = redisCatchStorage.queryPlaybackByStreamId(streamId); if (streamInfoForPlayBackCatch != null) { - cmder.streamByeCmd(streamInfoForPlayBackCatch.getDeviceID(), streamInfoForPlayBackCatch.getChannelId()); + cmder.streamByeCmd(streamInfoForPlayBackCatch.getDeviceID(), + streamInfoForPlayBackCatch.getChannelId(), streamInfoForPlayBackCatch.getStream()); redisCatchStorage.stopPlayback(streamInfoForPlayBackCatch); }else { StreamInfo streamInfoForDownload = redisCatchStorage.queryDownloadByStreamId(streamId); diff --git a/src/main/java/com/genersoft/iot/vmp/service/IMediaServerService.java b/src/main/java/com/genersoft/iot/vmp/service/IMediaServerService.java index ec2714fd..3a4c3ee2 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/IMediaServerService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/IMediaServerService.java @@ -46,7 +46,7 @@ public interface IMediaServerService { SSRCInfo openRTPServer(MediaServerItem mediaServerItem, String streamId, boolean isPlayback); - void closeRTPServer(Device device, String channelId); + void closeRTPServer(Device device, String channelId, String ssrc); void clearRTPServer(MediaServerItem mediaServerItem); diff --git a/src/main/java/com/genersoft/iot/vmp/service/IPlayService.java b/src/main/java/com/genersoft/iot/vmp/service/IPlayService.java index 8a7437cd..12bb8fa9 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/IPlayService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/IPlayService.java @@ -5,14 +5,16 @@ import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.event.SipSubscribe; import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; +import com.genersoft.iot.vmp.service.bean.PlayBackCallback; import com.genersoft.iot.vmp.vmanager.gb28181.play.bean.PlayResult; +import org.springframework.http.ResponseEntity; +import org.springframework.web.context.request.async.DeferredResult; /** * 点播处理 */ public interface IPlayService { - void onPublishHandlerForPlayBack(MediaServerItem mediaServerItem, JSONObject resonse, String deviceId, String channelId, String uuid); void onPublishHandlerForPlay(MediaServerItem mediaServerItem, JSONObject resonse, String deviceId, String channelId, String uuid); PlayResult play(MediaServerItem mediaServerItem, String deviceId, String channelId, ZLMHttpHookSubscribe.Event event, SipSubscribe.Event errorEvent); @@ -20,4 +22,6 @@ public interface IPlayService { MediaServerItem getNewMediaServerItem(Device device); void onPublishHandlerForDownload(MediaServerItem mediaServerItem, JSONObject response, String deviceId, String channelId, String toString); + + DeferredResult> playBack(String deviceId, String channelId, String startTime, String endTime, PlayBackCallback errorCallBack); } diff --git a/src/main/java/com/genersoft/iot/vmp/service/bean/PlayBackCallback.java b/src/main/java/com/genersoft/iot/vmp/service/bean/PlayBackCallback.java new file mode 100644 index 00000000..089523f9 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/service/bean/PlayBackCallback.java @@ -0,0 +1,9 @@ +package com.genersoft.iot.vmp.service.bean; + +import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage; + +public interface PlayBackCallback { + + void call(RequestMessage msg); + +} diff --git a/src/main/java/com/genersoft/iot/vmp/service/bean/SSRCInfo.java b/src/main/java/com/genersoft/iot/vmp/service/bean/SSRCInfo.java index faab1c80..1723bc59 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/bean/SSRCInfo.java +++ b/src/main/java/com/genersoft/iot/vmp/service/bean/SSRCInfo.java @@ -4,12 +4,12 @@ public class SSRCInfo { private int port; private String ssrc; - private String StreamId; + private String Stream; - public SSRCInfo(int port, String ssrc, String streamId) { + public SSRCInfo(int port, String ssrc, String stream) { this.port = port; this.ssrc = ssrc; - StreamId = streamId; + Stream = stream; } public int getPort() { @@ -28,11 +28,11 @@ public class SSRCInfo { this.ssrc = ssrc; } - public String getStreamId() { - return StreamId; + public String getStream() { + return Stream; } - public void setStreamId(String streamId) { - StreamId = streamId; + public void setStream(String stream) { + Stream = stream; } } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java index 159cd055..6a5642e1 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java @@ -162,15 +162,16 @@ public class MediaServerServiceImpl implements IMediaServerService, CommandLineR } @Override - public void closeRTPServer(Device device, String channelId) { - String mediaServerId = streamSession.getMediaServerId(device.getDeviceId(), channelId); + public void closeRTPServer(Device device, String channelId, String stream) { + String mediaServerId = streamSession.getMediaServerId(device.getDeviceId(), channelId, stream); + String ssrc = streamSession.getSSRC(device.getDeviceId(), channelId, stream); MediaServerItem mediaServerItem = this.getOne(mediaServerId); if (mediaServerItem != null) { String streamId = String.format("%s_%s", device.getDeviceId(), channelId); zlmrtpServerFactory.closeRTPServer(mediaServerItem, streamId); - releaseSsrc(mediaServerItem, streamSession.getSSRC(device.getDeviceId(), channelId)); + releaseSsrc(mediaServerItem, ssrc); } - streamSession.remove(device.getDeviceId(), channelId); + streamSession.remove(device.getDeviceId(), channelId, stream); } @Override diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServiceImpl.java index 6e3debaa..3c776b49 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServiceImpl.java @@ -74,7 +74,7 @@ public class MediaServiceImpl implements IMediaService { @Override public StreamInfo getStreamInfoByAppAndStream(MediaServerItem mediaInfo, String app, String stream, Object tracks, String addr) { StreamInfo streamInfoResult = new StreamInfo(); - streamInfoResult.setStreamId(stream); + streamInfoResult.setStream(stream); streamInfoResult.setApp(app); if (addr == null) { addr = mediaInfo.getStreamIp(); diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java index d3f6976c..de665082 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java @@ -16,6 +16,7 @@ import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.service.IMediaServerService; +import com.genersoft.iot.vmp.service.bean.PlayBackCallback; import com.genersoft.iot.vmp.service.bean.SSRCInfo; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorager; @@ -104,19 +105,21 @@ public class PlayServiceImpl implements IPlayService { logger.warn(String.format("设备点播超时,deviceId:%s ,channelId:%s", deviceId, channelId)); WVPResult wvpResult = new WVPResult(); wvpResult.setCode(-1); - SIPDialog dialog = streamSession.getDialog(deviceId, channelId); + SIPDialog dialog = streamSession.getDialogByStream(deviceId, channelId, streamInfo.getStream()); if (dialog != null) { wvpResult.setMsg("收流超时,请稍候重试"); }else { wvpResult.setMsg("点播超时,请稍候重试"); } + msg.setData(wvpResult); // 点播超时回复BYE - cmder.streamByeCmd(device.getDeviceId(), channelId); + cmder.streamByeCmd(device.getDeviceId(), channelId, streamInfo.getStream()); // 释放rtpserver - mediaServerService.closeRTPServer(playResult.getDevice(), channelId); + mediaServerService.closeRTPServer(playResult.getDevice(), channelId, streamInfo.getStream()); // 回复之前所有的点播请求 resultHolder.invokeAllResult(msg); + // TODO 释放ssrc }); result.onCompletion(()->{ // 点播结束时调用截图接口 @@ -153,14 +156,12 @@ public class PlayServiceImpl implements IPlayService { } }); if (streamInfo == null) { - SSRCInfo ssrcInfo; String streamId = null; if (mediaServerItem.isRtpEnable()) { streamId = String.format("%s_%s", device.getDeviceId(), channelId); } - ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId); - + SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId); // 发送点播消息 cmder.playStreamCmd(mediaServerItem, ssrcInfo, device, channelId, (MediaServerItem mediaServerItemInUse, JSONObject response) -> { logger.info("收到订阅消息: " + response.toJSONString()); @@ -172,7 +173,7 @@ public class PlayServiceImpl implements IPlayService { WVPResult wvpResult = new WVPResult(); wvpResult.setCode(-1); // 点播返回sip错误 - mediaServerService.closeRTPServer(playResult.getDevice(), channelId); + mediaServerService.closeRTPServer(playResult.getDevice(), channelId, ssrcInfo.getStream()); wvpResult.setMsg(String.format("点播失败, 错误码: %s, %s", event.statusCode, event.msg)); msg.setData(wvpResult); resultHolder.invokeAllResult(msg); @@ -183,7 +184,7 @@ public class PlayServiceImpl implements IPlayService { }); } else { - String streamId = streamInfo.getStreamId(); + String streamId = streamInfo.getStream(); if (streamId == null) { WVPResult wvpResult = new WVPResult(); wvpResult.setCode(-1); @@ -212,18 +213,16 @@ public class PlayServiceImpl implements IPlayService { // TODO 点播前是否重置状态 redisCatchStorage.stopPlay(streamInfo); storager.stopPlay(streamInfo.getDeviceID(), streamInfo.getChannelId()); - SSRCInfo ssrcInfo; String streamId2 = null; if (mediaServerItem.isRtpEnable()) { streamId2 = String.format("%s_%s", device.getDeviceId(), channelId); } - ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId2); - + SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId2); cmder.playStreamCmd(mediaServerItem, ssrcInfo, device, channelId, (MediaServerItem mediaServerItemInuse, JSONObject response) -> { logger.info("收到订阅消息: " + response.toJSONString()); onPublishHandlerForPlay(mediaServerItemInuse, response, deviceId, channelId, uuid); }, (event) -> { - mediaServerService.closeRTPServer(playResult.getDevice(), channelId); + mediaServerService.closeRTPServer(playResult.getDevice(), channelId, ssrcInfo.getStream()); WVPResult wvpResult = new WVPResult(); wvpResult.setCode(-1); wvpResult.setMsg(String.format("点播失败, 错误码: %s, %s", event.statusCode, event.msg)); @@ -241,12 +240,12 @@ public class PlayServiceImpl implements IPlayService { RequestMessage msg = new RequestMessage(); msg.setId(uuid); msg.setKey(DeferredResultHolder.CALLBACK_CMD_PLAY + deviceId + channelId); - StreamInfo streamInfo = onPublishHandler(mediaServerItem, response, deviceId, channelId, uuid); + StreamInfo streamInfo = onPublishHandler(mediaServerItem, resonse, deviceId, channelId); if (streamInfo != null) { DeviceChannel deviceChannel = storager.queryChannel(deviceId, channelId); if (deviceChannel != null) { - deviceChannel.setStreamId(streamInfo.getStreamId()); - storager.startPlay(deviceId, channelId, streamInfo.getStreamId()); + deviceChannel.setStreamId(streamInfo.getStream()); + storager.startPlay(deviceId, channelId, streamInfo.getStream()); } redisCatchStorage.startPlay(streamInfo); msg.setData(JSON.toJSONString(streamInfo)); @@ -283,29 +282,53 @@ public class PlayServiceImpl implements IPlayService { @Override - public void onPublishHandlerForPlayBack(MediaServerItem mediaServerItem, JSONObject resonse, String deviceId, String channelId, String uuid) { + public DeferredResult> playBack(String deviceId, String channelId, String startTime, String endTime, PlayBackCallback callback) { + String uuid = UUID.randomUUID().toString(); + String key = DeferredResultHolder.CALLBACK_CMD_PLAYBACK + deviceId + channelId; + DeferredResult> result = new DeferredResult<>(30000L); + Device device = storager.queryVideoDevice(deviceId); + if (device == null) { + result.setResult(new ResponseEntity<>(HttpStatus.BAD_REQUEST)); + return result; + } + + MediaServerItem newMediaServerItem = getNewMediaServerItem(device); + SSRCInfo ssrcInfo = mediaServerService.openRTPServer(newMediaServerItem, null, true); + resultHolder.put(DeferredResultHolder.CALLBACK_CMD_PLAY + deviceId + channelId, uuid, result); RequestMessage msg = new RequestMessage(); - msg.setKey(DeferredResultHolder.CALLBACK_CMD_PLAYBACK + deviceId + channelId); msg.setId(uuid); - StreamInfo streamInfo = onPublishHandler(mediaServerItem, resonse, deviceId, channelId, uuid); - if (streamInfo != null) { + msg.setKey(key); + result.onTimeout(()->{ + msg.setData("回放超时"); + callback.call(msg); + }); + cmder.playbackStreamCmd(newMediaServerItem, ssrcInfo, device, channelId, startTime, endTime, (MediaServerItem mediaServerItem, JSONObject response) -> { + logger.info("收到订阅消息: " + response.toJSONString()); + StreamInfo streamInfo = onPublishHandler(mediaServerItem, response, deviceId, channelId); + if (streamInfo == null) { + logger.warn("设备回放API调用失败!"); + msg.setData("设备回放API调用失败!"); + callback.call(msg); + return; + } redisCatchStorage.startPlayback(streamInfo); msg.setData(JSON.toJSONString(streamInfo)); - resultHolder.invokeResult(msg); - } else { - logger.warn("设备回放API调用失败!"); - msg.setData("设备回放API调用失败!"); - resultHolder.invokeResult(msg); - } + callback.call(msg); + }, event -> { + msg.setData(String.format("回放失败, 错误码: %s, %s", event.statusCode, event.msg)); + callback.call(msg); + }); + return result; } + @Override public void onPublishHandlerForDownload(MediaServerItem mediaServerItem, JSONObject response, String deviceId, String channelId, String uuid) { RequestMessage msg = new RequestMessage(); msg.setKey(DeferredResultHolder.CALLBACK_CMD_DOWNLOAD + deviceId + channelId); msg.setId(uuid); - StreamInfo streamInfo = onPublishHandler(mediaServerItem, response, deviceId, channelId, uuid); + StreamInfo streamInfo = onPublishHandler(mediaServerItem, response, deviceId, channelId); if (streamInfo != null) { redisCatchStorage.startDownload(streamInfo); msg.setData(JSON.toJSONString(streamInfo)); @@ -318,7 +341,7 @@ public class PlayServiceImpl implements IPlayService { } - public StreamInfo onPublishHandler(MediaServerItem mediaServerItem, JSONObject resonse, String deviceId, String channelId, String uuid) { + public StreamInfo onPublishHandler(MediaServerItem mediaServerItem, JSONObject resonse, String deviceId, String channelId) { String streamId = resonse.getString("stream"); JSONArray tracks = resonse.getJSONArray("tracks"); StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream(mediaServerItem,"rtp", streamId, tracks); diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java index afac6eba..564deb55 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java @@ -132,7 +132,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService { }else { streamLive = true; StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream( - mediaInfo, param.getApp(), param.getStream(), null); + mediaInfo, param.getApp(), param.getStream(), null, null); wvpResult.setData(streamInfo); } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java b/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java index edd6cbc0..1a939020 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java @@ -7,6 +7,7 @@ import com.genersoft.iot.vmp.media.zlm.dto.MediaItem; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem; import com.genersoft.iot.vmp.service.bean.GPSMsgInfo; +import com.genersoft.iot.vmp.service.bean.SSRCInfo; import com.genersoft.iot.vmp.service.bean.ThirdPartyGB; import java.util.List; @@ -220,4 +221,5 @@ public interface IRedisCatchStorage { void addMemInfo(double memInfo); void addNetInfo(Map networkInterfaces); + } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/StreamProxyMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/StreamProxyMapper.java index 63cd425d..2447ad98 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/dao/StreamProxyMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/StreamProxyMapper.java @@ -10,15 +10,16 @@ import java.util.List; @Repository public interface StreamProxyMapper { - @Insert("INSERT INTO stream_proxy (type, app, stream,mediaServerId, url, src_url, dst_url, " + + @Insert("INSERT INTO stream_proxy (type, name, app, stream,mediaServerId, url, src_url, dst_url, " + "timeout_ms, ffmpeg_cmd_key, rtp_type, enable_hls, enable_mp4, enable, status, enable_remove_none_reader, createTime) VALUES" + - "('${type}','${app}', '${stream}', '${mediaServerId}','${url}', '${src_url}', '${dst_url}', " + + "('${type}','${name}', '${app}', '${stream}', '${mediaServerId}','${url}', '${src_url}', '${dst_url}', " + "'${timeout_ms}', '${ffmpeg_cmd_key}', '${rtp_type}', ${enable_hls}, ${enable_mp4}, ${enable}, ${status}, " + "${enable_remove_none_reader}, '${createTime}' )") int add(StreamProxyItem streamProxyDto); @Update("UPDATE stream_proxy " + "SET type=#{type}, " + + "name=#{name}," + "app=#{app}," + "stream=#{stream}," + "url=#{url}, " + diff --git a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java index 1baefbe8..b5a3aba0 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java @@ -10,6 +10,7 @@ import com.genersoft.iot.vmp.media.zlm.dto.MediaItem; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem; import com.genersoft.iot.vmp.service.bean.GPSMsgInfo; +import com.genersoft.iot.vmp.service.bean.SSRCInfo; import com.genersoft.iot.vmp.service.bean.ThirdPartyGB; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.dao.DeviceChannelMapper; @@ -91,7 +92,8 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage { */ @Override public boolean startPlay(StreamInfo stream) { - return redis.set(String.format("%S_%S_%s_%s_%s", VideoManagerConstants.PLAYER_PREFIX, userSetup.getServerId(), stream.getStreamId(),stream.getDeviceID(), stream.getChannelId()), + return redis.set(String.format("%S_%S_%s_%s_%s", VideoManagerConstants.PLAYER_PREFIX, userSetup.getServerId(), + stream.getStream(), stream.getDeviceID(), stream.getChannelId()), stream); } @@ -105,7 +107,7 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage { if (streamInfo == null) return false; return redis.del(String.format("%S_%s_%s_%s_%s", VideoManagerConstants.PLAYER_PREFIX, userSetup.getServerId(), - streamInfo.getStreamId(), + streamInfo.getStream(), streamInfo.getDeviceID(), streamInfo.getChannelId())); } @@ -119,7 +121,7 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage { return (StreamInfo)redis.get(String.format("%S_%s_%s_%s_%s", VideoManagerConstants.PLAYER_PREFIX, userSetup.getServerId(), - streamInfo.getStreamId(), + streamInfo.getStream(), streamInfo.getDeviceID(), streamInfo.getChannelId())); } @@ -164,14 +166,14 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage { @Override public boolean startPlayback(StreamInfo stream) { - return redis.set(String.format("%S_%s_%s_%s_%s", VideoManagerConstants.PLAY_BLACK_PREFIX, userSetup.getServerId(),stream.getStreamId(), - stream.getDeviceID(), stream.getChannelId()), stream); + return redis.set(String.format("%S_%s_%s_%s_%s", VideoManagerConstants.PLAY_BLACK_PREFIX, + userSetup.getServerId(), stream.getStream(), stream.getDeviceID(), stream.getChannelId()), stream); } @Override public boolean startDownload(StreamInfo streamInfo) { - return redis.set(String.format("%S_%s_%s_%s_%s", VideoManagerConstants.DOWNLOAD_PREFIX, userSetup.getServerId(),streamInfo.getStreamId(), - streamInfo.getDeviceID(), streamInfo.getChannelId()), streamInfo); + return redis.set(String.format("%S_%s_%s_%s_%s", VideoManagerConstants.DOWNLOAD_PREFIX, userSetup.getServerId(), + streamInfo.getStream(), streamInfo.getDeviceID(), streamInfo.getChannelId()), streamInfo); } @Override @@ -185,7 +187,7 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage { } return redis.del(String.format("%S_%s_%s_%s_%s", VideoManagerConstants.PLAY_BLACK_PREFIX, userSetup.getServerId(), - streamInfo.getStreamId(), + streamInfo.getStream(), streamInfo.getDeviceID(), streamInfo.getChannelId())); } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStoragerImpl.java b/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStoragerImpl.java index 6a515415..9af6e772 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStoragerImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStoragerImpl.java @@ -1,5 +1,6 @@ package com.genersoft.iot.vmp.storager.impl; +import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.conf.SipConfig; import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.event.EventPublisher; @@ -157,7 +158,10 @@ public class VideoManagerStoragerImpl implements IVideoManagerStorager { public synchronized void updateChannel(String deviceId, DeviceChannel channel) { String channelId = channel.getChannelId(); channel.setDeviceId(deviceId); - channel.setStreamId(streamSession.getStreamId(deviceId, channel.getChannelId())); + StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(deviceId, channelId); + if (streamInfo != null) { + channel.setStreamId(streamInfo.getStream()); + } String now = this.format.format(System.currentTimeMillis()); channel.setUpdateTime(now); DeviceChannel deviceChannel = deviceChannelMapper.queryChannel(deviceId, channelId); @@ -179,7 +183,10 @@ public class VideoManagerStoragerImpl implements IVideoManagerStorager { if (channelList.size() == 0) { for (DeviceChannel channel : channels) { channel.setDeviceId(deviceId); - channel.setStreamId(streamSession.getStreamId(deviceId, channel.getChannelId())); + StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(deviceId, channel.getChannelId()); + if (streamInfo != null) { + channel.setStreamId(streamInfo.getStream()); + } String now = this.format.format(System.currentTimeMillis()); channel.setUpdateTime(now); channel.setCreateTime(now); @@ -190,9 +197,11 @@ public class VideoManagerStoragerImpl implements IVideoManagerStorager { channelsInStore.put(deviceChannel.getChannelId(), deviceChannel); } for (DeviceChannel channel : channels) { - String channelId = channel.getChannelId(); channel.setDeviceId(deviceId); - channel.setStreamId(streamSession.getStreamId(deviceId, channel.getChannelId())); + StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(deviceId, channel.getChannelId()); + if (streamInfo != null) { + channel.setStreamId(streamInfo.getStream()); + } String now = this.format.format(System.currentTimeMillis()); channel.setUpdateTime(now); if (channelsInStore.get(channel.getChannelId()) != null) { diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/play/PlayController.java b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/play/PlayController.java index c22a5584..8350d293 100644 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/play/PlayController.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/play/PlayController.java @@ -110,26 +110,26 @@ public class PlayController { String key = DeferredResultHolder.CALLBACK_CMD_STOP + deviceId + channelId; resultHolder.put(key, uuid, result); Device device = storager.queryVideoDevice(deviceId); - cmder.streamByeCmd(deviceId, channelId, (event) -> { - StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(deviceId, channelId); - if (streamInfo == null) { - RequestMessage msg = new RequestMessage(); - msg.setId(uuid); - msg.setKey(key); - msg.setData("点播未找到"); - resultHolder.invokeAllResult(msg); - storager.stopPlay(deviceId, channelId); - }else { - redisCatchStorage.stopPlay(streamInfo); - storager.stopPlay(streamInfo.getDeviceID(), streamInfo.getChannelId()); - RequestMessage msg = new RequestMessage(); - msg.setId(uuid); - msg.setKey(key); - //Response response = event.getResponse(); - msg.setData(String.format("success")); - resultHolder.invokeAllResult(msg); - } - mediaServerService.closeRTPServer(device, channelId); + StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(deviceId, channelId); + if (streamInfo == null) { + RequestMessage msg = new RequestMessage(); + msg.setId(uuid); + msg.setKey(key); + msg.setData("点播未找到"); + resultHolder.invokeAllResult(msg); + storager.stopPlay(deviceId, channelId); + return result; + } + cmder.streamByeCmd(deviceId, channelId, streamInfo.getStream(), (event) -> { + redisCatchStorage.stopPlay(streamInfo); + storager.stopPlay(streamInfo.getDeviceID(), streamInfo.getChannelId()); + RequestMessage msg = new RequestMessage(); + msg.setId(uuid); + msg.setKey(key); + //Response response = event.getResponse(); + msg.setData(String.format("success")); + resultHolder.invokeAllResult(msg); + mediaServerService.closeRTPServer(device, channelId, streamInfo.getStream()); }); if (deviceId != null || channelId != null) { @@ -329,7 +329,7 @@ public class PlayController { jsonObject.put("deviceId", transaction.getDeviceId()); jsonObject.put("channelId", transaction.getChannelId()); jsonObject.put("ssrc", transaction.getSsrc()); - jsonObject.put("streamId", transaction.getStreamId()); + jsonObject.put("streamId", transaction.getStream()); objects.add(jsonObject); } diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/playback/DownloadController.java b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/playback/DownloadController.java index 3f846c60..c2053e62 100644 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/playback/DownloadController.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/playback/DownloadController.java @@ -96,7 +96,7 @@ public class DownloadController { StreamInfo streamInfo = redisCatchStorage.queryPlaybackByDevice(deviceId, channelId); if (streamInfo != null) { // 停止之前的下载 - cmder.streamByeCmd(deviceId, channelId); + cmder.streamByeCmd(deviceId, channelId, streamInfo.getStream()); } MediaServerItem newMediaServerItem = playService.getNewMediaServerItem(device); @@ -114,7 +114,7 @@ public class DownloadController { cmder.downloadStreamCmd(newMediaServerItem, ssrcInfo, device, channelId, startTime, endTime, downloadSpeed, (MediaServerItem mediaServerItem, JSONObject response) -> { logger.info("收到订阅消息: " + response.toJSONString()); - playService.onPublishHandlerForDownload(mediaServerItem, response, deviceId, channelId, uuid.toString()); + playService.onPublishHandlerForDownload(mediaServerItem, response, deviceId, channelId, uuid); }, event -> { RequestMessage msg = new RequestMessage(); msg.setId(uuid); @@ -130,11 +130,12 @@ public class DownloadController { @ApiImplicitParams({ @ApiImplicitParam(name = "deviceId", value = "设备ID", dataTypeClass = String.class), @ApiImplicitParam(name = "channelId", value = "通道ID", dataTypeClass = String.class), + @ApiImplicitParam(name = "stream", value = "流ID", dataTypeClass = String.class), }) - @GetMapping("/stop/{deviceId}/{channelId}") - public ResponseEntity playStop(@PathVariable String deviceId, @PathVariable String channelId) { + @GetMapping("/stop/{deviceId}/{channelId}/{stream}") + public ResponseEntity playStop(@PathVariable String deviceId, @PathVariable String channelId, @PathVariable String stream) { - cmder.streamByeCmd(deviceId, channelId); + cmder.streamByeCmd(deviceId, channelId, stream); if (logger.isDebugEnabled()) { logger.debug(String.format("设备历史媒体下载停止 API调用,deviceId/channelId:%s_%s", deviceId, channelId)); diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/playback/PlaybackController.java b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/playback/PlaybackController.java index 8f83e7c4..3607a8d4 100644 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/playback/PlaybackController.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/playback/PlaybackController.java @@ -18,6 +18,7 @@ import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; +import org.springframework.util.StringUtils; import org.springframework.web.bind.annotation.CrossOrigin; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; @@ -75,52 +76,8 @@ public class PlaybackController { if (logger.isDebugEnabled()) { logger.debug(String.format("设备回放 API调用,deviceId:%s ,channelId:%s", deviceId, channelId)); } - String uuid = UUID.randomUUID().toString(); - String key = DeferredResultHolder.CALLBACK_CMD_PLAYBACK + deviceId + channelId; - DeferredResult> result = new DeferredResult>(30000L); - Device device = storager.queryVideoDevice(deviceId); - if (device == null) { - result.setResult(new ResponseEntity<>(HttpStatus.BAD_REQUEST)); - return result; - } - MediaServerItem newMediaServerItem = playService.getNewMediaServerItem(device); - SSRCInfo ssrcInfo = mediaServerService.openRTPServer(newMediaServerItem, null, true); - - // 超时处理 - result.onTimeout(()->{ - logger.warn(String.format("设备回放超时,deviceId:%s ,channelId:%s", deviceId, channelId)); - RequestMessage msg = new RequestMessage(); - msg.setId(uuid); - msg.setKey(key); - msg.setData("Timeout"); - resultHolder.invokeResult(msg); - }); - - StreamInfo streamInfo = redisCatchStorage.queryPlaybackByDevice(deviceId, channelId); - if (streamInfo != null) { - // 停止之前的回放 - cmder.streamByeCmd(deviceId, channelId); - } - resultHolder.put(DeferredResultHolder.CALLBACK_CMD_PLAY + deviceId + channelId, uuid, result); - - if (newMediaServerItem == null) { - logger.warn(String.format("设备回放超时,deviceId:%s ,channelId:%s", deviceId, channelId)); - RequestMessage msg = new RequestMessage(); - msg.setId(uuid); - msg.setKey(key); - msg.setData("Timeout"); - resultHolder.invokeResult(msg); - return result; - } - cmder.playbackStreamCmd(newMediaServerItem, ssrcInfo, device, channelId, startTime, endTime, (MediaServerItem mediaServerItem, JSONObject response) -> { - logger.info("收到订阅消息: " + response.toJSONString()); - playService.onPublishHandlerForPlayBack(mediaServerItem, response, deviceId, channelId, uuid.toString()); - }, event -> { - RequestMessage msg = new RequestMessage(); - msg.setId(uuid); - msg.setKey(key); - msg.setData(String.format("回放失败, 错误码: %s, %s", event.statusCode, event.msg)); + DeferredResult> result = playService.playBack(deviceId, channelId, startTime, endTime, msg->{ resultHolder.invokeResult(msg); }); @@ -131,24 +88,31 @@ public class PlaybackController { @ApiImplicitParams({ @ApiImplicitParam(name = "deviceId", value = "设备ID", dataTypeClass = String.class), @ApiImplicitParam(name = "channelId", value = "通道ID", dataTypeClass = String.class), + @ApiImplicitParam(name = "stream", value = "流ID", dataTypeClass = String.class), }) - @GetMapping("/stop/{deviceId}/{channelId}") - public ResponseEntity playStop(@PathVariable String deviceId, @PathVariable String channelId) { + @GetMapping("/stop/{deviceId}/{channelId}/{stream}") + public ResponseEntity playStop( + @PathVariable String deviceId, + @PathVariable String channelId, + @PathVariable String stream) { - cmder.streamByeCmd(deviceId, channelId); + cmder.streamByeCmd(deviceId, channelId, stream); if (logger.isDebugEnabled()) { logger.debug(String.format("设备录像回放停止 API调用,deviceId/channelId:%s/%s", deviceId, channelId)); } + if (StringUtils.isEmpty(deviceId) || StringUtils.isEmpty(channelId) || StringUtils.isEmpty(stream)) { + return new ResponseEntity<>(HttpStatus.BAD_REQUEST); + } if (deviceId != null && channelId != null) { JSONObject json = new JSONObject(); json.put("deviceId", deviceId); json.put("channelId", channelId); - return new ResponseEntity(json.toString(), HttpStatus.OK); + return new ResponseEntity<>(json.toString(), HttpStatus.OK); } else { logger.warn("设备录像回放停止API调用失败!"); - return new ResponseEntity(HttpStatus.INTERNAL_SERVER_ERROR); + return new ResponseEntity<>(HttpStatus.INTERNAL_SERVER_ERROR); } } diff --git a/src/main/java/com/genersoft/iot/vmp/web/gb28181/ApiStreamController.java b/src/main/java/com/genersoft/iot/vmp/web/gb28181/ApiStreamController.java index 70f98114..853ec562 100644 --- a/src/main/java/com/genersoft/iot/vmp/web/gb28181/ApiStreamController.java +++ b/src/main/java/com/genersoft/iot/vmp/web/gb28181/ApiStreamController.java @@ -103,7 +103,7 @@ public class ApiStreamController { PlayResult play = playService.play(newMediaServerItem, serial, code, (mediaServerItem, response)->{ StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(serial, code); JSONObject result = new JSONObject(); - result.put("StreamID", streamInfo.getStreamId()); + result.put("StreamID", streamInfo.getStream()); result.put("DeviceID", device.getDeviceId()); result.put("ChannelID", code); result.put("ChannelName", deviceChannel.getName()); @@ -177,7 +177,7 @@ public class ApiStreamController { result.put("error","未找到流信息"); return result; } - cmder.streamByeCmd(serial, code); + cmder.streamByeCmd(serial, code, streamInfo.getStream()); redisCatchStorage.stopPlay(streamInfo); storager.stopPlay(streamInfo.getDeviceID(), streamInfo.getChannelId()); return null; diff --git a/src/main/resources/all-application.yml b/src/main/resources/all-application.yml index 9968cf81..f3f1fb33 100644 --- a/src/main/resources/all-application.yml +++ b/src/main/resources/all-application.yml @@ -186,7 +186,7 @@ user-settings: # 是否将日志存储进数据库 logInDatebase: true # 第三方匹配,用于从stream钟获取有效信息 - thirdPartyGBIdReg: [\s\S]* + thirdPartyGBIdReg: "[\\s\\S]*" # 在线文档: swagger-ui(生产环境建议关闭) swagger-ui: diff --git a/src/main/resources/logback-spring-local.xml b/src/main/resources/logback-spring-local.xml index 90fe9408..9951c696 100644 --- a/src/main/resources/logback-spring-local.xml +++ b/src/main/resources/logback-spring-local.xml @@ -83,7 +83,7 @@ - + diff --git a/web_src/src/components/dialog/changePassword.vue b/web_src/src/components/dialog/changePassword.vue index a95736f2..23d761e1 100644 --- a/web_src/src/components/dialog/changePassword.vue +++ b/web_src/src/components/dialog/changePassword.vue @@ -75,7 +75,10 @@ export default { isLoging: false, rules: { oldPassword: [{ required: true, validator: validatePass0, trigger: "blur" }], - newPassword: [{ required: true, validator: validatePass1, trigger: "blur" }], + newPassword: [{ required: true, validator: validatePass1, trigger: "blur" }, { + pattern: /^(?=.*[a-zA-Z])(?=.*\d)(?=.*[~!@#$%^&*()_+`\-={}:";'<>?,.\/]).{8,20}$/, + message: "密码长度在8-20位之间,由字母+数字+特殊字符组成", + },], confirmPassword: [{ required: true, validator: validatePass2, trigger: "blur" }], }, }; diff --git a/web_src/src/components/dialog/devicePlayer.vue b/web_src/src/components/dialog/devicePlayer.vue index 74c14c81..5a080060 100644 --- a/web_src/src/components/dialog/devicePlayer.vue +++ b/web_src/src/components/dialog/devicePlayer.vue @@ -307,7 +307,7 @@ export default { this.isLoging = false; // this.videoUrl = streamInfo.rtc; this.videoUrl = this.getUrlByStreamInfo(streamInfo); - this.streamId = streamInfo.streamId; + this.streamId = streamInfo.stream; this.app = streamInfo.app; this.mediaServerId = streamInfo.mediaServerId; this.playFromStreamInfo(false, streamInfo) @@ -485,8 +485,9 @@ export default { }).then(function (res) { var streamInfo = res.data; that.app = streamInfo.app; - that.streamId = streamInfo.streamId; + that.streamId = streamInfo.stream; that.mediaServerId = streamInfo.mediaServerId; + that.ssrc = streamInfo.ssrc; that.videoUrl = that.getUrlByStreamInfo(streamInfo); that.recordPlay = true; }); @@ -497,7 +498,7 @@ export default { this.videoUrl = ''; this.$axios({ method: 'get', - url: '/api/playback/stop/' + this.deviceId + "/" + this.channelId + url: '/api/playback/stop/' + this.deviceId + "/" + this.channelId + "/" + this.streamId }).then(function (res) { if (callback) callback() }); @@ -517,7 +518,7 @@ export default { }).then(function (res) { var streamInfo = res.data; that.app = streamInfo.app; - that.streamId = streamInfo.streamId; + that.streamId = streamInfo.stream; that.mediaServerId = streamInfo.mediaServerId; that.videoUrl = that.getUrlByStreamInfo(streamInfo); that.recordPlay = true; @@ -529,7 +530,7 @@ export default { this.videoUrl = ''; this.$axios({ method: 'get', - url: '/api/download/stop/' + this.deviceId + "/" + this.channelId + url: '/api/download/stop/' + this.deviceId + "/" + this.channelId+ "/" + this.streamId }).then(function (res) { if (callback) callback() }); @@ -539,8 +540,6 @@ export default { let that = this; this.$axios({ method: 'post', - // url: '/api/ptz/' + this.deviceId + '/' + this.channelId + '?leftRight=' + leftRight + '&upDown=' + upDown + - // '&inOut=' + zoom + '&moveSpeed=50&zoomSpeed=50' url: '/api/ptz/control/' + this.deviceId + '/' + this.channelId + '?command=' + command + '&horizonSpeed=' + this.controSpeed + '&verticalSpeed=' + this.controSpeed + '&zoomSpeed=' + this.controSpeed }).then(function (res) {}); },