Browse Source

Merge pull request #67 from lawrencehj/wvp-28181-2.0

增加上级平台信令功能实现,解决上级点播的一些问题
pull/74/head
648540858 4 years ago
committed by GitHub
parent
commit
bb22908cf7
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 11
      README.md
  2. 12
      src/main/java/com/genersoft/iot/vmp/VManageBootstrap.java
  3. 17
      src/main/java/com/genersoft/iot/vmp/gb28181/session/VideoStreamSessionManager.java
  4. 1
      src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPProcessorFactory.java
  5. 8
      src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommander.java
  6. 19
      src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommanderForPlatform.java
  7. 52
      src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java
  8. 70
      src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java
  9. 14
      src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/AckRequestProcessor.java
  10. 35
      src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/ByeRequestProcessor.java
  11. 18
      src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/InviteRequestProcessor.java
  12. 129
      src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/MessageRequestProcessor.java
  13. 14
      src/main/java/com/genersoft/iot/vmp/gb28181/transmit/response/impl/RegisterResponseProcessor.java
  14. 23
      src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
  15. 10
      src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java
  16. 13
      src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java
  17. 26
      src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java
  18. 14
      src/main/java/com/genersoft/iot/vmp/vmanager/platform/PlatformController.java
  19. 3
      src/main/java/com/genersoft/iot/vmp/vmanager/service/impl/PlayServiceImpl.java

11
README.md

@ -60,21 +60,24 @@ https://gitee.com/18010473990/wvp-GB28181.git
15. 支持订阅与通知方法
- [X] 移动位置订阅
- [X] 移动位置通知处理
- [ ] 报警事件订阅
- [X] 报警事件订阅
- [X] 报警事件通知处理
- [ ] 设备目录订阅
- [X] 设备目录通知处理
16. 移动位置查询和显示,可通过配置文件设置移动位置历史是否存储
# 2.0 支持特性
- [ ] 国标通道向上级联
- [X] 国标通道向上级联
- [X] WEB添加上级平台
- [X] 注册
- [X] 心跳保活
- [X] 通道选择
- [X] 通道推送
- [ ] 点播
- [ ] 云台控制
- [X] 点播
- [X] 云台控制
- [X] 平台状态查询
- [X] 平台信息查询
- [X] 平台远程启动
- [ ] 添加RTSP视频
- [ ] 添加ONVIF探测局域网内的设备
- [ ] 添加RTMP视频

12
src/main/java/com/genersoft/iot/vmp/VManageBootstrap.java

@ -4,10 +4,20 @@ import java.util.logging.LogManager;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
@SpringBootApplication
public class VManageBootstrap extends LogManager {
private static String[] args;
private static ConfigurableApplicationContext context;
public static void main(String[] args) {
SpringApplication.run(VManageBootstrap.class, args);
VManageBootstrap.args = args;
VManageBootstrap.context = SpringApplication.run(VManageBootstrap.class, args);
}
// 项目重启
public static void restart() {
context.close();
VManageBootstrap.context = SpringApplication.run(VManageBootstrap.class, args);
}
}

17
src/main/java/com/genersoft/iot/vmp/gb28181/session/VideoStreamSessionManager.java

