From e94b99d11c46246532edc93cd25cbf8c0b88f03f Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: Sun, 27 Feb 2022 20:01:31 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=9E=E7=8E=B0=E5=9B=BD=E6=A0=87=E5=BD=95?= =?UTF-8?q?=E5=83=8F=E7=BA=A7=E8=81=94=E6=92=AD=E6=94=BE=EF=BC=8C=E4=BC=98?= =?UTF-8?q?=E5=8C=96=E7=82=B9=E6=92=AD=E6=B5=81=E7=A8=8B=EF=BC=8C=E5=8A=A0?= =?UTF-8?q?=E5=BF=AB=E7=82=B9=E6=92=AD=E9=80=9F=E5=BA=A6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../iot/vmp/gb28181/bean/SendRtpItem.java | 26 ++++++ .../iot/vmp/gb28181/session/SsrcConfig.java | 1 - .../vmp/gb28181/task/GPSSubscribeTask.java | 1 - .../transmit/SIPProcessorObserver.java | 2 - .../transmit/cmd/impl/SIPCommander.java | 16 ++-- .../cmd/impl/SIPCommanderFroPlatform.java | 3 - .../request/impl/AckRequestProcessor.java | 84 ++++++++++------- .../request/impl/ByeRequestProcessor.java | 19 +++- .../request/impl/InviteRequestProcessor.java | 90 +++++++++++++++---- .../impl/InviteResponseProcessor.java | 3 - .../vmp/media/zlm/ZLMHttpHookListener.java | 2 +- .../vmp/media/zlm/ZLMHttpHookSubscribe.java | 30 ++++--- .../iot/vmp/media/zlm/ZLMRESTfulUtils.java | 1 - .../vmp/media/zlm/ZLMRTPServerFactory.java | 9 ++ .../iot/vmp/service/impl/PlayServiceImpl.java | 5 +- .../storager/dao/ParentPlatformMapper.java | 2 +- .../storager/dao/PlatformChannelMapper.java | 8 +- .../storager/impl/RedisCatchStorageImpl.java | 1 - .../impl/DeviceAlarmServiceImplTest.java | 7 -- 19 files changed, 209 insertions(+), 101 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java index 2c9c494c..3e5d222a 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java @@ -71,6 +71,16 @@ public class SendRtpItem { */ private String mediaServerId; + /** + * invite的callId + */ + private String CallId; + + /** + * 是否是play, false是playback + */ + private boolean isPlay; + public String getIp() { return ip; } @@ -174,4 +184,20 @@ public class SendRtpItem { public void setMediaServerId(String mediaServerId) { this.mediaServerId = mediaServerId; } + + public String getCallId() { + return CallId; + } + + public void setCallId(String callId) { + CallId = callId; + } + + public boolean isPlay() { + return isPlay; + } + + public void setPlay(boolean play) { + isPlay = play; + } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/session/SsrcConfig.java b/src/main/java/com/genersoft/iot/vmp/gb28181/session/SsrcConfig.java index e96e6a5b..ac54c2d9 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/session/SsrcConfig.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/session/SsrcConfig.java @@ -81,7 +81,6 @@ public class SsrcConfig { isUsed.remove(sn); notUsed.add(sn); }catch (NullPointerException e){ - System.out.printf("11111"); } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/task/GPSSubscribeTask.java b/src/main/java/com/genersoft/iot/vmp/gb28181/task/GPSSubscribeTask.java index 0d56bd58..f0d90336 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/task/GPSSubscribeTask.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/task/GPSSubscribeTask.java @@ -36,7 +36,6 @@ public class GPSSubscribeTask implements Runnable{ SubscribeInfo subscribe = redisCatchStorage.getSubscribe(key); if (subscribe != null) { - System.out.println("发送GPS消息"); ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(platformId); if (parentPlatform == null || parentPlatform.isStatus()) { // TODO 暂时只处理视频流的回复,后续增加对国标设备的支持 diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPProcessorObserver.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPProcessorObserver.java index 71025c00..d352bb2b 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPProcessorObserver.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPProcessorObserver.java @@ -141,7 +141,6 @@ public class SIPProcessorObserver implements ISIPProcessorObserver { */ @Override public void processTimeout(TimeoutEvent timeoutEvent) { - System.out.println("processTimeout"); if(timeoutProcessor != null) { timeoutProcessor.process(timeoutEvent); } @@ -173,7 +172,6 @@ public class SIPProcessorObserver implements ISIPProcessorObserver { @Override public void processDialogTerminated(DialogTerminatedEvent dialogTerminatedEvent) { - System.out.println("processDialogTerminated"); CallIdHeader callId = dialogTerminatedEvent.getDialog().getCallId(); } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java index b0593bd4..da664dd9 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java @@ -346,8 +346,11 @@ public class SIPCommander implements ISIPCommander { subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey, (MediaServerItem mediaServerItemInUse, JSONObject json)->{ if (userSetup.isWaitTrack() && json.getJSONArray("tracks") == null) return; - event.response(mediaServerItemInUse, json); - subscribe.removeSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey); + if (event != null) { + event.response(mediaServerItemInUse, json); + } + +// subscribe.removeSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey); }); // StringBuffer content = new StringBuffer(200); @@ -452,9 +455,11 @@ public class SIPCommander implements ISIPCommander { logger.debug("录像回放添加订阅,订阅内容:" + subscribeKey.toString()); subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey, (MediaServerItem mediaServerItemInUse, JSONObject json)->{ + System.out.println(344444); if (userSetup.isWaitTrack() && json.getJSONArray("tracks") == null) return; - event.response(mediaServerItemInUse, json); - subscribe.removeSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey); + if (event != null) { + event.response(mediaServerItemInUse, json); + } }); StringBuffer content = new StringBuffer(200); @@ -466,8 +471,6 @@ public class SIPCommander implements ISIPCommander { content.append("t="+DateUtil.yyyy_MM_dd_HH_mm_ssToTimestamp(startTime)+" " +DateUtil.yyyy_MM_dd_HH_mm_ssToTimestamp(endTime) +"\r\n"); - - String streamMode = device.getStreamMode().toUpperCase(); if (userSetup.isSeniorSdp()) { @@ -1202,7 +1205,6 @@ public class SIPCommander implements ISIPCommander { if (type == null) { type = "all"; } - try { StringBuffer recordInfoXml = new StringBuffer(200); recordInfoXml.append("\r\n"); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java index bd41ddb6..637381f7 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java @@ -508,9 +508,6 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform { // callid CallIdHeader callIdHeader = parentPlatform.getTransport().equals("TCP") ? tcpSipProvider.getNewCallId() : udpSipProvider.getNewCallId(); - System.out.println( - recordXml.toString() - ); Request request = headerProviderPlarformProvider.createMessageRequest(parentPlatform, recordXml.toString(), fromTag, callIdHeader); transmitRequest(parentPlatform, request); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java index 127ef29a..1e99c0b9 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java @@ -1,10 +1,13 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl; +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver; import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor; import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; +import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.service.IMediaServerService; @@ -24,6 +27,8 @@ import javax.sip.header.HeaderAddress; import javax.sip.header.ToHeader; import java.util.HashMap; import java.util.Map; +import java.util.Timer; +import java.util.TimerTask; /** * SIP命令类型: ACK请求 @@ -52,6 +57,9 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In @Autowired private IMediaServerService mediaServerService; + @Autowired + private ZLMHttpHookSubscribe subscribe; + /** * 处理 ACK请求 @@ -60,6 +68,7 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In */ @Override public void process(RequestEvent evt) { + logger.debug("ACK请求: {}", ((System.currentTimeMillis()))); Dialog dialog = evt.getDialog(); if (dialog == null) return; if (dialog.getState()== DialogState.CONFIRMED) { @@ -69,16 +78,17 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In String is_Udp = sendRtpItem.isTcp() ? "0" : "1"; String deviceId = sendRtpItem.getDeviceId(); StreamInfo streamInfo = null; - if (deviceId == null) { + if (sendRtpItem.isPlay()) { + streamInfo = redisCatchStorage.queryPlayByDevice(deviceId, channelId); + }else { + streamInfo = redisCatchStorage.queryPlaybackByDevice(deviceId, channelId); + } + System.out.println(JSON.toJSON(streamInfo)); + if (streamInfo == null) { streamInfo = new StreamInfo(); streamInfo.setApp(sendRtpItem.getApp()); streamInfo.setStreamId(sendRtpItem.getStreamId()); - }else { - streamInfo = redisCatchStorage.queryPlayByDevice(deviceId, channelId); - sendRtpItem.setStreamId(streamInfo.getStreamId()); - streamInfo.setApp("rtp"); } - redisCatchStorage.updateSendRTPSever(sendRtpItem); logger.info(platformGbId); logger.info(channelId); @@ -90,34 +100,42 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In param.put("dst_url",sendRtpItem.getIp()); param.put("dst_port", sendRtpItem.getPort()); param.put("is_udp", is_Udp); - //param.put ("src_port", sendRtpItem.getLocalPort()); // 设备推流查询,成功后才能转推 - boolean rtpPushed = false; - long startTime = System.currentTimeMillis(); - while (!rtpPushed) { - try { - if (System.currentTimeMillis() - startTime < 30 * 1000) { - MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); - if (zlmrtpServerFactory.isStreamReady(mediaInfo, streamInfo.getApp(), streamInfo.getStreamId())) { - rtpPushed = true; - logger.info("已获取设备推流[{}/{}],开始向上级推流[{}:{}]", - streamInfo.getApp() ,streamInfo.getStreamId(), sendRtpItem.getIp(), sendRtpItem.getPort()); - zlmrtpServerFactory.startSendRtpStream(mediaInfo, param); - } else { - logger.info("等待设备推流[{}/{}].......", - streamInfo.getApp() ,streamInfo.getStreamId()); - Thread.sleep(1000); - continue; - } - } else { - rtpPushed = true; - logger.info("设备推流[{}/{}]超时,终止向上级推流", - streamInfo.getApp() ,streamInfo.getStreamId()); - } - } catch (InterruptedException e) { - e.printStackTrace(); - } - } + MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); + zlmrtpServerFactory.startSendRtpStream(mediaInfo, param); +// if (zlmrtpServerFactory.isStreamReady(mediaInfo, streamInfo.getApp(), streamInfo.getStreamId())) { +// logger.info("已获取设备推流[{}/{}],开始向上级推流[{}:{}]", +// streamInfo.getApp() ,streamInfo.getStreamId(), sendRtpItem.getIp(), sendRtpItem.getPort()); +// zlmrtpServerFactory.startSendRtpStream(mediaInfo, param); +// } else { +// // 对hook进行订阅 +// logger.info("等待设备推流[{}/{}].......", +// streamInfo.getApp(), streamInfo.getStreamId()); +// Timer timer = new Timer(); +// timer.schedule(new TimerTask() { +// @Override +// public void run() { +// logger.info("设备推流[{}/{}]超时,终止向上级推流", +// finalStreamInfo.getApp() , finalStreamInfo.getStreamId()); +// +// } +// }, 30*1000L); +// // 添加订阅 +// JSONObject subscribeKey = new JSONObject(); +// subscribeKey.put("app", "rtp"); +// subscribeKey.put("stream", streamInfo.getStreamId()); +// subscribeKey.put("mediaServerId", streamInfo.getMediaServerId()); +// subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_publish, subscribeKey, +// (MediaServerItem mediaServerItemInUse, JSONObject json) -> { +// logger.info("已获取设备推流[{}/{}],开始向上级推流[{}:{}]", +// finalStreamInfo.getApp(), finalStreamInfo.getStreamId(), sendRtpItem.getIp(), sendRtpItem.getPort()); +// timer.cancel(); +// zlmrtpServerFactory.startSendRtpStream(mediaInfo, param); +// subscribe.removeSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey); +// }); +// } + + } } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java index feb44c54..9b6e7276 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java @@ -87,18 +87,29 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); zlmrtpServerFactory.stopSendRtpStream(mediaInfo, param); redisCatchStorage.deleteSendRTPServer(platformGbId, channelId); - if (zlmrtpServerFactory.totalReaderCount(mediaInfo, sendRtpItem.getApp(), streamId) == 0) { + int totalReaderCount = zlmrtpServerFactory.totalReaderCount(mediaInfo, sendRtpItem.getApp(), streamId); + if (totalReaderCount == 0) { logger.info(streamId + "无其它观看者,通知设备停止推流"); cmder.streamByeCmd(sendRtpItem.getDeviceId(), channelId); + }else if (totalReaderCount == -1){ + logger.warn(streamId + " 查找其它观看者失败"); } } // 可能是设备主动停止 Device device = storager.queryVideoDeviceByChannelId(platformGbId); if (device != null) { - StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(device.getDeviceId(), channelId); - if (streamInfo != null) { - redisCatchStorage.stopPlay(streamInfo); + if (sendRtpItem.isPlay()) { + StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(device.getDeviceId(), channelId); + if (streamInfo != null) { + redisCatchStorage.stopPlay(streamInfo); + } + }else { + StreamInfo streamInfo = redisCatchStorage.queryPlaybackByDevice(device.getDeviceId(), channelId); + if (streamInfo != null) { + redisCatchStorage.stopPlayback(streamInfo); + } } + storager.stopPlay(device.getDeviceId(), channelId); mediaServerService.closeRTPServer(device, channelId); } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java index ae2819ca..a157a5c3 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java @@ -1,18 +1,28 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl; +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.gb28181.bean.*; +import com.genersoft.iot.vmp.gb28181.event.SipSubscribe; import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver; +import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage; +import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander; import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander; import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform; import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor; import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; +import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.service.IPlayService; +import com.genersoft.iot.vmp.service.bean.SSRCInfo; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorager; import com.genersoft.iot.vmp.vmanager.gb28181.play.bean.PlayResult; +import gov.nist.javax.sdp.TimeDescriptionImpl; +import gov.nist.javax.sdp.fields.TimeField; import gov.nist.javax.sip.address.AddressImpl; import gov.nist.javax.sip.address.SipUri; import org.slf4j.Logger; @@ -27,10 +37,13 @@ import javax.sip.RequestEvent; import javax.sip.ServerTransaction; import javax.sip.SipException; import javax.sip.address.SipURI; +import javax.sip.header.CallIdHeader; import javax.sip.header.FromHeader; import javax.sip.message.Request; import javax.sip.message.Response; import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.Date; import java.util.List; import java.util.Vector; @@ -60,6 +73,9 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements @Autowired private IPlayService playService; + @Autowired + private ISIPCommander commander; + @Autowired private ZLMRTPServerFactory zlmrtpServerFactory; @@ -69,6 +85,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements @Autowired private SIPProcessorObserver sipProcessorObserver; + @Override public void afterPropertiesSet() throws Exception { // 添加消息处理的订阅 @@ -84,6 +101,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements @Override public void process(RequestEvent evt) { // Invite Request消息实现,此消息一般为级联消息,上级给下级发送请求视频指令 + Long startTimeForInvite = System.currentTimeMillis(); try { Request request = evt.getRequest(); SipURI sipURI = (SipURI) request.getRequestURI(); @@ -91,6 +109,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements String requesterId = null; FromHeader fromHeader = (FromHeader)request.getHeader(FromHeader.NAME); + CallIdHeader callIdHeader = (CallIdHeader)request.getHeader(CallIdHeader.NAME); AddressImpl address = (AddressImpl) fromHeader.getAddress(); SipUri uri = (SipUri) address.getURI(); requesterId = uri.getUser(); @@ -101,7 +120,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements return; } - // 查询请求方是否上级平台 + // 查询请求是否来自上级平台\设备 ParentPlatform platform = storager.queryParentPlatByServerGBId(requesterId); if (platform != null) { // 查询平台下是否有该通道 @@ -158,7 +177,21 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements ssrc = ssrcDefault; sdp = SdpFactory.getInstance().createSessionDescription(contentString); } - + String sessionName = sdp.getSessionName().getValue(); + + Long startTime = null; + Long stopTime = null; + Date start = null; + Date end = null; + if (sdp.getTimeDescriptions(false) != null && sdp.getTimeDescriptions(false).size() > 0) { + TimeDescriptionImpl timeDescription = (TimeDescriptionImpl)(sdp.getTimeDescriptions(false).get(0)); + TimeField startTimeFiled = (TimeField)timeDescription.getTime(); + startTime = startTimeFiled.getStartTime(); + stopTime = startTimeFiled.getStopTime(); + + start = new Date(startTime*1000); + end = new Date(stopTime*1000); + } // 获取支持的格式 Vector mediaDescriptions = sdp.getMediaDescriptions(true); // 查看是否支持PS 负载96 @@ -228,23 +261,31 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements responseAck(evt, Response.BUSY_HERE); return; } - + sendRtpItem.setCallId(callIdHeader.getCallId()); + sendRtpItem.setPlay("Play".equals(sessionName)); // 写入redis, 超时时回复 redisCatchStorage.updateSendRTPSever(sendRtpItem); - // 通知下级推流, - PlayResult playResult = playService.play(mediaServerItem,device.getDeviceId(), channelId, (mediaServerItemInUSe, responseJSON)->{ - // 收到推流, 回复200OK, 等待ack + + Device finalDevice = device; + MediaServerItem finalMediaServerItem = mediaServerItem; + Long finalStartTime = startTime; + Long finalStopTime = stopTime; + ZLMHttpHookSubscribe.Event hookEvent = (mediaServerItemInUSe, responseJSON)->{ + logger.info("[上级点播]收到下级开始点播订阅, {}/{}", sendRtpItem.getApp(), sendRtpItem.getStreamId()); // if (sendRtpItem == null) return; sendRtpItem.setStatus(1); redisCatchStorage.updateSendRTPSever(sendRtpItem); - // TODO 添加对tcp的支持 StringBuffer content = new StringBuffer(200); content.append("v=0\r\n"); content.append("o="+ channelId +" 0 0 IN IP4 "+mediaServerItemInUSe.getSdpIp()+"\r\n"); - content.append("s=Play\r\n"); + content.append("s=" + sessionName+"\r\n"); content.append("c=IN IP4 "+mediaServerItemInUSe.getSdpIp()+"\r\n"); - content.append("t=0 0\r\n"); + if ("Playback".equals(sessionName)) { + content.append("t=" + finalStartTime + " " + finalStopTime + "\r\n"); + }else { + content.append("t=0 0\r\n"); + } content.append("m=video "+ sendRtpItem.getLocalPort()+" RTP/AVP 96\r\n"); content.append("a=sendonly\r\n"); content.append("a=rtpmap:96 PS/90000\r\n"); @@ -260,7 +301,11 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements } catch (ParseException e) { e.printStackTrace(); } - } ,((event) -> { + if ("Playback".equals(sessionName) && responseJSON != null) { + playService.onPublishHandlerForPlayBack(finalMediaServerItem, responseJSON, finalDevice.getDeviceId(), channelId, null); + } + }; + SipSubscribe.Event errorEvent = ((event) -> { // 未知错误。直接转发设备点播的错误 Response response = null; try { @@ -271,11 +316,27 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements } catch (ParseException | SipException | InvalidArgumentException e) { e.printStackTrace(); } - })); - if (logger.isDebugEnabled()) { - logger.debug(playResult.getResult().toString()); + }); + if ("Playback".equals(sessionName)) { + sendRtpItem.setPlay(false); + SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, sendRtpItem.getSsrc(), true); + sendRtpItem.setStreamId(ssrc); + SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + commander.playbackStreamCmd(mediaServerItem, ssrcInfo, device, channelId, format.format(start), format.format(end), hookEvent, errorEvent); + }else { + sendRtpItem.setPlay(true); + StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(device.getDeviceId(), channelId); + if (streamInfo == null) { + if (mediaServerItem.isRtpEnable()) { + sendRtpItem.setStreamId(String.format("%s_%s", device.getDeviceId(), channelId)); + } + sendRtpItem.setPlay(false); + playService.play(mediaServerItem,device.getDeviceId(), channelId, hookEvent,errorEvent); + }else { + sendRtpItem.setStreamId(streamInfo.getStreamId()); + hookEvent.response(mediaServerItem, null); + } } - }else if (gbStream != null) { SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId, gbStream.getApp(), gbStream.getStream(), channelId, @@ -295,7 +356,6 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements sendRtpItem.setStatus(1); redisCatchStorage.updateSendRTPSever(sendRtpItem); - // TODO 添加对tcp的支持 StringBuffer content = new StringBuffer(200); content.append("v=0\r\n"); content.append("o="+ channelId +" 0 0 IN IP4 "+mediaServerItem.getSdpIp()+"\r\n"); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/InviteResponseProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/InviteResponseProcessor.java index 5446a902..1b5081b1 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/InviteResponseProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/InviteResponseProcessor.java @@ -82,9 +82,6 @@ public class InviteResponseProcessor extends SIPResponseProcessorAbstract { requestURI.setPort(event.getRemotePort()); reqAck.setRequestURI(requestURI); logger.info("向 " + event.getRemoteIpAddress() + ":" + event.getRemotePort() + "回复ack"); - SipURI sipURI = (SipURI)dialog.getRemoteParty().getURI(); - String deviceId = requestURI.getUser(); - String channelId = sipURI.getUser(); dialog.sendAck(reqAck); diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java index 4f458135..e2c83ea9 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java @@ -181,7 +181,7 @@ public class ZLMHttpHookListener { @PostMapping(value = "/on_publish", produces = "application/json;charset=UTF-8") public ResponseEntity onPublish(@RequestBody JSONObject json) { - logger.debug("[ ZLM HOOK ]on_publish API调用,参数:" + json.toString()); + logger.info("[ ZLM HOOK ]on_publish API调用,参数:" + json.toString()); JSONObject ret = new JSONObject(); ret.put("code", 0); ret.put("msg", "success"); diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookSubscribe.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookSubscribe.java index c8cca53e..84b36e3e 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookSubscribe.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookSubscribe.java @@ -77,21 +77,23 @@ public class ZLMHttpHookSubscribe { if (eventMap == null) { return; } - Iterator> iterator = eventMap.entrySet().iterator(); - while (iterator.hasNext()){ - Map.Entry next = iterator.next(); - JSONObject key = next.getKey(); - Boolean result = null; - for (String s : key.keySet()) { - if (result == null) { - result = key.getString(s).equals(hookResponse.getString(s)); - }else { - if (key.getString(s) == null) continue; - result = result && key.getString(s).equals(hookResponse.getString(s)); + + Set> entries = eventMap.entrySet(); + if (entries.size() > 0) { + for (Map.Entry entry : entries) { + JSONObject key = entry.getKey(); + Boolean result = null; + for (String s : key.keySet()) { + if (result == null) { + result = key.getString(s).equals(hookResponse.getString(s)); + }else { + if (key.getString(s) == null) continue; + result = result && key.getString(s).equals(hookResponse.getString(s)); + } + } + if (null != result && result){ + entries.remove(entry); } - } - if (null != result && result){ - iterator.remove(); } } } diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java index 05ecd3fb..d0b1cb2d 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java @@ -72,7 +72,6 @@ public class ZLMRESTfulUtils { ResponseBody responseBody = response.body(); if (responseBody != null) { String responseStr = responseBody.string(); - System.out.println(responseStr); responseJSON = JSON.parseObject(responseStr); } }else { diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java index 30a15096..76bab9c0 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java @@ -242,9 +242,18 @@ public class ZLMRTPServerFactory { */ public int totalReaderCount(MediaServerItem mediaServerItem, String app, String streamId) { JSONObject mediaInfo = zlmresTfulUtils.getMediaInfo(mediaServerItem, app, "rtmp", streamId); + Integer code = mediaInfo.getInteger("code"); if (mediaInfo == null) { return 0; } + if ( code < 0) { + logger.warn("查询流({}/{})是否有其它观看者时得到: {}", app, streamId, mediaInfo.getString("msg")); + return -1; + } + if ( code == 0 && ! mediaInfo.getBoolean("online")) { + logger.warn("查询流({}/{})是否有其它观看者时得到: {}", app, streamId, mediaInfo.getString("msg")); + return -1; + } return mediaInfo.getInteger("totalReaderCount"); } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java index cf30a794..d3f6976c 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java @@ -122,7 +122,6 @@ public class PlayServiceImpl implements IPlayService { // 点播结束时调用截图接口 try { String classPath = ResourceUtils.getURL("classpath:").getPath(); - // System.out.println(classPath); // 兼容打包为jar的class路径 if(classPath.contains("jar")) { classPath = classPath.substring(0, classPath.lastIndexOf(".")); @@ -238,11 +237,11 @@ public class PlayServiceImpl implements IPlayService { } @Override - public void onPublishHandlerForPlay(MediaServerItem mediaServerItem, JSONObject resonse, String deviceId, String channelId, String uuid) { + public void onPublishHandlerForPlay(MediaServerItem mediaServerItem, JSONObject response, String deviceId, String channelId, String uuid) { RequestMessage msg = new RequestMessage(); msg.setId(uuid); msg.setKey(DeferredResultHolder.CALLBACK_CMD_PLAY + deviceId + channelId); - StreamInfo streamInfo = onPublishHandler(mediaServerItem, resonse, deviceId, channelId, uuid); + StreamInfo streamInfo = onPublishHandler(mediaServerItem, response, deviceId, channelId, uuid); if (streamInfo != null) { DeviceChannel deviceChannel = storager.queryChannel(deviceId, channelId); if (deviceChannel != null) { diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/ParentPlatformMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/ParentPlatformMapper.java index d10dde51..f74b6d4d 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/dao/ParentPlatformMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/ParentPlatformMapper.java @@ -88,7 +88,7 @@ public interface ParentPlatformMapper { ""}) int setDefaultCatalog(String platformId, String catalogId); - @Select("select 'channel' as name, count(pgc.platformId) count from platform_gb_channel pgc where pgc.platformId=#{platformId} and pgc.channelId =#{gbId} " + + @Select("select 'channel' as name, count(pgc.platformId) count from platform_gb_channel pgc left join device_channel dc on dc.id = pgc.deviceChannelId where pgc.platformId=#{platformId} and dc.channelId =#{gbId} " + "union " + "select 'stream' as name, count(pgs.platformId) count from platform_gb_stream pgs left join gb_stream gs on pgs.gbStreamId = gs.gbStreamId where pgs.platformId=#{platformId} and gs.gbId = #{gbId}") List getChannelSource(String platformId, String gbId); diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/PlatformChannelMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/PlatformChannelMapper.java index 3df38fc3..0abff270 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/dao/PlatformChannelMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/PlatformChannelMapper.java @@ -52,8 +52,8 @@ public interface PlatformChannelMapper { int cleanChannelForGB(String platformId); - @Select("SELECT * FROM device_channel WHERE deviceId = (SELECT deviceId FROM platform_gb_channel WHERE " + - "platformId='${platformId}' AND channelId='${channelId}' ) AND channelId='${channelId}'") + @Select("SELECT dc.* FROM platform_gb_channel pgc left join device_channel dc on dc.id = pgc.deviceChannelId WHERE " + + "pgc.platformId=#{platformId} AND dc.channelId=#{channelId}") DeviceChannel queryChannelInParentPlatform(String platformId, String channelId); @@ -62,7 +62,7 @@ public interface PlatformChannelMapper { "where pgc.platformId=#{platformId} and pgc.catalogId=#{catalogId}") List queryChannelInParentPlatformAndCatalog(String platformId, String catalogId); - @Select("SELECT * FROM device WHERE deviceId = (SELECT deviceId FROM platform_gb_channel WHERE platformId='${platformId}' AND channelId='${channelId}')") + @Select("SELECT * FROM device WHERE deviceId = (SELECT deviceId FROM platform_gb_channel pgc left join device_channel dc on dc.id = pgc.deviceChannelId WHERE pgc.platformId='${platformId}' AND dc.channelId='${channelId}')") Device queryVideoDeviceByPlatformIdAndChannelId(String platformId, String channelId); @Delete("") int delByCatalogIdAndChannelIdAndPlatformId(PlatformCatalog platformCatalog); diff --git a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java index 92fdf6c5..1baefbe8 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java @@ -139,7 +139,6 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage { @Override public StreamInfo queryPlayByDevice(String deviceId, String channelId) { -// List playLeys = redis.keys(String.format("%S_*_%s_%s", VideoManagerConstants.PLAYER_PREFIX, List playLeys = redis.scan(String.format("%S_%s_*_%s_%s", VideoManagerConstants.PLAYER_PREFIX, userSetup.getServerId(), deviceId, diff --git a/src/test/java/com/genersoft/iot/vmp/service/impl/DeviceAlarmServiceImplTest.java b/src/test/java/com/genersoft/iot/vmp/service/impl/DeviceAlarmServiceImplTest.java index 3cb9aa56..23b9f6b2 100644 --- a/src/test/java/com/genersoft/iot/vmp/service/impl/DeviceAlarmServiceImplTest.java +++ b/src/test/java/com/genersoft/iot/vmp/service/impl/DeviceAlarmServiceImplTest.java @@ -50,14 +50,7 @@ class DeviceAlarmServiceImplTest { // System.out.println(deviceAlarmService.getAllAlarm(0, 10000, "11111111111111111111", null, "1", null, // null, null).getSize()); - System.out.println(deviceAlarmService.getAllAlarm(0, 10000, "11111111111111111111", null, null, null, - "2021-01-01 00:00:00", null).getSize()); - System.out.println(deviceAlarmService.getAllAlarm(0, 10000, "11111111111111111111", null, null, null, - null, "2021-04-01 09:00:00").getSize()); - - System.out.println(deviceAlarmService.getAllAlarm(0, 10000, "11111111111111111111", null, null, null, - "2021-02-01 01:00:00", "2021-04-01 04:00:00").getSize()); }