diff --git a/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java b/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java index e45055be..7f659abb 100644 --- a/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java +++ b/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java @@ -58,6 +58,10 @@ public class VideoManagerConstants { public static final String SIP_CSEQ_PREFIX = "VMP_SIP_CSEQ_"; + public static final String SIP_SN_PREFIX = "VMP_SIP_SN_"; + + public static final String SIP_SUBSCRIBE_PREFIX = "SIP_SUBSCRIBE_"; + //************************** redis 消息********************************* public static final String WVP_MSG_STREAM_CHANGE_PREFIX = "WVP_MSG_STREAM_CHANGE_"; public static final String WVP_MSG_GPS_PREFIX = "WVP_MSG_GPS_"; diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/CmdType.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/CmdType.java new file mode 100644 index 00000000..ed5cad6c --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/CmdType.java @@ -0,0 +1,8 @@ +package com.genersoft.iot.vmp.gb28181.bean; + +public class CmdType { + + public static final String CATALOG = "Catalog"; + public static final String ALARM = "Alarm"; + public static final String MOBILE_POSITION = "MobilePosition"; +} diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeInfo.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeInfo.java new file mode 100644 index 00000000..66d67bfa --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeInfo.java @@ -0,0 +1,78 @@ +package com.genersoft.iot.vmp.gb28181.bean; + +import javax.sip.RequestEvent; +import javax.sip.header.*; +import javax.sip.message.Request; + +public class SubscribeInfo { + + public SubscribeInfo() { + } + + public SubscribeInfo(RequestEvent evt, String id) { + this.id = id; + Request request = evt.getRequest(); + CallIdHeader callIdHeader = (CallIdHeader)request.getHeader(CallIdHeader.NAME); + this.callId = callIdHeader.getCallId(); + FromHeader fromHeader = (FromHeader)request.getHeader(FromHeader.NAME); + this.fromTag = fromHeader.getTag(); + ExpiresHeader expiresHeader = (ExpiresHeader)request.getHeader(ExpiresHeader.NAME); + this.expires = expiresHeader.getExpires(); + this.event = (EventHeader)request.getHeader(EventHeader.NAME); + } + + private String id; + private int expires; + private String callId; + private EventHeader event; + private String fromTag; + private String toTag; + + public String getId() { + return id; + } + + public int getExpires() { + return expires; + } + + public String getCallId() { + return callId; + } + + public EventHeader getEvent() { + return event; + } + + public String getFromTag() { + return fromTag; + } + + public void setToTag(String toTag) { + this.toTag = toTag; + } + + public String getToTag() { + return toTag; + } + + public void setId(String id) { + this.id = id; + } + + public void setExpires(int expires) { + this.expires = expires; + } + + public void setCallId(String callId) { + this.callId = callId; + } + + public void setEvent(EventHeader event) { + this.event = event; + } + + public void setFromTag(String fromTag) { + this.fromTag = fromTag; + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/SubscribeListenerForPlatform.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/SubscribeListenerForPlatform.java new file mode 100644 index 00000000..bac8e3dd --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/SubscribeListenerForPlatform.java @@ -0,0 +1,52 @@ +package com.genersoft.iot.vmp.gb28181.event.subscribe; + +import com.genersoft.iot.vmp.common.VideoManagerConstants; +import com.genersoft.iot.vmp.conf.DynamicTask; +import com.genersoft.iot.vmp.conf.RedisKeyExpirationEventMessageListener; +import com.genersoft.iot.vmp.conf.UserSetup; +import com.genersoft.iot.vmp.gb28181.event.EventPublisher; +import org.checkerframework.checker.units.qual.A; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.redis.connection.Message; +import org.springframework.data.redis.listener.RedisMessageListenerContainer; +import org.springframework.stereotype.Component; + +/** + * 平台订阅到期事件 + */ +@Component +public class SubscribeListenerForPlatform extends RedisKeyExpirationEventMessageListener { + + private Logger logger = LoggerFactory.getLogger(SubscribeListenerForPlatform.class); + + @Autowired + private UserSetup userSetup; + + @Autowired + private DynamicTask dynamicTask; + + public SubscribeListenerForPlatform(RedisMessageListenerContainer listenerContainer, UserSetup userSetup) { + super(listenerContainer, userSetup); + } + + + /** + * 监听失效的key + * @param message + * @param pattern + */ + @Override + public void onMessage(Message message, byte[] pattern) { + // 获取失效的key + String expiredKey = message.toString(); + logger.debug(expiredKey); + // 订阅到期 + String PLATFORM_KEEPLIVEKEY_PREFIX = VideoManagerConstants.SIP_SUBSCRIBE_PREFIX + userSetup.getServerId() + "_"; + if (expiredKey.startsWith(PLATFORM_KEEPLIVEKEY_PREFIX)) { + // 取消定时任务 + dynamicTask.stopCron(expiredKey); + } + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/task/GPSSubscribeTask.java b/src/main/java/com/genersoft/iot/vmp/gb28181/task/GPSSubscribeTask.java new file mode 100644 index 00000000..ce990a0d --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/task/GPSSubscribeTask.java @@ -0,0 +1,59 @@ +package com.genersoft.iot.vmp.gb28181.task; + +import com.genersoft.iot.vmp.gb28181.bean.GbStream; +import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; +import com.genersoft.iot.vmp.gb28181.bean.SubscribeInfo; +import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; +import com.genersoft.iot.vmp.service.bean.GPSMsgInfo; +import com.genersoft.iot.vmp.storager.IRedisCatchStorage; +import com.genersoft.iot.vmp.storager.IVideoManagerStorager; + +import java.text.SimpleDateFormat; +import java.util.List; + +public class GPSSubscribeTask implements Runnable{ + + private IRedisCatchStorage redisCatchStorage; + private IVideoManagerStorager storager; + private ISIPCommanderForPlatform sipCommanderForPlatform; + private String platformId; + private String sn; + private String key; + + private final SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + + public GPSSubscribeTask(IRedisCatchStorage redisCatchStorage, ISIPCommanderForPlatform sipCommanderForPlatform, IVideoManagerStorager storager, String platformId, String sn, String key) { + this.redisCatchStorage = redisCatchStorage; + this.storager = storager; + this.platformId = platformId; + this.sn = sn; + this.key = key; + this.sipCommanderForPlatform = sipCommanderForPlatform; + } + + @Override + public void run() { + + SubscribeInfo subscribe = redisCatchStorage.getSubscribe(key); + if (subscribe != null) { + System.out.println("发送GPS消息"); + ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(platformId); + if (parentPlatform == null || parentPlatform.isStatus()) { + // TODO 暂时只处理视频流的回复,后续增加对国标设备的支持 + List gbStreams = storager.queryGbStreamListInPlatform(platformId); + if (gbStreams.size() > 0) { + for (GbStream gbStream : gbStreams) { + String gbId = gbStream.getGbId(); + GPSMsgInfo gpsMsgInfo = redisCatchStorage.getGpsMsgInfo(gbId); + if (gpsMsgInfo != null && gbStream.isStatus()) { + // 发送GPS消息 + sipCommanderForPlatform.sendMobilePosition(parentPlatform, gpsMsgInfo, subscribe); + } + } + } + } + } + + + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommanderForPlatform.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommanderForPlatform.java index fe302938..e8b41247 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommanderForPlatform.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommanderForPlatform.java @@ -2,7 +2,9 @@ package com.genersoft.iot.vmp.gb28181.transmit.cmd; import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; +import com.genersoft.iot.vmp.gb28181.bean.SubscribeInfo; import com.genersoft.iot.vmp.gb28181.event.SipSubscribe; +import com.genersoft.iot.vmp.service.bean.GPSMsgInfo; import javax.sip.header.WWWAuthenticateHeader; @@ -61,4 +63,12 @@ public interface ISIPCommanderForPlatform { */ boolean deviceStatusResponse(ParentPlatform parentPlatform, String sn, String fromTag); + /** + * 向上级回复移动位置订阅消息 + * @param parentPlatform 平台信息 + * @param gpsMsgInfo GPS信息 + * @param subscribeInfo 订阅相关的信息 + * @return + */ + boolean sendMobilePosition(ParentPlatform parentPlatform, GPSMsgInfo gpsMsgInfo, SubscribeInfo subscribeInfo); } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderPlarformProvider.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderPlarformProvider.java index cc41af7c..27125e1e 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderPlarformProvider.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderPlarformProvider.java @@ -2,6 +2,7 @@ package com.genersoft.iot.vmp.gb28181.transmit.cmd; import com.genersoft.iot.vmp.conf.SipConfig; import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; +import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import gov.nist.javax.sip.message.MessageFactoryImpl; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @@ -32,6 +33,9 @@ public class SIPRequestHeaderPlarformProvider { @Autowired private SipFactory sipFactory; + @Autowired + private IRedisCatchStorage redisCatchStorage; + public Request createKeetpaliveMessageRequest(ParentPlatform parentPlatform, String content, String viaTag, String fromTag, String toTag, CallIdHeader callIdHeader) throws ParseException, InvalidArgumentException, PeerUnavailableException { Request request = null; @@ -57,7 +61,7 @@ public class SIPRequestHeaderPlarformProvider { // Forwards MaxForwardsHeader maxForwards = sipFactory.createHeaderFactory().createMaxForwardsHeader(70); // ceq - CSeqHeader cSeqHeader = sipFactory.createHeaderFactory().createCSeqHeader(1L, Request.MESSAGE); + CSeqHeader cSeqHeader = sipFactory.createHeaderFactory().createCSeqHeader(redisCatchStorage.getCSEQ(Request.MESSAGE), Request.MESSAGE); request = sipFactory.createMessageFactory().createRequest(requestURI, Request.MESSAGE, callIdHeader, cSeqHeader, fromHeader, toHeader, viaHeaders, maxForwards); @@ -122,7 +126,7 @@ public class SIPRequestHeaderPlarformProvider { String callId, WWWAuthenticateHeader www , CallIdHeader callIdHeader) throws ParseException, PeerUnavailableException, InvalidArgumentException { - Request registerRequest = createRegisterRequest(parentPlatform, 2L, fromTag, viaTag, callIdHeader); + Request registerRequest = createRegisterRequest(parentPlatform, redisCatchStorage.getCSEQ(Request.REGISTER), fromTag, viaTag, callIdHeader); String realm = www.getRealm(); String nonce = www.getNonce(); @@ -208,7 +212,7 @@ public class SIPRequestHeaderPlarformProvider { // Forwards MaxForwardsHeader maxForwards = sipFactory.createHeaderFactory().createMaxForwardsHeader(70); // ceq - CSeqHeader cSeqHeader = sipFactory.createHeaderFactory().createCSeqHeader(1L, Request.MESSAGE); + CSeqHeader cSeqHeader = sipFactory.createHeaderFactory().createCSeqHeader(redisCatchStorage.getCSEQ(Request.MESSAGE), Request.MESSAGE); MessageFactoryImpl messageFactory = (MessageFactoryImpl) sipFactory.createMessageFactory(); // 设置编码, 防止中文乱码 messageFactory.setDefaultContentEncodingCharset("gb2312"); @@ -223,4 +227,43 @@ public class SIPRequestHeaderPlarformProvider { request.setContent(content, contentTypeHeader); return request; } + + public Request createNotifyRequest(ParentPlatform parentPlatform, String content, String fromTag, String toTag, CallIdHeader callIdHeader) throws PeerUnavailableException, ParseException, InvalidArgumentException { + Request request = null; + // sipuri + SipURI requestURI = sipFactory.createAddressFactory().createSipURI(parentPlatform.getServerGBId(), parentPlatform.getServerIP()+ ":" + parentPlatform.getServerPort()); + // via + ArrayList viaHeaders = new ArrayList(); + ViaHeader viaHeader = sipFactory.createHeaderFactory().createViaHeader(parentPlatform.getDeviceIp(), Integer.parseInt(parentPlatform.getDevicePort()), + parentPlatform.getTransport(), null); + viaHeader.setRPort(); + viaHeaders.add(viaHeader); + // from + SipURI fromSipURI = sipFactory.createAddressFactory().createSipURI(parentPlatform.getDeviceGBId(), + parentPlatform.getDeviceIp() + ":" + parentPlatform.getDevicePort()); + Address fromAddress = sipFactory.createAddressFactory().createAddress(fromSipURI); + FromHeader fromHeader = sipFactory.createHeaderFactory().createFromHeader(fromAddress, fromTag); + // to + SipURI toSipURI = sipFactory.createAddressFactory().createSipURI(parentPlatform.getServerGBId(), parentPlatform.getServerGBDomain()); + Address toAddress = sipFactory.createAddressFactory().createAddress(toSipURI); + ToHeader toHeader = sipFactory.createHeaderFactory().createToHeader(toAddress, toTag); + + // Forwards + MaxForwardsHeader maxForwards = sipFactory.createHeaderFactory().createMaxForwardsHeader(70); + // ceq + CSeqHeader cSeqHeader = sipFactory.createHeaderFactory().createCSeqHeader(redisCatchStorage.getCSEQ(Request.NOTIFY), Request.NOTIFY); + MessageFactoryImpl messageFactory = (MessageFactoryImpl) sipFactory.createMessageFactory(); + // 设置编码, 防止中文乱码 + messageFactory.setDefaultContentEncodingCharset("gb2312"); + request = messageFactory.createRequest(requestURI, Request.NOTIFY, callIdHeader, cSeqHeader, fromHeader, + toHeader, viaHeaders, maxForwards); + List agentParam = new ArrayList<>(); + agentParam.add("wvp-pro"); + UserAgentHeader userAgentHeader = sipFactory.createHeaderFactory().createUserAgentHeader(agentParam); + request.addHeader(userAgentHeader); + + ContentTypeHeader contentTypeHeader = sipFactory.createHeaderFactory().createContentTypeHeader("APPLICATION", "MANSCDP+xml"); + request.setContent(content, contentTypeHeader); + return request; + } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java index cf8b0a56..1707bde8 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java @@ -3,9 +3,11 @@ package com.genersoft.iot.vmp.gb28181.transmit.cmd.impl; import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; import com.genersoft.iot.vmp.gb28181.bean.ParentPlatformCatch; +import com.genersoft.iot.vmp.gb28181.bean.SubscribeInfo; import com.genersoft.iot.vmp.gb28181.event.SipSubscribe; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; import com.genersoft.iot.vmp.gb28181.transmit.cmd.SIPRequestHeaderPlarformProvider; +import com.genersoft.iot.vmp.service.bean.GPSMsgInfo; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -92,7 +94,7 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform { callIdHeader = udpSipProvider.getNewCallId(); } - request = headerProviderPlarformProvider.createRegisterRequest(parentPlatform, 1L, "FromRegister" + tm, null, callIdHeader); + request = headerProviderPlarformProvider.createRegisterRequest(parentPlatform, redisCatchStorage.getCSEQ(Request.REGISTER), "FromRegister" + tm, null, callIdHeader); // 将 callid 写入缓存, 等注册成功可以更新状态 redisCatchStorage.updatePlatformRegisterInfo(callIdHeader.getCallId(), parentPlatform.getServerGBId()); @@ -325,4 +327,41 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform { } return true; } + + @Override + public boolean sendMobilePosition(ParentPlatform parentPlatform, GPSMsgInfo gpsMsgInfo, SubscribeInfo subscribeInfo) { + if (parentPlatform == null) { + return false; + } + + try { + StringBuffer deviceStatusXml = new StringBuffer(600); + deviceStatusXml.append("\r\n"); + deviceStatusXml.append("\r\n"); + deviceStatusXml.append("MobilePosition\r\n"); + deviceStatusXml.append("" + (int)((Math.random()*9+1)*100000) + "\r\n"); + deviceStatusXml.append("" + gpsMsgInfo.getId() + "\r\n"); + deviceStatusXml.append("\r\n"); + deviceStatusXml.append("" + gpsMsgInfo.getLng() + "\r\n"); + deviceStatusXml.append("" + gpsMsgInfo.getLat() + "\r\n"); + deviceStatusXml.append("" + gpsMsgInfo.getSpeed() + "\r\n"); + deviceStatusXml.append("" + gpsMsgInfo.getDirection() + "\r\n"); + deviceStatusXml.append("" + gpsMsgInfo.getAltitude() + "\r\n"); + deviceStatusXml.append("\r\n"); + + CallIdHeader callIdHeader = parentPlatform.getTransport().equals("TCP") ? tcpSipProvider.getNewCallId() + : udpSipProvider.getNewCallId(); + callIdHeader.setCallId(subscribeInfo.getCallId()); + + String tm = Long.toString(System.currentTimeMillis()); + + Request request = headerProviderPlarformProvider.createNotifyRequest(parentPlatform, deviceStatusXml.toString(), subscribeInfo.getToTag(), subscribeInfo.getFromTag(), callIdHeader); + transmitRequest(parentPlatform, request); + + } catch (SipException | ParseException | InvalidArgumentException e) { + e.printStackTrace(); + return false; + } + return true; + } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/SIPRequestProcessorParent.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/SIPRequestProcessorParent.java index e7f8f727..d4de725d 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/SIPRequestProcessorParent.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/SIPRequestProcessorParent.java @@ -18,6 +18,7 @@ import javax.sip.address.Address; import javax.sip.address.AddressFactory; import javax.sip.address.SipURI; import javax.sip.header.ContentTypeHeader; +import javax.sip.header.ExpiresHeader; import javax.sip.header.HeaderFactory; import javax.sip.header.ViaHeader; import javax.sip.message.MessageFactory; @@ -153,7 +154,7 @@ public abstract class SIPRequestProcessorParent { * @throws InvalidArgumentException * @throws ParseException */ - public void responseAck(RequestEvent evt, String sdp) throws SipException, InvalidArgumentException, ParseException { + public void responseSdpAck(RequestEvent evt, String sdp) throws SipException, InvalidArgumentException, ParseException { Response response = getMessageFactory().createResponse(Response.OK, evt.getRequest()); SipFactory sipFactory = SipFactory.getInstance(); ContentTypeHeader contentTypeHeader = sipFactory.createHeaderFactory().createContentTypeHeader("APPLICATION", "SDP"); @@ -168,6 +169,31 @@ public abstract class SIPRequestProcessorParent { getServerTransaction(evt).sendResponse(response); } + /** + * 回复带xml的200 + * @param evt + * @param xml + * @throws SipException + * @throws InvalidArgumentException + * @throws ParseException + */ + public Response responseXmlAck(RequestEvent evt, String xml) throws SipException, InvalidArgumentException, ParseException { + Response response = getMessageFactory().createResponse(Response.OK, evt.getRequest()); + SipFactory sipFactory = SipFactory.getInstance(); + ContentTypeHeader contentTypeHeader = sipFactory.createHeaderFactory().createContentTypeHeader("APPLICATION", "MANSCDP+xml"); + response.setContent(xml, contentTypeHeader); + + SipURI sipURI = (SipURI)evt.getRequest().getRequestURI(); + + Address concatAddress = sipFactory.createAddressFactory().createAddress( + sipFactory.createAddressFactory().createSipURI(sipURI.getUser(), sipURI.getHost()+":"+sipURI.getPort() + )); + response.addHeader(sipFactory.createHeaderFactory().createContactHeader(concatAddress)); + response.addHeader(evt.getRequest().getHeader(ExpiresHeader.NAME)); + getServerTransaction(evt).sendResponse(response); + return response; + } + public Element getRootElement(RequestEvent evt) throws DocumentException { return getRootElement(evt, "gb2312"); } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java index 203d58b9..2a9abad5 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java @@ -253,7 +253,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements content.append("f=\r\n"); try { - responseAck(evt, content.toString()); + responseSdpAck(evt, content.toString()); } catch (SipException e) { e.printStackTrace(); } catch (InvalidArgumentException e) { @@ -310,7 +310,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements content.append("f=\r\n"); try { - responseAck(evt, content.toString()); + responseSdpAck(evt, content.toString()); } catch (SipException e) { e.printStackTrace(); } catch (InvalidArgumentException e) { diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java index 2e58d9d3..67bb56c5 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java @@ -62,9 +62,7 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements @Autowired private DeviceOffLineDetector offLineDetector; - private static final String NOTIFY_CATALOG = "Catalog"; - private static final String NOTIFY_ALARM = "Alarm"; - private static final String NOTIFY_MOBILE_POSITION = "MobilePosition"; + private String method = "NOTIFY"; @Autowired @@ -82,13 +80,13 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements Element rootElement = getRootElement(evt); String cmd = XmlUtil.getText(rootElement, "CmdType"); - if (NOTIFY_CATALOG.equals(cmd)) { + if (CmdType.CATALOG.equals(cmd)) { logger.info("接收到Catalog通知"); processNotifyCatalogList(evt); - } else if (NOTIFY_ALARM.equals(cmd)) { + } else if (CmdType.ALARM.equals(cmd)) { logger.info("接收到Alarm通知"); processNotifyAlarm(evt); - } else if (NOTIFY_MOBILE_POSITION.equals(cmd)) { + } else if (CmdType.MOBILE_POSITION.equals(cmd)) { logger.info("接收到MobilePosition通知"); processNotifyMobilePosition(evt); } else { diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/SubscribeRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/SubscribeRequestProcessor.java index be4b2ce9..9c5be8e4 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/SubscribeRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/SubscribeRequestProcessor.java @@ -1,8 +1,21 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl; +import com.genersoft.iot.vmp.common.VideoManagerConstants; +import com.genersoft.iot.vmp.conf.DynamicTask; +import com.genersoft.iot.vmp.conf.UserSetup; +import com.genersoft.iot.vmp.gb28181.bean.CmdType; +import com.genersoft.iot.vmp.gb28181.bean.SubscribeInfo; +import com.genersoft.iot.vmp.gb28181.task.GPSSubscribeTask; import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver; +import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor; import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; +import com.genersoft.iot.vmp.gb28181.utils.SipUtils; +import com.genersoft.iot.vmp.gb28181.utils.XmlUtil; +import com.genersoft.iot.vmp.storager.IRedisCatchStorage; +import com.genersoft.iot.vmp.storager.IVideoManagerStorager; +import org.dom4j.DocumentException; +import org.dom4j.Element; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.InitializingBean; @@ -13,7 +26,10 @@ import javax.sip.InvalidArgumentException; import javax.sip.RequestEvent; import javax.sip.ServerTransaction; import javax.sip.SipException; +import javax.sip.header.CallIdHeader; import javax.sip.header.ExpiresHeader; +import javax.sip.header.Header; +import javax.sip.header.ToHeader; import javax.sip.message.Request; import javax.sip.message.Response; import java.text.ParseException; @@ -30,6 +46,21 @@ public class SubscribeRequestProcessor extends SIPRequestProcessorParent impleme @Autowired private SIPProcessorObserver sipProcessorObserver; + @Autowired + private IRedisCatchStorage redisCatchStorage; + + @Autowired + private ISIPCommanderForPlatform sipCommanderForPlatform; + + @Autowired + private IVideoManagerStorager storager; + + @Autowired + private DynamicTask dynamicTask; + + @Autowired + private UserSetup userSetup; + @Override public void afterPropertiesSet() throws Exception { // 添加消息处理的订阅 @@ -46,30 +77,107 @@ public class SubscribeRequestProcessor extends SIPRequestProcessorParent impleme Request request = evt.getRequest(); try { - Response response = null; - response = getMessageFactory().createResponse(200, request); - if (response != null) { - ExpiresHeader expireHeader = getHeaderFactory().createExpiresHeader(30); - response.setExpires(expireHeader); - } - logger.info("response : " + response.toString()); - ServerTransaction transaction = getServerTransaction(evt); - if (transaction != null) { - transaction.sendResponse(response); - transaction.getDialog().delete(); - transaction.terminate(); + Element rootElement = getRootElement(evt); + String cmd = XmlUtil.getText(rootElement, "CmdType"); + if (CmdType.MOBILE_POSITION.equals(cmd)) { + logger.info("接收到MobilePosition订阅"); + processNotifyMobilePosition(evt, rootElement); +// } else if (CmdType.ALARM.equals(cmd)) { +// logger.info("接收到Alarm订阅"); +// processNotifyAlarm(evt, rootElement); +// } else if (CmdType.CATALOG.equals(cmd)) { +// logger.info("接收到Catalog订阅"); +// processNotifyCatalogList(evt, rootElement); } else { - logger.info("processRequest serverTransactionId is null."); + logger.info("接收到消息:" + cmd); +// responseAck(evt, Response.OK); + + Response response = null; + response = getMessageFactory().createResponse(200, request); + if (response != null) { + ExpiresHeader expireHeader = getHeaderFactory().createExpiresHeader(30); + response.setExpires(expireHeader); + } + logger.info("response : " + response.toString()); + ServerTransaction transaction = getServerTransaction(evt); + if (transaction != null) { + transaction.sendResponse(response); + transaction.getDialog().delete(); + transaction.terminate(); + } else { + logger.info("processRequest serverTransactionId is null."); + } } + + } catch (ParseException e) { e.printStackTrace(); } catch (SipException e) { e.printStackTrace(); } catch (InvalidArgumentException e) { e.printStackTrace(); + } catch (DocumentException e) { + e.printStackTrace(); + } + + } + + /** + * 处理移动位置订阅消息 + */ + private void processNotifyMobilePosition(RequestEvent evt, Element rootElement) { + String platformId = SipUtils.getUserIdFromFromHeader(evt.getRequest()); + String deviceID = XmlUtil.getText(rootElement, "DeviceID"); + SubscribeInfo subscribeInfo = new SubscribeInfo(evt, platformId); + String sn = XmlUtil.getText(rootElement, "SN"); + String key = VideoManagerConstants.SIP_SUBSCRIBE_PREFIX + userSetup.getServerId() + "_MobilePosition_" + platformId; + + StringBuilder resultXml = new StringBuilder(200); + resultXml.append("\r\n") + .append("\r\n") + .append("MobilePosition\r\n") + .append("" + sn + "\r\n") + .append("" + deviceID + "\r\n") + .append("OK\r\n") + .append("\r\n"); + + if (subscribeInfo.getExpires() > 0) { + if (redisCatchStorage.getSubscribe(key) != null) { + dynamicTask.stopCron(key); + } + String interval = XmlUtil.getText(rootElement, "Interval"); // GPS上报时间间隔 + dynamicTask.startCron(key, new GPSSubscribeTask(redisCatchStorage, sipCommanderForPlatform, storager, platformId, sn, key), Integer.parseInt(interval)); + + redisCatchStorage.updateSubscribe(key, subscribeInfo); + }else if (subscribeInfo.getExpires() == 0) { + dynamicTask.stopCron(key); + redisCatchStorage.delSubscribe(key); + } + + + + try { + Response response = responseXmlAck(evt, resultXml.toString()); + ToHeader toHeader = (ToHeader)response.getHeader(ToHeader.NAME); + subscribeInfo.setToTag(toHeader.getTag()); + redisCatchStorage.updateSubscribe(key, subscribeInfo); + + } catch (SipException e) { + e.printStackTrace(); + } catch (InvalidArgumentException e) { + e.printStackTrace(); + } catch (ParseException e) { + e.printStackTrace(); } - + } + + private void processNotifyAlarm(RequestEvent evt, Element rootElement) { + + } + + private void processNotifyCatalogList(RequestEvent evt, Element rootElement) { + } } diff --git a/src/main/java/com/genersoft/iot/vmp/service/bean/GPSMsgInfo.java b/src/main/java/com/genersoft/iot/vmp/service/bean/GPSMsgInfo.java index 3c5b2bf9..855cc5e4 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/bean/GPSMsgInfo.java +++ b/src/main/java/com/genersoft/iot/vmp/service/bean/GPSMsgInfo.java @@ -23,7 +23,7 @@ public class GPSMsgInfo { private double speed; /** - * 产生通知时间, + * 产生通知时间, 时间格式: 2020-01-14T14:32:12 */ private String time; diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/RedisGPSMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/impl/RedisGPSMsgListener.java index e3bfcde7..4e390b73 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/RedisGPSMsgListener.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/RedisGPSMsgListener.java @@ -17,6 +17,7 @@ public class RedisGPSMsgListener implements MessageListener { @Override public void onMessage(Message message, byte[] bytes) { GPSMsgInfo gpsMsgInfo = JSON.parseObject(message.getBody(), GPSMsgInfo.class); + System.out.println(JSON.toJSON(gpsMsgInfo)); redisCatchStorage.updateGpsMsgInfo(gpsMsgInfo); } } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java b/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java index 709d5487..ea85cf19 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java @@ -2,10 +2,7 @@ package com.genersoft.iot.vmp.storager; import com.alibaba.fastjson.JSONObject; import com.genersoft.iot.vmp.common.StreamInfo; -import com.genersoft.iot.vmp.gb28181.bean.Device; -import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; -import com.genersoft.iot.vmp.gb28181.bean.ParentPlatformCatch; -import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; +import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.service.bean.GPSMsgInfo; import com.genersoft.iot.vmp.service.bean.ThirdPartyGB; @@ -196,4 +193,16 @@ public interface IRedisCatchStorage { void resetAllCSEQ(); void updateGpsMsgInfo(GPSMsgInfo gpsMsgInfo); + + GPSMsgInfo getGpsMsgInfo(String gbId); + + Long getSN(String method); + + void resetAllSN(); + + void updateSubscribe(String key, SubscribeInfo subscribeInfo); + + SubscribeInfo getSubscribe(String key); + + void delSubscribe(String key); } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/GbStreamMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/GbStreamMapper.java index fa6b51c1..3a26b4e9 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/dao/GbStreamMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/GbStreamMapper.java @@ -54,6 +54,11 @@ public interface GbStreamMapper { "WHERE pgs.platformId = '${platformId}'") List queryGbStreamListInPlatform(String platformId); + + @Select("SELECT gs.*, pgs.platformId as platformId, pgs.catalogId as catalogId FROM gb_stream gs LEFT JOIN platform_gb_stream pgs " + + "ON gs.app = pgs.app and gs.stream = pgs.stream WHERE pgs.app is NULL and pgs.stream is NULL") + List queryStreamNotInPlatform(); + @Update("UPDATE gb_stream " + "SET status=${status} " + "WHERE app=#{app} AND stream=#{stream}") @@ -87,4 +92,6 @@ public interface GbStreamMapper { " " + "") void batchAdd(List subList); + + } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java index 6ea768a7..adda2318 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java @@ -49,6 +49,18 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage { return result; } + @Override + public Long getSN(String method) { + String key = VideoManagerConstants.SIP_SN_PREFIX + userSetup.getServerId() + "_" + method; + + long result = redis.incr(key, 1L); + if (result > Integer.MAX_VALUE) { + redis.set(key, 1); + result = 1; + } + return result; + } + @Override public void resetAllCSEQ() { String scanKey = VideoManagerConstants.SIP_CSEQ_PREFIX + userSetup.getServerId() + "_*"; @@ -59,6 +71,16 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage { } } + @Override + public void resetAllSN() { + String scanKey = VideoManagerConstants.SIP_SN_PREFIX + userSetup.getServerId() + "_*"; + List keys = redis.scan(scanKey); + for (int i = 0; i < keys.size(); i++) { + String key = (String) keys.get(i); + redis.set(key, 1); + } + } + /** * 开始播放时将流存入redis * @@ -433,4 +455,25 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage { String key = VideoManagerConstants.WVP_STREAM_GPS_MSG_PREFIX + userSetup.getServerId() + "_" + gpsMsgInfo.getId(); redis.set(key, gpsMsgInfo); } + + @Override + public GPSMsgInfo getGpsMsgInfo(String gbId) { + String key = VideoManagerConstants.WVP_STREAM_GPS_MSG_PREFIX + userSetup.getServerId() + "_" + gbId; + return (GPSMsgInfo)redis.get(key); + } + + @Override + public void updateSubscribe(String key, SubscribeInfo subscribeInfo) { + redis.set(key, subscribeInfo, subscribeInfo.getExpires()); + } + + @Override + public SubscribeInfo getSubscribe(String key) { + return (SubscribeInfo)redis.get(key); + } + + @Override + public void delSubscribe(String key) { + redis.del(key); + } } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStoragerImpl.java b/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStoragerImpl.java index 7f0efcd3..d381574d 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStoragerImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStoragerImpl.java @@ -486,18 +486,21 @@ public class VideoManagerStoragerImpl implements IVideoManagerStorager { // 更新缓存 parentPlatformCatch.setParentPlatform(parentPlatform); redisCatchStorage.updatePlatformCatchInfo(parentPlatformCatch); - // 共享所有视频流,需要将现有视频流添加到此平台 - List gbStreams = gbStreamMapper.selectAll(); - if (gbStreams.size() > 0) { - for (GbStream gbStream : gbStreams) { - gbStream.setCatalogId(parentPlatform.getCatalogId()); - } - if (parentPlatform.isShareAllLiveStream()) { - gbStreamService.addPlatformInfo(gbStreams, parentPlatform.getServerGBId(), parentPlatform.getCatalogId()); - }else { - gbStreamService.delPlatformInfo(gbStreams); + if (parentPlatform.isEnable()) { + // 共享所有视频流,需要将现有视频流添加到此平台 + List gbStreams = gbStreamMapper.queryStreamNotInPlatform(); + if (gbStreams.size() > 0) { + for (GbStream gbStream : gbStreams) { + gbStream.setCatalogId(parentPlatform.getCatalogId()); + } + if (parentPlatform.isShareAllLiveStream()) { + gbStreamService.addPlatformInfo(gbStreams, parentPlatform.getServerGBId(), parentPlatform.getCatalogId()); + }else { + gbStreamService.delPlatformInfo(gbStreams); + } } } + return result > 0; }