diff --git a/README.md b/README.md index 89120451..26d07287 100644 --- a/README.md +++ b/README.md @@ -60,21 +60,24 @@ https://gitee.com/18010473990/wvp-GB28181.git 15. 支持订阅与通知方法 - [X] 移动位置订阅 - [X] 移动位置通知处理 - - [ ] 报警事件订阅 + - [X] 报警事件订阅 - [X] 报警事件通知处理 - [ ] 设备目录订阅 - [X] 设备目录通知处理 16. 移动位置查询和显示,可通过配置文件设置移动位置历史是否存储 # 2.0 支持特性 -- [ ] 国标通道向上级联 +- [X] 国标通道向上级联 - [X] WEB添加上级平台 - [X] 注册 - [X] 心跳保活 - [X] 通道选择 - [X] 通道推送 - - [ ] 点播 - - [ ] 云台控制 + - [X] 点播 + - [X] 云台控制 + - [X] 平台状态查询 + - [X] 平台信息查询 + - [X] 平台远程启动 - [ ] 添加RTSP视频 - [ ] 添加ONVIF探测局域网内的设备 - [ ] 添加RTMP视频 diff --git a/src/main/java/com/genersoft/iot/vmp/VManageBootstrap.java b/src/main/java/com/genersoft/iot/vmp/VManageBootstrap.java index 4a92e093..d237b165 100644 --- a/src/main/java/com/genersoft/iot/vmp/VManageBootstrap.java +++ b/src/main/java/com/genersoft/iot/vmp/VManageBootstrap.java @@ -4,10 +4,20 @@ import java.util.logging.LogManager; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.context.ConfigurableApplicationContext; @SpringBootApplication public class VManageBootstrap extends LogManager { + private static String[] args; + private static ConfigurableApplicationContext context; public static void main(String[] args) { - SpringApplication.run(VManageBootstrap.class, args); + VManageBootstrap.args = args; + VManageBootstrap.context = SpringApplication.run(VManageBootstrap.class, args); + } + // 项目重启 + public static void restart() { + context.close(); + VManageBootstrap.context = SpringApplication.run(VManageBootstrap.class, args); + } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/session/VideoStreamSessionManager.java b/src/main/java/com/genersoft/iot/vmp/gb28181/session/VideoStreamSessionManager.java index c69faf96..65e1e5f6 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/session/VideoStreamSessionManager.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/session/VideoStreamSessionManager.java @@ -15,6 +15,7 @@ import org.springframework.stereotype.Component; public class VideoStreamSessionManager { private ConcurrentHashMap sessionMap = new ConcurrentHashMap<>(); + private ConcurrentHashMap ssrcMap = new ConcurrentHashMap<>(); public String createPlaySsrc(){ return SsrcUtil.getPlaySsrc(); @@ -24,16 +25,18 @@ public class VideoStreamSessionManager { return SsrcUtil.getPlayBackSsrc(); } - public void put(String ssrc,ClientTransaction transaction){ - sessionMap.put(ssrc, transaction); + public void put(String streamId,String ssrc,ClientTransaction transaction){ + sessionMap.put(streamId, transaction); + ssrcMap.put(streamId, ssrc); } - public ClientTransaction get(String ssrc){ - return sessionMap.get(ssrc); + public ClientTransaction get(String streamId){ + return sessionMap.get(streamId); } - public void remove(String ssrc) { - sessionMap.remove(ssrc); - SsrcUtil.releaseSsrc(ssrc); + public void remove(String streamId) { + sessionMap.remove(streamId); + SsrcUtil.releaseSsrc(ssrcMap.get(streamId)); + ssrcMap.remove(streamId); } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPProcessorFactory.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPProcessorFactory.java index b3b2fbaf..de9d8377 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPProcessorFactory.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPProcessorFactory.java @@ -156,6 +156,7 @@ public class SIPProcessorFactory { processor.setRequestEvent(evt); processor.setRedisCatchStorage(redisCatchStorage); processor.setZlmrtpServerFactory(zlmrtpServerFactory); + processor.setSIPCommander(cmder); return processor; } else if (Request.CANCEL.equals(method)) { CancelRequestProcessor processor = new CancelRequestProcessor(); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommander.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommander.java index 6fa4eca9..27000bb7 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommander.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommander.java @@ -77,6 +77,14 @@ public interface ISIPCommander { */ boolean frontEndCmd(Device device, String channelId, int cmdCode, int parameter1, int parameter2, int combineCode2); + /** + * 前端控制指令(用于转发上级指令) + * @param device 控制设备 + * @param channelId 预览通道 + * @param cmdString 前端控制指令串 + */ + boolean fronEndCmd(Device device, String channelId, String cmdString); + /** * 请求预览视频流 * diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommanderForPlatform.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommanderForPlatform.java index 5c4b8aff..1751ede0 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommanderForPlatform.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommanderForPlatform.java @@ -42,4 +42,23 @@ public interface ISIPCommanderForPlatform { * @return */ boolean catalogQuery(DeviceChannel channel, ParentPlatform parentPlatform, String sn, String fromTag, int size); + + /** + * 向上级回复DeviceInfo查询信息 + * @param parentPlatform 平台信息 + * @param sn + * @param fromTag + * @return + */ + boolean deviceInfoResponse(ParentPlatform parentPlatform, String sn, String fromTag); + + /** + * 向上级回复DeviceStatus查询信息 + * @param parentPlatform 平台信息 + * @param sn + * @param fromTag + * @return + */ + boolean deviceStatusResponse(ParentPlatform parentPlatform, String sn, String fromTag); + } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java index 912189c9..e941a8ba 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java @@ -235,7 +235,7 @@ public class SIPCommander implements ISIPCommander { ptzXml.append("\r\n"); String tm = Long.toString(System.currentTimeMillis()); - Request request = headerProvider.createMessageRequest(device, ptzXml.toString(), "ViaPtzBranch", "FromPtz" + tm, null); + Request request = headerProvider.createMessageRequest(device, ptzXml.toString(), "z9hG4bK-ViaPtz-" + tm, "FromPtz" + tm, null); transmitRequest(device, request); return true; @@ -272,7 +272,7 @@ public class SIPCommander implements ISIPCommander { ptzXml.append("\r\n"); String tm = Long.toString(System.currentTimeMillis()); - Request request = headerProvider.createMessageRequest(device, ptzXml.toString(), "ViaPtzBranch", "FromPtz" + tm, null); + Request request = headerProvider.createMessageRequest(device, ptzXml.toString(), "z9hG4bK-ViaPtz-" + tm, "FromPtz" + tm, null); transmitRequest(device, request); return true; } catch (SipException | ParseException | InvalidArgumentException e) { @@ -282,6 +282,36 @@ public class SIPCommander implements ISIPCommander { } /** + * 前端控制指令(用于转发上级指令) + * @param device 控制设备 + * @param channelId 预览通道 + * @param cmdString 前端控制指令串 + */ + @Override + public boolean fronEndCmd(Device device, String channelId, String cmdString) { + try { + StringBuffer ptzXml = new StringBuffer(200); + ptzXml.append("\r\n"); + ptzXml.append("\r\n"); + ptzXml.append("DeviceControl\r\n"); + ptzXml.append("" + (int)((Math.random()*9+1)*100000) + "\r\n"); + ptzXml.append("" + channelId + "\r\n"); + ptzXml.append("" + cmdString + "\r\n"); + ptzXml.append("\r\n"); + ptzXml.append("\r\n"); + ptzXml.append("\r\n"); + + String tm = Long.toString(System.currentTimeMillis()); + Request request = headerProvider.createMessageRequest(device, ptzXml.toString(), "z9hG4bK-ViaPtz-" + tm, "FromPtz" + tm, null); + transmitRequest(device, request); + return true; + } catch (SipException | ParseException | InvalidArgumentException e) { + e.printStackTrace(); + } + return false; + } + + /** * 请求预览视频流 * @param device 视频设备 * @param channelId 预览通道 @@ -387,9 +417,7 @@ public class SIPCommander implements ISIPCommander { Request request = headerProvider.createInviteRequest(device, channelId, content.toString(), null, "FromInvt" + tm, null, ssrc); ClientTransaction transaction = transmitRequest(device, request, errorEvent); - streamSession.put(streamId, transaction); - - + streamSession.put(streamId,ssrc, transaction); } catch ( SipException | ParseException | InvalidArgumentException e) { e.printStackTrace(); @@ -487,7 +515,7 @@ public class SIPCommander implements ISIPCommander { Request request = headerProvider.createPlaybackInviteRequest(device, channelId, content.toString(), null, "fromplybck" + tm, null); ClientTransaction transaction = transmitRequest(device, request, errorEvent); - streamSession.put(streamId, transaction); + streamSession.put(streamId, ssrc, transaction); } catch ( SipException | ParseException | InvalidArgumentException e) { e.printStackTrace(); @@ -893,7 +921,7 @@ public class SIPCommander implements ISIPCommander { catalogXml.append("\r\n"); String tm = Long.toString(System.currentTimeMillis()); - Request request = headerProvider.createMessageRequest(device, catalogXml.toString(), "z9hG4bK-ViaDeviceInfo" + tm, "FromDev" + tm, null); + Request request = headerProvider.createMessageRequest(device, catalogXml.toString(), "z9hG4bK-ViaDeviceInfo-" + tm, "FromDev" + tm, null); transmitRequest(device, request); @@ -923,7 +951,7 @@ public class SIPCommander implements ISIPCommander { catalogXml.append("\r\n"); String tm = Long.toString(System.currentTimeMillis()); - Request request = headerProvider.createMessageRequest(device, catalogXml.toString(), "z9hG4bK-ViaCatalog" + tm, "FromCat" + tm, null); + Request request = headerProvider.createMessageRequest(device, catalogXml.toString(), "z9hG4bK-ViaCatalog-" + tm, "FromCat" + tm, null); transmitRequest(device, request, errorEvent); } catch (SipException | ParseException | InvalidArgumentException e) { @@ -958,7 +986,7 @@ public class SIPCommander implements ISIPCommander { recordInfoXml.append("\r\n"); String tm = Long.toString(System.currentTimeMillis()); - Request request = headerProvider.createMessageRequest(device, recordInfoXml.toString(), "ViaRecordInfoBranch", "fromRec" + tm, null); + Request request = headerProvider.createMessageRequest(device, recordInfoXml.toString(), "z9hG4bK-ViaRecordInfo-" + tm, "fromRec" + tm, null); transmitRequest(device, request); } catch (SipException | ParseException | InvalidArgumentException e) { @@ -1101,7 +1129,7 @@ public class SIPCommander implements ISIPCommander { mobilePostitionXml.append("\r\n"); String tm = Long.toString(System.currentTimeMillis()); - Request request = headerProvider.createMessageRequest(device, mobilePostitionXml.toString(), "viaTagPos" + tm, "fromTagPos" + tm, null); + Request request = headerProvider.createMessageRequest(device, mobilePostitionXml.toString(), "z9hG4bK-viaPos-" + tm, "fromTagPos" + tm, null); transmitRequest(device, request, errorEvent); @@ -1134,7 +1162,7 @@ public class SIPCommander implements ISIPCommander { subscribePostitionXml.append("\r\n"); String tm = Long.toString(System.currentTimeMillis()); - Request request = headerProvider.createSubscribeRequest(device, subscribePostitionXml.toString(), "viaTagPos" + tm, "fromTagPos" + tm, null, expires, "presence" ); //Position;id=" + tm.substring(tm.length() - 4)); + Request request = headerProvider.createSubscribeRequest(device, subscribePostitionXml.toString(), "z9hG4bK-viaPos-" + tm, "fromTagPos" + tm, null, expires, "presence" ); //Position;id=" + tm.substring(tm.length() - 4)); transmitRequest(device, request); return true; @@ -1187,7 +1215,7 @@ public class SIPCommander implements ISIPCommander { cmdXml.append("\r\n"); String tm = Long.toString(System.currentTimeMillis()); - Request request = headerProvider.createSubscribeRequest(device, cmdXml.toString(), "viaTagPos" + tm, "fromTagPos" + tm, null, expires, "presence" ); + Request request = headerProvider.createSubscribeRequest(device, cmdXml.toString(), "z9hG4bK-viaPos-" + tm, "fromTagPos" + tm, null, expires, "presence" ); transmitRequest(device, request); return true; diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java index 28761ed2..d5e380b2 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java @@ -118,7 +118,7 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform { try { StringBuffer keepaliveXml = new StringBuffer(200); - keepaliveXml.append("\r\n");//" encoding=\"GB2312\"?>\r\n"); + keepaliveXml.append("\r\n"); keepaliveXml.append("\r\n"); keepaliveXml.append("Keepalive\r\n"); keepaliveXml.append("" + (int)((Math.random()*9+1)*100000) + "\r\n"); @@ -217,4 +217,72 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform { } return true; } + + /** + * 向上级回复DeviceInfo查询信息 + * @param parentPlatform 平台信息 + * @param sn + * @param fromTag + * @return + */ + @Override + public boolean deviceInfoResponse(ParentPlatform parentPlatform, String sn, String fromTag) { + if (parentPlatform == null) { + return false; + } + try { + StringBuffer deviceInfoXml = new StringBuffer(600); + deviceInfoXml.append("\r\n"); + deviceInfoXml.append("\r\n"); + deviceInfoXml.append("DeviceInfo\r\n"); + deviceInfoXml.append("" +sn + "\r\n"); + deviceInfoXml.append("" + parentPlatform.getDeviceGBId() + "\r\n"); + deviceInfoXml.append("GB28181 Video Platform\r\n"); + deviceInfoXml.append("Manufacturer\r\n"); + deviceInfoXml.append("wvp-28181\r\n"); + deviceInfoXml.append("2.0.202103\r\n"); + deviceInfoXml.append("OK\r\n"); + deviceInfoXml.append("\r\n"); + Request request = headerProviderPlarformProvider.createMessageRequest(parentPlatform, deviceInfoXml.toString(), fromTag); + transmitRequest(parentPlatform, request); + + } catch (SipException | ParseException | InvalidArgumentException e) { + e.printStackTrace(); + return false; + } + return true; + } + + /** + * 向上级回复DeviceStatus查询信息 + * @param parentPlatform 平台信息 + * @param sn + * @param fromTag + * @return + */ + @Override + public boolean deviceStatusResponse(ParentPlatform parentPlatform, String sn, String fromTag) { + if (parentPlatform == null) { + return false; + } + try { + StringBuffer deviceStatusXml = new StringBuffer(600); + deviceStatusXml.append("\r\n"); + deviceStatusXml.append("\r\n"); + deviceStatusXml.append("DeviceStatus\r\n"); + deviceStatusXml.append("" +sn + "\r\n"); + deviceStatusXml.append("" + parentPlatform.getDeviceGBId() + "\r\n"); + deviceStatusXml.append("OK\r\n"); + deviceStatusXml.append("ONLINE\r\n"); + deviceStatusXml.append("OK\r\n"); + deviceStatusXml.append("\r\n"); + Request request = headerProviderPlarformProvider.createMessageRequest(parentPlatform, deviceStatusXml.toString(), fromTag); + transmitRequest(parentPlatform, request); + + } catch (SipException | ParseException | InvalidArgumentException e) { + e.printStackTrace(); + return false; + } + return true; + } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/AckRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/AckRequestProcessor.java index ad7b0708..72a4d8cb 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/AckRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/AckRequestProcessor.java @@ -4,7 +4,10 @@ import java.util.HashMap; import java.util.Map; import javax.sip.*; -//import javax.sip.message.Request; +import javax.sip.address.SipURI; +import javax.sip.header.FromHeader; +import javax.sip.header.HeaderAddress; +import javax.sip.header.ToHeader; import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; @@ -12,14 +15,11 @@ import com.genersoft.iot.vmp.gb28181.transmit.request.SIPRequestAbstractProcesso import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; -import org.springframework.stereotype.Component; - /** * @Description:ACK请求处理器 * @author: swwheihei * @date: 2020年5月3日 下午5:31:45 */ -@Component public class AckRequestProcessor extends SIPRequestAbstractProcessor { private IRedisCatchStorage redisCatchStorage; @@ -38,10 +38,8 @@ public class AckRequestProcessor extends SIPRequestAbstractProcessor { if (dialog == null) return; //DialogState state = dialog.getState(); if (/*request.getMethod().equals(Request.INVITE) &&*/ dialog.getState()== DialogState.CONFIRMED) { - String remoteUri = dialog.getRemoteParty().getURI().toString(); - String localUri = dialog.getLocalParty().getURI().toString(); - String platformGbId = remoteUri.substring(remoteUri.indexOf(":") + 1, remoteUri.indexOf("@")); - String channelId = localUri.substring(remoteUri.indexOf(":") + 1, remoteUri.indexOf("@")); + String platformGbId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(FromHeader.NAME)).getAddress().getURI()).getUser(); + String channelId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(ToHeader.NAME)).getAddress().getURI()).getUser(); SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(platformGbId, channelId); String is_Udp = sendRtpItem.isTcp() ? "0" : "1"; String deviceId = sendRtpItem.getDeviceId(); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/ByeRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/ByeRequestProcessor.java index a14a4cc6..c96501d5 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/ByeRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/ByeRequestProcessor.java @@ -1,13 +1,18 @@ package com.genersoft.iot.vmp.gb28181.transmit.request.impl; +import javax.sip.address.SipURI; import javax.sip.Dialog; import javax.sip.DialogState; import javax.sip.InvalidArgumentException; import javax.sip.RequestEvent; import javax.sip.SipException; +import javax.sip.header.FromHeader; +import javax.sip.header.HeaderAddress; +import javax.sip.header.ToHeader; import javax.sip.message.Response; import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; +import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander; import com.genersoft.iot.vmp.gb28181.transmit.request.SIPRequestAbstractProcessor; import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; @@ -18,12 +23,14 @@ import java.util.Map; /** * @Description: BYE请求处理器 - * @author: swwheihei - * @date: 2020年5月3日 下午5:32:05 + * @author: lawrencehj + * @date: 2021年3月9日 */ public class ByeRequestProcessor extends SIPRequestAbstractProcessor { - private IRedisCatchStorage redisCatchStorage; + private ISIPCommander cmder; + + private IRedisCatchStorage redisCatchStorage; private ZLMRTPServerFactory zlmrtpServerFactory; @@ -38,10 +45,8 @@ public class ByeRequestProcessor extends SIPRequestAbstractProcessor { Dialog dialog = evt.getDialog(); if (dialog == null) return; if (dialog.getState().equals(DialogState.TERMINATED)) { - String remoteUri = dialog.getRemoteParty().getURI().toString(); - String localUri = dialog.getLocalParty().getURI().toString(); - String platformGbId = remoteUri.substring(remoteUri.indexOf(":") + 1, remoteUri.indexOf("@")); - String channelId = localUri.substring(remoteUri.indexOf(":") + 1, remoteUri.indexOf("@")); + String platformGbId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(FromHeader.NAME)).getAddress().getURI()).getUser(); + String channelId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(ToHeader.NAME)).getAddress().getURI()).getUser(); SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(platformGbId, channelId); String streamId = sendRtpItem.getStreamId(); Map param = new HashMap<>(); @@ -50,6 +55,11 @@ public class ByeRequestProcessor extends SIPRequestAbstractProcessor { param.put("stream",streamId); System.out.println("停止向上级推流:" + streamId); zlmrtpServerFactory.stopSendRtpStream(param); + redisCatchStorage.deleteSendRTPServer(platformGbId, channelId); + if (zlmrtpServerFactory.totalReaderCount(streamId) == 0) { + System.out.println(streamId + "无其它观看者,通知设备停止推流"); + cmder.streamByeCmd(streamId); + } } } catch (SipException e) { e.printStackTrace(); @@ -58,8 +68,6 @@ public class ByeRequestProcessor extends SIPRequestAbstractProcessor { } catch (ParseException e) { e.printStackTrace(); } - // TODO 优先级99 Bye Request消息实现,此消息一般为级联消息,上级给下级发送视频停止指令 - } /*** @@ -89,4 +97,13 @@ public class ByeRequestProcessor extends SIPRequestAbstractProcessor { public void setZlmrtpServerFactory(ZLMRTPServerFactory zlmrtpServerFactory) { this.zlmrtpServerFactory = zlmrtpServerFactory; } + + public ISIPCommander getSIPCommander() { + return cmder; + } + + public void setSIPCommander(ISIPCommander cmder) { + this.cmder = cmder; + } + } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/InviteRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/InviteRequestProcessor.java index 0c3f127f..dc41fe71 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/InviteRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/InviteRequestProcessor.java @@ -75,20 +75,6 @@ public class InviteRequestProcessor extends SIPRequestAbstractProcessor { SipURI sipURI = (SipURI) request.getRequestURI(); String channelId = sipURI.getUser(); String platformId = null; -// SubjectHeader subjectHeader = (SubjectHeader)request.getHeader(SubjectHeader.NAME); -// // 查询通道是否存在 不存在回复404 -// if (subjectHeader != null) { // 存在则从subjectHeader 获取平台信息 -// String subject = subjectHeader.getSubject(); -// if (subject != null) { -// String[] info1 = subject.split(","); -// if (info1 != null && info1 .length == 2) { -// String[] info2 = info1[1].split(":"); -// if (info2 != null && info2.length == 2) { -// platformId = info2[0]; -// } -// } -// } -// } FromHeader fromHeader = (FromHeader)request.getHeader(FromHeader.NAME); AddressImpl address = (AddressImpl) fromHeader.getAddress(); @@ -224,7 +210,9 @@ public class InviteRequestProcessor extends SIPRequestAbstractProcessor { e.printStackTrace(); } })); - playResult.getResult(); + if (logger.isDebugEnabled()) { + logger.debug(playResult.getResult().toString()); + } } catch (SipException | InvalidArgumentException | ParseException e) { e.printStackTrace(); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/MessageRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/MessageRequestProcessor.java index a2b37099..e97629b8 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/MessageRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/MessageRequestProcessor.java @@ -4,14 +4,22 @@ import java.io.ByteArrayInputStream; import java.text.ParseException; import java.util.*; +import javax.sip.address.SipURI; + import javax.sip.header.FromHeader; +import javax.sip.header.HeaderAddress; +import javax.sip.header.ToHeader; import javax.sip.InvalidArgumentException; +import javax.sip.ListeningPoint; +import javax.sip.ObjectInUseException; import javax.sip.RequestEvent; import javax.sip.SipException; +import javax.sip.SipProvider; import javax.sip.message.Request; import javax.sip.message.Response; import com.alibaba.fastjson.JSONObject; +import com.genersoft.iot.vmp.VManageBootstrap; import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.common.VideoManagerConstants; import com.genersoft.iot.vmp.conf.UserSetup; @@ -34,6 +42,7 @@ import com.genersoft.iot.vmp.utils.SpringBeanFactory; import com.genersoft.iot.vmp.utils.redis.RedisUtil; import com.genersoft.iot.vmp.vmanager.platform.bean.ChannelReduce; +import gov.nist.javax.sip.SipStackImpl; import gov.nist.javax.sip.address.AddressImpl; import gov.nist.javax.sip.address.SipUri; @@ -114,10 +123,10 @@ public class MessageRequestProcessor extends SIPRequestAbstractProcessor { logger.info("接收到Catalog消息"); processMessageCatalogList(evt); } else if (MESSAGE_DEVICE_INFO.equals(cmd)) { - logger.info("接收到DeviceInfo消息"); + //DeviceInfo消息处理 processMessageDeviceInfo(evt); } else if (MESSAGE_DEVICE_STATUS.equals(cmd)) { - logger.info("接收到DeviceStatus消息"); + // DeviceStatus消息处理 processMessageDeviceStatus(evt); } else if (MESSAGE_DEVICE_CONTROL.equals(cmd)) { logger.info("接收到DeviceControl消息"); @@ -211,27 +220,48 @@ public class MessageRequestProcessor extends SIPRequestAbstractProcessor { private void processMessageDeviceStatus(RequestEvent evt) { try { Element rootElement = getRootElement(evt); - String deviceId = XmlUtil.getText(rootElement, "DeviceID"); - // 检查设备是否存在, 不存在则不回复 - if (storager.exists(deviceId)) { - // 回复200 OK - responseAck(evt); - JSONObject json = new JSONObject(); - XmlUtil.node2Json(rootElement, json); - if (logger.isDebugEnabled()) { - logger.debug(json.toJSONString()); - } - RequestMessage msg = new RequestMessage(); - msg.setDeviceId(deviceId); - msg.setType(DeferredResultHolder.CALLBACK_CMD_DEVICESTATUS); - msg.setData(json); - deferredResultHolder.invokeResult(msg); + String name = rootElement.getName(); + Element deviceIdElement = rootElement.element("DeviceID"); + String deviceId = deviceIdElement.getText(); - if (offLineDetector.isOnline(deviceId)) { - publisher.onlineEventPublish(deviceId, VideoManagerConstants.EVENT_ONLINE_KEEPLIVE); + if (name.equalsIgnoreCase("Query")) { // 区分是Response——查询响应,还是Query——查询请求 + logger.info("接收到DeviceStatus查询消息"); + FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME); + String platformId = ((SipUri) fromHeader.getAddress().getURI()).getUser(); + if (platformId == null) { + response404Ack(evt); + return; } else { + // 回复200 OK + responseAck(evt); + String sn = rootElement.element("SN").getText(); + ParentPlatform parentPlatform = storager.queryParentPlatById(platformId); + cmderFroPlatform.deviceStatusResponse(parentPlatform, sn, fromHeader.getTag()); + } + } else { + logger.info("接收到DeviceStatus应答消息"); + // 检查设备是否存在, 不存在则不回复 + if (storager.exists(deviceId)) { + // 回复200 OK + responseAck(evt); + JSONObject json = new JSONObject(); + XmlUtil.node2Json(rootElement, json); + if (logger.isDebugEnabled()) { + logger.debug(json.toJSONString()); + } + RequestMessage msg = new RequestMessage(); + msg.setDeviceId(deviceId); + msg.setType(DeferredResultHolder.CALLBACK_CMD_DEVICESTATUS); + msg.setData(json); + deferredResultHolder.invokeResult(msg); + + if (offLineDetector.isOnline(deviceId)) { + publisher.onlineEventPublish(deviceId, VideoManagerConstants.EVENT_ONLINE_KEEPLIVE); + } else { + } } } + } catch (ParseException | SipException | InvalidArgumentException | DocumentException e) { e.printStackTrace(); } @@ -263,6 +293,51 @@ public class MessageRequestProcessor extends SIPRequestAbstractProcessor { deferredResultHolder.invokeResult(msg); } else { // 此处是上级发出的DeviceControl指令 + String platformId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(FromHeader.NAME)).getAddress().getURI()).getUser(); + String targetGBId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(ToHeader.NAME)).getAddress().getURI()).getUser(); + // 远程启动功能 + if (!XmlUtil.isEmpty(XmlUtil.getText(rootElement, "TeleBoot"))) { + if (deviceId.equals(targetGBId)) { + // 远程启动功能:需要在重新启动程序后先对SipStack解绑 + logger.info("执行远程启动本平台命令"); + ParentPlatform parentPlatform = storager.queryParentPlatById(platformId); + cmderFroPlatform.unregister(parentPlatform, null, null); + + Thread restartThread = new Thread(new Runnable() { + @Override + public void run() { + try { + Thread.sleep(3000); + SipProvider up = (SipProvider) SpringBeanFactory.getBean("udpSipProvider"); + SipStackImpl stack = (SipStackImpl)up.getSipStack(); + stack.stop(); + Iterator listener = stack.getListeningPoints(); + while (listener.hasNext()) { + stack.deleteListeningPoint((ListeningPoint) listener.next()); + } + Iterator providers = stack.getSipProviders(); + while (providers.hasNext()) { + stack.deleteSipProvider((SipProvider) providers.next()); + } + VManageBootstrap.restart(); + } catch (InterruptedException ignored) { + } catch (ObjectInUseException e) { + e.printStackTrace(); + } + } + }); + + restartThread.setDaemon(false); + restartThread.start(); + } else { + // 远程启动指定设备 + } + } + if (!XmlUtil.isEmpty(XmlUtil.getText(rootElement,"PTZCmd")) && !deviceId.equals(targetGBId)) { + String cmdString = XmlUtil.getText(rootElement,"PTZCmd"); + Device device = storager.queryVideoDeviceByPlatformIdAndChannelId(platformId, deviceId); + cmder.fronEndCmd(device, deviceId, cmdString); + } } } catch (ParseException | SipException | InvalidArgumentException | DocumentException e) { e.printStackTrace(); @@ -374,9 +449,21 @@ public class MessageRequestProcessor extends SIPRequestAbstractProcessor { Element deviceIdElement = rootElement.element("DeviceID"); String deviceId = deviceIdElement.getTextTrim().toString(); if (requestName.equals("Query")) { - // 回复200 OK - responseAck(evt); + logger.info("接收到DeviceInfo查询消息"); + FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME); + String platformId = ((SipUri) fromHeader.getAddress().getURI()).getUser(); + if (platformId == null) { + response404Ack(evt); + return; + } else { + // 回复200 OK + responseAck(evt); + String sn = rootElement.element("SN").getText(); + ParentPlatform parentPlatform = storager.queryParentPlatById(platformId); + cmderFroPlatform.deviceInfoResponse(parentPlatform, sn, fromHeader.getTag()); + } } else { + logger.info("接收到DeviceInfo应答消息"); Device device = storager.queryVideoDevice(deviceId); if (device == null) { return; diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/response/impl/RegisterResponseProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/response/impl/RegisterResponseProcessor.java index aa6b5ab2..a3952ff2 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/response/impl/RegisterResponseProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/response/impl/RegisterResponseProcessor.java @@ -60,16 +60,17 @@ public class RegisterResponseProcessor implements ISIPResponseProcessor { logger.info(String.format("未找到callId: %s 的注册/注销平台id", callId )); return; } - logger.info(String.format("收到 %s 的注册/注销%S响应", platformGBId, response.getStatusCode() )); ParentPlatformCatch parentPlatformCatch = redisCatchStorage.queryPlatformCatchInfo(platformGBId); if (parentPlatformCatch == null) { logger.warn(String.format("收到 %s 的注册/注销%S请求, 但是平台缓存信息未查询到!!!", platformGBId, response.getStatusCode())); return; } + String action = parentPlatformCatch.getParentPlatform().getExpires().equals("0") ? "注销" : "注册"; + logger.info(String.format("收到 %s %s的%S响应", platformGBId, action, response.getStatusCode() )); ParentPlatform parentPlatform = parentPlatformCatch.getParentPlatform(); if (parentPlatform == null) { - logger.warn(String.format("收到 %s 的注册/注销%S请求, 但是平台信息未查询到!!!", platformGBId, response.getStatusCode())); + logger.warn(String.format("收到 %s %s的%S请求, 但是平台信息未查询到!!!", platformGBId, action, response.getStatusCode())); return; } @@ -77,11 +78,16 @@ public class RegisterResponseProcessor implements ISIPResponseProcessor { WWWAuthenticateHeader www = (WWWAuthenticateHeader)response.getHeader(WWWAuthenticateHeader.NAME); sipCommanderForPlatform.register(parentPlatform, callId, www, null, null); }else if (response.getStatusCode() == 200){ - // 注册成功 - logger.info(String.format("%s 注册成功", platformGBId )); + // 注册/注销成功 + logger.info(String.format("%s %s成功", platformGBId, action)); redisCatchStorage.delPlatformRegisterInfo(callId); parentPlatform.setStatus(true); + // 取回Expires设置,避免注销过程中被置为0 + ParentPlatform parentPlatformTmp = storager.queryParentPlatById(platformGBId); + String expires = parentPlatformTmp.getExpires(); + parentPlatform.setExpires(expires); storager.updateParentPlatform(parentPlatform); + redisCatchStorage.updatePlatformRegister(parentPlatform); redisCatchStorage.updatePlatformKeepalive(parentPlatform); diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java index 90b53695..51f61eff 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java @@ -267,20 +267,25 @@ public class ZLMHttpHookListener { } String streamId = json.getString("stream"); - - cmder.streamByeCmd(streamId); StreamInfo streamInfo = redisCatchStorage.queryPlayByStreamId(streamId); - if (streamInfo!=null){ - redisCatchStorage.stopPlay(streamInfo); - storager.stopPlay(streamInfo.getDeviceID(), streamInfo.getChannelId()); + + JSONObject ret = new JSONObject(); + ret.put("code", 0); + ret.put("close", true); + + if (streamInfo != null) { + if (redisCatchStorage.isChannelSendingRTP(streamInfo.getChannelId())) { + ret.put("close", false); + } else { + cmder.streamByeCmd(streamId); + redisCatchStorage.stopPlay(streamInfo); + storager.stopPlay(streamInfo.getDeviceID(), streamInfo.getChannelId()); + } }else{ + cmder.streamByeCmd(streamId); streamInfo = redisCatchStorage.queryPlaybackByStreamId(streamId); redisCatchStorage.stopPlayback(streamInfo); } - - JSONObject ret = new JSONObject(); - ret.put("code", 0); - ret.put("close", true); return new ResponseEntity(ret.toString(),HttpStatus.OK); } diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java index 00951ba2..1f1693df 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java @@ -152,6 +152,16 @@ public class ZLMRTPServerFactory { return (mediaInfo.getInteger("code") == 0 && mediaInfo.getBoolean("online")); } + /** + * 查询转推的流是否有其它观看者 + * @param streamId + * @return + */ + public int totalReaderCount(String streamId) { + JSONObject mediaInfo = zlmresTfulUtils.getMediaInfo("rtp", "rtmp", streamId); + return mediaInfo.getInteger("totalReaderCount"); + } + /** * 调用zlm RESTful API —— stopSendRtp */ diff --git a/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java b/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java index ca70620f..90611846 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java @@ -89,4 +89,17 @@ public interface IRedisCatchStorage { */ SendRtpItem querySendRTPServer(String platformGbId, String channelId); + /** + * 删除RTP推送信息缓存 + * @param platformGbId + * @param channelId + */ + void deleteSendRTPServer(String platformGbId, String channelId); + + /** + * 查询某个通道是否存在上级点播(RTP推送) + * @param channelId + */ + boolean isChannelSendingRTP(String channelId); + } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java index 6153e5f7..3feb347e 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java @@ -225,4 +225,30 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage { return (SendRtpItem)redis.get(key); } + /** + * 删除RTP推送信息缓存 + * @param platformGbId + * @param channelId + */ + @Override + public void deleteSendRTPServer(String platformGbId, String channelId) { + String key = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX + platformGbId + "_" + channelId; + redis.del(key); + } + + /** + * 查询某个通道是否存在上级点播(RTP推送) + * @param channelId + */ + @Override + public boolean isChannelSendingRTP(String channelId) { + String key = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX + "*_" + channelId; + List RtpStreams = redis.scan(key); + if (RtpStreams.size() > 0) { + return true; + } else { + return false; + } + } + } diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/platform/PlatformController.java b/src/main/java/com/genersoft/iot/vmp/vmanager/platform/PlatformController.java index ef617a68..279a0310 100644 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/platform/PlatformController.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/platform/PlatformController.java @@ -60,7 +60,7 @@ public class PlatformController { public ResponseEntity savePlatform(@RequestBody ParentPlatform parentPlatform){ if (logger.isDebugEnabled()) { - logger.debug("查询所有上级设备API调用"); + logger.debug("保存上级平台信息API调用"); } if (StringUtils.isEmpty(parentPlatform.getName()) ||StringUtils.isEmpty(parentPlatform.getServerGBId()) @@ -87,13 +87,13 @@ public class PlatformController { if (parentPlatform.isEnable()) { // 只要保存就发送注册 commanderForPlatform.register(parentPlatform); - }else if (parentPlatformOld != null && parentPlatformOld.isEnable() && !parentPlatform.isEnable()){ // 关闭启用时注销 + } else if (parentPlatformOld != null && parentPlatformOld.isEnable() && !parentPlatform.isEnable()){ // 关闭启用时注销 commanderForPlatform.unregister(parentPlatform, null, null); } - + return new ResponseEntity<>("success", HttpStatus.OK); - }else { + } else { return new ResponseEntity<>("fail", HttpStatus.OK); } } @@ -103,7 +103,7 @@ public class PlatformController { public ResponseEntity deletePlatform(@RequestBody ParentPlatform parentPlatform){ if (logger.isDebugEnabled()) { - logger.debug("查询所有上级设备API调用"); + logger.debug("删除上级平台API调用"); } if (StringUtils.isEmpty(parentPlatform.getServerGBId()) ){ @@ -138,7 +138,7 @@ public class PlatformController { public ResponseEntity exitPlatform(@PathVariable String deviceGbId){ if (logger.isDebugEnabled()) { - logger.debug("查询所有上级设备API调用"); + logger.debug("查询上级平台是否存在API调用:" + deviceGbId); } ParentPlatform parentPlatform = storager.queryParentPlatById(deviceGbId); return new ResponseEntity<>(String.valueOf(parentPlatform != null), HttpStatus.OK); @@ -184,7 +184,7 @@ public class PlatformController { public ResponseEntity delChannelForGB(@RequestBody UpdateChannelParam param){ if (logger.isDebugEnabled()) { - logger.debug("给上级平台添加国标通道API调用"); + logger.debug("给上级平台删除国标通道API调用"); } int result = storager.delChannelForGB(param.getPlatformId(), param.getChannelReduces()); diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/service/impl/PlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/vmanager/service/impl/PlayServiceImpl.java index db1e345c..376040a3 100644 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/service/impl/PlayServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/service/impl/PlayServiceImpl.java @@ -86,6 +86,9 @@ public class PlayServiceImpl implements IPlayService { msg.setId(DeferredResultHolder.CALLBACK_CMD_PlAY + uuid); msg.setData(JSON.toJSONString(streamInfo)); resultHolder.invokeResult(msg); + if (hookEvent != null) { + hookEvent.response(JSONObject.parseObject(JSON.toJSONString(streamInfo))); + } } else { redisCatchStorage.stopPlay(streamInfo); storager.stopPlay(streamInfo.getDeviceID(), streamInfo.getChannelId());