diff --git a/src/main/java/com/genersoft/iot/vmp/conf/DynamicTask.java b/src/main/java/com/genersoft/iot/vmp/conf/DynamicTask.java new file mode 100644 index 00000000..145c29b5 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/conf/DynamicTask.java @@ -0,0 +1,42 @@ +package com.genersoft.iot.vmp.conf; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; +import org.springframework.scheduling.support.CronTrigger; +import org.springframework.stereotype.Component; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ScheduledFuture; + +/** + * 动态定时任务 + */ +@Component +public class DynamicTask { + + @Autowired + private ThreadPoolTaskScheduler threadPoolTaskScheduler; + + private Map> futureMap = new ConcurrentHashMap<>(); + + @Bean + public ThreadPoolTaskScheduler threadPoolTaskScheduler() { + return new ThreadPoolTaskScheduler(); + } + + public String startCron(String key, Runnable task, String corn) { + stopCron(key); + ScheduledFuture future = threadPoolTaskScheduler.schedule(task, new CronTrigger(corn)); + futureMap.put(key, future); + return "startCron"; + } + + public void stopCron(String key) { + if (futureMap.get(key) != null && !futureMap.get(key).isCancelled()) { + futureMap.get(key).cancel(true); + } + } + +} diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/ISIPRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/ISIPRequestProcessor.java new file mode 100644 index 00000000..8e79941f --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/ISIPRequestProcessor.java @@ -0,0 +1,14 @@ +package com.genersoft.iot.vmp.gb28181.transmit.event.request; + +import javax.sip.RequestEvent; + +/** + * @description: 对SIP事件进行处理,包括request, response, timeout, ioException, transactionTerminated,dialogTerminated + * @author: panlinlin + * @date: 2021年11月5日 15:47 + */ +public interface ISIPRequestProcessor { + + void process(RequestEvent event); + +} diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/SIPRequestProcessorAbstract.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/SIPRequestProcessorAbstract.java new file mode 100644 index 00000000..6cb9e7e0 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/SIPRequestProcessorAbstract.java @@ -0,0 +1,179 @@ +package com.genersoft.iot.vmp.gb28181.transmit.event.request; + +import gov.nist.javax.sip.SipProviderImpl; +import gov.nist.javax.sip.SipStackImpl; +import gov.nist.javax.sip.message.SIPRequest; +import gov.nist.javax.sip.stack.SIPServerTransaction; +import org.dom4j.Document; +import org.dom4j.DocumentException; +import org.dom4j.Element; +import org.dom4j.io.SAXReader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.InitializingBean; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; + +import javax.sip.*; +import javax.sip.address.Address; +import javax.sip.address.AddressFactory; +import javax.sip.address.SipURI; +import javax.sip.header.ContentTypeHeader; +import javax.sip.header.HeaderFactory; +import javax.sip.header.ViaHeader; +import javax.sip.message.MessageFactory; +import javax.sip.message.Request; +import javax.sip.message.Response; +import java.io.ByteArrayInputStream; +import java.text.ParseException; + +/** + * @description:处理接收IPCamera发来的SIP协议请求消息 + * @author: songww + * @date: 2020年5月3日 下午4:42:22 + */ +public abstract class SIPRequestProcessorAbstract implements InitializingBean, ISIPRequestProcessor { + + private final static Logger logger = LoggerFactory.getLogger(SIPRequestProcessorAbstract.class); + + @Autowired + @Qualifier(value="tcpSipProvider") + private SipProviderImpl tcpSipProvider; + + @Autowired + @Qualifier(value="udpSipProvider") + private SipProviderImpl udpSipProvider; + + /** + * 根据 RequestEvent 获取 ServerTransaction + * @param evt + * @return + */ + public ServerTransaction getServerTransaction(RequestEvent evt) { + Request request = evt.getRequest(); + ServerTransaction serverTransaction = evt.getServerTransaction(); + // 判断TCP还是UDP + boolean isTcp = false; + ViaHeader reqViaHeader = (ViaHeader) request.getHeader(ViaHeader.NAME); + String transport = reqViaHeader.getTransport(); + if (transport.equals("TCP")) { + isTcp = true; + } + + if (serverTransaction == null) { + try { + if (isTcp) { + SipStackImpl stack = (SipStackImpl)tcpSipProvider.getSipStack(); + serverTransaction = (SIPServerTransaction) stack.findTransaction((SIPRequest)request, true); + if (serverTransaction == null) { + serverTransaction = tcpSipProvider.getNewServerTransaction(request); + } + } else { + SipStackImpl stack = (SipStackImpl)udpSipProvider.getSipStack(); + serverTransaction = (SIPServerTransaction) stack.findTransaction((SIPRequest)request, true); + if (serverTransaction == null) { + serverTransaction = udpSipProvider.getNewServerTransaction(request); + } + } + } catch (TransactionAlreadyExistsException e) { + logger.error(e.getMessage()); + } catch (TransactionUnavailableException e) { + logger.error(e.getMessage()); + } + } + return serverTransaction; + } + + public AddressFactory getAddressFactory() { + try { + return SipFactory.getInstance().createAddressFactory(); + } catch (PeerUnavailableException e) { + e.printStackTrace(); + } + return null; + } + + public HeaderFactory getHeaderFactory() { + try { + return SipFactory.getInstance().createHeaderFactory(); + } catch (PeerUnavailableException e) { + e.printStackTrace(); + } + return null; + } + + public MessageFactory getMessageFactory() { + try { + return SipFactory.getInstance().createMessageFactory(); + } catch (PeerUnavailableException e) { + e.printStackTrace(); + } + return null; + } + + /*** + * 回复状态码 + * 100 trying + * 200 OK + * 400 + * 404 + * @param evt + * @throws SipException + * @throws InvalidArgumentException + * @throws ParseException + */ + public void responseAck(RequestEvent evt, int statusCode) throws SipException, InvalidArgumentException, ParseException { + Response response = getMessageFactory().createResponse(statusCode, evt.getRequest()); + ServerTransaction serverTransaction = getServerTransaction(evt); + serverTransaction.sendResponse(response); + if (statusCode >= 200) { + if (serverTransaction.getDialog() != null) serverTransaction.getDialog().delete(); + } + } + + public void responseAck(RequestEvent evt, int statusCode, String msg) throws SipException, InvalidArgumentException, ParseException { + Response response = getMessageFactory().createResponse(statusCode, evt.getRequest()); + response.setReasonPhrase(msg); + ServerTransaction serverTransaction = getServerTransaction(evt); + serverTransaction.sendResponse(response); + if (statusCode >= 200) { + if (serverTransaction.getDialog() != null) serverTransaction.getDialog().delete(); + } + } + + /** + * 回复带sdp的200 + * @param evt + * @param sdp + * @throws SipException + * @throws InvalidArgumentException + * @throws ParseException + */ + public void responseAck(RequestEvent evt, String sdp) throws SipException, InvalidArgumentException, ParseException { + Response response = getMessageFactory().createResponse(Response.OK, evt.getRequest()); + SipFactory sipFactory = SipFactory.getInstance(); + ContentTypeHeader contentTypeHeader = sipFactory.createHeaderFactory().createContentTypeHeader("APPLICATION", "SDP"); + response.setContent(sdp, contentTypeHeader); + + SipURI sipURI = (SipURI)evt.getRequest().getRequestURI(); + + Address concatAddress = sipFactory.createAddressFactory().createAddress( + sipFactory.createAddressFactory().createSipURI(sipURI.getUser(), sipURI.getHost()+":"+sipURI.getPort() + )); + response.addHeader(sipFactory.createHeaderFactory().createContactHeader(concatAddress)); + getServerTransaction(evt).sendResponse(response); + } + + public Element getRootElement(RequestEvent evt) throws DocumentException { + return getRootElement(evt, "gb2312"); + } + public Element getRootElement(RequestEvent evt, String charset) throws DocumentException { + if (charset == null) charset = "gb2312"; + Request request = evt.getRequest(); + SAXReader reader = new SAXReader(); + reader.setEncoding(charset); + Document xml = reader.read(new ByteArrayInputStream(request.getRawContent())); + return xml.getRootElement(); + } + +} diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java new file mode 100644 index 00000000..41217588 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java @@ -0,0 +1,123 @@ +package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl; + +import com.genersoft.iot.vmp.common.StreamInfo; +import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; +import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver; +import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorAbstract; +import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; +import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; +import com.genersoft.iot.vmp.service.IMediaServerService; +import com.genersoft.iot.vmp.storager.IRedisCatchStorage; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import javax.sip.Dialog; +import javax.sip.DialogState; +import javax.sip.RequestEvent; +import javax.sip.address.SipURI; +import javax.sip.header.FromHeader; +import javax.sip.header.HeaderAddress; +import javax.sip.header.ToHeader; +import java.util.HashMap; +import java.util.Map; + +/** + * @description:ACK请求处理器 + * @author: swwheihei + * @date: 2020年5月3日 下午5:31:45 + */ +@Component +public class AckRequestProcessor extends SIPRequestProcessorAbstract { + + private Logger logger = LoggerFactory.getLogger(AckRequestProcessor.class); + private String method = "ACK"; + + @Autowired + private SIPProcessorObserver sipProcessorObserver; + + @Override + public void afterPropertiesSet() throws Exception { + // 添加消息处理的订阅 + sipProcessorObserver.addRequestProcessor(method, this); + } + + @Autowired + private IRedisCatchStorage redisCatchStorage; + + @Autowired + private ZLMRTPServerFactory zlmrtpServerFactory; + + @Autowired + private IMediaServerService mediaServerService; + + + /** + * 处理 ACK请求 + * + * @param evt + */ + @Override + public void process(RequestEvent evt) { + Dialog dialog = evt.getDialog(); + if (dialog == null) return; + if (dialog.getState()== DialogState.CONFIRMED) { + String platformGbId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(FromHeader.NAME)).getAddress().getURI()).getUser(); + String channelId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(ToHeader.NAME)).getAddress().getURI()).getUser(); + SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(platformGbId, channelId); + String is_Udp = sendRtpItem.isTcp() ? "0" : "1"; + String deviceId = sendRtpItem.getDeviceId(); + StreamInfo streamInfo = null; + if (deviceId == null) { + streamInfo = new StreamInfo(); + streamInfo.setApp(sendRtpItem.getApp()); + streamInfo.setStreamId(sendRtpItem.getStreamId()); + }else { + streamInfo = redisCatchStorage.queryPlayByDevice(deviceId, channelId); + sendRtpItem.setStreamId(streamInfo.getStreamId()); + streamInfo.setApp("rtp"); + } + + redisCatchStorage.updateSendRTPSever(sendRtpItem); + logger.info(platformGbId); + logger.info(channelId); + Map param = new HashMap<>(); + param.put("vhost","__defaultVhost__"); + param.put("app",streamInfo.getApp()); + param.put("stream",streamInfo.getStreamId()); + param.put("ssrc", sendRtpItem.getSsrc()); + param.put("dst_url",sendRtpItem.getIp()); + param.put("dst_port", sendRtpItem.getPort()); + param.put("is_udp", is_Udp); + //param.put ("src_port", sendRtpItem.getLocalPort()); + // 设备推流查询,成功后才能转推 + boolean rtpPushed = false; + long startTime = System.currentTimeMillis(); + while (!rtpPushed) { + try { + if (System.currentTimeMillis() - startTime < 30 * 1000) { + MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); + if (zlmrtpServerFactory.isStreamReady(mediaInfo, streamInfo.getApp(), streamInfo.getStreamId())) { + rtpPushed = true; + logger.info("已获取设备推流[{}/{}],开始向上级推流[{}:{}]", + streamInfo.getApp() ,streamInfo.getStreamId(), sendRtpItem.getIp(), sendRtpItem.getPort()); + zlmrtpServerFactory.startSendRtpStream(mediaInfo, param); + } else { + logger.info("等待设备推流[{}/{}].......", + streamInfo.getApp() ,streamInfo.getStreamId()); + Thread.sleep(1000); + continue; + } + } else { + rtpPushed = true; + logger.info("设备推流[{}/{}]超时,终止向上级推流", + streamInfo.getApp() ,streamInfo.getStreamId()); + } + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java new file mode 100644 index 00000000..275e2cfb --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java @@ -0,0 +1,115 @@ +package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl; + +import com.genersoft.iot.vmp.common.StreamInfo; +import com.genersoft.iot.vmp.gb28181.bean.Device; +import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; +import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver; +import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander; +import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorAbstract; +import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; +import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; +import com.genersoft.iot.vmp.service.IMediaServerService; +import com.genersoft.iot.vmp.storager.IRedisCatchStorage; +import com.genersoft.iot.vmp.storager.IVideoManagerStorager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import javax.sip.*; +import javax.sip.address.SipURI; +import javax.sip.header.FromHeader; +import javax.sip.header.HeaderAddress; +import javax.sip.header.ToHeader; +import javax.sip.message.Response; +import java.text.ParseException; +import java.util.HashMap; +import java.util.Map; + +/** + * @description: BYE请求处理器 + * @author: lawrencehj + * @date: 2021年3月9日 + */ +@Component +public class ByeRequestProcessor extends SIPRequestProcessorAbstract { + + private Logger logger = LoggerFactory.getLogger(ByeRequestProcessor.class); + + @Autowired + private ISIPCommander cmder; + + @Autowired + private IRedisCatchStorage redisCatchStorage; + + @Autowired + private IVideoManagerStorager storager; + + @Autowired + private ZLMRTPServerFactory zlmrtpServerFactory; + + @Autowired + private IMediaServerService mediaServerService; + + private String method = "BYE"; + + @Autowired + private SIPProcessorObserver sipProcessorObserver; + + @Override + public void afterPropertiesSet() throws Exception { + // 添加消息处理的订阅 + sipProcessorObserver.addRequestProcessor(method, this); + } + + /** + * 处理BYE请求 + * @param evt + */ + @Override + public void process(RequestEvent evt) { + try { + responseAck(evt, Response.OK); + Dialog dialog = evt.getDialog(); + if (dialog == null) return; + if (dialog.getState().equals(DialogState.TERMINATED)) { + String platformGbId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(FromHeader.NAME)).getAddress().getURI()).getUser(); + String channelId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(ToHeader.NAME)).getAddress().getURI()).getUser(); + SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(platformGbId, channelId); + logger.info("收到bye, [{}/{}]", platformGbId, channelId); + if (sendRtpItem != null){ + String streamId = sendRtpItem.getStreamId(); + Map param = new HashMap<>(); + param.put("vhost","__defaultVhost__"); + param.put("app",sendRtpItem.getApp()); + param.put("stream",streamId); + param.put("ssrc",sendRtpItem.getSsrc()); + logger.info("停止向上级推流:" + streamId); + MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); + zlmrtpServerFactory.stopSendRtpStream(mediaInfo, param); + redisCatchStorage.deleteSendRTPServer(platformGbId, channelId); + if (zlmrtpServerFactory.totalReaderCount(mediaInfo, sendRtpItem.getApp(), streamId) == 0) { + logger.info(streamId + "无其它观看者,通知设备停止推流"); + cmder.streamByeCmd(sendRtpItem.getDeviceId(), channelId); + } + } + // 可能是设备主动停止 + Device device = storager.queryVideoDeviceByChannelId(platformGbId); + if (device != null) { + StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(device.getDeviceId(), channelId); + if (streamInfo != null) { + redisCatchStorage.stopPlay(streamInfo); + } + storager.stopPlay(device.getDeviceId(), channelId); + mediaServerService.closeRTPServer(device, channelId); + } + } + } catch (SipException e) { + e.printStackTrace(); + } catch (InvalidArgumentException e) { + e.printStackTrace(); + } catch (ParseException e) { + e.printStackTrace(); + } + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/CancelRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/CancelRequestProcessor.java new file mode 100644 index 00000000..b2bd9d9f --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/CancelRequestProcessor.java @@ -0,0 +1,40 @@ +package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl; + +import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver; +import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorAbstract; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import javax.sip.RequestEvent; + +/** + * @description:CANCEL请求处理器 + * @author: swwheihei + * @date: 2020年5月3日 下午5:32:23 + */ +@Component +public class CancelRequestProcessor extends SIPRequestProcessorAbstract { + + private String method = "CANCEL"; + + @Autowired + private SIPProcessorObserver sipProcessorObserver; + + @Override + public void afterPropertiesSet() throws Exception { + // 添加消息处理的订阅 + sipProcessorObserver.addRequestProcessor(method, this); + } + + /** + * 处理CANCEL请求 + * + * @param evt 事件 + */ + @Override + public void process(RequestEvent evt) { + // TODO 优先级99 Cancel Request消息实现,此消息一般为级联消息,上级给下级发送请求取消指令 + + } + +} diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java new file mode 100644 index 00000000..6984fab8 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java @@ -0,0 +1,386 @@ +package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl; + +import com.genersoft.iot.vmp.gb28181.bean.*; +import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver; +import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander; +import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform; +import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorAbstract; +import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; +import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; +import com.genersoft.iot.vmp.service.IMediaServerService; +import com.genersoft.iot.vmp.service.IPlayService; +import com.genersoft.iot.vmp.storager.IRedisCatchStorage; +import com.genersoft.iot.vmp.storager.IVideoManagerStorager; +import com.genersoft.iot.vmp.vmanager.gb28181.play.bean.PlayResult; +import gov.nist.javax.sip.address.AddressImpl; +import gov.nist.javax.sip.address.SipUri; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import javax.sdp.*; +import javax.sip.InvalidArgumentException; +import javax.sip.RequestEvent; +import javax.sip.ServerTransaction; +import javax.sip.SipException; +import javax.sip.address.SipURI; +import javax.sip.header.FromHeader; +import javax.sip.message.Request; +import javax.sip.message.Response; +import java.text.ParseException; +import java.util.Vector; + +/** + * @description:处理INVITE请求 + * @author: panll + * @date: 2021年1月14日 + */ +@SuppressWarnings("rawtypes") +@Component +public class InviteRequestProcessor extends SIPRequestProcessorAbstract { + + private final static Logger logger = LoggerFactory.getLogger(InviteRequestProcessor.class); + + private String method = "INVITE"; + + @Autowired + private SIPCommanderFroPlatform cmderFroPlatform; + + @Autowired + private IVideoManagerStorager storager; + + @Autowired + private IRedisCatchStorage redisCatchStorage; + + @Autowired + private SIPCommander cmder; + + @Autowired + private IPlayService playService; + + @Autowired + private ZLMRTPServerFactory zlmrtpServerFactory; + + @Autowired + private IMediaServerService mediaServerService; + + @Autowired + private SIPProcessorObserver sipProcessorObserver; + + @Override + public void afterPropertiesSet() throws Exception { + // 添加消息处理的订阅 + sipProcessorObserver.addRequestProcessor(method, this); + } + + /** + * 处理invite请求 + * + * @param evt + * 请求消息 + */ + @Override + public void process(RequestEvent evt) { + // Invite Request消息实现,此消息一般为级联消息,上级给下级发送请求视频指令 + try { + Request request = evt.getRequest(); + SipURI sipURI = (SipURI) request.getRequestURI(); + String channelId = sipURI.getUser(); + String requesterId = null; + + FromHeader fromHeader = (FromHeader)request.getHeader(FromHeader.NAME); + AddressImpl address = (AddressImpl) fromHeader.getAddress(); + SipUri uri = (SipUri) address.getURI(); + requesterId = uri.getUser(); + + if (requesterId == null || channelId == null) { + logger.info("无法从FromHeader的Address中获取到平台id,返回400"); + responseAck(evt, Response.BAD_REQUEST); // 参数不全, 发400,请求错误 + return; + } + + // 查询请求方是否上级平台 + ParentPlatform platform = storager.queryParentPlatByServerGBId(requesterId); + if (platform != null) { + // 查询平台下是否有该通道 + DeviceChannel channel = storager.queryChannelInParentPlatform(requesterId, channelId); + GbStream gbStream = storager.queryStreamInParentPlatform(requesterId, channelId); + MediaServerItem mediaServerItem = null; + // 不是通道可能是直播流 + if (channel != null && gbStream == null ) { + if (channel.getStatus() == 0) { + logger.info("通道离线,返回400"); + responseAck(evt, Response.BAD_REQUEST, "channel [" + channel.getChannelId() + "] offline"); + return; + } + responseAck(evt, Response.CALL_IS_BEING_FORWARDED); // 通道存在,发181,呼叫转接中 + }else if(channel == null && gbStream != null){ + String mediaServerId = gbStream.getMediaServerId(); + mediaServerItem = mediaServerService.getOne(mediaServerId); + if (mediaServerItem == null) { + logger.info("[ app={}, stream={} ]找不到zlm {},返回410",gbStream.getApp(), gbStream.getStream(), mediaServerId); + responseAck(evt, Response.GONE, "media server not found"); + return; + } + Boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServerItem, gbStream.getApp(), gbStream.getStream()); + if (!streamReady ) { + logger.info("[ app={}, stream={} ]通道离线,返回400",gbStream.getApp(), gbStream.getStream()); + responseAck(evt, Response.BAD_REQUEST, "channel [" + gbStream.getGbId() + "] offline"); + return; + } + responseAck(evt, Response.CALL_IS_BEING_FORWARDED); // 通道存在,发181,呼叫转接中 + }else { + logger.info("通道不存在,返回404"); + responseAck(evt, Response.NOT_FOUND); // 通道不存在,发404,资源不存在 + return; + } + // 解析sdp消息, 使用jainsip 自带的sdp解析方式 + String contentString = new String(request.getRawContent()); + + // jainSip不支持y=字段, 移除移除以解析。 + int ssrcIndex = contentString.indexOf("y="); + //ssrc规定长度为10字节,不取余下长度以避免后续还有“f=”字段 + String ssrc = contentString.substring(ssrcIndex + 2, ssrcIndex + 12); + String substring = contentString.substring(0, contentString.indexOf("y=")); + SessionDescription sdp = SdpFactory.getInstance().createSessionDescription(substring); + + // 获取支持的格式 + Vector mediaDescriptions = sdp.getMediaDescriptions(true); + // 查看是否支持PS 负载96 + //String ip = null; + int port = -1; + //boolean recvonly = false; + boolean mediaTransmissionTCP = false; + Boolean tcpActive = null; + for (Object description : mediaDescriptions) { + MediaDescription mediaDescription = (MediaDescription) description; + Media media = mediaDescription.getMedia(); + + Vector mediaFormats = media.getMediaFormats(false); + if (mediaFormats.contains("96")) { + port = media.getMediaPort(); + //String mediaType = media.getMediaType(); + String protocol = media.getProtocol(); + + // 区分TCP发流还是udp, 当前默认udp + if ("TCP/RTP/AVP".equals(protocol)) { + String setup = mediaDescription.getAttribute("setup"); + if (setup != null) { + mediaTransmissionTCP = true; + if ("active".equals(setup)) { + tcpActive = true; + } else if ("passive".equals(setup)) { + tcpActive = false; + } + } + } + break; + } + } + if (port == -1) { + logger.info("不支持的媒体格式,返回415"); + // 回复不支持的格式 + responseAck(evt, Response.UNSUPPORTED_MEDIA_TYPE); // 不支持的格式,发415 + return; + } + String username = sdp.getOrigin().getUsername(); + String addressStr = sdp.getOrigin().getAddress(); + //String sessionName = sdp.getSessionName().getValue(); + logger.info("[上级点播]用户:{}, 地址:{}:{}, ssrc:{}", username, addressStr, port, ssrc); + Device device = null; + // 通过 channel 和 gbStream 是否为null 值判断来源是直播流合适国标 + if (channel != null) { + device = storager.queryVideoDeviceByPlatformIdAndChannelId(requesterId, channelId); + if (device == null) { + logger.warn("点播平台{}的通道{}时未找到设备信息", requesterId, channel); + responseAck(evt, Response.SERVER_INTERNAL_ERROR); + return; + } + mediaServerItem = playService.getNewMediaServerItem(device); + if (mediaServerItem == null) { + logger.warn("未找到可用的zlm"); + responseAck(evt, Response.BUSY_HERE); + return; + } + SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId, + device.getDeviceId(), channelId, + mediaTransmissionTCP); + if (tcpActive != null) { + sendRtpItem.setTcpActive(tcpActive); + } + if (sendRtpItem == null) { + logger.warn("服务器端口资源不足"); + responseAck(evt, Response.BUSY_HERE); + return; + } + + // 写入redis, 超时时回复 + redisCatchStorage.updateSendRTPSever(sendRtpItem); + // 通知下级推流, + PlayResult playResult = playService.play(mediaServerItem,device.getDeviceId(), channelId, (mediaServerItemInUSe, responseJSON)->{ + // 收到推流, 回复200OK, 等待ack + // if (sendRtpItem == null) return; + sendRtpItem.setStatus(1); + redisCatchStorage.updateSendRTPSever(sendRtpItem); + // TODO 添加对tcp的支持 + + StringBuffer content = new StringBuffer(200); + content.append("v=0\r\n"); + content.append("o="+ channelId +" 0 0 IN IP4 "+mediaServerItemInUSe.getSdpIp()+"\r\n"); + content.append("s=Play\r\n"); + content.append("c=IN IP4 "+mediaServerItemInUSe.getSdpIp()+"\r\n"); + content.append("t=0 0\r\n"); + content.append("m=video "+ sendRtpItem.getLocalPort()+" RTP/AVP 96\r\n"); + content.append("a=sendonly\r\n"); + content.append("a=rtpmap:96 PS/90000\r\n"); + content.append("y="+ ssrc + "\r\n"); + content.append("f=\r\n"); + + try { + responseAck(evt, content.toString()); + } catch (SipException e) { + e.printStackTrace(); + } catch (InvalidArgumentException e) { + e.printStackTrace(); + } catch (ParseException e) { + e.printStackTrace(); + } + } ,((event) -> { + // 未知错误。直接转发设备点播的错误 + Response response = null; + try { + response = getMessageFactory().createResponse(event.statusCode, evt.getRequest()); + ServerTransaction serverTransaction = getServerTransaction(evt); + serverTransaction.sendResponse(response); + if (serverTransaction.getDialog() != null) serverTransaction.getDialog().delete(); + } catch (ParseException | SipException | InvalidArgumentException e) { + e.printStackTrace(); + } + })); + if (logger.isDebugEnabled()) { + logger.debug(playResult.getResult().toString()); + } + + }else if (gbStream != null) { + SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId, + gbStream.getApp(), gbStream.getStream(), channelId, + mediaTransmissionTCP); + + if (tcpActive != null) { + sendRtpItem.setTcpActive(tcpActive); + } + if (sendRtpItem == null) { + logger.warn("服务器端口资源不足"); + responseAck(evt, Response.BUSY_HERE); + return; + } + + // 写入redis, 超时时回复 + redisCatchStorage.updateSendRTPSever(sendRtpItem); + + sendRtpItem.setStatus(1); + redisCatchStorage.updateSendRTPSever(sendRtpItem); + // TODO 添加对tcp的支持 + StringBuffer content = new StringBuffer(200); + content.append("v=0\r\n"); + content.append("o="+ channelId +" 0 0 IN IP4 "+mediaServerItem.getSdpIp()+"\r\n"); + content.append("s=Play\r\n"); + content.append("c=IN IP4 "+mediaServerItem.getSdpIp()+"\r\n"); + content.append("t=0 0\r\n"); + content.append("m=video "+ sendRtpItem.getLocalPort()+" RTP/AVP 96\r\n"); + content.append("a=sendonly\r\n"); + content.append("a=rtpmap:96 PS/90000\r\n"); + content.append("y="+ ssrc + "\r\n"); + content.append("f=\r\n"); + + try { + responseAck(evt, content.toString()); + } catch (SipException e) { + e.printStackTrace(); + } catch (InvalidArgumentException e) { + e.printStackTrace(); + } catch (ParseException e) { + e.printStackTrace(); + } + } + + } else { + // 非上级平台请求,查询是否设备请求(通常为接收语音广播的设备) + Device device = storager.queryVideoDevice(requesterId); + if (device != null) { + logger.info("收到设备" + requesterId + "的语音广播Invite请求"); + responseAck(evt, Response.TRYING); + + String contentString = new String(request.getRawContent()); + // jainSip不支持y=字段, 移除移除以解析。 + String substring = contentString; + String ssrc = "0000000404"; + int ssrcIndex = contentString.indexOf("y="); + if (ssrcIndex > 0) { + substring = contentString.substring(0, ssrcIndex); + ssrc = contentString.substring(ssrcIndex + 2, ssrcIndex + 12); + } + ssrcIndex = substring.indexOf("f="); + if (ssrcIndex > 0) { + substring = contentString.substring(0, ssrcIndex); + } + SessionDescription sdp = SdpFactory.getInstance().createSessionDescription(substring); + + // 获取支持的格式 + Vector mediaDescriptions = sdp.getMediaDescriptions(true); + // 查看是否支持PS 负载96 + int port = -1; + //boolean recvonly = false; + boolean mediaTransmissionTCP = false; + Boolean tcpActive = null; + for (int i = 0; i < mediaDescriptions.size(); i++) { + MediaDescription mediaDescription = (MediaDescription)mediaDescriptions.get(i); + Media media = mediaDescription.getMedia(); + + Vector mediaFormats = media.getMediaFormats(false); + if (mediaFormats.contains("8")) { + port = media.getMediaPort(); + String protocol = media.getProtocol(); + // 区分TCP发流还是udp, 当前默认udp + if ("TCP/RTP/AVP".equals(protocol)) { + String setup = mediaDescription.getAttribute("setup"); + if (setup != null) { + mediaTransmissionTCP = true; + if ("active".equals(setup)) { + tcpActive = true; + } else if ("passive".equals(setup)) { + tcpActive = false; + } + } + } + break; + } + } + if (port == -1) { + logger.info("不支持的媒体格式,返回415"); + // 回复不支持的格式 + responseAck(evt, Response.UNSUPPORTED_MEDIA_TYPE); // 不支持的格式,发415 + return; + } + String username = sdp.getOrigin().getUsername(); + String addressStr = sdp.getOrigin().getAddress(); + logger.info("设备{}请求语音流,地址:{}:{},ssrc:{}", username, addressStr, port, ssrc); + + } else { + logger.warn("来自无效设备/平台的请求"); + responseAck(evt, Response.BAD_REQUEST); + } + } + + } catch (SipException | InvalidArgumentException | ParseException e) { + e.printStackTrace(); + logger.warn("sdp解析错误"); + e.printStackTrace(); + } catch (SdpParseException e) { + e.printStackTrace(); + } catch (SdpException e) { + e.printStackTrace(); + } + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/MessageRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/MessageRequestProcessor.java new file mode 100644 index 00000000..f4d2b864 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/MessageRequestProcessor.java @@ -0,0 +1,1103 @@ +package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl; + +import com.alibaba.fastjson.JSONObject; +import com.genersoft.iot.vmp.VManageBootstrap; +import com.genersoft.iot.vmp.common.StreamInfo; +import com.genersoft.iot.vmp.common.VideoManagerConstants; +import com.genersoft.iot.vmp.conf.SipConfig; +import com.genersoft.iot.vmp.conf.UserSetup; +import com.genersoft.iot.vmp.gb28181.bean.*; +import com.genersoft.iot.vmp.gb28181.event.DeviceOffLineDetector; +import com.genersoft.iot.vmp.gb28181.event.EventPublisher; +import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver; +import com.genersoft.iot.vmp.gb28181.transmit.callback.CheckForAllRecordsThread; +import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; +import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage; +import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander; +import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform; +import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorAbstract; +import com.genersoft.iot.vmp.gb28181.utils.DateUtil; +import com.genersoft.iot.vmp.gb28181.utils.NumericUtil; +import com.genersoft.iot.vmp.gb28181.utils.SipUtils; +import com.genersoft.iot.vmp.gb28181.utils.XmlUtil; +import com.genersoft.iot.vmp.service.IDeviceAlarmService; +import com.genersoft.iot.vmp.storager.IRedisCatchStorage; +import com.genersoft.iot.vmp.storager.IVideoManagerStorager; +import com.genersoft.iot.vmp.utils.GpsUtil; +import com.genersoft.iot.vmp.utils.SpringBeanFactory; +import com.genersoft.iot.vmp.utils.redis.RedisUtil; +import com.genersoft.iot.vmp.vmanager.gb28181.platform.bean.ChannelReduce; +import gov.nist.javax.sip.SipStackImpl; +import gov.nist.javax.sip.address.SipUri; +import org.dom4j.DocumentException; +import org.dom4j.Element; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; +import org.springframework.util.StringUtils; + +import javax.sip.*; +import javax.sip.address.SipURI; +import javax.sip.header.FromHeader; +import javax.sip.header.HeaderAddress; +import javax.sip.header.ToHeader; +import javax.sip.message.Response; +import java.text.ParseException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.UUID; + +import static com.genersoft.iot.vmp.gb28181.utils.XmlUtil.getText; + +/** + * @description:MESSAGE请求处理器 + * @author: swwheihei + * @date: 2020年5月3日 下午5:32:41 + */ +@SuppressWarnings(value={"unchecked", "rawtypes"}) +@Component +public class MessageRequestProcessor extends SIPRequestProcessorAbstract { + + public static volatile List threadNameList = new ArrayList(); + private final static Logger logger = LoggerFactory.getLogger(MessageRequestProcessor.class); + + private final static String CACHE_RECORDINFO_KEY = "CACHE_RECORDINFO_"; + private static final String MESSAGE_KEEP_ALIVE = "Keepalive"; + private static final String MESSAGE_CONFIG_DOWNLOAD = "ConfigDownload"; + private static final String MESSAGE_CATALOG = "Catalog"; + private static final String MESSAGE_DEVICE_INFO = "DeviceInfo"; + private static final String MESSAGE_ALARM = "Alarm"; + private static final String MESSAGE_RECORD_INFO = "RecordInfo"; + private static final String MESSAGE_MEDIA_STATUS = "MediaStatus"; + private static final String MESSAGE_BROADCAST = "Broadcast"; + private static final String MESSAGE_DEVICE_STATUS = "DeviceStatus"; + private static final String MESSAGE_DEVICE_CONTROL = "DeviceControl"; + private static final String MESSAGE_DEVICE_CONFIG = "DeviceConfig"; + private static final String MESSAGE_MOBILE_POSITION = "MobilePosition"; + private static final String MESSAGE_PRESET_QUERY = "PresetQuery"; + private String method = "MESSAGE"; + + @Autowired + private UserSetup userSetup; + + @Autowired + private SIPCommander cmder; + + @Autowired + private SipConfig config; + + @Autowired + private SIPCommanderFroPlatform cmderFroPlatform; + + @Autowired + private IVideoManagerStorager storager; + + @Autowired + private IRedisCatchStorage redisCatchStorage; + + @Autowired + private EventPublisher publisher; + + @Autowired + private RedisUtil redis; + + @Autowired + private DeferredResultHolder deferredResultHolder; + + @Autowired + private DeviceOffLineDetector offLineDetector; + + @Autowired + private IDeviceAlarmService deviceAlarmService; + + @Autowired + private SIPProcessorObserver sipProcessorObserver; + + @Override + public void afterPropertiesSet() throws Exception { + // 添加消息处理的订阅 + sipProcessorObserver.addRequestProcessor(method, this); + } + + /** + * 处理MESSAGE请求 + * + * @param evt + */ + @Override + public void process(RequestEvent evt) { + + try { + Element rootElement = getRootElement(evt); + String cmd = getText(rootElement, "CmdType"); + + if (MESSAGE_KEEP_ALIVE.equals(cmd)) { + logger.debug("接收到KeepAlive消息"); + processMessageKeepAlive(evt); + } else if (MESSAGE_CONFIG_DOWNLOAD.equals(cmd)) { + logger.debug("接收到ConfigDownload消息"); + processMessageConfigDownload(evt); + } else if (MESSAGE_CATALOG.equals(cmd)) { + logger.debug("接收到Catalog消息"); + processMessageCatalogList(evt); + } else if (MESSAGE_DEVICE_INFO.equals(cmd)) { + // DeviceInfo消息处理 + processMessageDeviceInfo(evt); + } else if (MESSAGE_DEVICE_STATUS.equals(cmd)) { + // DeviceStatus消息处理 + processMessageDeviceStatus(evt); + } else if (MESSAGE_DEVICE_CONTROL.equals(cmd)) { + logger.debug("接收到DeviceControl消息"); + processMessageDeviceControl(evt); + } else if (MESSAGE_DEVICE_CONFIG.equals(cmd)) { + logger.info("接收到DeviceConfig消息"); + processMessageDeviceConfig(evt); + } else if (MESSAGE_ALARM.equals(cmd)) { + logger.debug("接收到Alarm消息"); + processMessageAlarm(evt); + } else if (MESSAGE_RECORD_INFO.equals(cmd)) { + logger.debug("接收到RecordInfo消息"); + processMessageRecordInfo(evt); + }else if (MESSAGE_MEDIA_STATUS.equals(cmd)) { + logger.debug("接收到MediaStatus消息"); + processMessageMediaStatus(evt); + } else if (MESSAGE_MOBILE_POSITION.equals(cmd)) { + logger.debug("接收到MobilePosition消息"); + processMessageMobilePosition(evt); + } else if (MESSAGE_PRESET_QUERY.equals(cmd)) { + logger.debug("接收到PresetQuery消息"); + processMessagePresetQuery(evt); + } else if (MESSAGE_BROADCAST.equals(cmd)) { + // Broadcast消息处理 + processMessageBroadcast(evt); + } else { + logger.debug("接收到消息:" + cmd); + responseAck(evt, Response.OK); + } + } catch (DocumentException | SipException |InvalidArgumentException | ParseException e) { + e.printStackTrace(); + } + } + + /** + * 处理MobilePosition移动位置消息 + * + * @param evt + */ + private void processMessageMobilePosition(RequestEvent evt) { + try { + String deviceId = SipUtils.getUserIdFromFromHeader(evt.getRequest()); + Device device = storager.queryVideoDevice(deviceId); + if (device == null) { + logger.warn("处理MobilePosition移动位置消息时未找到设备信息"); + responseAck(evt, Response.NOT_FOUND); + return; + } + Element rootElement = getRootElement(evt, device.getCharset()); + + MobilePosition mobilePosition = new MobilePosition(); + if (!StringUtils.isEmpty(device.getName())) { + mobilePosition.setDeviceName(device.getName()); + } + mobilePosition.setDeviceId(deviceId); + mobilePosition.setChannelId(getText(rootElement, "DeviceID")); + mobilePosition.setTime(getText(rootElement, "Time")); + mobilePosition.setLongitude(Double.parseDouble(getText(rootElement, "Longitude"))); + mobilePosition.setLatitude(Double.parseDouble(getText(rootElement, "Latitude"))); + if (NumericUtil.isDouble(getText(rootElement, "Speed"))) { + mobilePosition.setSpeed(Double.parseDouble(getText(rootElement, "Speed"))); + } else { + mobilePosition.setSpeed(0.0); + } + if (NumericUtil.isDouble(getText(rootElement, "Direction"))) { + mobilePosition.setDirection(Double.parseDouble(getText(rootElement, "Direction"))); + } else { + mobilePosition.setDirection(0.0); + } + if (NumericUtil.isDouble(getText(rootElement, "Altitude"))) { + mobilePosition.setAltitude(Double.parseDouble(getText(rootElement, "Altitude"))); + } else { + mobilePosition.setAltitude(0.0); + } + mobilePosition.setReportSource("Mobile Position"); + BaiduPoint bp = new BaiduPoint(); + bp = GpsUtil.Wgs84ToBd09(String.valueOf(mobilePosition.getLongitude()), String.valueOf(mobilePosition.getLatitude())); + logger.info("百度坐标:" + bp.getBdLng() + ", " + bp.getBdLat()); + mobilePosition.setGeodeticSystem("BD-09"); + mobilePosition.setCnLng(bp.getBdLng()); + mobilePosition.setCnLat(bp.getBdLat()); + if (!userSetup.getSavePositionHistory()) { + storager.clearMobilePositionsByDeviceId(deviceId); + } + storager.insertMobilePosition(mobilePosition); + //回复 200 OK + responseAck(evt, Response.OK); + } catch (DocumentException | SipException | InvalidArgumentException | ParseException e) { + e.printStackTrace(); + } + } + + /** + * 处理DeviceStatus设备状态Message + * + * @param evt + */ + private void processMessageDeviceStatus(RequestEvent evt) { + try { + String deviceId = SipUtils.getUserIdFromFromHeader(evt.getRequest()); + Device device = storager.queryVideoDevice(deviceId); + if (device == null) { + logger.warn("处理DeviceStatus设备状态Message时未找到设备信息"); + responseAck(evt, Response.NOT_FOUND); + return; + } + Element rootElement = getRootElement(evt); + String name = rootElement.getName(); + Element deviceIdElement = rootElement.element("DeviceID"); + String channelId = deviceIdElement.getText(); + if (name.equalsIgnoreCase("Query")) { // 区分是Response——查询响应,还是Query——查询请求 + logger.info("接收到DeviceStatus查询消息"); + FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME); + String platformId = ((SipUri) fromHeader.getAddress().getURI()).getUser(); + if (platformId == null) { + responseAck(evt, Response.NOT_FOUND); + return; + } else { + // 回复200 OK + responseAck(evt, Response.OK); + String sn = rootElement.element("SN").getText(); + ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(platformId); + cmderFroPlatform.deviceStatusResponse(parentPlatform, sn, fromHeader.getTag()); + } + } else { + logger.info("接收到DeviceStatus应答消息"); + // 检查设备是否存在, 不存在则不回复 + if (storager.exists(deviceId)) { + // 回复200 OK + responseAck(evt, Response.OK); + JSONObject json = new JSONObject(); + XmlUtil.node2Json(rootElement, json); + if (logger.isDebugEnabled()) { + logger.debug(json.toJSONString()); + } + RequestMessage msg = new RequestMessage(); + msg.setKey(DeferredResultHolder.CALLBACK_CMD_DEVICESTATUS + deviceId + channelId); + msg.setData(json); + deferredResultHolder.invokeAllResult(msg); + + if (offLineDetector.isOnline(deviceId)) { + publisher.onlineEventPublish(device, VideoManagerConstants.EVENT_ONLINE_MESSAGE); + } else { + } + } + } + + } catch (ParseException | SipException | InvalidArgumentException | DocumentException e) { + e.printStackTrace(); + } + } + + /** + * 处理DeviceControl设备状态Message + * + * @param evt + */ + private void processMessageDeviceControl(RequestEvent evt) { + try { + String deviceId = SipUtils.getUserIdFromFromHeader(evt.getRequest()); + Device device = storager.queryVideoDevice(deviceId); + if (device == null) { + logger.warn("处理DeviceControl设备状态Message未找到设备信息"); + responseAck(evt, Response.NOT_FOUND); + return; + } + Element rootElement = getRootElement(evt); + String channelId = getText(rootElement, "DeviceID"); + //String result = getText(rootElement, "Result"); + // 回复200 OK + responseAck(evt, Response.OK); + if (rootElement.getName().equals("Response")) {//} !StringUtils.isEmpty(result)) { + // 此处是对本平台发出DeviceControl指令的应答 + JSONObject json = new JSONObject(); + XmlUtil.node2Json(rootElement, json); + if (logger.isDebugEnabled()) { + logger.debug(json.toJSONString()); + } + RequestMessage msg = new RequestMessage(); + String key = DeferredResultHolder.CALLBACK_CMD_DEVICECONTROL + deviceId + channelId; + msg.setKey(key); + msg.setData(json); + deferredResultHolder.invokeAllResult(msg); + } else { + // 此处是上级发出的DeviceControl指令 + String platformId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(FromHeader.NAME)).getAddress().getURI()).getUser(); + String targetGBId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(ToHeader.NAME)).getAddress().getURI()).getUser(); + // 远程启动功能 + if (!StringUtils.isEmpty(getText(rootElement, "TeleBoot"))) { + if (deviceId.equals(targetGBId)) { + // 远程启动本平台:需要在重新启动程序后先对SipStack解绑 + logger.info("执行远程启动本平台命令"); + ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(platformId); + cmderFroPlatform.unregister(parentPlatform, null, null); + + Thread restartThread = new Thread(new Runnable() { + @Override + public void run() { + try { + Thread.sleep(3000); + SipProvider up = (SipProvider) SpringBeanFactory.getBean("udpSipProvider"); + SipStackImpl stack = (SipStackImpl)up.getSipStack(); + stack.stop(); + Iterator listener = stack.getListeningPoints(); + while (listener.hasNext()) { + stack.deleteListeningPoint((ListeningPoint) listener.next()); + } + Iterator providers = stack.getSipProviders(); + while (providers.hasNext()) { + stack.deleteSipProvider((SipProvider) providers.next()); + } + VManageBootstrap.restart(); + } catch (InterruptedException ignored) { + } catch (ObjectInUseException e) { + e.printStackTrace(); + } + } + }); + + restartThread.setDaemon(false); + restartThread.start(); + } else { + // 远程启动指定设备 + } + } + // 云台/前端控制命令 + if (!StringUtils.isEmpty(getText(rootElement,"PTZCmd")) && !deviceId.equals(targetGBId)) { + String cmdString = getText(rootElement,"PTZCmd"); + Device deviceForPlatform = storager.queryVideoDeviceByPlatformIdAndChannelId(platformId, deviceId); + cmder.fronEndCmd(deviceForPlatform, deviceId, cmdString); + } + } + } catch (ParseException | SipException | InvalidArgumentException | DocumentException e) { + e.printStackTrace(); + } + } + + /** + * 处理DeviceConfig设备状态Message + * + * @param evt + */ + private void processMessageDeviceConfig(RequestEvent evt) { + try { + String deviceId = SipUtils.getUserIdFromFromHeader(evt.getRequest()); + // 查询设备是否存在 + Device device = storager.queryVideoDevice(deviceId); + if (device == null) { + logger.warn("处理DeviceConfig设备状态Message消息时未找到设备信息"); + responseAck(evt, Response.NOT_FOUND); + return; + } + Element rootElement = getRootElement(evt); + String channelId = getText(rootElement, "DeviceID"); + // 回复200 OK + responseAck(evt, Response.OK); + if (rootElement.getName().equals("Response")) { + // 此处是对本平台发出DeviceControl指令的应答 + JSONObject json = new JSONObject(); + XmlUtil.node2Json(rootElement, json); + if (logger.isDebugEnabled()) { + logger.debug(json.toJSONString()); + } + String key = DeferredResultHolder.CALLBACK_CMD_DEVICECONFIG + deviceId + channelId; + RequestMessage msg = new RequestMessage(); + msg.setKey(key); + msg.setData(json); + deferredResultHolder.invokeAllResult(msg); + } else { + // 此处是上级发出的DeviceConfig指令 + } + } catch (ParseException | SipException | InvalidArgumentException | DocumentException e) { + e.printStackTrace(); + } + } + + /** + * 处理ConfigDownload设备状态Message + * + * @param evt + */ + private void processMessageConfigDownload(RequestEvent evt) { + try { + String deviceId = SipUtils.getUserIdFromFromHeader(evt.getRequest()); + // 查询设备是否存在 + Device device = storager.queryVideoDevice(deviceId); + if (device == null) { + logger.warn("处理ConfigDownload设备状态Message时未找到设备信息"); + responseAck(evt, Response.NOT_FOUND); + return; + } + Element rootElement = getRootElement(evt); + String channelId = getText(rootElement, "DeviceID"); + String key = DeferredResultHolder.CALLBACK_CMD_CONFIGDOWNLOAD + deviceId + channelId; + // 回复200 OK + responseAck(evt, Response.OK); + if (rootElement.getName().equals("Response")) { + // 此处是对本平台发出DeviceControl指令的应答 + JSONObject json = new JSONObject(); + XmlUtil.node2Json(rootElement, json); + if (logger.isDebugEnabled()) { + logger.debug(json.toJSONString()); + } + RequestMessage msg = new RequestMessage(); + msg.setKey(key); + msg.setData(json); + deferredResultHolder.invokeAllResult(msg); + } else { + // 此处是上级发出的DeviceConfig指令 + } + } catch (ParseException | SipException | InvalidArgumentException | DocumentException e) { + e.printStackTrace(); + } + } + + /** + * 处理PresetQuery预置位列表Message + * + * @param evt + */ + private void processMessagePresetQuery(RequestEvent evt) { + try { + String deviceId = SipUtils.getUserIdFromFromHeader(evt.getRequest()); + // 查询设备是否存在 + Device device = storager.queryVideoDevice(deviceId); + if (device == null) { + logger.warn("处理PresetQuery预置位列表Message时未找到设备信息"); + responseAck(evt, Response.NOT_FOUND); + return; + } + Element rootElement = getRootElement(evt); + String channelId = getText(rootElement, "DeviceID"); + String key = DeferredResultHolder.CALLBACK_CMD_PRESETQUERY + deviceId + channelId; + // 回复200 OK + responseAck(evt, Response.OK); + if (rootElement.getName().equals("Response")) {// !StringUtils.isEmpty(result)) { + // 此处是对本平台发出DeviceControl指令的应答 + JSONObject json = new JSONObject(); + XmlUtil.node2Json(rootElement, json); + if (logger.isDebugEnabled()) { + logger.debug(json.toJSONString()); + } + RequestMessage msg = new RequestMessage(); + msg.setKey(key); + msg.setData(json); + deferredResultHolder.invokeAllResult(msg); + } else { + // 此处是上级发出的DeviceControl指令 + } + } catch (ParseException | SipException | InvalidArgumentException | DocumentException e) { + e.printStackTrace(); + } + } + + /** + * 处理DeviceInfo设备信息Message + * + * @param evt + */ + private void processMessageDeviceInfo(RequestEvent evt) { + try { + String deviceId = SipUtils.getUserIdFromFromHeader(evt.getRequest()); + // 查询设备是否存在 + Device device = storager.queryVideoDevice(deviceId); + ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(deviceId); + + Element rootElement = getRootElement(evt); + String requestName = rootElement.getName(); + Element deviceIdElement = rootElement.element("DeviceID"); + String channelId = deviceIdElement.getTextTrim(); + String key = DeferredResultHolder.CALLBACK_CMD_DEVICEINFO + deviceId + channelId; + if (device != null ) { + rootElement = getRootElement(evt, device.getCharset()); + } + if (requestName.equals("Query")) { + logger.info("接收到DeviceInfo查询消息"); + FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME); + if (parentPlatform == null) { + responseAck(evt, Response.NOT_FOUND); + return; + } else { + // 回复200 OK + responseAck(evt, Response.OK); + String sn = rootElement.element("SN").getText(); + cmderFroPlatform.deviceInfoResponse(parentPlatform, sn, fromHeader.getTag()); + } + } else { + logger.debug("接收到DeviceInfo应答消息"); + if (device == null) { + logger.warn("处理DeviceInfo设备信息Message时未找到设备信息"); + responseAck(evt, Response.NOT_FOUND); + return; + } + + device.setName(getText(rootElement, "DeviceName")); + + device.setManufacturer(getText(rootElement, "Manufacturer")); + device.setModel(getText(rootElement, "Model")); + device.setFirmware(getText(rootElement, "Firmware")); + if (StringUtils.isEmpty(device.getStreamMode())) { + device.setStreamMode("UDP"); + } + storager.updateDevice(device); + + RequestMessage msg = new RequestMessage(); + msg.setKey(key); + msg.setData(device); + deferredResultHolder.invokeAllResult(msg); + // 回复200 OK + responseAck(evt, Response.OK); + if (offLineDetector.isOnline(deviceId)) { + publisher.onlineEventPublish(device, VideoManagerConstants.EVENT_ONLINE_MESSAGE); + } + } + } catch (DocumentException | SipException | InvalidArgumentException | ParseException e) { + e.printStackTrace(); + } + } + + /*** + * 收到catalog设备目录列表请求 处理 + * + * @param evt + */ + private void processMessageCatalogList(RequestEvent evt) { + try { + + String deviceId = SipUtils.getUserIdFromFromHeader(evt.getRequest()); + // 查询设备是否存在 + Device device = storager.queryVideoDevice(deviceId); + ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(deviceId); + + + Element rootElement = getRootElement(evt); + String name = rootElement.getName(); + Element deviceIdElement = rootElement.element("DeviceID"); + String channelId = deviceIdElement.getText(); + Element deviceListElement = rootElement.element("DeviceList"); + String key = DeferredResultHolder.CALLBACK_CMD_CATALOG + deviceId; + FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME); + if (name.equalsIgnoreCase("Query")) { // 区分是Response——查询响应,还是Query——查询请求 + // TODO 后续将代码拆分 + if (parentPlatform == null) { + responseAck(evt, Response.NOT_FOUND); + return; + } else { + // 回复200 OK + responseAck(evt, Response.OK); + + Element snElement = rootElement.element("SN"); + String sn = snElement.getText(); + // 准备回复通道信息 + List channelReduces = storager.queryChannelListInParentPlatform(parentPlatform.getServerGBId()); + // 查询关联的直播通道 + List gbStreams = storager.queryGbStreamListInPlatform(parentPlatform.getServerGBId()); + int size = channelReduces.size() + gbStreams.size(); + // 回复级联的通道 + if (channelReduces.size() > 0) { + for (ChannelReduce channelReduce : channelReduces) { + DeviceChannel deviceChannel = storager.queryChannel(channelReduce.getDeviceId(), channelReduce.getChannelId()); + cmderFroPlatform.catalogQuery(deviceChannel, parentPlatform, sn, fromHeader.getTag(), size); + } + } + // 回复直播的通道 + if (gbStreams.size() > 0) { + for (GbStream gbStream : gbStreams) { + DeviceChannel deviceChannel = new DeviceChannel(); + deviceChannel.setChannelId(gbStream.getGbId()); + deviceChannel.setName(gbStream.getName()); + deviceChannel.setLongitude(gbStream.getLongitude()); + deviceChannel.setLatitude(gbStream.getLatitude()); + deviceChannel.setDeviceId(parentPlatform.getDeviceGBId()); + deviceChannel.setManufacture("wvp-pro"); + deviceChannel.setStatus(gbStream.isStatus()?1:0); +// deviceChannel.setParentId(parentPlatform.getDeviceGBId()); + deviceChannel.setRegisterWay(1); + deviceChannel.setCivilCode(config.getDomain()); + deviceChannel.setModel("live"); + deviceChannel.setOwner("wvp-pro"); +// deviceChannel.setAddress("test"); + deviceChannel.setParental(0); + deviceChannel.setSecrecy("0"); + deviceChannel.setSecrecy("0"); + + cmderFroPlatform.catalogQuery(deviceChannel, parentPlatform, sn, fromHeader.getTag(), size); + } + } + if (size == 0) { + // 回复无通道 + cmderFroPlatform.catalogQuery(null, parentPlatform, sn, fromHeader.getTag(), size); + } + } + + + } else { + if (device == null) { + logger.warn("收到catalog设备目录列表请求时未找到设备信息"); + responseAck(evt, Response.NOT_FOUND); + return; + } + deviceListElement = getRootElement(evt, device.getCharset()).element("DeviceList"); + Iterator deviceListIterator = deviceListElement.elementIterator(); + if (deviceListIterator != null) { + + // 遍历DeviceList + while (deviceListIterator.hasNext()) { + Element itemDevice = deviceListIterator.next(); + Element channelDeviceElement = itemDevice.element("DeviceID"); + if (channelDeviceElement == null) { + continue; + } + String channelDeviceId = channelDeviceElement.getText(); + Element channdelNameElement = itemDevice.element("Name"); + String channelName = channdelNameElement != null ? channdelNameElement.getTextTrim().toString() : ""; + Element statusElement = itemDevice.element("Status"); + String status = statusElement != null ? statusElement.getText().toString() : "ON"; + DeviceChannel deviceChannel = new DeviceChannel(); + deviceChannel.setName(channelName); + deviceChannel.setChannelId(channelDeviceId); + // ONLINE OFFLINE HIKVISION DS-7716N-E4 NVR的兼容性处理 + if (status.equals("ON") || status.equals("On") || status.equals("ONLINE")) { + deviceChannel.setStatus(1); + } + if (status.equals("OFF") || status.equals("Off") || status.equals("OFFLINE")) { + deviceChannel.setStatus(0); + } + + deviceChannel.setManufacture(getText(itemDevice, "Manufacturer")); + deviceChannel.setModel(getText(itemDevice, "Model")); + deviceChannel.setOwner(getText(itemDevice, "Owner")); + deviceChannel.setCivilCode(getText(itemDevice, "CivilCode")); + deviceChannel.setBlock(getText(itemDevice, "Block")); + deviceChannel.setAddress(getText(itemDevice, "Address")); + if (getText(itemDevice, "Parental") == null || getText(itemDevice, "Parental") == "") { + deviceChannel.setParental(0); + } else { + deviceChannel.setParental(Integer.parseInt(getText(itemDevice, "Parental"))); + } + deviceChannel.setParentId(getText(itemDevice, "ParentID")); + if (getText(itemDevice, "SafetyWay") == null || getText(itemDevice, "SafetyWay") == "") { + deviceChannel.setSafetyWay(0); + } else { + deviceChannel.setSafetyWay(Integer.parseInt(getText(itemDevice, "SafetyWay"))); + } + if (getText(itemDevice, "RegisterWay") == null || getText(itemDevice, "RegisterWay") == "") { + deviceChannel.setRegisterWay(1); + } else { + deviceChannel.setRegisterWay(Integer.parseInt(getText(itemDevice, "RegisterWay"))); + } + deviceChannel.setCertNum(getText(itemDevice, "CertNum")); + if (getText(itemDevice, "Certifiable") == null || getText(itemDevice, "Certifiable") == "") { + deviceChannel.setCertifiable(0); + } else { + deviceChannel.setCertifiable(Integer.parseInt(getText(itemDevice, "Certifiable"))); + } + if (getText(itemDevice, "ErrCode") == null || getText(itemDevice, "ErrCode") == "") { + deviceChannel.setErrCode(0); + } else { + deviceChannel.setErrCode(Integer.parseInt(getText(itemDevice, "ErrCode"))); + } + deviceChannel.setEndTime(getText(itemDevice, "EndTime")); + deviceChannel.setSecrecy(getText(itemDevice, "Secrecy")); + deviceChannel.setIpAddress(getText(itemDevice, "IPAddress")); + if (getText(itemDevice, "Port") == null || getText(itemDevice, "Port") == "") { + deviceChannel.setPort(0); + } else { + deviceChannel.setPort(Integer.parseInt(getText(itemDevice, "Port"))); + } + deviceChannel.setPassword(getText(itemDevice, "Password")); + if (NumericUtil.isDouble(getText(itemDevice, "Longitude"))) { + deviceChannel.setLongitude(Double.parseDouble(getText(itemDevice, "Longitude"))); + } else { + deviceChannel.setLongitude(0.00); + } + if (NumericUtil.isDouble(getText(itemDevice, "Latitude"))) { + deviceChannel.setLatitude(Double.parseDouble(getText(itemDevice, "Latitude"))); + } else { + deviceChannel.setLatitude(0.00); + } + if (getText(itemDevice, "PTZType") == null || getText(itemDevice, "PTZType") == "") { + deviceChannel.setPTZType(0); + } else { + deviceChannel.setPTZType(Integer.parseInt(getText(itemDevice, "PTZType"))); + } + deviceChannel.setHasAudio(true); // 默认含有音频,播放时再检查是否有音频及是否AAC + storager.updateChannel(device.getDeviceId(), deviceChannel); + } + + RequestMessage msg = new RequestMessage(); + msg.setKey(key); + msg.setData(device); + deferredResultHolder.invokeAllResult(msg); + // 回复200 OK + responseAck(evt, Response.OK); + if (offLineDetector.isOnline(deviceId)) { + publisher.onlineEventPublish(device, VideoManagerConstants.EVENT_ONLINE_MESSAGE); + } + } + } + } catch (DocumentException | SipException | InvalidArgumentException | ParseException e) { + e.printStackTrace(); + } + } + + /*** + * 收到alarm设备报警信息 处理 + * + * @param evt + */ + private void processMessageAlarm(RequestEvent evt) { + try { + String deviceId = SipUtils.getUserIdFromFromHeader(evt.getRequest()); + Device device = storager.queryVideoDevice(deviceId); + if (device == null) { + logger.warn("处理alarm设备报警信息未找到设备信息"); + responseAck(evt, Response.NOT_FOUND); + return; + } + Element rootElement = getRootElement(evt, device.getCharset()); + Element deviceIdElement = rootElement.element("DeviceID"); + String channelId = deviceIdElement.getText().toString(); + String key = DeferredResultHolder.CALLBACK_CMD_ALARM + deviceId + channelId; + // 回复200 OK + responseAck(evt, Response.OK); + + if (device.getCharset() != null) { + rootElement = getRootElement(evt, device.getCharset()); + } + + if (rootElement.getName().equals("Notify")) { // 处理报警通知 + DeviceAlarm deviceAlarm = new DeviceAlarm(); + deviceAlarm.setDeviceId(deviceId); + deviceAlarm.setChannelId(channelId); + deviceAlarm.setAlarmPriority(getText(rootElement, "AlarmPriority")); + deviceAlarm.setAlarmMethod(getText(rootElement, "AlarmMethod")); + deviceAlarm.setAlarmTime(getText(rootElement, "AlarmTime")); + if (getText(rootElement, "AlarmDescription") == null) { + deviceAlarm.setAlarmDescription(""); + } else { + deviceAlarm.setAlarmDescription(getText(rootElement, "AlarmDescription")); + } + if (NumericUtil.isDouble(getText(rootElement, "Longitude"))) { + deviceAlarm.setLongitude(Double.parseDouble(getText(rootElement, "Longitude"))); + } else { + deviceAlarm.setLongitude(0.00); + } + if (NumericUtil.isDouble(getText(rootElement, "Latitude"))) { + deviceAlarm.setLatitude(Double.parseDouble(getText(rootElement, "Latitude"))); + } else { + deviceAlarm.setLatitude(0.00); + } + + if (!StringUtils.isEmpty(deviceAlarm.getAlarmMethod())) { + if ( deviceAlarm.getAlarmMethod().equals("4")) { + MobilePosition mobilePosition = new MobilePosition(); + mobilePosition.setDeviceId(deviceAlarm.getDeviceId()); + mobilePosition.setTime(deviceAlarm.getAlarmTime()); + mobilePosition.setLongitude(deviceAlarm.getLongitude()); + mobilePosition.setLatitude(deviceAlarm.getLatitude()); + mobilePosition.setReportSource("GPS Alarm"); + BaiduPoint bp = new BaiduPoint(); + bp = GpsUtil.Wgs84ToBd09(String.valueOf(mobilePosition.getLongitude()), String.valueOf(mobilePosition.getLatitude())); + logger.info("百度坐标:" + bp.getBdLng() + ", " + bp.getBdLat()); + mobilePosition.setGeodeticSystem("BD-09"); + mobilePosition.setCnLng(bp.getBdLng()); + mobilePosition.setCnLat(bp.getBdLat()); + if (!userSetup.getSavePositionHistory()) { + storager.clearMobilePositionsByDeviceId(deviceId); + } + storager.insertMobilePosition(mobilePosition); + } + } + logger.debug("存储报警信息、报警分类"); + // 存储报警信息、报警分类 + deviceAlarmService.add(deviceAlarm); + + if (offLineDetector.isOnline(deviceId)) { + publisher.deviceAlarmEventPublish(deviceAlarm); + } + } else if (rootElement.getName().equals("Response")) { // 处理报警查询响应 + JSONObject json = new JSONObject(); + XmlUtil.node2Json(rootElement, json); + if (logger.isDebugEnabled()) { + logger.debug(json.toJSONString()); + } + RequestMessage msg = new RequestMessage(); + msg.setKey(key); + msg.setData(json); + deferredResultHolder.invokeAllResult(msg); + } + } catch (DocumentException | SipException | InvalidArgumentException | ParseException e) { + e.printStackTrace(); + } + } + + /*** + * 收到keepalive请求 处理 + * + * @param evt + */ + private void processMessageKeepAlive(RequestEvent evt) { + try { + + String deviceId = SipUtils.getUserIdFromFromHeader(evt.getRequest()); + // 查询设备是否存在 + Device device = storager.queryVideoDevice(deviceId); + + Element rootElement = getRootElement(evt); + String channelId = getText(rootElement, "DeviceID"); + + // 检查设备是否存在并在线, 不在线则设置为在线 + if (device != null ) { + // 回复200 OK + responseAck(evt, Response.OK); + publisher.onlineEventPublish(device, VideoManagerConstants.EVENT_ONLINE_KEEPLIVE); + }else{ + logger.warn("收到[ "+deviceId+" ]心跳信息, 但是设备不存在, 回复404"); + Response response = getMessageFactory().createResponse(Response.NOT_FOUND, evt.getRequest()); + ServerTransaction serverTransaction = getServerTransaction(evt); + serverTransaction.sendResponse(response); + if (serverTransaction.getDialog() != null) { + serverTransaction.getDialog().delete(); + } + } + +// if (device != null && device.getOnline() == 1) { +// +// if (offLineDetector.isOnline(deviceId)) { +// publisher.onlineEventPublish(device, VideoManagerConstants.EVENT_ONLINE_KEEPLIVE); +// } else { +// } +// }else { +//// logger.warn("收到[ "+deviceId+" ]心跳信息, 但是设备" + (device == null? "不存在":"离线") + ", 回复401"); +//// Response response = getMessageFactory().createResponse(Response.UNAUTHORIZED, evt.getRequest()); +//// getServerTransaction(evt).sendResponse(response); +// publisher.onlineEventPublish(device, VideoManagerConstants.EVENT_ONLINE_KEEPLIVE); +// +// } + } catch (ParseException | SipException | InvalidArgumentException | DocumentException e) { + e.printStackTrace(); + } + } + + /*** + * 处理RecordInfo设备录像列表Message请求 TODO 过期时间暂时写死180秒,后续与DeferredResult超时时间保持一致 + * + * @param evt + */ + private void processMessageRecordInfo(RequestEvent evt) { + try { + + String deviceId = SipUtils.getUserIdFromFromHeader(evt.getRequest()); + // 查询设备是否存在 + Device device = storager.queryVideoDevice(deviceId); + if (device == null) { + logger.warn("处理DeviceInfo设备信息Message时未找到设备信息"); + responseAck(evt, Response.NOT_FOUND); + return; + } + + // 回复200 OK + responseAck(evt, Response.OK); + String uuid = UUID.randomUUID().toString().replace("-", ""); + RecordInfo recordInfo = new RecordInfo(); + Element rootElement = getRootElement(evt); + Element deviceIdElement = rootElement.element("DeviceID"); + String channelId = deviceIdElement.getText().toString(); + String key = DeferredResultHolder.CALLBACK_CMD_RECORDINFO + deviceId + channelId; + if (device != null ) { + rootElement = getRootElement(evt, device.getCharset()); + } + recordInfo.setDeviceId(deviceId); + recordInfo.setChannelId(channelId); + recordInfo.setName(getText(rootElement, "Name")); + if (getText(rootElement, "SumNum")== null || getText(rootElement, "SumNum") =="") { + recordInfo.setSumNum(0); + } else { + recordInfo.setSumNum(Integer.parseInt(getText(rootElement, "SumNum"))); + } + String sn = getText(rootElement, "SN"); + Element recordListElement = rootElement.element("RecordList"); + if (recordListElement == null || recordInfo.getSumNum() == 0) { + logger.info("无录像数据"); + RequestMessage msg = new RequestMessage(); + msg.setKey(key); + msg.setData(recordInfo); + deferredResultHolder.invokeAllResult(msg); + } else { + Iterator recordListIterator = recordListElement.elementIterator(); + List recordList = new ArrayList(); + if (recordListIterator != null) { + RecordItem record = new RecordItem(); + logger.info("处理录像列表数据..."); + // 遍历DeviceList + while (recordListIterator.hasNext()) { + Element itemRecord = recordListIterator.next(); + Element recordElement = itemRecord.element("DeviceID"); + if (recordElement == null) { + logger.info("记录为空,下一个..."); + continue; + } + record = new RecordItem(); + record.setDeviceId(getText(itemRecord, "DeviceID")); + record.setName(getText(itemRecord, "Name")); + record.setFilePath(getText(itemRecord, "FilePath")); + record.setAddress(getText(itemRecord, "Address")); + record.setStartTime( + DateUtil.ISO8601Toyyyy_MM_dd_HH_mm_ss(getText(itemRecord, "StartTime"))); + record.setEndTime( + DateUtil.ISO8601Toyyyy_MM_dd_HH_mm_ss(getText(itemRecord, "EndTime"))); + record.setSecrecy(itemRecord.element("Secrecy") == null ? 0 + : Integer.parseInt(getText(itemRecord, "Secrecy"))); + record.setType(getText(itemRecord, "Type")); + record.setRecorderId(getText(itemRecord, "RecorderID")); + recordList.add(record); + } + recordInfo.setRecordList(recordList); + } + + // 改用单独线程统计已获取录像文件数量,避免多包并行分别统计不完整的问题 + String cacheKey = CACHE_RECORDINFO_KEY + deviceId + sn; + redis.set(cacheKey + "_" + uuid, recordList, 90); + if (!threadNameList.contains(cacheKey)) { + threadNameList.add(cacheKey); + CheckForAllRecordsThread chk = new CheckForAllRecordsThread(cacheKey, recordInfo); + chk.setName(cacheKey); + chk.setDeferredResultHolder(deferredResultHolder); + chk.setRedis(redis); + chk.setLogger(logger); + chk.start(); + if (logger.isDebugEnabled()) { + logger.debug("Start Thread " + cacheKey + "."); + } + } else { + if (logger.isDebugEnabled()) { + logger.debug("Thread " + cacheKey + " already started."); + } + } + + // 存在录像且如果当前录像明细个数小于总条数,说明拆包返回,需要组装,暂不返回 + // if (recordInfo.getSumNum() > 0 && recordList.size() > 0 && recordList.size() < recordInfo.getSumNum()) { + // // 为防止连续请求该设备的录像数据,返回数据错乱,特增加sn进行区分 + // String cacheKey = CACHE_RECORDINFO_KEY + deviceId + sn; + + // redis.set(cacheKey + "_" + uuid, recordList, 90); + // List cacheKeys = redis.scan(cacheKey + "_*"); + // List totalRecordList = new ArrayList(); + // for (int i = 0; i < cacheKeys.size(); i++) { + // totalRecordList.addAll((List) redis.get(cacheKeys.get(i).toString())); + // } + // if (totalRecordList.size() < recordInfo.getSumNum()) { + // logger.info("已获取" + totalRecordList.size() + "项录像数据,共" + recordInfo.getSumNum() + "项"); + // return; + // } + // logger.info("录像数据已全部获取,共" + recordInfo.getSumNum() + "项"); + // recordInfo.setRecordList(totalRecordList); + // for (int i = 0; i < cacheKeys.size(); i++) { + // redis.del(cacheKeys.get(i).toString()); + // } + // } + // // 自然顺序排序, 元素进行升序排列 + // recordInfo.getRecordList().sort(Comparator.naturalOrder()); + } + // 走到这里,有以下可能:1、没有录像信息,第一次收到recordinfo的消息即返回响应数据,无redis操作 + // 2、有录像数据,且第一次即收到完整数据,返回响应数据,无redis操作 + // 3、有录像数据,在超时时间内收到多次包组装后数量足够,返回数据 + + // RequestMessage msg = new RequestMessage(); + // msg.setDeviceId(deviceId); + // msg.setType(DeferredResultHolder.CALLBACK_CMD_RECORDINFO); + // msg.setData(recordInfo); + // deferredResultHolder.invokeResult(msg); + // logger.info("处理完成,返回结果"); + } catch (DocumentException | SipException | InvalidArgumentException | ParseException e) { + e.printStackTrace(); + } + } + + /** + * 收到MediaStatus消息处理 + * + * @param evt + */ + private void processMessageMediaStatus(RequestEvent evt){ + try { + + String deviceId = SipUtils.getUserIdFromFromHeader(evt.getRequest()); + // 查询设备是否存在 + Device device = storager.queryVideoDevice(deviceId); + if (device == null) { + logger.warn("处理DeviceInfo设备信息Message时未找到设备信息"); + responseAck(evt, Response.NOT_FOUND); + return; + } + + // 回复200 OK + responseAck(evt, Response.OK); + Element rootElement = getRootElement(evt); + String channelId = getText(rootElement, "DeviceID"); + String NotifyType =getText(rootElement, "NotifyType"); + if (NotifyType.equals("121")){ + logger.info("媒体播放完毕,通知关流"); + StreamInfo streamInfo = redisCatchStorage.queryPlaybackByDevice(deviceId, "*"); + if (streamInfo != null) { + redisCatchStorage.stopPlayback(streamInfo); + cmder.streamByeCmd(streamInfo.getDeviceID(), streamInfo.getChannelId()); + } + } + } catch (ParseException | SipException | InvalidArgumentException | DocumentException e) { + e.printStackTrace(); + } + } + + /** + * 处理AudioBroadcast语音广播Message + * + * @param evt + */ + private void processMessageBroadcast(RequestEvent evt) { + try { + + String deviceId = SipUtils.getUserIdFromFromHeader(evt.getRequest()); + // 查询设备是否存在 + Device device = storager.queryVideoDevice(deviceId); + if (device == null) { + logger.warn("处理DeviceInfo设备信息Message时未找到设备信息"); + responseAck(evt, Response.NOT_FOUND); + return; + } + + Element rootElement = getRootElement(evt); + String channelId = getText(rootElement, "DeviceID"); + String key = DeferredResultHolder.CALLBACK_CMD_BROADCAST + deviceId + channelId; + // 回复200 OK + responseAck(evt, Response.OK); + if (rootElement.getName().equals("Response")) { + // 此处是对本平台发出Broadcast指令的应答 + JSONObject json = new JSONObject(); + XmlUtil.node2Json(rootElement, json); + if (logger.isDebugEnabled()) { + logger.debug(json.toJSONString()); + } + RequestMessage msg = new RequestMessage(); + msg.setKey(key); + msg.setData(json); + deferredResultHolder.invokeAllResult(msg); + } else { + // 此处是上级发出的Broadcast指令 + } + } catch (ParseException | SipException | InvalidArgumentException | DocumentException e) { + e.printStackTrace(); + } + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java new file mode 100644 index 00000000..88bf45aa --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java @@ -0,0 +1,384 @@ +package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl; + +import com.genersoft.iot.vmp.common.VideoManagerConstants; +import com.genersoft.iot.vmp.conf.UserSetup; +import com.genersoft.iot.vmp.gb28181.bean.*; +import com.genersoft.iot.vmp.gb28181.event.DeviceOffLineDetector; +import com.genersoft.iot.vmp.gb28181.event.EventPublisher; +import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver; +import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; +import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander; +import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorAbstract; +import com.genersoft.iot.vmp.gb28181.utils.NumericUtil; +import com.genersoft.iot.vmp.gb28181.utils.SipUtils; +import com.genersoft.iot.vmp.gb28181.utils.XmlUtil; +import com.genersoft.iot.vmp.storager.IRedisCatchStorage; +import com.genersoft.iot.vmp.storager.IVideoManagerStorager; +import com.genersoft.iot.vmp.utils.GpsUtil; +import com.genersoft.iot.vmp.utils.redis.RedisUtil; +import org.dom4j.DocumentException; +import org.dom4j.Element; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; +import org.springframework.util.StringUtils; + +import javax.sip.InvalidArgumentException; +import javax.sip.RequestEvent; +import javax.sip.SipException; +import javax.sip.header.FromHeader; +import javax.sip.message.Response; +import java.text.ParseException; +import java.util.Iterator; + +/** + * @description: Notify请求处理器 + * @author: lawrencehj + * @date: 2021年1月27日 + */ +@Component +public class NotifyRequestProcessor extends SIPRequestProcessorAbstract { + + + private final static Logger logger = LoggerFactory.getLogger(NotifyRequestProcessor.class); + + @Autowired + private UserSetup userSetup; + + @Autowired + private IVideoManagerStorager storager; + + @Autowired + private IRedisCatchStorage redisCatchStorage; + + @Autowired + private EventPublisher publisher; + + @Autowired + private DeviceOffLineDetector offLineDetector; + + private static final String NOTIFY_CATALOG = "Catalog"; + private static final String NOTIFY_ALARM = "Alarm"; + private static final String NOTIFY_MOBILE_POSITION = "MobilePosition"; + private String method = "NOTIFY"; + + @Autowired + private SIPProcessorObserver sipProcessorObserver; + + @Override + public void afterPropertiesSet() throws Exception { + // 添加消息处理的订阅 + sipProcessorObserver.addRequestProcessor(method, this); + } + + @Override + public void process(RequestEvent evt) { + try { + Element rootElement = getRootElement(evt); + String cmd = XmlUtil.getText(rootElement, "CmdType"); + + if (NOTIFY_CATALOG.equals(cmd)) { + logger.info("接收到Catalog通知"); + processNotifyCatalogList(evt); + } else if (NOTIFY_ALARM.equals(cmd)) { + logger.info("接收到Alarm通知"); + processNotifyAlarm(evt); + } else if (NOTIFY_MOBILE_POSITION.equals(cmd)) { + logger.info("接收到MobilePosition通知"); + processNotifyMobilePosition(evt); + } else { + logger.info("接收到消息:" + cmd); + responseAck(evt, Response.OK); + } + } catch (DocumentException | SipException | InvalidArgumentException | ParseException e) { + e.printStackTrace(); + } + } + + /** + * 处理MobilePosition移动位置Notify + * + * @param evt + */ + private void processNotifyMobilePosition(RequestEvent evt) { + try { + // 回复 200 OK + Element rootElement = getRootElement(evt); + MobilePosition mobilePosition = new MobilePosition(); + Element deviceIdElement = rootElement.element("DeviceID"); + String deviceId = deviceIdElement.getTextTrim().toString(); + Device device = storager.queryVideoDevice(deviceId); + if (device != null) { + if (!StringUtils.isEmpty(device.getName())) { + mobilePosition.setDeviceName(device.getName()); + } + } + mobilePosition.setDeviceId(XmlUtil.getText(rootElement, "DeviceID")); + mobilePosition.setTime(XmlUtil.getText(rootElement, "Time")); + mobilePosition.setLongitude(Double.parseDouble(XmlUtil.getText(rootElement, "Longitude"))); + mobilePosition.setLatitude(Double.parseDouble(XmlUtil.getText(rootElement, "Latitude"))); + if (NumericUtil.isDouble(XmlUtil.getText(rootElement, "Speed"))) { + mobilePosition.setSpeed(Double.parseDouble(XmlUtil.getText(rootElement, "Speed"))); + } else { + mobilePosition.setSpeed(0.0); + } + if (NumericUtil.isDouble(XmlUtil.getText(rootElement, "Direction"))) { + mobilePosition.setDirection(Double.parseDouble(XmlUtil.getText(rootElement, "Direction"))); + } else { + mobilePosition.setDirection(0.0); + } + if (NumericUtil.isDouble(XmlUtil.getText(rootElement, "Altitude"))) { + mobilePosition.setAltitude(Double.parseDouble(XmlUtil.getText(rootElement, "Altitude"))); + } else { + mobilePosition.setAltitude(0.0); + } + mobilePosition.setReportSource("Mobile Position"); + BaiduPoint bp = new BaiduPoint(); + bp = GpsUtil.Wgs84ToBd09(String.valueOf(mobilePosition.getLongitude()), String.valueOf(mobilePosition.getLatitude())); + logger.info("百度坐标:" + bp.getBdLng() + ", " + bp.getBdLat()); + mobilePosition.setGeodeticSystem("BD-09"); + mobilePosition.setCnLng(bp.getBdLng()); + mobilePosition.setCnLat(bp.getBdLat()); + if (!userSetup.getSavePositionHistory()) { + storager.clearMobilePositionsByDeviceId(deviceId); + } + storager.insertMobilePosition(mobilePosition); + responseAck(evt, Response.OK); + } catch (DocumentException | SipException | InvalidArgumentException | ParseException e) { + e.printStackTrace(); + } + } + + /*** + * 处理alarm设备报警Notify + * + * @param evt + */ + private void processNotifyAlarm(RequestEvent evt) { + try { + Element rootElement = getRootElement(evt); + Element deviceIdElement = rootElement.element("DeviceID"); + String deviceId = deviceIdElement.getText().toString(); + + Device device = storager.queryVideoDevice(deviceId); + if (device == null) { + return; + } + rootElement = getRootElement(evt, device.getCharset()); + DeviceAlarm deviceAlarm = new DeviceAlarm(); + deviceAlarm.setDeviceId(deviceId); + deviceAlarm.setAlarmPriority(XmlUtil.getText(rootElement, "AlarmPriority")); + deviceAlarm.setAlarmMethod(XmlUtil.getText(rootElement, "AlarmMethod")); + deviceAlarm.setAlarmTime(XmlUtil.getText(rootElement, "AlarmTime")); + if (XmlUtil.getText(rootElement, "AlarmDescription") == null) { + deviceAlarm.setAlarmDescription(""); + } else { + deviceAlarm.setAlarmDescription(XmlUtil.getText(rootElement, "AlarmDescription")); + } + if (NumericUtil.isDouble(XmlUtil.getText(rootElement, "Longitude"))) { + deviceAlarm.setLongitude(Double.parseDouble(XmlUtil.getText(rootElement, "Longitude"))); + } else { + deviceAlarm.setLongitude(0.00); + } + if (NumericUtil.isDouble(XmlUtil.getText(rootElement, "Latitude"))) { + deviceAlarm.setLatitude(Double.parseDouble(XmlUtil.getText(rootElement, "Latitude"))); + } else { + deviceAlarm.setLatitude(0.00); + } + + if (deviceAlarm.getAlarmMethod().equals("4")) { + MobilePosition mobilePosition = new MobilePosition(); + mobilePosition.setDeviceId(deviceAlarm.getDeviceId()); + mobilePosition.setTime(deviceAlarm.getAlarmTime()); + mobilePosition.setLongitude(deviceAlarm.getLongitude()); + mobilePosition.setLatitude(deviceAlarm.getLatitude()); + mobilePosition.setReportSource("GPS Alarm"); + BaiduPoint bp = new BaiduPoint(); + bp = GpsUtil.Wgs84ToBd09(String.valueOf(mobilePosition.getLongitude()), String.valueOf(mobilePosition.getLatitude())); + logger.info("百度坐标:" + bp.getBdLng() + ", " + bp.getBdLat()); + mobilePosition.setGeodeticSystem("BD-09"); + mobilePosition.setCnLng(bp.getBdLng()); + mobilePosition.setCnLat(bp.getBdLat()); + if (!userSetup.getSavePositionHistory()) { + storager.clearMobilePositionsByDeviceId(deviceId); + } + storager.insertMobilePosition(mobilePosition); + } + // TODO: 需要实现存储报警信息、报警分类 + + // 回复200 OK + responseAck(evt, Response.OK); + if (offLineDetector.isOnline(deviceId)) { + publisher.deviceAlarmEventPublish(deviceAlarm); + } + } catch (DocumentException | SipException | InvalidArgumentException | ParseException e) { + e.printStackTrace(); + } + } + + /*** + * 处理catalog设备目录列表Notify + * + * @param evt + */ + private void processNotifyCatalogList(RequestEvent evt) { + try { + FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME); + String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader); + + Element rootElement = getRootElement(evt); + Element deviceIdElement = rootElement.element("DeviceID"); + String channelId = deviceIdElement.getText(); + Device device = storager.queryVideoDevice(deviceId); + if (device == null) { + return; + } + if (device != null ) { + rootElement = getRootElement(evt, device.getCharset()); + } + Element deviceListElement = rootElement.element("DeviceList"); + if (deviceListElement == null) { + return; + } + Iterator deviceListIterator = deviceListElement.elementIterator(); + if (deviceListIterator != null) { + + // 遍历DeviceList + while (deviceListIterator.hasNext()) { + Element itemDevice = deviceListIterator.next(); + Element channelDeviceElement = itemDevice.element("DeviceID"); + if (channelDeviceElement == null) { + continue; + } + String channelDeviceId = channelDeviceElement.getTextTrim(); + Element channdelNameElement = itemDevice.element("Name"); + String channelName = channdelNameElement != null ? channdelNameElement.getTextTrim().toString() : ""; + Element statusElement = itemDevice.element("Status"); + String status = statusElement != null ? statusElement.getTextTrim().toString() : "ON"; + DeviceChannel deviceChannel = new DeviceChannel(); + deviceChannel.setName(channelName); + deviceChannel.setChannelId(channelDeviceId); + // ONLINE OFFLINE HIKVISION DS-7716N-E4 NVR的兼容性处理 + if (status.equals("ON") || status.equals("On") || status.equals("ONLINE")) { + deviceChannel.setStatus(1); + } + if (status.equals("OFF") || status.equals("Off") || status.equals("OFFLINE")) { + deviceChannel.setStatus(0); + } + + deviceChannel.setManufacture(XmlUtil.getText(itemDevice, "Manufacturer")); + deviceChannel.setModel(XmlUtil.getText(itemDevice, "Model")); + deviceChannel.setOwner(XmlUtil.getText(itemDevice, "Owner")); + deviceChannel.setCivilCode(XmlUtil.getText(itemDevice, "CivilCode")); + deviceChannel.setBlock(XmlUtil.getText(itemDevice, "Block")); + deviceChannel.setAddress(XmlUtil.getText(itemDevice, "Address")); + if (XmlUtil.getText(itemDevice, "Parental") == null + || XmlUtil.getText(itemDevice, "Parental") == "") { + deviceChannel.setParental(0); + } else { + deviceChannel.setParental(Integer.parseInt(XmlUtil.getText(itemDevice, "Parental"))); + } + deviceChannel.setParentId(XmlUtil.getText(itemDevice, "ParentID")); + if (XmlUtil.getText(itemDevice, "SafetyWay") == null + || XmlUtil.getText(itemDevice, "SafetyWay") == "") { + deviceChannel.setSafetyWay(0); + } else { + deviceChannel.setSafetyWay(Integer.parseInt(XmlUtil.getText(itemDevice, "SafetyWay"))); + } + if (XmlUtil.getText(itemDevice, "RegisterWay") == null + || XmlUtil.getText(itemDevice, "RegisterWay") == "") { + deviceChannel.setRegisterWay(1); + } else { + deviceChannel.setRegisterWay(Integer.parseInt(XmlUtil.getText(itemDevice, "RegisterWay"))); + } + deviceChannel.setCertNum(XmlUtil.getText(itemDevice, "CertNum")); + if (XmlUtil.getText(itemDevice, "Certifiable") == null + || XmlUtil.getText(itemDevice, "Certifiable") == "") { + deviceChannel.setCertifiable(0); + } else { + deviceChannel.setCertifiable(Integer.parseInt(XmlUtil.getText(itemDevice, "Certifiable"))); + } + if (XmlUtil.getText(itemDevice, "ErrCode") == null + || XmlUtil.getText(itemDevice, "ErrCode") == "") { + deviceChannel.setErrCode(0); + } else { + deviceChannel.setErrCode(Integer.parseInt(XmlUtil.getText(itemDevice, "ErrCode"))); + } + deviceChannel.setEndTime(XmlUtil.getText(itemDevice, "EndTime")); + deviceChannel.setSecrecy(XmlUtil.getText(itemDevice, "Secrecy")); + deviceChannel.setIpAddress(XmlUtil.getText(itemDevice, "IPAddress")); + if (XmlUtil.getText(itemDevice, "Port") == null || XmlUtil.getText(itemDevice, "Port") == "") { + deviceChannel.setPort(0); + } else { + deviceChannel.setPort(Integer.parseInt(XmlUtil.getText(itemDevice, "Port"))); + } + deviceChannel.setPassword(XmlUtil.getText(itemDevice, "Password")); + if (NumericUtil.isDouble(XmlUtil.getText(itemDevice, "Longitude"))) { + deviceChannel.setLongitude(Double.parseDouble(XmlUtil.getText(itemDevice, "Longitude"))); + } else { + deviceChannel.setLongitude(0.00); + } + if (NumericUtil.isDouble(XmlUtil.getText(itemDevice, "Latitude"))) { + deviceChannel.setLatitude(Double.parseDouble(XmlUtil.getText(itemDevice, "Latitude"))); + } else { + deviceChannel.setLatitude(0.00); + } + if (XmlUtil.getText(itemDevice, "PTZType") == null + || XmlUtil.getText(itemDevice, "PTZType") == "") { + deviceChannel.setPTZType(0); + } else { + deviceChannel.setPTZType(Integer.parseInt(XmlUtil.getText(itemDevice, "PTZType"))); + } + deviceChannel.setHasAudio(true); // 默认含有音频,播放时再检查是否有音频及是否AAC + storager.updateChannel(device.getDeviceId(), deviceChannel); + } + + // RequestMessage msg = new RequestMessage(); + // msg.setDeviceId(deviceId); + // msg.setType(DeferredResultHolder.CALLBACK_CMD_CATALOG); + // msg.setData(device); + // deferredResultHolder.invokeResult(msg); + // 回复200 OK + responseAck(evt, Response.OK); + if (offLineDetector.isOnline(deviceId)) { + publisher.onlineEventPublish(device, VideoManagerConstants.EVENT_ONLINE_MESSAGE); + } + } + } catch (DocumentException | SipException | InvalidArgumentException | ParseException e) { + e.printStackTrace(); + } + } + + + + + public void setCmder(SIPCommander cmder) { + } + + public void setStorager(IVideoManagerStorager storager) { + this.storager = storager; + } + + public void setPublisher(EventPublisher publisher) { + this.publisher = publisher; + } + + public void setRedis(RedisUtil redis) { + } + + public void setDeferredResultHolder(DeferredResultHolder deferredResultHolder) { + } + + public void setOffLineDetector(DeviceOffLineDetector offLineDetector) { + this.offLineDetector = offLineDetector; + } + + public IRedisCatchStorage getRedisCatchStorage() { + return redisCatchStorage; + } + + public void setRedisCatchStorage(IRedisCatchStorage redisCatchStorage) { + this.redisCatchStorage = redisCatchStorage; + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/RegisterRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/RegisterRequestProcessor.java new file mode 100644 index 00000000..e16043d7 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/RegisterRequestProcessor.java @@ -0,0 +1,197 @@ +package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl; + +import com.genersoft.iot.vmp.common.VideoManagerConstants; +import com.genersoft.iot.vmp.conf.SipConfig; +import com.genersoft.iot.vmp.gb28181.auth.DigestServerAuthenticationHelper; +import com.genersoft.iot.vmp.gb28181.auth.RegisterLogicHandler; +import com.genersoft.iot.vmp.gb28181.bean.Device; +import com.genersoft.iot.vmp.gb28181.bean.WvpSipDate; +import com.genersoft.iot.vmp.gb28181.event.EventPublisher; +import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver; +import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorAbstract; +import com.genersoft.iot.vmp.storager.IVideoManagerStorager; +import gov.nist.javax.sip.RequestEventExt; +import gov.nist.javax.sip.address.AddressImpl; +import gov.nist.javax.sip.address.SipUri; +import gov.nist.javax.sip.header.Expires; +import gov.nist.javax.sip.header.SIPDateHeader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; +import org.springframework.util.StringUtils; + +import javax.sip.InvalidArgumentException; +import javax.sip.RequestEvent; +import javax.sip.ServerTransaction; +import javax.sip.SipException; +import javax.sip.header.*; +import javax.sip.message.Request; +import javax.sip.message.Response; +import java.security.NoSuchAlgorithmException; +import java.text.ParseException; +import java.util.Calendar; +import java.util.Locale; + +/** + * @description:收到注册请求 处理 + * @author: swwheihei + * @date: 2020年5月3日 下午4:47:25 + */ +@Component +public class RegisterRequestProcessor extends SIPRequestProcessorAbstract { + + private Logger logger = LoggerFactory.getLogger(RegisterRequestProcessor.class); + + public String method = "REGISTER"; + + @Autowired + private SipConfig sipConfig; + + @Autowired + private RegisterLogicHandler handler; + + @Autowired + private IVideoManagerStorager storager; + + @Autowired + private EventPublisher publisher; + + @Autowired + private SIPProcessorObserver sipProcessorObserver; + + @Override + public void afterPropertiesSet() throws Exception { + // 添加消息处理的订阅 + sipProcessorObserver.addRequestProcessor(method, this); + } + + /** + * 收到注册请求 处理 + * @param evt + */ + @Override + public void process(RequestEvent evt) { + try { + RequestEventExt evtExt = (RequestEventExt)evt; + String requestAddress = evtExt.getRemoteIpAddress() + ":" + evtExt.getRemotePort(); + logger.info("[{}] 收到注册请求,开始处理", requestAddress); + Request request = evt.getRequest(); + + Response response = null; + boolean passwordCorrect = false; + // 注册标志 0:未携带授权头或者密码错误 1:注册成功 2:注销成功 + int registerFlag = 0; + FromHeader fromHeader = (FromHeader) request.getHeader(FromHeader.NAME); + AddressImpl address = (AddressImpl) fromHeader.getAddress(); + SipUri uri = (SipUri) address.getURI(); + String deviceId = uri.getUser(); + Device device = storager.queryVideoDevice(deviceId); + AuthorizationHeader authorhead = (AuthorizationHeader) request.getHeader(AuthorizationHeader.NAME); + // 校验密码是否正确 + if (authorhead != null) { + passwordCorrect = new DigestServerAuthenticationHelper().doAuthenticatePlainTextPassword(request, + sipConfig.getPassword()); + } + if (StringUtils.isEmpty(sipConfig.getPassword())){ + passwordCorrect = true; + } + + // 未携带授权头或者密码错误 回复401 + if (authorhead == null ) { + + logger.info("[{}] 未携带授权头 回复401", requestAddress); + response = getMessageFactory().createResponse(Response.UNAUTHORIZED, request); + new DigestServerAuthenticationHelper().generateChallenge(getHeaderFactory(), response, sipConfig.getDomain()); + }else { + if (!passwordCorrect){ + // 注册失败 + response = getMessageFactory().createResponse(Response.FORBIDDEN, request); + response.setReasonPhrase("wrong password"); + logger.info("[{}] 密码/SIP服务器ID错误, 回复403", requestAddress); + }else { + // 携带授权头并且密码正确 + response = getMessageFactory().createResponse(Response.OK, request); + // 添加date头 + SIPDateHeader dateHeader = new SIPDateHeader(); + // 使用自己修改的 + WvpSipDate wvpSipDate = new WvpSipDate(Calendar.getInstance(Locale.ENGLISH).getTimeInMillis()); + dateHeader.setDate(wvpSipDate); + response.addHeader(dateHeader); + + ExpiresHeader expiresHeader = (ExpiresHeader) request.getHeader(Expires.NAME); + if (expiresHeader == null) { + response = getMessageFactory().createResponse(Response.BAD_REQUEST, request); + ServerTransaction serverTransaction = getServerTransaction(evt); + serverTransaction.sendResponse(response); + if (serverTransaction.getDialog() != null) serverTransaction.getDialog().delete(); + return; + } + // 添加Contact头 + response.addHeader(request.getHeader(ContactHeader.NAME)); + // 添加Expires头 + response.addHeader(request.getExpires()); + + // 获取到通信地址等信息 + ViaHeader viaHeader = (ViaHeader) request.getHeader(ViaHeader.NAME); + String received = viaHeader.getReceived(); + int rPort = viaHeader.getRPort(); + // 解析本地地址替代 + if (StringUtils.isEmpty(received) || rPort == -1) { + received = viaHeader.getHost(); + rPort = viaHeader.getPort(); + } + // + + if (device == null) { + device = new Device(); + device.setStreamMode("UDP"); + device.setCharset("gb2312"); + device.setDeviceId(deviceId); + device.setFirsRegister(true); + } + device.setIp(received); + device.setPort(rPort); + device.setHostAddress(received.concat(":").concat(String.valueOf(rPort))); + // 注销成功 + if (expiresHeader.getExpires() == 0) { + registerFlag = 2; + } + // 注册成功 + else { + device.setExpires(expiresHeader.getExpires()); + registerFlag = 1; + // 判断TCP还是UDP + boolean isTcp = false; + ViaHeader reqViaHeader = (ViaHeader) request.getHeader(ViaHeader.NAME); + String transport = reqViaHeader.getTransport(); + if (transport.equals("TCP")) { + isTcp = true; + } + device.setTransport(isTcp ? "TCP" : "UDP"); + } + } + } + + ServerTransaction serverTransaction = getServerTransaction(evt); + serverTransaction.sendResponse(response); + if (serverTransaction.getDialog() != null) serverTransaction.getDialog().delete(); + // 注册成功 + // 保存到redis + // 下发catelog查询目录 + if (registerFlag == 1 ) { + logger.info("[{}] 注册成功! deviceId:" + device.getDeviceId(), requestAddress); + publisher.onlineEventPublish(device, VideoManagerConstants.EVENT_ONLINE_REGISTER); + // 重新注册更新设备和通道,以免设备替换或更新后信息无法更新 + handler.onRegister(device); + } else if (registerFlag == 2) { + logger.info("[{}] 注销成功! deviceId:" + device.getDeviceId(), requestAddress); + publisher.outlineEventPublish(device.getDeviceId(), VideoManagerConstants.EVENT_OUTLINE_UNREGISTER); + } + } catch (SipException | InvalidArgumentException | NoSuchAlgorithmException | ParseException e) { + e.printStackTrace(); + } + + } + +} diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/SubscribeRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/SubscribeRequestProcessor.java new file mode 100644 index 00000000..d5f582c8 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/SubscribeRequestProcessor.java @@ -0,0 +1,75 @@ +package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl; + +import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver; +import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorAbstract; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import javax.sip.InvalidArgumentException; +import javax.sip.RequestEvent; +import javax.sip.ServerTransaction; +import javax.sip.SipException; +import javax.sip.header.ExpiresHeader; +import javax.sip.message.Request; +import javax.sip.message.Response; +import java.text.ParseException; + +/** + * @description:SUBSCRIBE请求处理器 + * @author: swwheihei + * @date: 2020年5月3日 下午5:31:20 + */ +@Component +public class SubscribeRequestProcessor extends SIPRequestProcessorAbstract { + + private Logger logger = LoggerFactory.getLogger(SubscribeRequestProcessor.class); + private String method = "SUBSCRIBE"; + + @Autowired + private SIPProcessorObserver sipProcessorObserver; + + @Override + public void afterPropertiesSet() throws Exception { + // 添加消息处理的订阅 + sipProcessorObserver.addRequestProcessor(method, this); + } + + /** + * 处理SUBSCRIBE请求 + * + * @param evt + */ + @Override + public void process(RequestEvent evt) { + Request request = evt.getRequest(); + + try { + Response response = null; + response = getMessageFactory().createResponse(200, request); + if (response != null) { + ExpiresHeader expireHeader = getHeaderFactory().createExpiresHeader(30); + response.setExpires(expireHeader); + } + logger.info("response : " + response.toString()); + ServerTransaction transaction = getServerTransaction(evt); + if (transaction != null) { + transaction.sendResponse(response); + transaction.getDialog().delete(); + transaction.terminate(); + } else { + logger.info("processRequest serverTransactionId is null."); + } + + } catch (ParseException e) { + e.printStackTrace(); + } catch (SipException e) { + e.printStackTrace(); + } catch (InvalidArgumentException e) { + e.printStackTrace(); + } + + } + +} diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/ISIPResponseProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/ISIPResponseProcessor.java new file mode 100644 index 00000000..50fb2021 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/ISIPResponseProcessor.java @@ -0,0 +1,15 @@ +package com.genersoft.iot.vmp.gb28181.transmit.event.response; + +import javax.sip.ResponseEvent; + +/** + * @description:处理接收IPCamera发来的SIP协议响应消息 + * @author: swwheihei + * @date: 2020年5月3日 下午4:42:22 + */ +public interface ISIPResponseProcessor { + + void process(ResponseEvent evt); + + +} diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/ByeResponseProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/ByeResponseProcessor.java new file mode 100644 index 00000000..d44c1a9d --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/ByeResponseProcessor.java @@ -0,0 +1,49 @@ +package com.genersoft.iot.vmp.gb28181.transmit.event.response.impl; + +import com.genersoft.iot.vmp.conf.SipConfig; +import com.genersoft.iot.vmp.gb28181.SipLayer; +import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver; +import com.genersoft.iot.vmp.gb28181.transmit.event.response.SIPResponseProcessorAbstract; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import javax.sip.ResponseEvent; + +/** + * @description: BYE请求响应器 + * @author: swwheihei + * @date: 2020年5月3日 下午5:32:05 + */ +@Component +public class ByeResponseProcessor extends SIPResponseProcessorAbstract { + + private String method = "BYE"; + + @Autowired + private SipLayer sipLayer; + + @Autowired + private SipConfig config; + + + @Autowired + private SIPProcessorObserver sipProcessorObserver; + + @Override + public void afterPropertiesSet() throws Exception { + // 添加消息处理的订阅 + sipProcessorObserver.addResponseProcessor(method, this); + } + /** + * 处理BYE响应 + * + * @param evt + */ + @Override + public void process(ResponseEvent evt) { + // TODO Auto-generated method stub + System.out.println("收到bye"); + } + + +} diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/CancelResponseProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/CancelResponseProcessor.java new file mode 100644 index 00000000..80d7e2b2 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/CancelResponseProcessor.java @@ -0,0 +1,47 @@ +package com.genersoft.iot.vmp.gb28181.transmit.event.response.impl; + +import com.genersoft.iot.vmp.conf.SipConfig; +import com.genersoft.iot.vmp.gb28181.SipLayer; +import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver; +import com.genersoft.iot.vmp.gb28181.transmit.event.response.SIPResponseProcessorAbstract; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import javax.sip.ResponseEvent; + +/** + * @description: CANCEL响应处理器 + * @author: panlinlin + * @date: 2021年11月5日 16:35 + */ +@Component +public class CancelResponseProcessor extends SIPResponseProcessorAbstract { + + private String method = "CANCEL"; + + @Autowired + private SipLayer sipLayer; + + @Autowired + private SipConfig config; + + @Autowired + private SIPProcessorObserver sipProcessorObserver; + + @Override + public void afterPropertiesSet() throws Exception { + // 添加消息处理的订阅 + sipProcessorObserver.addResponseProcessor(method, this); + } + /** + * 处理CANCEL响应 + * + * @param evt + */ + @Override + public void process(ResponseEvent evt) { + // TODO Auto-generated method stub + + } + +} diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/InviteResponseProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/InviteResponseProcessor.java new file mode 100644 index 00000000..5446a902 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/InviteResponseProcessor.java @@ -0,0 +1,97 @@ +package com.genersoft.iot.vmp.gb28181.transmit.event.response.impl; + +import com.genersoft.iot.vmp.conf.SipConfig; +import com.genersoft.iot.vmp.gb28181.SipLayer; +import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager; +import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver; +import com.genersoft.iot.vmp.gb28181.transmit.event.response.SIPResponseProcessorAbstract; +import gov.nist.javax.sip.ResponseEventExt; +import gov.nist.javax.sip.stack.SIPDialog; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import javax.sip.InvalidArgumentException; +import javax.sip.ResponseEvent; +import javax.sip.SipException; +import javax.sip.address.SipURI; +import javax.sip.header.CSeqHeader; +import javax.sip.message.Request; +import javax.sip.message.Response; +import java.text.ParseException; + + +/** + * @description: 处理INVITE响应 + * @author: panlinlin + * @date: 2021年11月5日 16:40 + */ +@Component +public class InviteResponseProcessor extends SIPResponseProcessorAbstract { + + private final static Logger logger = LoggerFactory.getLogger(InviteResponseProcessor.class); + private String method = "INVITE"; + + @Autowired + private SipLayer sipLayer; + + @Autowired + private SipConfig config; + + + @Autowired + private SIPProcessorObserver sipProcessorObserver; + + @Override + public void afterPropertiesSet() throws Exception { + // 添加消息处理的订阅 + sipProcessorObserver.addResponseProcessor(method, this); + } + + @Autowired + private VideoStreamSessionManager streamSession; + + /** + * 处理invite响应 + * + * @param evt 响应消息 + * @throws ParseException + */ + @Override + public void process(ResponseEvent evt ){ + try { + Response response = evt.getResponse(); + int statusCode = response.getStatusCode(); + // trying不会回复 + if (statusCode == Response.TRYING) { + } + // 成功响应 + // 下发ack + if (statusCode == Response.OK) { + ResponseEventExt event = (ResponseEventExt)evt; + SIPDialog dialog = (SIPDialog)evt.getDialog(); + CSeqHeader cseq = (CSeqHeader) response.getHeader(CSeqHeader.NAME); + Request reqAck = dialog.createAck(cseq.getSeqNumber()); + SipURI requestURI = (SipURI) reqAck.getRequestURI(); + try { + requestURI.setHost(event.getRemoteIpAddress()); + } catch (ParseException e) { + e.printStackTrace(); + } + requestURI.setPort(event.getRemotePort()); + reqAck.setRequestURI(requestURI); + logger.info("向 " + event.getRemoteIpAddress() + ":" + event.getRemotePort() + "回复ack"); + SipURI sipURI = (SipURI)dialog.getRemoteParty().getURI(); + String deviceId = requestURI.getUser(); + String channelId = sipURI.getUser(); + + dialog.sendAck(reqAck); + + } + } catch (InvalidArgumentException | SipException e) { + e.printStackTrace(); + } + } + +} diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/RegisterResponseProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/RegisterResponseProcessor.java new file mode 100644 index 00000000..a5dced37 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/RegisterResponseProcessor.java @@ -0,0 +1,104 @@ +package com.genersoft.iot.vmp.gb28181.transmit.event.response.impl; + +import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; +import com.genersoft.iot.vmp.gb28181.bean.ParentPlatformCatch; +import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver; +import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; +import com.genersoft.iot.vmp.gb28181.transmit.event.response.SIPResponseProcessorAbstract; +import com.genersoft.iot.vmp.storager.IRedisCatchStorage; +import com.genersoft.iot.vmp.storager.IVideoManagerStorager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import javax.sip.ResponseEvent; +import javax.sip.header.CallIdHeader; +import javax.sip.header.WWWAuthenticateHeader; +import javax.sip.message.Response; + +/** + * @description:Register响应处理器 + * @author: swwheihei + * @date: 2020年5月3日 下午5:32:23 + */ +@Component +public class RegisterResponseProcessor extends SIPResponseProcessorAbstract { + + private Logger logger = LoggerFactory.getLogger(RegisterResponseProcessor.class); + private String method = "REGISTER"; + + @Autowired + private ISIPCommanderForPlatform sipCommanderForPlatform; + + @Autowired + private IVideoManagerStorager storager; + + @Autowired + private IRedisCatchStorage redisCatchStorage; + + @Autowired + private SIPProcessorObserver sipProcessorObserver; + + @Override + public void afterPropertiesSet() throws Exception { + // 添加消息处理的订阅 + sipProcessorObserver.addResponseProcessor(method, this); + } + + /** + * 处理Register响应 + * + * @param evt 事件 + */ + @Override + public void process(ResponseEvent evt) { + Response response = evt.getResponse(); + CallIdHeader callIdHeader = (CallIdHeader) response.getHeader(CallIdHeader.NAME); + String callId = callIdHeader.getCallId(); + + String platformGBId = redisCatchStorage.queryPlatformRegisterInfo(callId); + if (platformGBId == null) { + logger.info(String.format("未找到callId: %s 的注册/注销平台id", callId )); + return; + } + + ParentPlatformCatch parentPlatformCatch = redisCatchStorage.queryPlatformCatchInfo(platformGBId); + if (parentPlatformCatch == null) { + logger.warn(String.format("收到 %s 的注册/注销%S请求, 但是平台缓存信息未查询到!!!", platformGBId, response.getStatusCode())); + return; + } + String action = parentPlatformCatch.getParentPlatform().getExpires().equals("0") ? "注销" : "注册"; + logger.info(String.format("收到 %s %s的%S响应", platformGBId, action, response.getStatusCode() )); + ParentPlatform parentPlatform = parentPlatformCatch.getParentPlatform(); + if (parentPlatform == null) { + logger.warn(String.format("收到 %s %s的%S请求, 但是平台信息未查询到!!!", platformGBId, action, response.getStatusCode())); + return; + } + + if (response.getStatusCode() == 401) { + WWWAuthenticateHeader www = (WWWAuthenticateHeader)response.getHeader(WWWAuthenticateHeader.NAME); + sipCommanderForPlatform.register(parentPlatform, callId, www, null, null); + }else if (response.getStatusCode() == 200){ + // 注册/注销成功 + logger.info(String.format("%s %s成功", platformGBId, action)); + redisCatchStorage.delPlatformRegisterInfo(callId); + parentPlatform.setStatus("注册".equals(action)); + // 取回Expires设置,避免注销过程中被置为0 + ParentPlatform parentPlatformTmp = storager.queryParentPlatByServerGBId(platformGBId); + String expires = parentPlatformTmp.getExpires(); + parentPlatform.setExpires(expires); + parentPlatform.setId(parentPlatformTmp.getId()); + storager.updateParentPlatformStatus(platformGBId, "注册".equals(action)); + + redisCatchStorage.updatePlatformRegister(parentPlatform); + + redisCatchStorage.updatePlatformKeepalive(parentPlatform); + + parentPlatformCatch.setParentPlatform(parentPlatform); + + redisCatchStorage.updatePlatformCatchInfo(parentPlatformCatch); + } + } + +} diff --git a/src/main/java/com/genersoft/iot/vmp/service/IDeviceService.java b/src/main/java/com/genersoft/iot/vmp/service/IDeviceService.java new file mode 100644 index 00000000..0cb84136 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/service/IDeviceService.java @@ -0,0 +1,24 @@ +package com.genersoft.iot.vmp.service; + +import com.genersoft.iot.vmp.gb28181.bean.Device; + +/** + * 设备相关业务处理 + */ +public interface IDeviceService { + + /** + * 添加目录订阅 + * @param device 设备信息 + * @return + */ + boolean addCatalogSubscribe(Device device); + + /** + * 移除目录订阅 + * @param device 设备信息 + * @return + */ + boolean removeCatalogSubscribe(Device device); + +} diff --git a/src/main/java/com/genersoft/iot/vmp/service/bean/CatalogSubscribeTask.java b/src/main/java/com/genersoft/iot/vmp/service/bean/CatalogSubscribeTask.java new file mode 100644 index 00000000..eb179d7e --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/service/bean/CatalogSubscribeTask.java @@ -0,0 +1,48 @@ +package com.genersoft.iot.vmp.service.bean; + +import com.genersoft.iot.vmp.gb28181.bean.Device; +import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander; +import com.genersoft.iot.vmp.gb28181.utils.XmlUtil; +import org.dom4j.DocumentException; +import org.dom4j.Element; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.sip.ResponseEvent; + +public class CatalogSubscribeTask implements Runnable{ + private final Logger logger = LoggerFactory.getLogger(CatalogSubscribeTask.class); + private Device device; + private ISIPCommander sipCommander; + + public CatalogSubscribeTask(Device device, ISIPCommander sipCommander) { + this.device = device; + this.sipCommander = sipCommander; + } + + @Override + public void run() { + sipCommander.catalogSubscribe(device, eventResult -> { + ResponseEvent event = (ResponseEvent) eventResult.event; + Element rootElement = null; + try { + rootElement = XmlUtil.getRootElement(event.getResponse().getRawContent(), "gb2312"); + } catch (DocumentException e) { + e.printStackTrace(); + } + Element resultElement = rootElement.element("Result"); + String result = resultElement.getText(); + if (result.toUpperCase().equals("OK")){ + // 成功 + logger.info("目录订阅成功: {}", device.getDeviceId()); + }else { + // 失败 + logger.info("目录订阅失败: {}-{}", device.getDeviceId(), result); + } + + },eventResult -> { + // 失败 + logger.warn("目录订阅失败: {}-信令发送失败", device.getDeviceId()); + }); + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java new file mode 100644 index 00000000..595f38c8 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java @@ -0,0 +1,61 @@ +package com.genersoft.iot.vmp.service.impl; + +import com.genersoft.iot.vmp.conf.DynamicTask; +import com.genersoft.iot.vmp.gb28181.bean.Device; +import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander; +import com.genersoft.iot.vmp.service.IDeviceService; +import com.genersoft.iot.vmp.service.bean.CatalogSubscribeTask; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +@Service +public class DeviceServiceImpl implements IDeviceService { + + private final static Logger logger = LoggerFactory.getLogger(DeviceServiceImpl.class); + + @Autowired + private DynamicTask dynamicTask; +; + + @Autowired + private ISIPCommander sipCommander; + + @Override + public boolean addCatalogSubscribe(Device device) { + if (device == null || device.getSubscribeCycleForCatalog() < 0) { + return false; + } + // 添加目录订阅 + CatalogSubscribeTask catalogSubscribeTask = new CatalogSubscribeTask(device, sipCommander); + catalogSubscribeTask.run(); + // 提前开始刷新订阅 + String cron = getCron(device.getSubscribeCycleForCatalog() - 60); + dynamicTask.startCron(device.getDeviceId(), catalogSubscribeTask, cron); + return true; + } + + @Override + public boolean removeCatalogSubscribe(Device device) { + if (device == null || device.getSubscribeCycleForCatalog() < 0) { + return false; + } + dynamicTask.stopCron(device.getDeviceId()); + return true; + } + + public String getCron(int time) { + if (time <= 59) { + return "0/" + time +" * * * * ?"; + }else if (time <= 60* 59) { + int minute = time/(60); + return "0 0/" + minute +" * * * ?"; + }else if (time <= 60* 60* 59) { + int hour = time/(60*60); + return "0 0 0/" + hour +" * * ?"; + }else { + return "0 0/10 * * * ?"; + } + } +}