diff --git a/src/main/java/com/genersoft/iot/vmp/conf/DynamicTask.java b/src/main/java/com/genersoft/iot/vmp/conf/DynamicTask.java index 0c57bdef..b8423829 100644 --- a/src/main/java/com/genersoft/iot/vmp/conf/DynamicTask.java +++ b/src/main/java/com/genersoft/iot/vmp/conf/DynamicTask.java @@ -39,7 +39,7 @@ public class DynamicTask { public void startCron(String key, Runnable task, int cycleForCatalog) { stop(key); // scheduleWithFixedDelay 必须等待上一个任务结束才开始计时period, cycleForCatalog表示执行的间隔 - ScheduledFuture future = threadPoolTaskScheduler.scheduleWithFixedDelay(task, cycleForCatalog * 1000L); + ScheduledFuture future = threadPoolTaskScheduler.scheduleAtFixedRate(task, cycleForCatalog * 1000L); futureMap.put(key, future); runnableMap.put(key, task); } @@ -78,4 +78,7 @@ public class DynamicTask { return futureMap.keySet(); } + public Runnable get(String key) { + return runnableMap.get(key); + } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/online/OnlineEventListener.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/online/OnlineEventListener.java index fc9eb274..e454d490 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/online/OnlineEventListener.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/event/online/OnlineEventListener.java @@ -99,7 +99,10 @@ public class OnlineEventListener implements ApplicationListener { storager.updateDevice(device); // 上线添加订阅 if (device.getSubscribeCycleForCatalog() > 0) { + // 查询在线设备那些开启了订阅,为设备开启定时的目录订阅 deviceService.addCatalogSubscribe(device); + } + if (device.getSubscribeCycleForMobilePosition() > 0) { deviceService.addMobilePositionSubscribe(device); } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/task/ISubscribeTask.java b/src/main/java/com/genersoft/iot/vmp/gb28181/task/ISubscribeTask.java index 4c6a18ac..b6ec4519 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/task/ISubscribeTask.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/task/ISubscribeTask.java @@ -1,5 +1,9 @@ package com.genersoft.iot.vmp.gb28181.task; +import javax.sip.DialogState; + public interface ISubscribeTask extends Runnable{ void stop(); + + DialogState getDialogState(); } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/CatalogSubscribeTask.java b/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/CatalogSubscribeTask.java index 51356d55..433eb3b1 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/CatalogSubscribeTask.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/CatalogSubscribeTask.java @@ -5,6 +5,7 @@ import com.genersoft.iot.vmp.gb28181.task.ISubscribeTask; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.scheduling.annotation.Async; import javax.sip.Dialog; import javax.sip.DialogState; @@ -45,6 +46,7 @@ public class CatalogSubscribeTask implements ISubscribeTask { }); } + @Async @Override public void stop() { /** @@ -72,4 +74,10 @@ public class CatalogSubscribeTask implements ISubscribeTask { }); } } + + @Override + public DialogState getDialogState() { + if (dialog == null) return null; + return dialog.getState(); + } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/MobilePositionSubscribeHandlerTask.java b/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/MobilePositionSubscribeHandlerTask.java index fcac3e9d..569a9b7e 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/MobilePositionSubscribeHandlerTask.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/MobilePositionSubscribeHandlerTask.java @@ -1,16 +1,16 @@ package com.genersoft.iot.vmp.gb28181.task.impl; -import com.genersoft.iot.vmp.gb28181.bean.GbStream; -import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; -import com.genersoft.iot.vmp.gb28181.bean.SubscribeHolder; -import com.genersoft.iot.vmp.gb28181.bean.SubscribeInfo; +import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.task.ISubscribeTask; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; import com.genersoft.iot.vmp.service.bean.GPSMsgInfo; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.scheduling.annotation.Async; -import java.text.SimpleDateFormat; +import javax.sip.DialogState; import java.util.List; /** @@ -18,6 +18,8 @@ import java.util.List; */ public class MobilePositionSubscribeHandlerTask implements ISubscribeTask { + private Logger logger = LoggerFactory.getLogger(MobilePositionSubscribeHandlerTask.class); + private IRedisCatchStorage redisCatchStorage; private IVideoManagerStorage storager; private ISIPCommanderForPlatform sipCommanderForPlatform; @@ -26,8 +28,6 @@ public class MobilePositionSubscribeHandlerTask implements ISubscribeTask { private String sn; private String key; - private final SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); - public MobilePositionSubscribeHandlerTask(IRedisCatchStorage redisCatchStorage, ISIPCommanderForPlatform sipCommanderForPlatform, IVideoManagerStorage storager, String platformId, String sn, String key, SubscribeHolder subscribeInfo) { this.redisCatchStorage = redisCatchStorage; this.storager = storager; @@ -38,40 +38,51 @@ public class MobilePositionSubscribeHandlerTask implements ISubscribeTask { this.subscribeHolder = subscribeInfo; } + @Async @Override public void run() { + logger.info("执行MobilePositionSubscribeHandlerTask"); SubscribeInfo subscribe = subscribeHolder.getMobilePositionSubscribe(platformId); - if (subscribe != null) { ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(platformId); - if (parentPlatform == null || parentPlatform.isStatus()) { - // TODO 暂时只处理视频流的回复,后续增加对国标设备的支持 - List gbStreams = storager.queryGbStreamListInPlatform(platformId); - if (gbStreams.size() > 0) { - for (GbStream gbStream : gbStreams) { - String gbId = gbStream.getGbId(); - GPSMsgInfo gpsMsgInfo = redisCatchStorage.getGpsMsgInfo(gbId); - if (gpsMsgInfo != null) { - // 发送GPS消息 - sipCommanderForPlatform.sendNotifyMobilePosition(parentPlatform, gpsMsgInfo, subscribe); - }else { - // 没有在redis找到新的消息就使用数据库的消息 - gpsMsgInfo = new GPSMsgInfo(); - gpsMsgInfo.setId(gbId); - gpsMsgInfo.setLat(gbStream.getLongitude()); - gpsMsgInfo.setLng(gbStream.getLongitude()); - // 发送GPS消息 - sipCommanderForPlatform.sendNotifyMobilePosition(parentPlatform, gpsMsgInfo, subscribe); - } + if (parentPlatform == null ) { + logger.info("发送订阅时未找到平台信息:{}", platformId); + return; + } + if (!parentPlatform.isStatus()) { + logger.info("发送订阅时发现平台已经离线:{}", platformId); + return; + } + // TODO 暂时只处理视频流的回复,后续增加对国标设备的支持 + List gbStreams = storager.queryGbStreamListInPlatform(platformId); + if (gbStreams.size() == 0) { + logger.info("发送订阅时发现平台已经没有关联的直播流:{}", platformId); + return; + } + for (GbStream gbStream : gbStreams) { + String gbId = gbStream.getGbId(); + GPSMsgInfo gpsMsgInfo = redisCatchStorage.getGpsMsgInfo(gbId); + if (gpsMsgInfo != null) { // 无最新位置不发送 + // 经纬度都为0不发送 + if (gpsMsgInfo.getLng() == 0 && gpsMsgInfo.getLat() == 0) { + continue; } + // 发送GPS消息 + sipCommanderForPlatform.sendNotifyMobilePosition(parentPlatform, gpsMsgInfo, subscribe); } } } + logger.info("结束执行MobilePositionSubscribeHandlerTask"); } @Override public void stop() { } + + @Override + public DialogState getDialogState() { + return null; + } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/MobilePositionSubscribeTask.java b/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/MobilePositionSubscribeTask.java index e86c601c..7203ee2d 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/MobilePositionSubscribeTask.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/MobilePositionSubscribeTask.java @@ -6,6 +6,7 @@ import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander; import org.dom4j.Element; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.scheduling.annotation.Async; import javax.sip.Dialog; import javax.sip.DialogState; @@ -25,6 +26,7 @@ public class MobilePositionSubscribeTask implements ISubscribeTask { this.sipCommander = sipCommander; } + @Async @Override public void run() { sipCommander.mobilePositionSubscribe(device, dialog, eventResult -> { @@ -74,4 +76,9 @@ public class MobilePositionSubscribeTask implements ISubscribeTask { }); } } + @Override + public DialogState getDialogState() { + if (dialog == null) return null; + return dialog.getState(); + } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java index 123d0e78..027238b7 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java @@ -1566,17 +1566,28 @@ public class SIPCommander implements ISIPCommander { cmdXml.append("" + device.getDeviceId() + "\r\n"); cmdXml.append("\r\n"); - String tm = Long.toString(System.currentTimeMillis()); - CallIdHeader callIdHeader = device.getTransport().equals("TCP") ? tcpSipProvider.getNewCallId() - : udpSipProvider.getNewCallId(); + Request request; + if (dialog != null) { + logger.info("发送目录订阅消息时 dialog的状态为: {}", dialog.getState()); + request = dialog.createRequest(Request.SUBSCRIBE); + ContentTypeHeader contentTypeHeader = sipFactory.createHeaderFactory().createContentTypeHeader("Application", "MANSCDP+xml"); + request.setContent(cmdXml.toString(), contentTypeHeader); + ExpiresHeader expireHeader = sipFactory.createHeaderFactory().createExpiresHeader(device.getSubscribeCycleForMobilePosition()); + request.addHeader(expireHeader); + }else { + String tm = Long.toString(System.currentTimeMillis()); - // 有效时间默认为60秒以上 - Request request = headerProvider.createSubscribeRequest(device, cmdXml.toString(), "z9hG4bK-viaPos-" + tm, - "fromTagPos" + tm, null, device.getSubscribeCycleForCatalog(), "Catalog" , - callIdHeader); - transmitRequest(device, request, errorEvent, okEvent); + CallIdHeader callIdHeader = device.getTransport().equals("TCP") ? tcpSipProvider.getNewCallId() + : udpSipProvider.getNewCallId(); + + // 有效时间默认为60秒以上 + request = headerProvider.createSubscribeRequest(device, cmdXml.toString(), "z9hG4bK-viaPos-" + tm, + "fromTagPos" + tm, null, device.getSubscribeCycleForCatalog(), "Catalog" , + callIdHeader); + } + transmitRequest(device, request, errorEvent, okEvent); return true; } catch ( NumberFormatException | ParseException | InvalidArgumentException | SipException e) { diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java index 0dc11e01..7768ed42 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java @@ -405,7 +405,7 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform { CallIdHeader callIdHeader = parentPlatform.getTransport().equals("TCP") ? tcpSipProvider.getNewCallId() : udpSipProvider.getNewCallId(); callIdHeader.setCallId(subscribeInfo.getCallId()); - logger.info("[发送Notify-MobilePosition] {}/{}->{},{}", parentPlatform.getServerGBId(), gpsMsgInfo.getId(), gpsMsgInfo.getLng(), gpsMsgInfo.getLat()); + logger.info("[发送 移动位置订阅] {}/{}->{},{}", parentPlatform.getServerGBId(), gpsMsgInfo.getId(), gpsMsgInfo.getLng(), gpsMsgInfo.getLat()); sendNotify(parentPlatform, deviceStatusXml.toString(), subscribeInfo, eventResult -> { logger.error("发送NOTIFY通知消息失败。错误:{} {}", eventResult.statusCode, eventResult.msg); }, null); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java index 8c87be99..8e03510e 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java @@ -146,7 +146,7 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements } else { mobilePosition.setAltitude(0.0); } - logger.info("[收到Notify-MobilePosition]:{}/{}->{}.{}", mobilePosition.getDeviceId(), mobilePosition.getChannelId(), + logger.info("[收到 移动位置订阅]:{}/{}->{}.{}", mobilePosition.getDeviceId(), mobilePosition.getChannelId(), mobilePosition.getLongitude(), mobilePosition.getLatitude()); mobilePosition.setReportSource("Mobile Position"); BaiduPoint bp = GpsUtil.Wgs84ToBd09(String.valueOf(mobilePosition.getLongitude()), String.valueOf(mobilePosition.getLatitude())); @@ -281,7 +281,7 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements Element eventElement = itemDevice.element("Event"); DeviceChannel channel = XmlUtil.channelContentHander(itemDevice); channel.setDeviceId(device.getDeviceId()); - logger.info("[收到Notify-Catalog]:{}/{}", device.getDeviceId(), channel.getChannelId()); + logger.info("[收到 目录订阅]:{}/{}", device.getDeviceId(), channel.getChannelId()); switch (eventElement.getText().toUpperCase()) { case CatalogEvent.ON: // 上线 logger.info("收到来自设备【{}】的通道【{}】上线通知", device.getDeviceId(), channel.getChannelId()); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/SubscribeRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/SubscribeRequestProcessor.java index 5ae80530..979849ed 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/SubscribeRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/SubscribeRequestProcessor.java @@ -150,7 +150,7 @@ public class SubscribeRequestProcessor extends SIPRequestProcessorParent impleme } String sn = XmlUtil.getText(rootElement, "SN"); String key = VideoManagerConstants.SIP_SUBSCRIBE_PREFIX + userSetting.getServerId() + "_MobilePosition_" + platformId; - logger.info("[notify-MobilePosition]: {}", platformId); + logger.info("[回复 移动位置订阅]: {}", platformId); StringBuilder resultXml = new StringBuilder(200); resultXml.append("\r\n") .append("\r\n") @@ -161,12 +161,21 @@ public class SubscribeRequestProcessor extends SIPRequestProcessorParent impleme .append("\r\n"); if (subscribeInfo.getExpires() > 0) { - if (subscribeHolder.getMobilePositionSubscribe(platformId) != null) { - dynamicTask.stop(key); + + if (subscribeHolder.getMobilePositionSubscribe(platformId) == null ) { + String interval = XmlUtil.getText(rootElement, "Interval"); // GPS上报时间间隔 + subscribeHolder.putMobilePositionSubscribe(platformId, subscribeInfo); + dynamicTask.startCron(key, new MobilePositionSubscribeHandlerTask(redisCatchStorage, sipCommanderForPlatform, storager, platformId, sn, key, subscribeHolder), Integer.parseInt(interval)); + }else { + if (subscribeHolder.getMobilePositionSubscribe(platformId).getDialog() != null + && subscribeHolder.getMobilePositionSubscribe(platformId).getDialog().getState() != null + && !subscribeHolder.getMobilePositionSubscribe(platformId).getDialog().getState().equals(DialogState.CONFIRMED)) { + dynamicTask.stop(key); + String interval = XmlUtil.getText(rootElement, "Interval"); // GPS上报时间间隔 + subscribeHolder.putMobilePositionSubscribe(platformId, subscribeInfo); + dynamicTask.startCron(key, new MobilePositionSubscribeHandlerTask(redisCatchStorage, sipCommanderForPlatform, storager, platformId, sn, key, subscribeHolder), Integer.parseInt(interval)); + } } - String interval = XmlUtil.getText(rootElement, "Interval"); // GPS上报时间间隔 - dynamicTask.startCron(key, new MobilePositionSubscribeHandlerTask(redisCatchStorage, sipCommanderForPlatform, storager, platformId, sn, key, subscribeHolder), Integer.parseInt(interval) -1 ); - subscribeHolder.putMobilePositionSubscribe(platformId, subscribeInfo); }else if (subscribeInfo.getExpires() == 0) { dynamicTask.stop(key); subscribeHolder.removeMobilePositionSubscribe(platformId); @@ -203,7 +212,7 @@ public class SubscribeRequestProcessor extends SIPRequestProcessorParent impleme } String sn = XmlUtil.getText(rootElement, "SN"); String key = VideoManagerConstants.SIP_SUBSCRIBE_PREFIX + userSetting.getServerId() + "_Catalog_" + platformId; - logger.info("[notify-Catalog]: {}", platformId); + logger.info("[回复 目录订阅]: {}/{}", platformId, deviceID); StringBuilder resultXml = new StringBuilder(200); resultXml.append("\r\n") .append("\r\n") diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/device/DeviceQuery.java b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/device/DeviceQuery.java index 1b6d31eb..12136f47 100644 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/device/DeviceQuery.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/device/DeviceQuery.java @@ -4,8 +4,13 @@ import com.alibaba.fastjson.JSONObject; import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; +import com.genersoft.iot.vmp.gb28181.bean.SubscribeHolder; import com.genersoft.iot.vmp.gb28181.bean.SyncStatus; import com.genersoft.iot.vmp.gb28181.event.DeviceOffLineDetector; +import com.genersoft.iot.vmp.gb28181.task.ISubscribeTask; +import com.genersoft.iot.vmp.gb28181.task.impl.CatalogSubscribeTask; +import com.genersoft.iot.vmp.gb28181.task.impl.MobilePositionSubscribeHandlerTask; +import com.genersoft.iot.vmp.gb28181.task.impl.MobilePositionSubscribeTask; import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage; import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander; @@ -29,9 +34,8 @@ import org.springframework.util.StringUtils; import org.springframework.web.bind.annotation.*; import org.springframework.web.context.request.async.DeferredResult; -import java.util.List; -import java.util.Set; -import java.util.UUID; +import javax.sip.DialogState; +import java.util.*; @Api(tags = "国标设备查询", value = "国标设备查询") @SuppressWarnings("rawtypes") @@ -63,6 +67,9 @@ public class DeviceQuery { @Autowired private DynamicTask dynamicTask; + @Autowired + private SubscribeHolder subscribeHolder; + /** * 使用ID查询国标设备 * @param deviceId 国标ID @@ -469,4 +476,29 @@ public class DeviceQuery { } return wvpResult; } + + @GetMapping("/{deviceId}/subscribe_info") + @ApiOperation(value = "获取设备的订阅状态", notes = "获取设备的订阅状态") + public WVPResult> getSubscribeInfo(@PathVariable String deviceId) { + Set allKeys = dynamicTask.getAllKeys(); + Map dialogStateMap = new HashMap<>(); + for (String key : allKeys) { + if (key.startsWith(deviceId)) { + ISubscribeTask subscribeTask = (ISubscribeTask)dynamicTask.get(key); + DialogState dialogState = subscribeTask.getDialogState(); + if (dialogState == null) { + continue; + } + if (subscribeTask instanceof CatalogSubscribeTask) { + dialogStateMap.put("catalog", dialogState.toString()); + }else if (subscribeTask instanceof MobilePositionSubscribeTask) { + dialogStateMap.put("mobilePosition", dialogState.toString()); + } + } + } + WVPResult> wvpResult = new WVPResult<>(); + wvpResult.setCode(0); + wvpResult.setData(dialogStateMap); + return wvpResult; + } }