@ -15,6 +15,7 @@ import org.springframework.stereotype.Component;
public class VideoStreamSessionManager {
private ConcurrentHashMap<String, ClientTransaction> sessionMap = new ConcurrentHashMap<>();
private ConcurrentHashMap<String, String> ssrcMap = new ConcurrentHashMap<>();
public String createPlaySsrc(){
return SsrcUtil.getPlaySsrc();
@ -24,16 +25,18 @@ public class VideoStreamSessionManager {
return SsrcUtil.getPlayBackSsrc();
}
public void put(String ssrc,ClientTransaction transaction){
sessionMap.put(ssrc, transaction);
public void put(String streamId,String ssrc,ClientTransaction transaction){
sessionMap.put(streamId, transaction);
ssrcMap.put(streamId, ssrc);
}
public ClientTransaction get(String ssrc){
return sessionMap.get(ssrc);
public ClientTransaction get(String streamId){
return sessionMap.get(streamId);
}
public void remove(String ssrc) {
sessionMap.remove(ssrc);
SsrcUtil.releaseSsrc(ssrc);
public void remove(String streamId) {
sessionMap.remove(streamId);
SsrcUtil.releaseSsrc(ssrcMap.get(streamId));
ssrcMap.remove(streamId);
}
}

1
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPProcessorFactory.java

@ -156,6 +156,7 @@ public class SIPProcessorFactory {
processor.setRequestEvent(evt);
processor.setRedisCatchStorage(redisCatchStorage);
processor.setZlmrtpServerFactory(zlmrtpServerFactory);
processor.setSIPCommander(cmder);
return processor;
} else if (Request.CANCEL.equals(method)) {
CancelRequestProcessor processor = new CancelRequestProcessor();

8
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommander.java

@ -77,6 +77,14 @@ public interface ISIPCommander {
*/
boolean frontEndCmd(Device device, String channelId, int cmdCode, int parameter1, int parameter2, int combineCode2);
/**
* 前端控制指令用于转发上级指令
* @param device 控制设备
* @param channelId 预览通道
* @param cmdString 前端控制指令串
*/
boolean fronEndCmd(Device device, String channelId, String cmdString);
/**
* 请求预览视频流
*

19
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommanderForPlatform.java

@ -42,4 +42,23 @@ public interface ISIPCommanderForPlatform {
* @return
*/
boolean catalogQuery(DeviceChannel channel, ParentPlatform parentPlatform, String sn, String fromTag, int size);
/**
* 向上级回复DeviceInfo查询信息
* @param parentPlatform 平台信息
* @param sn
* @param fromTag
* @return
*/
boolean deviceInfoResponse(ParentPlatform parentPlatform, String sn, String fromTag);
/**
* 向上级回复DeviceStatus查询信息
* @param parentPlatform 平台信息
* @param sn
* @param fromTag
* @return
*/
boolean deviceStatusResponse(ParentPlatform parentPlatform, String sn, String fromTag);
}

52
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java

@ -235,7 +235,7 @@ public class SIPCommander implements ISIPCommander {
ptzXml.append("</Control>\r\n");
String tm = Long.toString(System.currentTimeMillis());
Request request = headerProvider.createMessageRequest(device, ptzXml.toString(), "ViaPtzBranch", "FromPtz" + tm, null);
Request request = headerProvider.createMessageRequest(device, ptzXml.toString(), "z9hG4bK-ViaPtz-" + tm, "FromPtz" + tm, null);
transmitRequest(device, request);
return true;
@ -272,7 +272,7 @@ public class SIPCommander implements ISIPCommander {
ptzXml.append("</Control>\r\n");
String tm = Long.toString(System.currentTimeMillis());
Request request = headerProvider.createMessageRequest(device, ptzXml.toString(), "ViaPtzBranch", "FromPtz" + tm, null);
Request request = headerProvider.createMessageRequest(device, ptzXml.toString(), "z9hG4bK-ViaPtz-" + tm, "FromPtz" + tm, null);
transmitRequest(device, request);
return true;
} catch (SipException | ParseException | InvalidArgumentException e) {
@ -282,6 +282,36 @@ public class SIPCommander implements ISIPCommander {
}
/**
* 前端控制指令用于转发上级指令
* @param device 控制设备
* @param channelId 预览通道
* @param cmdString 前端控制指令串
*/
@Override
public boolean fronEndCmd(Device device, String channelId, String cmdString) {
try {
StringBuffer ptzXml = new StringBuffer(200);
ptzXml.append("<?xml version=\"1.0\" ?>\r\n");
ptzXml.append("<Control>\r\n");
ptzXml.append("<CmdType>DeviceControl</CmdType>\r\n");
ptzXml.append("<SN>" + (int)((Math.random()*9+1)*100000) + "</SN>\r\n");
ptzXml.append("<DeviceID>" + channelId + "</DeviceID>\r\n");
ptzXml.append("<PTZCmd>" + cmdString + "</PTZCmd>\r\n");
ptzXml.append("<Info>\r\n");
ptzXml.append("</Info>\r\n");
ptzXml.append("</Control>\r\n");
String tm = Long.toString(System.currentTimeMillis());
Request request = headerProvider.createMessageRequest(device, ptzXml.toString(), "z9hG4bK-ViaPtz-" + tm, "FromPtz" + tm, null);
transmitRequest(device, request);
return true;
} catch (SipException | ParseException | InvalidArgumentException e) {
e.printStackTrace();
}
return false;
}
/**
* 请求预览视频流
* @param device 视频设备
* @param channelId 预览通道
@ -387,9 +417,7 @@ public class SIPCommander implements ISIPCommander {
Request request = headerProvider.createInviteRequest(device, channelId, content.toString(), null, "FromInvt" + tm, null, ssrc);
ClientTransaction transaction = transmitRequest(device, request, errorEvent);
streamSession.put(streamId, transaction);
streamSession.put(streamId,ssrc, transaction);
} catch ( SipException | ParseException | InvalidArgumentException e) {
e.printStackTrace();
@ -487,7 +515,7 @@ public class SIPCommander implements ISIPCommander {
Request request = headerProvider.createPlaybackInviteRequest(device, channelId, content.toString(), null, "fromplybck" + tm, null);
ClientTransaction transaction = transmitRequest(device, request, errorEvent);
streamSession.put(streamId, transaction);
streamSession.put(streamId, ssrc, transaction);
} catch ( SipException | ParseException | InvalidArgumentException e) {
e.printStackTrace();
@ -893,7 +921,7 @@ public class SIPCommander implements ISIPCommander {
catalogXml.append("</Query>\r\n");
String tm = Long.toString(System.currentTimeMillis());
Request request = headerProvider.createMessageRequest(device, catalogXml.toString(), "z9hG4bK-ViaDeviceInfo" + tm, "FromDev" + tm, null);
Request request = headerProvider.createMessageRequest(device, catalogXml.toString(), "z9hG4bK-ViaDeviceInfo-" + tm, "FromDev" + tm, null);
transmitRequest(device, request);
@ -923,7 +951,7 @@ public class SIPCommander implements ISIPCommander {
catalogXml.append("</Query>\r\n");
String tm = Long.toString(System.currentTimeMillis());
Request request = headerProvider.createMessageRequest(device, catalogXml.toString(), "z9hG4bK-ViaCatalog" + tm, "FromCat" + tm, null);
Request request = headerProvider.createMessageRequest(device, catalogXml.toString(), "z9hG4bK-ViaCatalog-" + tm, "FromCat" + tm, null);
transmitRequest(device, request, errorEvent);
} catch (SipException | ParseException | InvalidArgumentException e) {
@ -958,7 +986,7 @@ public class SIPCommander implements ISIPCommander {
recordInfoXml.append("</Query>\r\n");
String tm = Long.toString(System.currentTimeMillis());
Request request = headerProvider.createMessageRequest(device, recordInfoXml.toString(), "ViaRecordInfoBranch", "fromRec" + tm, null);
Request request = headerProvider.createMessageRequest(device, recordInfoXml.toString(), "z9hG4bK-ViaRecordInfo-" + tm, "fromRec" + tm, null);
transmitRequest(device, request);
} catch (SipException | ParseException | InvalidArgumentException e) {
@ -1101,7 +1129,7 @@ public class SIPCommander implements ISIPCommander {
mobilePostitionXml.append("</Query>\r\n");
String tm = Long.toString(System.currentTimeMillis());
Request request = headerProvider.createMessageRequest(device, mobilePostitionXml.toString(), "viaTagPos" + tm, "fromTagPos" + tm, null);
Request request = headerProvider.createMessageRequest(device, mobilePostitionXml.toString(), "z9hG4bK-viaPos-" + tm, "fromTagPos" + tm, null);
transmitRequest(device, request, errorEvent);
@ -1134,7 +1162,7 @@ public class SIPCommander implements ISIPCommander {
subscribePostitionXml.append("</Query>\r\n");
String tm = Long.toString(System.currentTimeMillis());
Request request = headerProvider.createSubscribeRequest(device, subscribePostitionXml.toString(), "viaTagPos" + tm, "fromTagPos" + tm, null, expires, "presence" ); //Position;id=" + tm.substring(tm.length() - 4));
Request request = headerProvider.createSubscribeRequest(device, subscribePostitionXml.toString(), "z9hG4bK-viaPos-" + tm, "fromTagPos" + tm, null, expires, "presence" ); //Position;id=" + tm.substring(tm.length() - 4));
transmitRequest(device, request);
return true;
@ -1187,7 +1215,7 @@ public class SIPCommander implements ISIPCommander {
cmdXml.append("</Query>\r\n");
String tm = Long.toString(System.currentTimeMillis());
Request request = headerProvider.createSubscribeRequest(device, cmdXml.toString(), "viaTagPos" + tm, "fromTagPos" + tm, null, expires, "presence" );
Request request = headerProvider.createSubscribeRequest(device, cmdXml.toString(), "z9hG4bK-viaPos-" + tm, "fromTagPos" + tm, null, expires, "presence" );
transmitRequest(device, request);
return true;

70
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java

@ -118,7 +118,7 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform {
try {
StringBuffer keepaliveXml = new StringBuffer(200);
keepaliveXml.append("<?xml version=\"1.0\"?>\r\n");//" encoding=\"GB2312\"?>\r\n");
keepaliveXml.append("<?xml version=\"1.0\"?>\r\n");
keepaliveXml.append("<Notify>\r\n");
keepaliveXml.append("<CmdType>Keepalive</CmdType>\r\n");
keepaliveXml.append("<SN>" + (int)((Math.random()*9+1)*100000) + "</SN>\r\n");
@ -217,4 +217,72 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform {
}
return true;
}
/**
* 向上级回复DeviceInfo查询信息
* @param parentPlatform 平台信息
* @param sn
* @param fromTag
* @return
*/
@Override
public boolean deviceInfoResponse(ParentPlatform parentPlatform, String sn, String fromTag) {
if (parentPlatform == null) {
return false;
}
try {
StringBuffer deviceInfoXml = new StringBuffer(600);
deviceInfoXml.append("<?xml version=\"1.0\" encoding=\"GB2312\"?>\r\n");
deviceInfoXml.append("<Response>\r\n");
deviceInfoXml.append("<CmdType>DeviceInfo</CmdType>\r\n");
deviceInfoXml.append("<SN>" +sn + "</SN>\r\n");
deviceInfoXml.append("<DeviceID>" + parentPlatform.getDeviceGBId() + "</DeviceID>\r\n");
deviceInfoXml.append("<DeviceName>GB28181 Video Platform</DeviceName>\r\n");
deviceInfoXml.append("<Manufacturer>Manufacturer</Manufacturer>\r\n");
deviceInfoXml.append("<Model>wvp-28181</Model>\r\n");
deviceInfoXml.append("<Firmware>2.0.202103</Firmware>\r\n");
deviceInfoXml.append("<Result>OK</Result>\r\n");
deviceInfoXml.append("</Response>\r\n");
Request request = headerProviderPlarformProvider.createMessageRequest(parentPlatform, deviceInfoXml.toString(), fromTag);
transmitRequest(parentPlatform, request);
} catch (SipException | ParseException | InvalidArgumentException e) {
e.printStackTrace();
return false;
}
return true;
}
/**
* 向上级回复DeviceStatus查询信息
* @param parentPlatform 平台信息
* @param sn
* @param fromTag
* @return
*/
@Override
public boolean deviceStatusResponse(ParentPlatform parentPlatform, String sn, String fromTag) {
if (parentPlatform == null) {
return false;
}
try {
StringBuffer deviceStatusXml = new StringBuffer(600);
deviceStatusXml.append("<?xml version=\"1.0\" encoding=\"GB2312\"?>\r\n");
deviceStatusXml.append("<Response>\r\n");
deviceStatusXml.append("<CmdType>DeviceStatus</CmdType>\r\n");
deviceStatusXml.append("<SN>" +sn + "</SN>\r\n");
deviceStatusXml.append("<DeviceID>" + parentPlatform.getDeviceGBId() + "</DeviceID>\r\n");
deviceStatusXml.append("<Result>OK</Result>\r\n");
deviceStatusXml.append("<Online>ONLINE</Online>\r\n");
deviceStatusXml.append("<Status>OK</Status>\r\n");
deviceStatusXml.append("</Response>\r\n");
Request request = headerProviderPlarformProvider.createMessageRequest(parentPlatform, deviceStatusXml.toString(), fromTag);
transmitRequest(parentPlatform, request);
} catch (SipException | ParseException | InvalidArgumentException e) {
e.printStackTrace();
return false;
}
return true;
}
}

14
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/AckRequestProcessor.java

@ -4,7 +4,10 @@ import java.util.HashMap;
import java.util.Map;
import javax.sip.*;
//import javax.sip.message.Request;
import javax.sip.address.SipURI;
import javax.sip.header.FromHeader;
import javax.sip.header.HeaderAddress;
import javax.sip.header.ToHeader;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
@ -12,14 +15,11 @@ import com.genersoft.iot.vmp.gb28181.transmit.request.SIPRequestAbstractProcesso
import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import org.springframework.stereotype.Component;
/**
* @Description:ACK请求处理器
* @author: swwheihei
* @date: 2020年5月3日 下午5:31:45
*/
@Component
public class AckRequestProcessor extends SIPRequestAbstractProcessor {
private IRedisCatchStorage redisCatchStorage;
@ -38,10 +38,8 @@ public class AckRequestProcessor extends SIPRequestAbstractProcessor {
if (dialog == null) return;
//DialogState state = dialog.getState();
if (/*request.getMethod().equals(Request.INVITE) &&*/ dialog.getState()== DialogState.CONFIRMED) {
String remoteUri = dialog.getRemoteParty().getURI().toString();
String localUri = dialog.getLocalParty().getURI().toString();
String platformGbId = remoteUri.substring(remoteUri.indexOf(":") + 1, remoteUri.indexOf("@"));
String channelId = localUri.substring(remoteUri.indexOf(":") + 1, remoteUri.indexOf("@"));
String platformGbId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(FromHeader.NAME)).getAddress().getURI()).getUser();
String channelId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(ToHeader.NAME)).getAddress().getURI()).getUser();
SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(platformGbId, channelId);
String is_Udp = sendRtpItem.isTcp() ? "0" : "1";
String deviceId = sendRtpItem.getDeviceId();

35
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/ByeRequestProcessor.java

@ -1,13 +1,18 @@
package com.genersoft.iot.vmp.gb28181.transmit.request.impl;
import javax.sip.address.SipURI;
import javax.sip.Dialog;
import javax.sip.DialogState;
import javax.sip.InvalidArgumentException;
import javax.sip.RequestEvent;
import javax.sip.SipException;
import javax.sip.header.FromHeader;
import javax.sip.header.HeaderAddress;
import javax.sip.header.ToHeader;
import javax.sip.message.Response;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;
import com.genersoft.iot.vmp.gb28181.transmit.request.SIPRequestAbstractProcessor;
import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
@ -18,12 +23,14 @@ import java.util.Map;
/**
* @Description: BYE请求处理器
* @author: swwheihei
* @date: 2020年5月3日 下午5:32:05
* @author: lawrencehj
* @date: 2021年3月9日
*/
public class ByeRequestProcessor extends SIPRequestAbstractProcessor {
private IRedisCatchStorage redisCatchStorage;
private ISIPCommander cmder;
private IRedisCatchStorage redisCatchStorage;
private ZLMRTPServerFactory zlmrtpServerFactory;
@ -38,10 +45,8 @@ public class ByeRequestProcessor extends SIPRequestAbstractProcessor {
Dialog dialog = evt.getDialog();
if (dialog == null) return;
if (dialog.getState().equals(DialogState.TERMINATED)) {
String remoteUri = dialog.getRemoteParty().getURI().toString();
String localUri = dialog.getLocalParty().getURI().toString();
String platformGbId = remoteUri.substring(remoteUri.indexOf(":") + 1, remoteUri.indexOf("@"));
String channelId = localUri.substring(remoteUri.indexOf(":") + 1, remoteUri.indexOf("@"));
String platformGbId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(FromHeader.NAME)).getAddress().getURI()).getUser();
String channelId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(ToHeader.NAME)).getAddress().getURI()).getUser();
SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(platformGbId, channelId);
String streamId = sendRtpItem.getStreamId();
Map<String, Object> param = new HashMap<>();
@ -50,6 +55,11 @@ public class ByeRequestProcessor extends SIPRequestAbstractProcessor {
param.put("stream",streamId);
System.out.println("停止向上级推流:" + streamId);
zlmrtpServerFactory.stopSendRtpStream(param);
redisCatchStorage.deleteSendRTPServer(platformGbId, channelId);
if (zlmrtpServerFactory.totalReaderCount(streamId) == 0) {
System.out.println(streamId + "无其它观看者,通知设备停止推流");
cmder.streamByeCmd(streamId);
}
}
} catch (SipException e) {
e.printStackTrace();
@ -58,8 +68,6 @@ public class ByeRequestProcessor extends SIPRequestAbstractProcessor {
} catch (ParseException e) {
e.printStackTrace();
}
// TODO 优先级99 Bye Request消息实现,此消息一般为级联消息,上级给下级发送视频停止指令
}
/***
@ -89,4 +97,13 @@ public class ByeRequestProcessor extends SIPRequestAbstractProcessor {
public void setZlmrtpServerFactory(ZLMRTPServerFactory zlmrtpServerFactory) {
this.zlmrtpServerFactory = zlmrtpServerFactory;
}
public ISIPCommander getSIPCommander() {
return cmder;
}
public void setSIPCommander(ISIPCommander cmder) {
this.cmder = cmder;
}
}

18
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/InviteRequestProcessor.java

@ -75,20 +75,6 @@ public class InviteRequestProcessor extends SIPRequestAbstractProcessor {
SipURI sipURI = (SipURI) request.getRequestURI();
String channelId = sipURI.getUser();
String platformId = null;
// SubjectHeader subjectHeader = (SubjectHeader)request.getHeader(SubjectHeader.NAME);
// // 查询通道是否存在 不存在回复404
// if (subjectHeader != null) { // 存在则从subjectHeader 获取平台信息
// String subject = subjectHeader.getSubject();
// if (subject != null) {
// String[] info1 = subject.split(",");
// if (info1 != null && info1 .length == 2) {
// String[] info2 = info1[1].split(":");
// if (info2 != null && info2.length == 2) {
// platformId = info2[0];
// }
// }
// }
// }
FromHeader fromHeader = (FromHeader)request.getHeader(FromHeader.NAME);
AddressImpl address = (AddressImpl) fromHeader.getAddress();
@ -224,7 +210,9 @@ public class InviteRequestProcessor extends SIPRequestAbstractProcessor {
e.printStackTrace();
}
}));
playResult.getResult();
if (logger.isDebugEnabled()) {
logger.debug(playResult.getResult().toString());
}
} catch (SipException | InvalidArgumentException | ParseException e) {
e.printStackTrace();

129
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/MessageRequestProcessor.java

@ -4,14 +4,22 @@ import java.io.ByteArrayInputStream;
import java.text.ParseException;
import java.util.*;
import javax.sip.address.SipURI;
import javax.sip.header.FromHeader;
import javax.sip.header.HeaderAddress;
import javax.sip.header.ToHeader;
import javax.sip.InvalidArgumentException;
import javax.sip.ListeningPoint;
import javax.sip.ObjectInUseException;
import javax.sip.RequestEvent;
import javax.sip.SipException;
import javax.sip.SipProvider;
import javax.sip.message.Request;
import javax.sip.message.Response;
import com.alibaba.fastjson.JSONObject;
import com.genersoft.iot.vmp.VManageBootstrap;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.common.VideoManagerConstants;
import com.genersoft.iot.vmp.conf.UserSetup;
@ -34,6 +42,7 @@ import com.genersoft.iot.vmp.utils.SpringBeanFactory;
import com.genersoft.iot.vmp.utils.redis.RedisUtil;
import com.genersoft.iot.vmp.vmanager.platform.bean.ChannelReduce;
import gov.nist.javax.sip.SipStackImpl;
import gov.nist.javax.sip.address.AddressImpl;
import gov.nist.javax.sip.address.SipUri;
@ -114,10 +123,10 @@ public class MessageRequestProcessor extends SIPRequestAbstractProcessor {
logger.info("接收到Catalog消息");
processMessageCatalogList(evt);
} else if (MESSAGE_DEVICE_INFO.equals(cmd)) {
logger.info("接收到DeviceInfo消息");
//DeviceInfo消息处理
processMessageDeviceInfo(evt);
} else if (MESSAGE_DEVICE_STATUS.equals(cmd)) {
logger.info("接收到DeviceStatus消息");
// DeviceStatus消息处理
processMessageDeviceStatus(evt);
} else if (MESSAGE_DEVICE_CONTROL.equals(cmd)) {
logger.info("接收到DeviceControl消息");
@ -211,27 +220,48 @@ public class MessageRequestProcessor extends SIPRequestAbstractProcessor {
private void processMessageDeviceStatus(RequestEvent evt) {
try {
Element rootElement = getRootElement(evt);
String deviceId = XmlUtil.getText(rootElement, "DeviceID");
// 检查设备是否存在, 不存在则不回复
if (storager.exists(deviceId)) {
// 回复200 OK
responseAck(evt);
JSONObject json = new JSONObject();
XmlUtil.node2Json(rootElement, json);
if (logger.isDebugEnabled()) {
logger.debug(json.toJSONString());
}
RequestMessage msg = new RequestMessage();
msg.setDeviceId(deviceId);
msg.setType(DeferredResultHolder.CALLBACK_CMD_DEVICESTATUS);
msg.setData(json);
deferredResultHolder.invokeResult(msg);
String name = rootElement.getName();
Element deviceIdElement = rootElement.element("DeviceID");
String deviceId = deviceIdElement.getText();
if (offLineDetector.isOnline(deviceId)) {
publisher.onlineEventPublish(deviceId, VideoManagerConstants.EVENT_ONLINE_KEEPLIVE);
if (name.equalsIgnoreCase("Query")) { // 区分是Response——查询响应,还是Query——查询请求
logger.info("接收到DeviceStatus查询消息");
FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME);
String platformId = ((SipUri) fromHeader.getAddress().getURI()).getUser();
if (platformId == null) {
response404Ack(evt);
return;
} else {
// 回复200 OK
responseAck(evt);
String sn = rootElement.element("SN").getText();
ParentPlatform parentPlatform = storager.queryParentPlatById(platformId);
cmderFroPlatform.deviceStatusResponse(parentPlatform, sn, fromHeader.getTag());
}
} else {
logger.info("接收到DeviceStatus应答消息");
// 检查设备是否存在, 不存在则不回复
if (storager.exists(deviceId)) {
// 回复200 OK
responseAck(evt);
JSONObject json = new JSONObject();
XmlUtil.node2Json(rootElement, json);
if (logger.isDebugEnabled()) {
logger.debug(json.toJSONString());
}
RequestMessage msg = new RequestMessage();
msg.setDeviceId(deviceId);
msg.setType(DeferredResultHolder.CALLBACK_CMD_DEVICESTATUS);
msg.setData(json);
deferredResultHolder.invokeResult(msg);
if (offLineDetector.isOnline(deviceId)) {
publisher.onlineEventPublish(deviceId, VideoManagerConstants.EVENT_ONLINE_KEEPLIVE);
} else {
}
}
}
} catch (ParseException | SipException | InvalidArgumentException | DocumentException e) {
e.printStackTrace();
}
@ -263,6 +293,51 @@ public class MessageRequestProcessor extends SIPRequestAbstractProcessor {
deferredResultHolder.invokeResult(msg);
} else {
// 此处是上级发出的DeviceControl指令
String platformId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(FromHeader.NAME)).getAddress().getURI()).getUser();
String targetGBId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(ToHeader.NAME)).getAddress().getURI()).getUser();
// 远程启动功能
if (!XmlUtil.isEmpty(XmlUtil.getText(rootElement, "TeleBoot"))) {
if (deviceId.equals(targetGBId)) {
// 远程启动功能:需要在重新启动程序后先对SipStack解绑
logger.info("执行远程启动本平台命令");
ParentPlatform parentPlatform = storager.queryParentPlatById(platformId);
cmderFroPlatform.unregister(parentPlatform, null, null);
Thread restartThread = new Thread(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(3000);
SipProvider up = (SipProvider) SpringBeanFactory.getBean("udpSipProvider");
SipStackImpl stack = (SipStackImpl)up.getSipStack();
stack.stop();
Iterator listener = stack.getListeningPoints();
while (listener.hasNext()) {
stack.deleteListeningPoint((ListeningPoint) listener.next());
}
Iterator providers = stack.getSipProviders();
while (providers.hasNext()) {
stack.deleteSipProvider((SipProvider) providers.next());
}
VManageBootstrap.restart();
} catch (InterruptedException ignored) {
} catch (ObjectInUseException e) {
e.printStackTrace();
}
}
});
restartThread.setDaemon(false);
restartThread.start();
} else {
// 远程启动指定设备
}
}
if (!XmlUtil.isEmpty(XmlUtil.getText(rootElement,"PTZCmd")) && !deviceId.equals(targetGBId)) {
String cmdString = XmlUtil.getText(rootElement,"PTZCmd");
Device device = storager.queryVideoDeviceByPlatformIdAndChannelId(platformId, deviceId);
cmder.fronEndCmd(device, deviceId, cmdString);
}
}
} catch (ParseException | SipException | InvalidArgumentException | DocumentException e) {
e.printStackTrace();
@ -374,9 +449,21 @@ public class MessageRequestProcessor extends SIPRequestAbstractProcessor {
Element deviceIdElement = rootElement.element("DeviceID");
String deviceId = deviceIdElement.getTextTrim().toString();
if (requestName.equals("Query")) {
// 回复200 OK
responseAck(evt);
logger.info("接收到DeviceInfo查询消息");
FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME);
String platformId = ((SipUri) fromHeader.getAddress().getURI()).getUser();
if (platformId == null) {
response404Ack(evt);
return;
} else {
// 回复200 OK
responseAck(evt);
String sn = rootElement.element("SN").getText();
ParentPlatform parentPlatform = storager.queryParentPlatById(platformId);
cmderFroPlatform.deviceInfoResponse(parentPlatform, sn, fromHeader.getTag());
}
} else {
logger.info("接收到DeviceInfo应答消息");
Device device = storager.queryVideoDevice(deviceId);
if (device == null) {
return;

14
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/response/impl/RegisterResponseProcessor.java

@ -60,16 +60,17 @@ public class RegisterResponseProcessor implements ISIPResponseProcessor {
logger.info(String.format("未找到callId: %s 的注册/注销平台id", callId ));
return;
}
logger.info(String.format("收到 %s 的注册/注销%S响应", platformGBId, response.getStatusCode() ));
ParentPlatformCatch parentPlatformCatch = redisCatchStorage.queryPlatformCatchInfo(platformGBId);
if (parentPlatformCatch == null) {
logger.warn(String.format("收到 %s 的注册/注销%S请求, 但是平台缓存信息未查询到!!!", platformGBId, response.getStatusCode()));
return;
}
String action = parentPlatformCatch.getParentPlatform().getExpires().equals("0") ? "注销" : "注册";
logger.info(String.format("收到 %s %s的%S响应", platformGBId, action, response.getStatusCode() ));
ParentPlatform parentPlatform = parentPlatformCatch.getParentPlatform();
if (parentPlatform == null) {
logger.warn(String.format("收到 %s 的注册/注销%S请求, 但是平台信息未查询到!!!", platformGBId, response.getStatusCode()));
logger.warn(String.format("收到 %s %s的%S请求, 但是平台信息未查询到!!!", platformGBId, action, response.getStatusCode()));
return;
}
@ -77,11 +78,16 @@ public class RegisterResponseProcessor implements ISIPResponseProcessor {
WWWAuthenticateHeader www = (WWWAuthenticateHeader)response.getHeader(WWWAuthenticateHeader.NAME);
sipCommanderForPlatform.register(parentPlatform, callId, www, null, null);
}else if (response.getStatusCode() == 200){
// 注册成功
logger.info(String.format("%s 注册成功", platformGBId ));
// 注册/注销成功
logger.info(String.format("%s %s成功", platformGBId, action));
redisCatchStorage.delPlatformRegisterInfo(callId);
parentPlatform.setStatus(true);
// 取回Expires设置,避免注销过程中被置为0
ParentPlatform parentPlatformTmp = storager.queryParentPlatById(platformGBId);
String expires = parentPlatformTmp.getExpires();
parentPlatform.setExpires(expires);
storager.updateParentPlatform(parentPlatform);
redisCatchStorage.updatePlatformRegister(parentPlatform);
redisCatchStorage.updatePlatformKeepalive(parentPlatform);

23
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java

@ -267,20 +267,25 @@ public class ZLMHttpHookListener {
}
String streamId = json.getString("stream");
cmder.streamByeCmd(streamId);
StreamInfo streamInfo = redisCatchStorage.queryPlayByStreamId(streamId);
if (streamInfo!=null){
redisCatchStorage.stopPlay(streamInfo);
storager.stopPlay(streamInfo.getDeviceID(), streamInfo.getChannelId());
JSONObject ret = new JSONObject();
ret.put("code", 0);
ret.put("close", true);
if (streamInfo != null) {
if (redisCatchStorage.isChannelSendingRTP(streamInfo.getChannelId())) {
ret.put("close", false);
} else {
cmder.streamByeCmd(streamId);
redisCatchStorage.stopPlay(streamInfo);
storager.stopPlay(streamInfo.getDeviceID(), streamInfo.getChannelId());
}
}else{
cmder.streamByeCmd(streamId);
streamInfo = redisCatchStorage.queryPlaybackByStreamId(streamId);
redisCatchStorage.stopPlayback(streamInfo);
}
JSONObject ret = new JSONObject();
ret.put("code", 0);
ret.put("close", true);
return new ResponseEntity<String>(ret.toString(),HttpStatus.OK);
}

10
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java

@ -152,6 +152,16 @@ public class ZLMRTPServerFactory {
return (mediaInfo.getInteger("code") == 0 && mediaInfo.getBoolean("online"));
}
/**
* 查询转推的流是否有其它观看者
* @param streamId
* @return
*/
public int totalReaderCount(String streamId) {
JSONObject mediaInfo = zlmresTfulUtils.getMediaInfo("rtp", "rtmp", streamId);
return mediaInfo.getInteger("totalReaderCount");
}
/**
* 调用zlm RESTful API stopSendRtp
*/

13
src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java

@ -89,4 +89,17 @@ public interface IRedisCatchStorage {
*/
SendRtpItem querySendRTPServer(String platformGbId, String channelId);
/**
* 删除RTP推送信息缓存
* @param platformGbId
* @param channelId
*/
void deleteSendRTPServer(String platformGbId, String channelId);
/**
* 查询某个通道是否存在上级点播RTP推送
* @param channelId
*/
boolean isChannelSendingRTP(String channelId);
}

26
src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java

@ -225,4 +225,30 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage {
return (SendRtpItem)redis.get(key);
}
/**
* 删除RTP推送信息缓存
* @param platformGbId
* @param channelId
*/
@Override
public void deleteSendRTPServer(String platformGbId, String channelId) {
String key = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX + platformGbId + "_" + channelId;
redis.del(key);
}
/**
* 查询某个通道是否存在上级点播RTP推送
* @param channelId
*/
@Override
public boolean isChannelSendingRTP(String channelId) {
String key = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX + "*_" + channelId;
List<Object> RtpStreams = redis.scan(key);
if (RtpStreams.size() > 0) {
return true;
} else {
return false;
}
}
}

14
src/main/java/com/genersoft/iot/vmp/vmanager/platform/PlatformController.java

@ -60,7 +60,7 @@ public class PlatformController {
public ResponseEntity<String> savePlatform(@RequestBody ParentPlatform parentPlatform){
if (logger.isDebugEnabled()) {
logger.debug("查询所有上级设备API调用");
logger.debug("保存上级平台信息API调用");
}
if (StringUtils.isEmpty(parentPlatform.getName())
||StringUtils.isEmpty(parentPlatform.getServerGBId())
@ -87,13 +87,13 @@ public class PlatformController {
if (parentPlatform.isEnable()) {
// 只要保存就发送注册
commanderForPlatform.register(parentPlatform);
}else if (parentPlatformOld != null && parentPlatformOld.isEnable() && !parentPlatform.isEnable()){ // 关闭启用时注销
} else if (parentPlatformOld != null && parentPlatformOld.isEnable() && !parentPlatform.isEnable()){ // 关闭启用时注销
commanderForPlatform.unregister(parentPlatform, null, null);
}
return new ResponseEntity<>("success", HttpStatus.OK);
}else {
} else {
return new ResponseEntity<>("fail", HttpStatus.OK);
}
}
@ -103,7 +103,7 @@ public class PlatformController {
public ResponseEntity<String> deletePlatform(@RequestBody ParentPlatform parentPlatform){
if (logger.isDebugEnabled()) {
logger.debug("查询所有上级设备API调用");
logger.debug("删除上级平台API调用");
}
if (StringUtils.isEmpty(parentPlatform.getServerGBId())
){
@ -138,7 +138,7 @@ public class PlatformController {
public ResponseEntity<String> exitPlatform(@PathVariable String deviceGbId){
if (logger.isDebugEnabled()) {
logger.debug("查询所有上级设备API调用");
logger.debug("查询上级平台是否存在API调用:" + deviceGbId);
}
ParentPlatform parentPlatform = storager.queryParentPlatById(deviceGbId);
return new ResponseEntity<>(String.valueOf(parentPlatform != null), HttpStatus.OK);
@ -184,7 +184,7 @@ public class PlatformController {
public ResponseEntity<String> delChannelForGB(@RequestBody UpdateChannelParam param){
if (logger.isDebugEnabled()) {
logger.debug("给上级平台添加国标通道API调用");
logger.debug("给上级平台删除国标通道API调用");
}
int result = storager.delChannelForGB(param.getPlatformId(), param.getChannelReduces());

3
src/main/java/com/genersoft/iot/vmp/vmanager/service/impl/PlayServiceImpl.java

@ -86,6 +86,9 @@ public class PlayServiceImpl implements IPlayService {
msg.setId(DeferredResultHolder.CALLBACK_CMD_PlAY + uuid);
msg.setData(JSON.toJSONString(streamInfo));
resultHolder.invokeResult(msg);
if (hookEvent != null) {
hookEvent.response(JSONObject.parseObject(JSON.toJSONString(streamInfo)));
}
} else {
redisCatchStorage.stopPlay(streamInfo);
storager.stopPlay(streamInfo.getDeviceID(), streamInfo.getChannelId());

Loading…
Cancel
Save