From d21322a93258206eb910d7ac3a70a4812fc48cbc Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: Thu, 3 Mar 2022 18:23:52 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E5=9B=BD=E6=A0=87=E7=BA=A7?= =?UTF-8?q?=E8=81=94=E5=BD=95=E5=83=8F=E9=A2=84=E8=A7=88?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../iot/vmp/gb28181/bean/SDPInfo.java | 14 ++ .../transmit/cmd/impl/SIPCommander.java | 2 + .../request/impl/AckRequestProcessor.java | 6 +- .../request/impl/ByeRequestProcessor.java | 31 +-- .../request/impl/InviteRequestProcessor.java | 188 ++++++++++-------- .../iot/vmp/service/IMediaServerService.java | 2 +- .../vmp/service/bean/PlayBackCallback.java | 3 +- .../iot/vmp/service/bean/PlayBackResult.java | 55 +++++ .../service/impl/MediaServerServiceImpl.java | 10 +- .../iot/vmp/service/impl/PlayServiceImpl.java | 36 +++- .../vmanager/gb28181/play/PlayController.java | 1 - .../gb28181/playback/PlaybackController.java | 4 +- 12 files changed, 227 insertions(+), 125 deletions(-) create mode 100644 src/main/java/com/genersoft/iot/vmp/gb28181/bean/SDPInfo.java create mode 100644 src/main/java/com/genersoft/iot/vmp/service/bean/PlayBackResult.java diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SDPInfo.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SDPInfo.java new file mode 100644 index 00000000..39225b57 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SDPInfo.java @@ -0,0 +1,14 @@ +package com.genersoft.iot.vmp.gb28181.bean; + +import javax.sdp.SessionDescription; + +public class SDPInfo { + private byte[] source; + private SessionDescription sdpSource; + private String sessionName; + private Long startTime; + private Long stopTime; + private String username; + private String address; + private String ssrc; +} diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java index ff871e3c..437c69d9 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java @@ -453,6 +453,7 @@ public class SIPCommander implements ISIPCommander { subscribeKey.put("app", "rtp"); subscribeKey.put("stream", ssrcInfo.getStream()); subscribeKey.put("regist", true); + subscribeKey.put("schema", "rtmp"); subscribeKey.put("mediaServerId", mediaServerItem.getId()); logger.debug("录像回放添加订阅,订阅内容:" + subscribeKey.toString()); subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey, @@ -718,6 +719,7 @@ public class SIPCommander implements ISIPCommander { if (ssrcTransaction != null) { MediaServerItem mediaServerItem = mediaServerService.getOne(ssrcTransaction.getMediaServerId()); mediaServerService.releaseSsrc(mediaServerItem, ssrcTransaction.getSsrc()); + mediaServerService.closeRTPServer(deviceId, channelId, ssrcTransaction.getStream()); streamSession.remove(deviceId, channelId, ssrcTransaction.getStream()); } } catch (SipException | ParseException e) { diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java index cb03d4cf..d5bc99b7 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java @@ -68,7 +68,7 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In */ @Override public void process(RequestEvent evt) { - logger.debug("ACK请求: {}", ((System.currentTimeMillis()))); + logger.info("ACK请求: {}", ((System.currentTimeMillis()))); Dialog dialog = evt.getDialog(); if (dialog == null) return; if (dialog.getState()== DialogState.CONFIRMED) { @@ -88,10 +88,6 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In streamInfo = new StreamInfo(); streamInfo.setApp(sendRtpItem.getApp()); streamInfo.setStream(sendRtpItem.getStreamId()); - }else { - streamInfo = redisCatchStorage.queryPlayByDevice(deviceId, channelId); - sendRtpItem.setStreamId(streamInfo.getStream()); - streamInfo.setApp("rtp"); } redisCatchStorage.updateSendRTPSever(sendRtpItem); logger.info(platformGbId); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java index 60ea11bb..deda7832 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java @@ -2,6 +2,7 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl; import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.gb28181.bean.Device; +import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander; @@ -90,29 +91,31 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In int totalReaderCount = zlmrtpServerFactory.totalReaderCount(mediaInfo, sendRtpItem.getApp(), streamId); if (totalReaderCount == 0) { logger.info(streamId + "无其它观看者,通知设备停止推流"); - cmder.streamByeCmd(sendRtpItem.getDeviceId(), channelId); + cmder.streamByeCmd(sendRtpItem.getDeviceId(), channelId, streamId); }else if (totalReaderCount == -1){ logger.warn(streamId + " 查找其它观看者失败"); } } // 可能是设备主动停止 Device device = storager.queryVideoDeviceByChannelId(platformGbId); - if (device != null) { - if (sendRtpItem.isPlay()) { - StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(device.getDeviceId(), channelId); - if (streamInfo != null) { - redisCatchStorage.stopPlay(streamInfo); + if (device != null) { + StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(device.getDeviceId(), channelId); + if (sendRtpItem != null) { + if (sendRtpItem.isPlay()) { + if (streamInfo != null) { + redisCatchStorage.stopPlay(streamInfo); + } + }else { + if (streamInfo != null) { + redisCatchStorage.stopPlayback(streamInfo); + } } - }else { - StreamInfo streamInfo = redisCatchStorage.queryPlaybackByDevice(device.getDeviceId(), channelId); - if (streamInfo != null) { - redisCatchStorage.stopPlayback(streamInfo); - } - } - storager.stopPlay(device.getDeviceId(), channelId); - mediaServerService.closeRTPServer(device, channelId, streamInfo.getStream()); + storager.stopPlay(device.getDeviceId(), channelId); + mediaServerService.closeRTPServer(device.getDeviceId(), channelId, streamInfo.getStream()); + } } + } } catch (SipException e) { e.printStackTrace(); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java index e08d1fb9..52859e6a 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java @@ -12,6 +12,7 @@ import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander; import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform; import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor; import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; +import com.genersoft.iot.vmp.gb28181.utils.SipUtils; import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; @@ -101,19 +102,12 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements @Override public void process(RequestEvent evt) { // Invite Request消息实现,此消息一般为级联消息,上级给下级发送请求视频指令 - Long startTimeForInvite = System.currentTimeMillis(); try { Request request = evt.getRequest(); SipURI sipURI = (SipURI) request.getRequestURI(); String channelId = sipURI.getUser(); - String requesterId = null; - - FromHeader fromHeader = (FromHeader)request.getHeader(FromHeader.NAME); + String requesterId = SipUtils.getUserIdFromFromHeader(request); CallIdHeader callIdHeader = (CallIdHeader)request.getHeader(CallIdHeader.NAME); - AddressImpl address = (AddressImpl) fromHeader.getAddress(); - SipUri uri = (SipUri) address.getURI(); - requesterId = uri.getUser(); - if (requesterId == null || channelId == null) { logger.info("无法从FromHeader的Address中获取到平台id,返回400"); responseAck(evt, Response.BAD_REQUEST); // 参数不全, 发400,请求错误 @@ -122,7 +116,9 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements // 查询请求是否来自上级平台\设备 ParentPlatform platform = storager.queryParentPlatByServerGBId(requesterId); - if (platform != null) { + if (platform == null) { + inviteFromDeviceHandle(evt, requesterId); + }else { // 查询平台下是否有该通道 DeviceChannel channel = storager.queryChannelInParentPlatform(requesterId, channelId); GbStream gbStream = storager.queryStreamInParentPlatform(requesterId, channelId); @@ -141,7 +137,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements mediaServerItem = mediaServerService.getOne(mediaServerId); if (mediaServerItem == null) { logger.info("[ app={}, stream={} ]找不到zlm {},返回410",gbStream.getApp(), gbStream.getStream(), mediaServerId); - responseAck(evt, Response.GONE, "media server not found"); + responseAck(evt, Response.GONE); return; } Boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServerItem, gbStream.getApp(), gbStream.getStream()); @@ -197,7 +193,6 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements // 查看是否支持PS 负载96 //String ip = null; int port = -1; - //boolean recvonly = false; boolean mediaTransmissionTCP = false; Boolean tcpActive = null; for (Object description : mediaDescriptions) { @@ -233,7 +228,6 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements } String username = sdp.getOrigin().getUsername(); String addressStr = sdp.getOrigin().getAddress(); - //String sessionName = sdp.getSessionName().getValue(); logger.info("[上级点播]用户:{}, 地址:{}:{}, ssrc:{}", username, addressStr, port, ssrc); Device device = null; // 通过 channel 和 gbStream 是否为null 值判断来源是直播流合适国标 @@ -271,8 +265,10 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements Long finalStartTime = startTime; Long finalStopTime = stopTime; ZLMHttpHookSubscribe.Event hookEvent = (mediaServerItemInUSe, responseJSON)->{ - logger.info("[上级点播]收到下级开始点播订阅, {}/{}", sendRtpItem.getApp(), sendRtpItem.getStreamId()); - // if (sendRtpItem == null) return; + logger.info("[上级点播]下级已经开始推流。 回复200OK(SDP), {}/{}", sendRtpItem.getApp(), sendRtpItem.getStreamId()); + // * 0 等待设备推流上来 + // * 1 下级已经推流,等待上级平台回复ack + // * 2 推流中 sendRtpItem.setStatus(1); redisCatchStorage.updateSendRTPSever(sendRtpItem); @@ -301,9 +297,6 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements } catch (ParseException e) { e.printStackTrace(); } - if ("Playback".equals(sessionName) && responseJSON != null) { - playService.onPublishHandlerForPlayBack(finalMediaServerItem, responseJSON, finalDevice.getDeviceId(), channelId, null); - } }; SipSubscribe.Event errorEvent = ((event) -> { // 未知错误。直接转发设备点播的错误 @@ -319,10 +312,29 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements }); if ("Playback".equals(sessionName)) { sendRtpItem.setPlay(false); - SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, sendRtpItem.getSsrc(), true); sendRtpItem.setStreamId(ssrc); SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); - commander.playbackStreamCmd(mediaServerItem, ssrcInfo, device, channelId, format.format(start), format.format(end), hookEvent, errorEvent); + playService.playBack(device.getDeviceId(), channelId, format.format(start), format.format(end),result -> { + if (result.getCode() != 0){ + logger.warn("录像回放失败"); + if (result.getEvent() != null) { + errorEvent.response(result.getEvent()); + } + try { + responseAck(evt, Response.REQUEST_TIMEOUT); + } catch (SipException e) { + e.printStackTrace(); + } catch (InvalidArgumentException e) { + e.printStackTrace(); + } catch (ParseException e) { + e.printStackTrace(); + } + }else { + if (result.getMediaServerItem() != null) { + hookEvent.response(result.getMediaServerItem(), result.getResponse()); + } + } + }); }else { sendRtpItem.setPlay(true); StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(device.getDeviceId(), channelId); @@ -333,7 +345,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements sendRtpItem.setPlay(false); playService.play(mediaServerItem,device.getDeviceId(), channelId, hookEvent,errorEvent); }else { - sendRtpItem.setStreamId(streamInfo.getStreamId()); + sendRtpItem.setStreamId(streamInfo.getStream()); hookEvent.response(mediaServerItem, null); } } @@ -379,72 +391,6 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements } } - } else { - // 非上级平台请求,查询是否设备请求(通常为接收语音广播的设备) - Device device = redisCatchStorage.getDevice(requesterId); - if (device != null) { - logger.info("收到设备" + requesterId + "的语音广播Invite请求"); - responseAck(evt, Response.TRYING); - - String contentString = new String(request.getRawContent()); - // jainSip不支持y=字段, 移除移除以解析。 - String substring = contentString; - String ssrc = "0000000404"; - int ssrcIndex = contentString.indexOf("y="); - if (ssrcIndex > 0) { - substring = contentString.substring(0, ssrcIndex); - ssrc = contentString.substring(ssrcIndex + 2, ssrcIndex + 12); - } - ssrcIndex = substring.indexOf("f="); - if (ssrcIndex > 0) { - substring = contentString.substring(0, ssrcIndex); - } - SessionDescription sdp = SdpFactory.getInstance().createSessionDescription(substring); - - // 获取支持的格式 - Vector mediaDescriptions = sdp.getMediaDescriptions(true); - // 查看是否支持PS 负载96 - int port = -1; - //boolean recvonly = false; - boolean mediaTransmissionTCP = false; - Boolean tcpActive = null; - for (int i = 0; i < mediaDescriptions.size(); i++) { - MediaDescription mediaDescription = (MediaDescription)mediaDescriptions.get(i); - Media media = mediaDescription.getMedia(); - - Vector mediaFormats = media.getMediaFormats(false); - if (mediaFormats.contains("8")) { - port = media.getMediaPort(); - String protocol = media.getProtocol(); - // 区分TCP发流还是udp, 当前默认udp - if ("TCP/RTP/AVP".equals(protocol)) { - String setup = mediaDescription.getAttribute("setup"); - if (setup != null) { - mediaTransmissionTCP = true; - if ("active".equals(setup)) { - tcpActive = true; - } else if ("passive".equals(setup)) { - tcpActive = false; - } - } - } - break; - } - } - if (port == -1) { - logger.info("不支持的媒体格式,返回415"); - // 回复不支持的格式 - responseAck(evt, Response.UNSUPPORTED_MEDIA_TYPE); // 不支持的格式,发415 - return; - } - String username = sdp.getOrigin().getUsername(); - String addressStr = sdp.getOrigin().getAddress(); - logger.info("设备{}请求语音流,地址:{}:{},ssrc:{}", username, addressStr, port, ssrc); - - } else { - logger.warn("来自无效设备/平台的请求"); - responseAck(evt, Response.BAD_REQUEST); - } } } catch (SipException | InvalidArgumentException | ParseException e) { @@ -457,4 +403,74 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements e.printStackTrace(); } } + + public void inviteFromDeviceHandle(RequestEvent evt, String requesterId) throws InvalidArgumentException, ParseException, SipException, SdpException { + + // 非上级平台请求,查询是否设备请求(通常为接收语音广播的设备) + Device device = redisCatchStorage.getDevice(requesterId); + Request request = evt.getRequest(); + if (device != null) { + logger.info("收到设备" + requesterId + "的语音广播Invite请求"); + responseAck(evt, Response.TRYING); + + String contentString = new String(request.getRawContent()); + // jainSip不支持y=字段, 移除移除以解析。 + String substring = contentString; + String ssrc = "0000000404"; + int ssrcIndex = contentString.indexOf("y="); + if (ssrcIndex > 0) { + substring = contentString.substring(0, ssrcIndex); + ssrc = contentString.substring(ssrcIndex + 2, ssrcIndex + 12); + } + ssrcIndex = substring.indexOf("f="); + if (ssrcIndex > 0) { + substring = contentString.substring(0, ssrcIndex); + } + SessionDescription sdp = SdpFactory.getInstance().createSessionDescription(substring); + + // 获取支持的格式 + Vector mediaDescriptions = sdp.getMediaDescriptions(true); + // 查看是否支持PS 负载96 + int port = -1; + //boolean recvonly = false; + boolean mediaTransmissionTCP = false; + Boolean tcpActive = null; + for (int i = 0; i < mediaDescriptions.size(); i++) { + MediaDescription mediaDescription = (MediaDescription)mediaDescriptions.get(i); + Media media = mediaDescription.getMedia(); + + Vector mediaFormats = media.getMediaFormats(false); + if (mediaFormats.contains("8")) { + port = media.getMediaPort(); + String protocol = media.getProtocol(); + // 区分TCP发流还是udp, 当前默认udp + if ("TCP/RTP/AVP".equals(protocol)) { + String setup = mediaDescription.getAttribute("setup"); + if (setup != null) { + mediaTransmissionTCP = true; + if ("active".equals(setup)) { + tcpActive = true; + } else if ("passive".equals(setup)) { + tcpActive = false; + } + } + } + break; + } + } + if (port == -1) { + logger.info("不支持的媒体格式,返回415"); + // 回复不支持的格式 + responseAck(evt, Response.UNSUPPORTED_MEDIA_TYPE); // 不支持的格式,发415 + return; + } + String username = sdp.getOrigin().getUsername(); + String addressStr = sdp.getOrigin().getAddress(); + logger.info("设备{}请求语音流,地址:{}:{},ssrc:{}", username, addressStr, port, ssrc); + + } else { + logger.warn("来自无效设备/平台的请求"); + responseAck(evt, Response.BAD_REQUEST); + } + } } diff --git a/src/main/java/com/genersoft/iot/vmp/service/IMediaServerService.java b/src/main/java/com/genersoft/iot/vmp/service/IMediaServerService.java index 2e8a68ee..8c12c787 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/IMediaServerService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/IMediaServerService.java @@ -48,7 +48,7 @@ public interface IMediaServerService { SSRCInfo openRTPServer(MediaServerItem mediaServerItem, String streamId, boolean isPlayback); - void closeRTPServer(Device device, String channelId, String ssrc); + void closeRTPServer(String deviceId, String channelId, String ssrc); void clearRTPServer(MediaServerItem mediaServerItem); diff --git a/src/main/java/com/genersoft/iot/vmp/service/bean/PlayBackCallback.java b/src/main/java/com/genersoft/iot/vmp/service/bean/PlayBackCallback.java index 089523f9..5ed6cf34 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/bean/PlayBackCallback.java +++ b/src/main/java/com/genersoft/iot/vmp/service/bean/PlayBackCallback.java @@ -1,9 +1,10 @@ package com.genersoft.iot.vmp.service.bean; import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage; +import com.genersoft.iot.vmp.vmanager.bean.WVPResult; public interface PlayBackCallback { - void call(RequestMessage msg); + void call(PlayBackResult msg); } diff --git a/src/main/java/com/genersoft/iot/vmp/service/bean/PlayBackResult.java b/src/main/java/com/genersoft/iot/vmp/service/bean/PlayBackResult.java new file mode 100644 index 00000000..10a2759f --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/service/bean/PlayBackResult.java @@ -0,0 +1,55 @@ +package com.genersoft.iot.vmp.service.bean; + +import com.alibaba.fastjson.JSONObject; +import com.genersoft.iot.vmp.gb28181.event.SipSubscribe; +import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; + +import javax.sip.RequestEvent; + +public class PlayBackResult { + private int code; + private T data; + private MediaServerItem mediaServerItem; + private JSONObject response; + private SipSubscribe.EventResult event; + + public int getCode() { + return code; + } + + public void setCode(int code) { + this.code = code; + } + + public T getData() { + return data; + } + + public void setData(T data) { + this.data = data; + } + + public MediaServerItem getMediaServerItem() { + return mediaServerItem; + } + + public void setMediaServerItem(MediaServerItem mediaServerItem) { + this.mediaServerItem = mediaServerItem; + } + + public JSONObject getResponse() { + return response; + } + + public void setResponse(JSONObject response) { + this.response = response; + } + + public SipSubscribe.EventResult getEvent() { + return event; + } + + public void setEvent(SipSubscribe.EventResult event) { + this.event = event; + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java index 4f08c99d..f226a37f 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java @@ -160,16 +160,16 @@ public class MediaServerServiceImpl implements IMediaServerService { } @Override - public void closeRTPServer(Device device, String channelId, String stream) { - String mediaServerId = streamSession.getMediaServerId(device.getDeviceId(), channelId, stream); - String ssrc = streamSession.getSSRC(device.getDeviceId(), channelId, stream); + public void closeRTPServer(String deviceId, String channelId, String stream) { + String mediaServerId = streamSession.getMediaServerId(deviceId, channelId, stream); + String ssrc = streamSession.getSSRC(deviceId, channelId, stream); MediaServerItem mediaServerItem = this.getOne(mediaServerId); if (mediaServerItem != null) { - String streamId = String.format("%s_%s", device.getDeviceId(), channelId); + String streamId = String.format("%s_%s", deviceId, channelId); zlmrtpServerFactory.closeRTPServer(mediaServerItem, streamId); releaseSsrc(mediaServerItem, ssrc); } - streamSession.remove(device.getDeviceId(), channelId, stream); + streamSession.remove(deviceId, channelId, stream); } @Override diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java index de665082..0fefb0cc 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java @@ -17,6 +17,7 @@ import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.service.bean.PlayBackCallback; +import com.genersoft.iot.vmp.service.bean.PlayBackResult; import com.genersoft.iot.vmp.service.bean.SSRCInfo; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorager; @@ -115,11 +116,8 @@ public class PlayServiceImpl implements IPlayService { msg.setData(wvpResult); // 点播超时回复BYE cmder.streamByeCmd(device.getDeviceId(), channelId, streamInfo.getStream()); - // 释放rtpserver - mediaServerService.closeRTPServer(playResult.getDevice(), channelId, streamInfo.getStream()); // 回复之前所有的点播请求 resultHolder.invokeAllResult(msg); - // TODO 释放ssrc }); result.onCompletion(()->{ // 点播结束时调用截图接口 @@ -173,7 +171,10 @@ public class PlayServiceImpl implements IPlayService { WVPResult wvpResult = new WVPResult(); wvpResult.setCode(-1); // 点播返回sip错误 - mediaServerService.closeRTPServer(playResult.getDevice(), channelId, ssrcInfo.getStream()); + mediaServerService.closeRTPServer(playResult.getDevice().getDeviceId(), channelId, ssrcInfo.getStream()); + // 释放ssrc + mediaServerService.releaseSsrc(mediaServerItem, ssrcInfo.getSsrc()); + streamSession.remove(deviceId, channelId, ssrcInfo.getStream()); wvpResult.setMsg(String.format("点播失败, 错误码: %s, %s", event.statusCode, event.msg)); msg.setData(wvpResult); resultHolder.invokeAllResult(msg); @@ -222,7 +223,10 @@ public class PlayServiceImpl implements IPlayService { logger.info("收到订阅消息: " + response.toJSONString()); onPublishHandlerForPlay(mediaServerItemInuse, response, deviceId, channelId, uuid); }, (event) -> { - mediaServerService.closeRTPServer(playResult.getDevice(), channelId, ssrcInfo.getStream()); + mediaServerService.closeRTPServer(playResult.getDevice().getDeviceId(), channelId, ssrcInfo.getStream()); + // 释放ssrc + mediaServerService.releaseSsrc(mediaServerItem, ssrcInfo.getSsrc()); + streamSession.remove(deviceId, channelId, ssrcInfo.getStream()); WVPResult wvpResult = new WVPResult(); wvpResult.setCode(-1); wvpResult.setMsg(String.format("点播失败, 错误码: %s, %s", event.statusCode, event.msg)); @@ -240,7 +244,7 @@ public class PlayServiceImpl implements IPlayService { RequestMessage msg = new RequestMessage(); msg.setId(uuid); msg.setKey(DeferredResultHolder.CALLBACK_CMD_PLAY + deviceId + channelId); - StreamInfo streamInfo = onPublishHandler(mediaServerItem, resonse, deviceId, channelId); + StreamInfo streamInfo = onPublishHandler(mediaServerItem, response, deviceId, channelId); if (streamInfo != null) { DeviceChannel deviceChannel = storager.queryChannel(deviceId, channelId); if (deviceChannel != null) { @@ -298,9 +302,12 @@ public class PlayServiceImpl implements IPlayService { RequestMessage msg = new RequestMessage(); msg.setId(uuid); msg.setKey(key); + PlayBackResult playBackResult = new PlayBackResult<>(); result.onTimeout(()->{ msg.setData("回放超时"); - callback.call(msg); + playBackResult.setCode(-1); + playBackResult.setData(msg); + callback.call(playBackResult); }); cmder.playbackStreamCmd(newMediaServerItem, ssrcInfo, device, channelId, startTime, endTime, (MediaServerItem mediaServerItem, JSONObject response) -> { logger.info("收到订阅消息: " + response.toJSONString()); @@ -308,15 +315,24 @@ public class PlayServiceImpl implements IPlayService { if (streamInfo == null) { logger.warn("设备回放API调用失败!"); msg.setData("设备回放API调用失败!"); - callback.call(msg); + playBackResult.setCode(-1); + playBackResult.setData(msg); + callback.call(playBackResult); return; } redisCatchStorage.startPlayback(streamInfo); msg.setData(JSON.toJSONString(streamInfo)); - callback.call(msg); + playBackResult.setCode(0); + playBackResult.setData(msg); + playBackResult.setMediaServerItem(mediaServerItem); + playBackResult.setResponse(response); + callback.call(playBackResult); }, event -> { msg.setData(String.format("回放失败, 错误码: %s, %s", event.statusCode, event.msg)); - callback.call(msg); + playBackResult.setCode(-1); + playBackResult.setData(msg); + playBackResult.setEvent(event); + callback.call(playBackResult); }); return result; } diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/play/PlayController.java b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/play/PlayController.java index 8350d293..fd70690e 100644 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/play/PlayController.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/play/PlayController.java @@ -129,7 +129,6 @@ public class PlayController { //Response response = event.getResponse(); msg.setData(String.format("success")); resultHolder.invokeAllResult(msg); - mediaServerService.closeRTPServer(device, channelId, streamInfo.getStream()); }); if (deviceId != null || channelId != null) { diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/playback/PlaybackController.java b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/playback/PlaybackController.java index 3607a8d4..b864f466 100644 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/playback/PlaybackController.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/playback/PlaybackController.java @@ -77,8 +77,8 @@ public class PlaybackController { logger.debug(String.format("设备回放 API调用,deviceId:%s ,channelId:%s", deviceId, channelId)); } - DeferredResult> result = playService.playBack(deviceId, channelId, startTime, endTime, msg->{ - resultHolder.invokeResult(msg); + DeferredResult> result = playService.playBack(deviceId, channelId, startTime, endTime, wvpResult->{ + resultHolder.invokeResult(wvpResult.getData()); }); return result;