diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPProcessorFactory.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPProcessorFactory.java index b3b2fbaf..de9d8377 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPProcessorFactory.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPProcessorFactory.java @@ -156,6 +156,7 @@ public class SIPProcessorFactory { processor.setRequestEvent(evt); processor.setRedisCatchStorage(redisCatchStorage); processor.setZlmrtpServerFactory(zlmrtpServerFactory); + processor.setSIPCommander(cmder); return processor; } else if (Request.CANCEL.equals(method)) { CancelRequestProcessor processor = new CancelRequestProcessor(); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/ByeRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/ByeRequestProcessor.java index a14a4cc6..c9ea567a 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/ByeRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/ByeRequestProcessor.java @@ -1,29 +1,38 @@ package com.genersoft.iot.vmp.gb28181.transmit.request.impl; +import javax.sip.address.SipURI; import javax.sip.Dialog; import javax.sip.DialogState; import javax.sip.InvalidArgumentException; import javax.sip.RequestEvent; import javax.sip.SipException; +import javax.sip.header.FromHeader; +import javax.sip.header.HeaderAddress; +import javax.sip.header.ToHeader; import javax.sip.message.Response; import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; +import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander; import com.genersoft.iot.vmp.gb28181.transmit.request.SIPRequestAbstractProcessor; import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; +import org.apache.log4j.Logger; + import java.text.ParseException; import java.util.HashMap; import java.util.Map; /** * @Description: BYE请求处理器 - * @author: swwheihei - * @date: 2020年5月3日 下午5:32:05 + * @author: lawrencehj + * @date: 2021年3月9日 */ public class ByeRequestProcessor extends SIPRequestAbstractProcessor { - private IRedisCatchStorage redisCatchStorage; + private ISIPCommander cmder; + + private IRedisCatchStorage redisCatchStorage; private ZLMRTPServerFactory zlmrtpServerFactory; @@ -38,10 +47,8 @@ public class ByeRequestProcessor extends SIPRequestAbstractProcessor { Dialog dialog = evt.getDialog(); if (dialog == null) return; if (dialog.getState().equals(DialogState.TERMINATED)) { - String remoteUri = dialog.getRemoteParty().getURI().toString(); - String localUri = dialog.getLocalParty().getURI().toString(); - String platformGbId = remoteUri.substring(remoteUri.indexOf(":") + 1, remoteUri.indexOf("@")); - String channelId = localUri.substring(remoteUri.indexOf(":") + 1, remoteUri.indexOf("@")); + String platformGbId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(FromHeader.NAME)).getAddress().getURI()).getUser(); + String channelId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(ToHeader.NAME)).getAddress().getURI()).getUser(); SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(platformGbId, channelId); String streamId = sendRtpItem.getStreamId(); Map param = new HashMap<>(); @@ -50,6 +57,11 @@ public class ByeRequestProcessor extends SIPRequestAbstractProcessor { param.put("stream",streamId); System.out.println("停止向上级推流:" + streamId); zlmrtpServerFactory.stopSendRtpStream(param); + redisCatchStorage.deleteSendRTPServer(platformGbId, channelId); + if (zlmrtpServerFactory.totalReaderCount(streamId) == 0) { + System.out.println(streamId + "无其它观看者,通知设备停止推流"); + cmder.streamByeCmd(streamId); + } } } catch (SipException e) { e.printStackTrace(); @@ -58,8 +70,6 @@ public class ByeRequestProcessor extends SIPRequestAbstractProcessor { } catch (ParseException e) { e.printStackTrace(); } - // TODO 优先级99 Bye Request消息实现,此消息一般为级联消息,上级给下级发送视频停止指令 - } /*** @@ -89,4 +99,13 @@ public class ByeRequestProcessor extends SIPRequestAbstractProcessor { public void setZlmrtpServerFactory(ZLMRTPServerFactory zlmrtpServerFactory) { this.zlmrtpServerFactory = zlmrtpServerFactory; } + + public ISIPCommander getSIPCommander() { + return cmder; + } + + public void setSIPCommander(ISIPCommander cmder) { + this.cmder = cmder; + } + } diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java index 90b53695..51f61eff 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java @@ -267,20 +267,25 @@ public class ZLMHttpHookListener { } String streamId = json.getString("stream"); - - cmder.streamByeCmd(streamId); StreamInfo streamInfo = redisCatchStorage.queryPlayByStreamId(streamId); - if (streamInfo!=null){ - redisCatchStorage.stopPlay(streamInfo); - storager.stopPlay(streamInfo.getDeviceID(), streamInfo.getChannelId()); + + JSONObject ret = new JSONObject(); + ret.put("code", 0); + ret.put("close", true); + + if (streamInfo != null) { + if (redisCatchStorage.isChannelSendingRTP(streamInfo.getChannelId())) { + ret.put("close", false); + } else { + cmder.streamByeCmd(streamId); + redisCatchStorage.stopPlay(streamInfo); + storager.stopPlay(streamInfo.getDeviceID(), streamInfo.getChannelId()); + } }else{ + cmder.streamByeCmd(streamId); streamInfo = redisCatchStorage.queryPlaybackByStreamId(streamId); redisCatchStorage.stopPlayback(streamInfo); } - - JSONObject ret = new JSONObject(); - ret.put("code", 0); - ret.put("close", true); return new ResponseEntity(ret.toString(),HttpStatus.OK); } diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java index 00951ba2..1f1693df 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java @@ -152,6 +152,16 @@ public class ZLMRTPServerFactory { return (mediaInfo.getInteger("code") == 0 && mediaInfo.getBoolean("online")); } + /** + * 查询转推的流是否有其它观看者 + * @param streamId + * @return + */ + public int totalReaderCount(String streamId) { + JSONObject mediaInfo = zlmresTfulUtils.getMediaInfo("rtp", "rtmp", streamId); + return mediaInfo.getInteger("totalReaderCount"); + } + /** * 调用zlm RESTful API —— stopSendRtp */ diff --git a/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java b/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java index ca70620f..90611846 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java @@ -89,4 +89,17 @@ public interface IRedisCatchStorage { */ SendRtpItem querySendRTPServer(String platformGbId, String channelId); + /** + * 删除RTP推送信息缓存 + * @param platformGbId + * @param channelId + */ + void deleteSendRTPServer(String platformGbId, String channelId); + + /** + * 查询某个通道是否存在上级点播(RTP推送) + * @param channelId + */ + boolean isChannelSendingRTP(String channelId); + } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java index 6153e5f7..3feb347e 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java @@ -225,4 +225,30 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage { return (SendRtpItem)redis.get(key); } + /** + * 删除RTP推送信息缓存 + * @param platformGbId + * @param channelId + */ + @Override + public void deleteSendRTPServer(String platformGbId, String channelId) { + String key = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX + platformGbId + "_" + channelId; + redis.del(key); + } + + /** + * 查询某个通道是否存在上级点播(RTP推送) + * @param channelId + */ + @Override + public boolean isChannelSendingRTP(String channelId) { + String key = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX + "*_" + channelId; + List RtpStreams = redis.scan(key); + if (RtpStreams.size() > 0) { + return true; + } else { + return false; + } + } + }