Browse Source

Merge branch 'wvp-28181-2.0' into wvp-28181-2.0

pull/388/head
648540858 3 years ago
committed by GitHub
parent
commit
45e9fc880c
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      src/main/java/com/genersoft/iot/vmp/conf/MediaConfig.java
  2. 5
      src/main/java/com/genersoft/iot/vmp/gb28181/SipLayer.java
  3. 26
      src/main/java/com/genersoft/iot/vmp/gb28181/auth/RegisterLogicHandler.java
  4. 2
      src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java
  5. 37
      src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeHolder.java
  6. 26
      src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeInfo.java
  7. 16
      src/main/java/com/genersoft/iot/vmp/gb28181/event/EventPublisher.java
  8. 2
      src/main/java/com/genersoft/iot/vmp/gb28181/event/offline/OfflineEventListener.java
  9. 9
      src/main/java/com/genersoft/iot/vmp/gb28181/event/online/OnlineEvent.java
  10. 26
      src/main/java/com/genersoft/iot/vmp/gb28181/event/online/OnlineEventListener.java
  11. 14
      src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java
  12. 8
      src/main/java/com/genersoft/iot/vmp/gb28181/task/GPSSubscribeTask.java
  13. 1
      src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPProcessorObserver.java
  14. 106
      src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderPlarformProvider.java
  15. 3
      src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderProvider.java
  16. 2
      src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java
  17. 145
      src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java
  18. 7
      src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java
  19. 8
      src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/RegisterRequestProcessor.java
  20. 55
      src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/SubscribeRequestProcessor.java
  21. 6
      src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/MessageRequestProcessor.java
  22. 75
      src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/CatalogResponseMessageHandler.java
  23. 4
      src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java
  24. 10
      src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java
  25. 60
      src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java

4
src/main/java/com/genersoft/iot/vmp/conf/MediaConfig.java

@ -60,8 +60,8 @@ public class MediaConfig{
@Value("${media.secret}") @Value("${media.secret}")
private String secret; private String secret;
@Value("${media.stream-none-reader-delay-ms:18000}") @Value("${media.stream-none-reader-delay-ms:10000}")
private int streamNoneReaderDelayMS = 18000; private int streamNoneReaderDelayMS = 10000;
@Value("${media.rtp.enable}") @Value("${media.rtp.enable}")
private boolean rtpEnable; private boolean rtpEnable;

5
src/main/java/com/genersoft/iot/vmp/gb28181/SipLayer.java

@ -47,7 +47,7 @@ public class SipLayer{
Properties properties = new Properties(); Properties properties = new Properties();
properties.setProperty("javax.sip.STACK_NAME", "GB28181_SIP"); properties.setProperty("javax.sip.STACK_NAME", "GB28181_SIP");
properties.setProperty("javax.sip.IP_ADDRESS", sipConfig.getMonitorIp()); properties.setProperty("javax.sip.IP_ADDRESS", sipConfig.getMonitorIp());
properties.setProperty("gov.nist.javax.sip.LOG_MESSAGE_CONTENT", "false"); properties.setProperty("gov.nist.javax.sip.LOG_MESSAGE_CONTENT", "true");
/** /**
* sip_server_log.log sip_debug_log.log public static final int TRACE_NONE = * sip_server_log.log sip_debug_log.log public static final int TRACE_NONE =
* 0; public static final int TRACE_MESSAGES = 16; public static final int * 0; public static final int TRACE_MESSAGES = 16; public static final int
@ -57,6 +57,7 @@ public class SipLayer{
properties.setProperty("gov.nist.javax.sip.SERVER_LOG", "sip_server_log"); properties.setProperty("gov.nist.javax.sip.SERVER_LOG", "sip_server_log");
properties.setProperty("gov.nist.javax.sip.DEBUG_LOG", "sip_debug_log"); properties.setProperty("gov.nist.javax.sip.DEBUG_LOG", "sip_debug_log");
sipStack = (SipStackImpl) sipFactory.createSipStack(properties); sipStack = (SipStackImpl) sipFactory.createSipStack(properties);
return sipStack; return sipStack;
} }
@ -70,6 +71,7 @@ public class SipLayer{
tcpSipProvider = (SipProviderImpl)sipStack.createSipProvider(tcpListeningPoint); tcpSipProvider = (SipProviderImpl)sipStack.createSipProvider(tcpListeningPoint);
tcpSipProvider.setDialogErrorsAutomaticallyHandled(); tcpSipProvider.setDialogErrorsAutomaticallyHandled();
tcpSipProvider.addSipListener(sipProcessorObserver); tcpSipProvider.addSipListener(sipProcessorObserver);
// tcpSipProvider.setAutomaticDialogSupportEnabled(false);
logger.info("Sip Server TCP 启动成功 port {" + sipConfig.getMonitorIp() + ":" + sipConfig.getPort() + "}"); logger.info("Sip Server TCP 启动成功 port {" + sipConfig.getMonitorIp() + ":" + sipConfig.getPort() + "}");
} catch (TransportNotSupportedException e) { } catch (TransportNotSupportedException e) {
e.printStackTrace(); e.printStackTrace();
@ -93,6 +95,7 @@ public class SipLayer{
udpListeningPoint = sipStack.createListeningPoint(sipConfig.getMonitorIp(), sipConfig.getPort(), "UDP"); udpListeningPoint = sipStack.createListeningPoint(sipConfig.getMonitorIp(), sipConfig.getPort(), "UDP");
udpSipProvider = (SipProviderImpl)sipStack.createSipProvider(udpListeningPoint); udpSipProvider = (SipProviderImpl)sipStack.createSipProvider(udpListeningPoint);
udpSipProvider.addSipListener(sipProcessorObserver); udpSipProvider.addSipListener(sipProcessorObserver);
// udpSipProvider.setAutomaticDialogSupportEnabled(false);
} catch (TransportNotSupportedException e) { } catch (TransportNotSupportedException e) {
e.printStackTrace(); e.printStackTrace();
} catch (InvalidArgumentException e) { } catch (InvalidArgumentException e) {

26
src/main/java/com/genersoft/iot/vmp/gb28181/auth/RegisterLogicHandler.java

@ -27,18 +27,18 @@ public class RegisterLogicHandler {
public void onRegister(Device device) { public void onRegister(Device device) {
// 只有第一次注册时调用查询设备信息,如需更新调用更新API接口 // 只有第一次注册时调用查询设备信息,如需更新调用更新API接口
// TODO 此处错误无法获取到通道 // // TODO 此处错误无法获取到通道
Device device1 = storager.queryVideoDevice(device.getDeviceId()); // Device device1 = storager.queryVideoDevice(device.getDeviceId());
if (device.isFirsRegister()) { // if (device.isFirsRegister()) {
logger.info("[{}] 首次注册,查询设备信息以及通道信息", device.getDeviceId()); // logger.info("[{}] 首次注册,查询设备信息以及通道信息", device.getDeviceId());
try { // try {
Thread.sleep(100); // Thread.sleep(100);
cmder.deviceInfoQuery(device); // cmder.deviceInfoQuery(device);
Thread.sleep(100); // Thread.sleep(100);
cmder.catalogQuery(device, null); // cmder.catalogQuery(device, null);
} catch (InterruptedException e) { // } catch (InterruptedException e) {
e.printStackTrace(); // e.printStackTrace();
} // }
} // }
} }
} }

2
src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java

@ -220,4 +220,6 @@ public class SendRtpItem {
public void setDialog(byte[] dialog) { public void setDialog(byte[] dialog) {
this.dialog = dialog; this.dialog = dialog;
} }
} }

37
src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeHolder.java

@ -0,0 +1,37 @@
package com.genersoft.iot.vmp.gb28181.bean;
import org.springframework.stereotype.Component;
import java.util.concurrent.ConcurrentHashMap;
@Component
public class SubscribeHolder {
private static ConcurrentHashMap<String, SubscribeInfo> catalogMap = new ConcurrentHashMap<>();
private static ConcurrentHashMap<String, SubscribeInfo> mobilePositionMap = new ConcurrentHashMap<>();
public void putCatalogSubscribe(String platformId, SubscribeInfo subscribeInfo) {
catalogMap.put(platformId, subscribeInfo);
}
public SubscribeInfo getCatalogSubscribe(String platformId) {
return catalogMap.get(platformId);
}
public void removeCatalogSubscribe(String platformId) {
catalogMap.remove(platformId);
}
public void putMobilePositionSubscribe(String platformId, SubscribeInfo subscribeInfo) {
mobilePositionMap.put(platformId, subscribeInfo);
}
public SubscribeInfo getMobilePositionSubscribe(String platformId) {
return mobilePositionMap.get(platformId);
}
public void removeMobilePositionSubscribe(String platformId) {
mobilePositionMap.remove(platformId);
}
}

26
src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeInfo.java

@ -1,13 +1,15 @@
package com.genersoft.iot.vmp.gb28181.bean; package com.genersoft.iot.vmp.gb28181.bean;
import com.genersoft.iot.vmp.utils.SerializeUtils;
import javax.sip.Dialog;
import javax.sip.RequestEvent; import javax.sip.RequestEvent;
import javax.sip.ServerTransaction;
import javax.sip.header.*; import javax.sip.header.*;
import javax.sip.message.Request; import javax.sip.message.Request;
public class SubscribeInfo { public class SubscribeInfo {
public SubscribeInfo() {
}
public SubscribeInfo(RequestEvent evt, String id) { public SubscribeInfo(RequestEvent evt, String id) {
this.id = id; this.id = id;
@ -23,6 +25,8 @@ public class SubscribeInfo {
this.eventType = eventHeader.getEventType(); this.eventType = eventHeader.getEventType();
ViaHeader viaHeader = (ViaHeader)request.getHeader(ViaHeader.NAME); ViaHeader viaHeader = (ViaHeader)request.getHeader(ViaHeader.NAME);
this.branch = viaHeader.getBranch(); this.branch = viaHeader.getBranch();
this.transaction = evt.getServerTransaction();
this.dialog = evt.getDialog();
} }
private String id; private String id;
@ -33,6 +37,8 @@ public class SubscribeInfo {
private String fromTag; private String fromTag;
private String toTag; private String toTag;
private String branch; private String branch;
private ServerTransaction transaction;
private Dialog dialog;
public String getId() { public String getId() {
return id; return id;
@ -97,4 +103,20 @@ public class SubscribeInfo {
public void setBranch(String branch) { public void setBranch(String branch) {
this.branch = branch; this.branch = branch;
} }
public ServerTransaction getTransaction() {
return transaction;
}
public void setTransaction(ServerTransaction transaction) {
this.transaction = transaction;
}
public Dialog getDialog() {
return dialog;
}
public void setDialog(Dialog dialog) {
this.dialog = dialog;
}
} }

16
src/main/java/com/genersoft/iot/vmp/gb28181/event/EventPublisher.java

@ -33,13 +33,21 @@ public class EventPublisher {
@Autowired @Autowired
private ApplicationEventPublisher applicationEventPublisher; private ApplicationEventPublisher applicationEventPublisher;
public void onlineEventPublish(Device device, String from) { public void onlineEventPublish(Device device, String from, int expires) {
OnlineEvent onEvent = new OnlineEvent(this); OnlineEvent onEvent = new OnlineEvent(this);
onEvent.setDevice(device); onEvent.setDevice(device);
onEvent.setFrom(from); onEvent.setFrom(from);
onEvent.setExpires(expires);
applicationEventPublisher.publishEvent(onEvent); applicationEventPublisher.publishEvent(onEvent);
} }
public void onlineEventPublish(Device device, String from) {
OnlineEvent onEvent = new OnlineEvent(this);
onEvent.setDevice(device);
onEvent.setFrom(from);
applicationEventPublisher.publishEvent(onEvent);
}
public void outlineEventPublish(String deviceId, String from){ public void outlineEventPublish(String deviceId, String from){
OfflineEvent outEvent = new OfflineEvent(this); OfflineEvent outEvent = new OfflineEvent(this);
outEvent.setDeviceId(deviceId); outEvent.setDeviceId(deviceId);
@ -107,6 +115,12 @@ public class EventPublisher {
} }
/**
*
* @param platformId
* @param deviceChannels
* @param type
*/
public void catalogEventPublish(String platformId, List<DeviceChannel> deviceChannels, String type) { public void catalogEventPublish(String platformId, List<DeviceChannel> deviceChannels, String type) {
CatalogEvent outEvent = new CatalogEvent(this); CatalogEvent outEvent = new CatalogEvent(this);
List<DeviceChannel> channels = new ArrayList<>(); List<DeviceChannel> channels = new ArrayList<>();

2
src/main/java/com/genersoft/iot/vmp/gb28181/event/offline/OfflineEventListener.java

@ -91,7 +91,7 @@ public class OfflineEventListener implements ApplicationListener<OfflineEvent> {
// 离线释放所有ssrc // 离线释放所有ssrc
List<SsrcTransaction> ssrcTransactions = streamSession.getSsrcTransactionForAll(event.getDeviceId(), null, null, null); List<SsrcTransaction> ssrcTransactions = streamSession.getSsrcTransactionForAll(event.getDeviceId(), null, null, null);
if (ssrcTransactions.size() > 0) { if (ssrcTransactions != null && ssrcTransactions.size() > 0) {
for (SsrcTransaction ssrcTransaction : ssrcTransactions) { for (SsrcTransaction ssrcTransaction : ssrcTransactions) {
mediaServerService.releaseSsrc(ssrcTransaction.getMediaServerId(), ssrcTransaction.getSsrc()); mediaServerService.releaseSsrc(ssrcTransaction.getMediaServerId(), ssrcTransaction.getSsrc());
mediaServerService.closeRTPServer(event.getDeviceId(), ssrcTransaction.getChannelId(), ssrcTransaction.getStream()); mediaServerService.closeRTPServer(event.getDeviceId(), ssrcTransaction.getChannelId(), ssrcTransaction.getStream());

9
src/main/java/com/genersoft/iot/vmp/gb28181/event/online/OnlineEvent.java

@ -23,6 +23,8 @@ public class OnlineEvent extends ApplicationEvent {
private String from; private String from;
private int expires;
public Device getDevice() { public Device getDevice() {
return device; return device;
} }
@ -39,4 +41,11 @@ public class OnlineEvent extends ApplicationEvent {
this.from = from; this.from = from;
} }
public int getExpires() {
return expires;
}
public void setExpires(int expires) {
this.expires = expires;
}
} }

26
src/main/java/com/genersoft/iot/vmp/gb28181/event/online/OnlineEventListener.java

@ -6,6 +6,7 @@ import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
import com.genersoft.iot.vmp.gb28181.event.EventPublisher; import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent; import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander;
import com.genersoft.iot.vmp.service.IDeviceService; import com.genersoft.iot.vmp.service.IDeviceService;
import com.genersoft.iot.vmp.storager.dao.dto.User; import com.genersoft.iot.vmp.storager.dao.dto.User;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -51,6 +52,9 @@ public class OnlineEventListener implements ApplicationListener<OnlineEvent> {
@Autowired @Autowired
private EventPublisher eventPublisher; private EventPublisher eventPublisher;
@Autowired
private SIPCommander cmder;
private SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); private SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
@Override @Override
@ -62,13 +66,21 @@ public class OnlineEventListener implements ApplicationListener<OnlineEvent> {
Device device = event.getDevice(); Device device = event.getDevice();
if (device == null) return; if (device == null) return;
String key = VideoManagerConstants.KEEPLIVEKEY_PREFIX + userSetup.getServerId() + "_" + event.getDevice().getDeviceId(); String key = VideoManagerConstants.KEEPLIVEKEY_PREFIX + userSetup.getServerId() + "_" + event.getDevice().getDeviceId();
Device deviceInStore = storager.queryVideoDevice(device.getDeviceId());
device.setOnline(1);
// 处理上线监听
storager.updateDevice(device);
switch (event.getFrom()) { switch (event.getFrom()) {
// 注册时触发的在线事件,先在redis中增加超时超时监听 // 注册时触发的在线事件,先在redis中增加超时超时监听
case VideoManagerConstants.EVENT_ONLINE_REGISTER: case VideoManagerConstants.EVENT_ONLINE_REGISTER:
// 超时时间 // 超时时间
redis.set(key, event.getDevice().getDeviceId(), sipConfig.getKeepaliveTimeOut()); redis.set(key, event.getDevice().getDeviceId(), sipConfig.getKeepaliveTimeOut());
device.setRegisterTime(format.format(System.currentTimeMillis())); device.setRegisterTime(format.format(System.currentTimeMillis()));
if (deviceInStore == null) { //第一次上线
logger.info("[{}] 首次注册,查询设备信息以及通道信息", device.getDeviceId());
cmder.deviceInfoQuery(device);
cmder.catalogQuery(device, null);
}
break; break;
// 设备主动发送心跳触发的在线事件 // 设备主动发送心跳触发的在线事件
case VideoManagerConstants.EVENT_ONLINE_KEEPLIVE: case VideoManagerConstants.EVENT_ONLINE_KEEPLIVE:
@ -87,19 +99,11 @@ public class OnlineEventListener implements ApplicationListener<OnlineEvent> {
break; break;
} }
device.setOnline(1); List<DeviceChannel> deviceChannelList = storager.queryOnlineChannelsByDeviceId(device.getDeviceId());
Device deviceInStore = storager.queryVideoDevice(device.getDeviceId()); eventPublisher.catalogEventPublish(null, deviceChannelList, CatalogEvent.ON);
if (deviceInStore != null && deviceInStore.getOnline() == 0) {
List<DeviceChannel> deviceChannelList = storager.queryOnlineChannelsByDeviceId(device.getDeviceId());
eventPublisher.catalogEventPublish(null, deviceChannelList, CatalogEvent.ON);
}
// 处理上线监听
storager.updateDevice(device);
// 上线添加订阅 // 上线添加订阅
if (device.getSubscribeCycleForCatalog() > 0) { if (device.getSubscribeCycleForCatalog() > 0) {
deviceService.addCatalogSubscribe(device); deviceService.addCatalogSubscribe(device);
} }
} }
} }

14
src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java

@ -52,6 +52,9 @@ public class CatalogEventLister implements ApplicationListener<CatalogEvent> {
@Autowired @Autowired
private IGbStreamService gbStreamService; private IGbStreamService gbStreamService;
@Autowired
private SubscribeHolder subscribeHolder;
@Override @Override
public void onApplicationEvent(CatalogEvent event) { public void onApplicationEvent(CatalogEvent event) {
SubscribeInfo subscribe = null; SubscribeInfo subscribe = null;
@ -62,7 +65,8 @@ public class CatalogEventLister implements ApplicationListener<CatalogEvent> {
parentPlatform = storager.queryParentPlatByServerGBId(event.getPlatformId()); parentPlatform = storager.queryParentPlatByServerGBId(event.getPlatformId());
if (parentPlatform != null && !parentPlatform.isStatus())return; if (parentPlatform != null && !parentPlatform.isStatus())return;
String key = VideoManagerConstants.SIP_SUBSCRIBE_PREFIX + userSetup.getServerId() + "_Catalog_" + event.getPlatformId(); String key = VideoManagerConstants.SIP_SUBSCRIBE_PREFIX + userSetup.getServerId() + "_Catalog_" + event.getPlatformId();
subscribe = redisCatchStorage.getSubscribe(key); // subscribe = redisCatchStorage.getSubscribe(key);
subscribe = subscribeHolder.getCatalogSubscribe(event.getPlatformId());
if (subscribe == null) { if (subscribe == null) {
logger.debug("发送订阅消息时发现订阅信息已经不存在"); logger.debug("发送订阅消息时发现订阅信息已经不存在");
@ -114,7 +118,8 @@ public class CatalogEventLister implements ApplicationListener<CatalogEvent> {
if (parentPlatforms != null && parentPlatforms.size() > 0) { if (parentPlatforms != null && parentPlatforms.size() > 0) {
for (ParentPlatform platform : parentPlatforms) { for (ParentPlatform platform : parentPlatforms) {
String key = VideoManagerConstants.SIP_SUBSCRIBE_PREFIX + userSetup.getServerId() + "_Catalog_" + platform.getServerGBId(); String key = VideoManagerConstants.SIP_SUBSCRIBE_PREFIX + userSetup.getServerId() + "_Catalog_" + platform.getServerGBId();
SubscribeInfo subscribeInfo = redisCatchStorage.getSubscribe(key); // SubscribeInfo subscribeInfo = redisCatchStorage.getSubscribe(key);
SubscribeInfo subscribeInfo = subscribeHolder.getCatalogSubscribe(platform.getServerGBId());
if (subscribeInfo == null) continue; if (subscribeInfo == null) continue;
logger.info("[Catalog事件: {}]平台:{},影响通道{}", event.getType(), platform.getServerGBId(), gbId); logger.info("[Catalog事件: {}]平台:{},影响通道{}", event.getType(), platform.getServerGBId(), gbId);
List<DeviceChannel> deviceChannelList = new ArrayList<>(); List<DeviceChannel> deviceChannelList = new ArrayList<>();
@ -153,8 +158,9 @@ public class CatalogEventLister implements ApplicationListener<CatalogEvent> {
List<ParentPlatform> parentPlatforms = parentPlatformMap.get(gbId); List<ParentPlatform> parentPlatforms = parentPlatformMap.get(gbId);
if (parentPlatforms != null && parentPlatforms.size() > 0) { if (parentPlatforms != null && parentPlatforms.size() > 0) {
for (ParentPlatform platform : parentPlatforms) { for (ParentPlatform platform : parentPlatforms) {
String key = VideoManagerConstants.SIP_SUBSCRIBE_PREFIX + userSetup.getServerId() + "_Catalog_" + platform.getServerGBId(); // String key = VideoManagerConstants.SIP_SUBSCRIBE_PREFIX + userSetup.getServerId() + "_Catalog_" + platform.getServerGBId();
SubscribeInfo subscribeInfo = redisCatchStorage.getSubscribe(key); // SubscribeInfo subscribeInfo = redisCatchStorage.getSubscribe(key);
SubscribeInfo subscribeInfo = subscribeHolder.getCatalogSubscribe(event.getPlatformId());
if (subscribeInfo == null) continue; if (subscribeInfo == null) continue;
logger.info("[Catalog事件: {}]平台:{},影响通道{}", event.getType(), platform.getServerGBId(), gbId); logger.info("[Catalog事件: {}]平台:{},影响通道{}", event.getType(), platform.getServerGBId(), gbId);
List<DeviceChannel> deviceChannelList = new ArrayList<>(); List<DeviceChannel> deviceChannelList = new ArrayList<>();

8
src/main/java/com/genersoft/iot/vmp/gb28181/task/GPSSubscribeTask.java

@ -2,6 +2,7 @@ package com.genersoft.iot.vmp.gb28181.task;
import com.genersoft.iot.vmp.gb28181.bean.GbStream; import com.genersoft.iot.vmp.gb28181.bean.GbStream;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
import com.genersoft.iot.vmp.gb28181.bean.SubscribeHolder;
import com.genersoft.iot.vmp.gb28181.bean.SubscribeInfo; import com.genersoft.iot.vmp.gb28181.bean.SubscribeInfo;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
import com.genersoft.iot.vmp.service.bean.GPSMsgInfo; import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
@ -16,25 +17,28 @@ public class GPSSubscribeTask implements Runnable{
private IRedisCatchStorage redisCatchStorage; private IRedisCatchStorage redisCatchStorage;
private IVideoManagerStorager storager; private IVideoManagerStorager storager;
private ISIPCommanderForPlatform sipCommanderForPlatform; private ISIPCommanderForPlatform sipCommanderForPlatform;
private SubscribeHolder subscribeHolder;
private String platformId; private String platformId;
private String sn; private String sn;
private String key; private String key;
private final SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); private final SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
public GPSSubscribeTask(IRedisCatchStorage redisCatchStorage, ISIPCommanderForPlatform sipCommanderForPlatform, IVideoManagerStorager storager, String platformId, String sn, String key) { public GPSSubscribeTask(IRedisCatchStorage redisCatchStorage, ISIPCommanderForPlatform sipCommanderForPlatform, IVideoManagerStorager storager, String platformId, String sn, String key, SubscribeHolder subscribeInfo) {
this.redisCatchStorage = redisCatchStorage; this.redisCatchStorage = redisCatchStorage;
this.storager = storager; this.storager = storager;
this.platformId = platformId; this.platformId = platformId;
this.sn = sn; this.sn = sn;
this.key = key; this.key = key;
this.sipCommanderForPlatform = sipCommanderForPlatform; this.sipCommanderForPlatform = sipCommanderForPlatform;
this.subscribeHolder = subscribeInfo;
} }
@Override @Override
public void run() { public void run() {
SubscribeInfo subscribe = redisCatchStorage.getSubscribe(key); SubscribeInfo subscribe = subscribeHolder.getMobilePositionSubscribe(platformId);
if (subscribe != null) { if (subscribe != null) {
ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(platformId); ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(platformId);
if (parentPlatform == null || parentPlatform.isStatus()) { if (parentPlatform == null || parentPlatform.isStatus()) {

1
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPProcessorObserver.java

@ -94,6 +94,7 @@ public class SIPProcessorObserver implements ISIPProcessorObserver {
Response response = responseEvent.getResponse(); Response response = responseEvent.getResponse();
logger.debug("\n收到响应:\n{}", responseEvent.getResponse()); logger.debug("\n收到响应:\n{}", responseEvent.getResponse());
int status = response.getStatusCode(); int status = response.getStatusCode();
if (((status >= 200) && (status < 300)) || status == 401) { // Success! if (((status >= 200) && (status < 300)) || status == 401) { // Success!
CSeqHeader cseqHeader = (CSeqHeader) responseEvent.getResponse().getHeader(CSeqHeader.NAME); CSeqHeader cseqHeader = (CSeqHeader) responseEvent.getResponse().getHeader(CSeqHeader.NAME);
String method = cseqHeader.getMethod(); String method = cseqHeader.getMethod();

106
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderPlarformProvider.java

@ -236,57 +236,57 @@ public class SIPRequestHeaderPlarformProvider {
return request; return request;
} }
public Request createNotifyRequest(ParentPlatform parentPlatform, String content, CallIdHeader callIdHeader, String viaTag, SubscribeInfo subscribeInfo) throws PeerUnavailableException, ParseException, InvalidArgumentException { // public Request createNotifyRequest(ParentPlatform parentPlatform, String content, CallIdHeader callIdHeader, String viaTag, String fromTag, SubscribeInfo subscribeInfo) throws PeerUnavailableException, ParseException, InvalidArgumentException {
Request request = null; // Request request = null;
// sipuri // // sipuri
SipURI requestURI = sipFactory.createAddressFactory().createSipURI(parentPlatform.getServerGBId(), parentPlatform.getServerIP()+ ":" + parentPlatform.getServerPort()); // SipURI requestURI = sipFactory.createAddressFactory().createSipURI(parentPlatform.getServerGBId(), parentPlatform.getServerIP()+ ":" + parentPlatform.getServerPort());
// via // // via
ArrayList<ViaHeader> viaHeaders = new ArrayList<ViaHeader>(); // ArrayList<ViaHeader> viaHeaders = new ArrayList<ViaHeader>();
ViaHeader viaHeader = sipFactory.createHeaderFactory().createViaHeader(parentPlatform.getDeviceIp(), Integer.parseInt(parentPlatform.getDevicePort()), // ViaHeader viaHeader = sipFactory.createHeaderFactory().createViaHeader(parentPlatform.getDeviceIp(), Integer.parseInt(parentPlatform.getDevicePort()),
parentPlatform.getTransport(), subscribeInfo.getBranch()); // parentPlatform.getTransport(), viaTag);
viaHeader.setRPort(); // viaHeader.setRPort();
viaHeaders.add(viaHeader); // viaHeaders.add(viaHeader);
// from // // from
SipURI fromSipURI = sipFactory.createAddressFactory().createSipURI(parentPlatform.getDeviceGBId(), // SipURI fromSipURI = sipFactory.createAddressFactory().createSipURI(parentPlatform.getDeviceGBId(),
parentPlatform.getDeviceIp() + ":" + parentPlatform.getDevicePort()); // parentPlatform.getDeviceIp() + ":" + parentPlatform.getDevicePort());
Address fromAddress = sipFactory.createAddressFactory().createAddress(fromSipURI); // Address fromAddress = sipFactory.createAddressFactory().createAddress(fromSipURI);
FromHeader fromHeader = sipFactory.createHeaderFactory().createFromHeader(fromAddress, subscribeInfo.getToTag()); // FromHeader fromHeader = sipFactory.createHeaderFactory().createFromHeader(fromAddress, fromTag);
// to // // to
SipURI toSipURI = sipFactory.createAddressFactory().createSipURI(parentPlatform.getServerGBId(), parentPlatform.getServerGBDomain()); // SipURI toSipURI = sipFactory.createAddressFactory().createSipURI(parentPlatform.getServerGBId(), parentPlatform.getServerGBDomain());
Address toAddress = sipFactory.createAddressFactory().createAddress(toSipURI); // Address toAddress = sipFactory.createAddressFactory().createAddress(toSipURI);
ToHeader toHeader = sipFactory.createHeaderFactory().createToHeader(toAddress, subscribeInfo.getFromTag()); // ToHeader toHeader = sipFactory.createHeaderFactory().createToHeader(toAddress, subscribeInfo.getFromTag());
//
// Forwards // // Forwards
MaxForwardsHeader maxForwards = sipFactory.createHeaderFactory().createMaxForwardsHeader(70); // MaxForwardsHeader maxForwards = sipFactory.createHeaderFactory().createMaxForwardsHeader(70);
// ceq // // ceq
CSeqHeader cSeqHeader = sipFactory.createHeaderFactory().createCSeqHeader(redisCatchStorage.getCSEQ(Request.NOTIFY), Request.NOTIFY); // CSeqHeader cSeqHeader = sipFactory.createHeaderFactory().createCSeqHeader(redisCatchStorage.getCSEQ(Request.NOTIFY), Request.NOTIFY);
MessageFactoryImpl messageFactory = (MessageFactoryImpl) sipFactory.createMessageFactory(); // MessageFactoryImpl messageFactory = (MessageFactoryImpl) sipFactory.createMessageFactory();
// 设置编码, 防止中文乱码 // // 设置编码, 防止中文乱码
messageFactory.setDefaultContentEncodingCharset("gb2312"); // messageFactory.setDefaultContentEncodingCharset("gb2312");
request = messageFactory.createRequest(requestURI, Request.NOTIFY, callIdHeader, cSeqHeader, fromHeader, // request = messageFactory.createRequest(requestURI, Request.NOTIFY, callIdHeader, cSeqHeader, fromHeader,
toHeader, viaHeaders, maxForwards); // toHeader, viaHeaders, maxForwards);
List<String> agentParam = new ArrayList<>(); // List<String> agentParam = new ArrayList<>();
agentParam.add("wvp-pro"); // agentParam.add("wvp-pro");
UserAgentHeader userAgentHeader = sipFactory.createHeaderFactory().createUserAgentHeader(agentParam); // UserAgentHeader userAgentHeader = sipFactory.createHeaderFactory().createUserAgentHeader(agentParam);
request.addHeader(userAgentHeader); // request.addHeader(userAgentHeader);
//
EventHeader event = sipFactory.createHeaderFactory().createEventHeader(subscribeInfo.getEventType()); // EventHeader event = sipFactory.createHeaderFactory().createEventHeader(subscribeInfo.getEventType());
if (subscribeInfo.getEventId() != null) { // if (subscribeInfo.getEventId() != null) {
event.setEventId(subscribeInfo.getEventId()); // event.setEventId(subscribeInfo.getEventId());
} // }
//
request.addHeader(event); // request.addHeader(event);
//
SubscriptionStateHeader active = sipFactory.createHeaderFactory().createSubscriptionStateHeader("active"); // SubscriptionStateHeader active = sipFactory.createHeaderFactory().createSubscriptionStateHeader("active");
request.setHeader(active); // request.setHeader(active);
//
String sipAddress = sipConfig.getIp() + ":" + sipConfig.getPort(); // String sipAddress = sipConfig.getIp() + ":" + sipConfig.getPort();
Address concatAddress = sipFactory.createAddressFactory().createAddress(sipFactory.createAddressFactory() // Address concatAddress = sipFactory.createAddressFactory().createAddress(sipFactory.createAddressFactory()
.createSipURI(parentPlatform.getDeviceGBId(), sipAddress)); // .createSipURI(parentPlatform.getDeviceGBId(), sipAddress));
request.addHeader(sipFactory.createHeaderFactory().createContactHeader(concatAddress)); // request.addHeader(sipFactory.createHeaderFactory().createContactHeader(concatAddress));
//
ContentTypeHeader contentTypeHeader = sipFactory.createHeaderFactory().createContentTypeHeader("Application", "MANSCDP+xml"); // ContentTypeHeader contentTypeHeader = sipFactory.createHeaderFactory().createContentTypeHeader("Application", "MANSCDP+xml");
request.setContent(content, contentTypeHeader); // request.setContent(content, contentTypeHeader);
return request; // return request;
} // }
} }

3
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderProvider.java

@ -215,6 +215,9 @@ public class SIPRequestHeaderProvider {
// Event // Event
EventHeader eventHeader = sipFactory.createHeaderFactory().createEventHeader(event); EventHeader eventHeader = sipFactory.createHeaderFactory().createEventHeader(event);
int random = (int)Math.random() * 1000000000;
eventHeader.setEventId(random + "");
request.addHeader(eventHeader); request.addHeader(eventHeader);
ContentTypeHeader contentTypeHeader = sipFactory.createHeaderFactory().createContentTypeHeader("Application", "MANSCDP+xml"); ContentTypeHeader contentTypeHeader = sipFactory.createHeaderFactory().createContentTypeHeader("Application", "MANSCDP+xml");

2
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java

@ -1518,7 +1518,7 @@ public class SIPCommander implements ISIPCommander {
// 有效时间默认为60秒以上 // 有效时间默认为60秒以上
Request request = headerProvider.createSubscribeRequest(device, cmdXml.toString(), "z9hG4bK-viaPos-" + tm, Request request = headerProvider.createSubscribeRequest(device, cmdXml.toString(), "z9hG4bK-viaPos-" + tm,
"fromTagPos" + tm, null, device.getSubscribeCycleForCatalog() + 60, "Catalog" , "fromTagPos" + tm, null, device.getSubscribeCycleForCatalog(), "Catalog" ,
callIdHeader); callIdHeader);
transmitRequest(device, request, errorEvent, okEvent); transmitRequest(device, request, errorEvent, okEvent);

145
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java

@ -27,9 +27,7 @@ import org.springframework.util.StringUtils;
import javax.sip.*; import javax.sip.*;
import javax.sip.address.SipURI; import javax.sip.address.SipURI;
import javax.sip.header.CallIdHeader; import javax.sip.header.*;
import javax.sip.header.ViaHeader;
import javax.sip.header.WWWAuthenticateHeader;
import javax.sip.message.Request; import javax.sip.message.Request;
import java.lang.reflect.Field; import java.lang.reflect.Field;
import java.text.ParseException; import java.text.ParseException;
@ -68,6 +66,9 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform {
@Qualifier(value="udpSipProvider") @Qualifier(value="udpSipProvider")
private SipProviderImpl udpSipProvider; private SipProviderImpl udpSipProvider;
@Autowired
private SipFactory sipFactory;
@Override @Override
public boolean register(ParentPlatform parentPlatform, SipSubscribe.Event errorEvent , SipSubscribe.Event okEvent) { public boolean register(ParentPlatform parentPlatform, SipSubscribe.Event errorEvent , SipSubscribe.Event okEvent) {
return register(parentPlatform, null, null, errorEvent, okEvent, false); return register(parentPlatform, null, null, errorEvent, okEvent, false);
@ -88,7 +89,7 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform {
public boolean register(ParentPlatform parentPlatform, @Nullable String callId, @Nullable WWWAuthenticateHeader www, public boolean register(ParentPlatform parentPlatform, @Nullable String callId, @Nullable WWWAuthenticateHeader www,
SipSubscribe.Event errorEvent , SipSubscribe.Event okEvent, boolean registerAgain) { SipSubscribe.Event errorEvent , SipSubscribe.Event okEvent, boolean registerAgain) {
try { try {
Request request = null; Request request;
String tm = Long.toString(System.currentTimeMillis()); String tm = Long.toString(System.currentTimeMillis());
if (!registerAgain ) { if (!registerAgain ) {
// //callid // //callid
@ -364,16 +365,18 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform {
: udpSipProvider.getNewCallId(); : udpSipProvider.getNewCallId();
callIdHeader.setCallId(subscribeInfo.getCallId()); callIdHeader.setCallId(subscribeInfo.getCallId());
String tm = Long.toString(System.currentTimeMillis()); //
sendNotify(parentPlatform, deviceStatusXml.toString(), subscribeInfo, eventResult -> {
Request request = headerProviderPlarformProvider.createNotifyRequest(parentPlatform, logger.error("发送NOTIFY通知消息失败。错误:{} {}", eventResult.statusCode, eventResult.msg);
deviceStatusXml.toString(),callIdHeader, }, null);
"z9hG4bK-" + UUID.randomUUID().toString().replace("-", ""), subscribeInfo);
transmitRequest(parentPlatform, request);
} catch (SipException | ParseException | InvalidArgumentException e) { } catch (SipException | ParseException e) {
e.printStackTrace(); e.printStackTrace();
return false; return false;
} catch (NoSuchFieldException e) {
e.printStackTrace();
} catch (IllegalAccessException e) {
e.printStackTrace();
} }
return true; return true;
} }
@ -386,37 +389,89 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform {
if (index == null) { if (index == null) {
index = 0; index = 0;
} }
if (index >= deviceChannels.size()) {
return true;
}
try { try {
if (index > deviceChannels.size() - 1) {
return true;
}
Request request = getCatalogNotifyRequestForCatalogAddOrUpdate(parentPlatform, deviceChannels.get(index), deviceChannels.size(), type, subscribeInfo);
index += 1;
Integer finalIndex = index; Integer finalIndex = index;
transmitRequest(parentPlatform, request, null, (eventResult -> { String catalogXmlContent = getCatalogXmlContentForCatalogAddOrUpdate(parentPlatform, deviceChannels.get(index ), deviceChannels.size(), type, subscribeInfo);
sendNotifyForCatalogAddOrUpdate(type, parentPlatform, deviceChannels, subscribeInfo, finalIndex); sendNotify(parentPlatform, catalogXmlContent, subscribeInfo, eventResult -> {
logger.error("发送NOTIFY通知消息失败。错误:{} {}", eventResult.statusCode, eventResult.msg);
}, (eventResult -> {
sendNotifyForCatalogAddOrUpdate(type, parentPlatform, deviceChannels, subscribeInfo, finalIndex + 1);
})); }));
} catch (SipException | ParseException | InvalidArgumentException e) { } catch (SipException | ParseException e) {
e.printStackTrace(); e.printStackTrace();
return false; return false;
} catch (NoSuchFieldException e) {
e.printStackTrace();
} catch (IllegalAccessException e) {
e.printStackTrace();
} }
return true; return true;
} }
private Request getCatalogNotifyRequestForCatalogAddOrUpdate(ParentPlatform parentPlatform, DeviceChannel channel, int size, String type, private void sendNotify(ParentPlatform parentPlatform, String catalogXmlContent,
SubscribeInfo subscribeInfo) throws ParseException, InvalidArgumentException, SubscribeInfo subscribeInfo, SipSubscribe.Event errorEvent, SipSubscribe.Event okEvent )
PeerUnavailableException { throws NoSuchFieldException, IllegalAccessException, SipException, ParseException {
String catalogXmlContent = getCatalogXmlContentForCatalogAddOrUpdate(parentPlatform, channel, size, type, subscribeInfo); Dialog dialog = subscribeInfo.getDialog();
Request notifyRequest = dialog.createRequest(Request.NOTIFY);
CallIdHeader callIdHeader = parentPlatform.getTransport().equals("TCP") ? tcpSipProvider.getNewCallId()
: udpSipProvider.getNewCallId(); ContentTypeHeader contentTypeHeader = sipFactory.createHeaderFactory().createContentTypeHeader("Application", "MANSCDP+xml");
callIdHeader.setCallId(subscribeInfo.getCallId());
Request request = headerProviderPlarformProvider.createNotifyRequest(parentPlatform, catalogXmlContent, notifyRequest.setContent(catalogXmlContent, contentTypeHeader);
callIdHeader, "z9hG4bK-" + UUID.randomUUID().toString().replace("-", ""), subscribeInfo);
return request; SubscriptionStateHeader subscriptionState = sipFactory.createHeaderFactory()
.createSubscriptionStateHeader(SubscriptionStateHeader.ACTIVE);
notifyRequest.addHeader(subscriptionState);
EventHeader event = sipFactory.createHeaderFactory().createEventHeader(subscribeInfo.getEventType());
if (subscribeInfo.getEventId() != null) {
event.setEventId(subscribeInfo.getEventId());
}
notifyRequest.addHeader(event);
SipURI sipURI = (SipURI) notifyRequest.getRequestURI();
SIPRequest request = (SIPRequest) subscribeInfo.getTransaction().getRequest();
sipURI.setHost(request.getRemoteAddress().getHostName());
sipURI.setPort(request.getRemotePort());
ClientTransaction transaction = null;
if ("TCP".equals(parentPlatform.getTransport())) {
transaction = tcpSipProvider.getNewClientTransaction(notifyRequest);
} else if ("UDP".equals(parentPlatform.getTransport())) {
transaction = udpSipProvider.getNewClientTransaction(notifyRequest);
}
// 添加错误订阅
if (errorEvent != null) {
sipSubscribe.addErrorSubscribe(subscribeInfo.getCallId(), errorEvent);
}
// 添加订阅
if (okEvent != null) {
sipSubscribe.addOkSubscribe(subscribeInfo.getCallId(), okEvent);
}
if (transaction == null) {
logger.error("平台{}的Transport错误:{}",parentPlatform.getServerGBId(), parentPlatform.getTransport());
return;
}
dialog.sendRequest(transaction);
} }
// private Request getCatalogNotifyRequestForCatalogAddOrUpdate(ParentPlatform parentPlatform, DeviceChannel channel, int size, String type,
// SubscribeInfo subscribeInfo) throws ParseException, InvalidArgumentException,
// PeerUnavailableException, NoSuchFieldException, IllegalAccessException {
// String catalogXmlContent = getCatalogXmlContentForCatalogAddOrUpdate(parentPlatform, channel, size, type, subscribeInfo);
//
// CallIdHeader callIdHeader = parentPlatform.getTransport().equals("TCP") ? tcpSipProvider.getNewCallId()
// : udpSipProvider.getNewCallId();
// callIdHeader.setCallId(subscribeInfo.getCallId());
// String tm = Long.toString(System.currentTimeMillis());
//
// Request request = headerProviderPlarformProvider.createNotifyRequest(parentPlatform, catalogXmlContent,
// callIdHeader, "z9hG4bK-" + UUID.randomUUID().toString().replace("-", ""),"FromRegister" + tm, subscribeInfo);
// return request;
// }
private String getCatalogXmlContentForCatalogAddOrUpdate(ParentPlatform parentPlatform, DeviceChannel channel, int sumNum, String type, SubscribeInfo subscribeInfo) { private String getCatalogXmlContentForCatalogAddOrUpdate(ParentPlatform parentPlatform, DeviceChannel channel, int sumNum, String type, SubscribeInfo subscribeInfo) {
StringBuffer catalogXml = new StringBuffer(600); StringBuffer catalogXml = new StringBuffer(600);
if (parentPlatform.getServerGBId().equals(channel.getParentId())) { if (parentPlatform.getServerGBId().equals(channel.getParentId())) {
@ -465,34 +520,31 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform {
if (index == null) { if (index == null) {
index = 0; index = 0;
} }
if (index >= deviceChannels.size()) {
if (index > deviceChannels.size() - 1) {
return true; return true;
} }
try { try {
String catalogXml = getCatalogXmlContentForCatalogOther(deviceChannels.get(index), type, parentPlatform);
CallIdHeader callIdHeader = parentPlatform.getTransport().equals("TCP") ? tcpSipProvider.getNewCallId()
: udpSipProvider.getNewCallId();
Request request = headerProviderPlarformProvider.createNotifyRequest(parentPlatform, catalogXml,
callIdHeader,
"z9hG4bK-" + UUID.randomUUID().toString().replace("-", ""), subscribeInfo);
index += 1;
Integer finalIndex = index; Integer finalIndex = index;
transmitRequest(parentPlatform, request, null, eventResult -> { String catalogXmlContent = getCatalogXmlContentForCatalogOther(parentPlatform, deviceChannels.get(index), type);
sendNotifyForCatalogOther(type, parentPlatform, deviceChannels, subscribeInfo, finalIndex); sendNotify(parentPlatform, catalogXmlContent, subscribeInfo, eventResult -> {
}); logger.error("发送NOTIFY通知消息失败。错误:{} {}", eventResult.statusCode, eventResult.msg);
}, (eventResult -> {
sendNotifyForCatalogOther(type, parentPlatform, deviceChannels, subscribeInfo, finalIndex + 1);
}));
} catch (SipException e) { } catch (SipException e) {
e.printStackTrace(); e.printStackTrace();
} catch (InvalidArgumentException e) {
e.printStackTrace();
} catch (ParseException e) { } catch (ParseException e) {
e.printStackTrace(); e.printStackTrace();
} catch (NoSuchFieldException e) {
e.printStackTrace();
} catch (IllegalAccessException e) {
e.printStackTrace();
} }
return true; return true;
} }
private String getCatalogXmlContentForCatalogOther(DeviceChannel channel, String type, ParentPlatform parentPlatform) { private String getCatalogXmlContentForCatalogOther(ParentPlatform parentPlatform, DeviceChannel channel, String type) {
if (parentPlatform.getServerGBId().equals(channel.getParentId())) { if (parentPlatform.getServerGBId().equals(channel.getParentId())) {
channel.setParentId(parentPlatform.getDeviceGBId()); channel.setParentId(parentPlatform.getDeviceGBId());
} }
@ -594,6 +646,7 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform {
byte[] transactionByteArray = sendRtpItem.getTransaction(); byte[] transactionByteArray = sendRtpItem.getTransaction();
ClientTransaction clientTransaction = (ClientTransaction) SerializeUtils.deSerialize(transactionByteArray); ClientTransaction clientTransaction = (ClientTransaction) SerializeUtils.deSerialize(transactionByteArray);
Request byeRequest = dialog.createRequest(Request.BYE); Request byeRequest = dialog.createRequest(Request.BYE);
SipURI byeURI = (SipURI) byeRequest.getRequestURI(); SipURI byeURI = (SipURI) byeRequest.getRequestURI();
SIPRequest request = (SIPRequest) clientTransaction.getRequest(); SIPRequest request = (SIPRequest) clientTransaction.getRequest();
byeURI.setHost(request.getRemoteAddress().getHostName()); byeURI.setHost(request.getRemoteAddress().getHostName());

7
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java

@ -233,6 +233,7 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements
*/ */
private void processNotifyCatalogList(RequestEvent evt) { private void processNotifyCatalogList(RequestEvent evt) {
try { try {
System.out.println(343434);
FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME); FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME);
String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader); String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader);
@ -309,12 +310,6 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements
} }
// RequestMessage msg = new RequestMessage();
// msg.setDeviceId(deviceId);
// msg.setType(DeferredResultHolder.CALLBACK_CMD_CATALOG);
// msg.setData(device);
// deferredResultHolder.invokeResult(msg);
if (offLineDetector.isOnline(deviceId)) { if (offLineDetector.isOnline(deviceId)) {
publisher.onlineEventPublish(device, VideoManagerConstants.EVENT_ONLINE_MESSAGE); publisher.onlineEventPublish(device, VideoManagerConstants.EVENT_ONLINE_MESSAGE);
} }

8
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/RegisterRequestProcessor.java

@ -81,7 +81,7 @@ public class RegisterRequestProcessor extends SIPRequestProcessorParent implemen
String requestAddress = evtExt.getRemoteIpAddress() + ":" + evtExt.getRemotePort(); String requestAddress = evtExt.getRemoteIpAddress() + ":" + evtExt.getRemotePort();
logger.info("[{}] 收到注册请求,开始处理", requestAddress); logger.info("[{}] 收到注册请求,开始处理", requestAddress);
Request request = evt.getRequest(); Request request = evt.getRequest();
ExpiresHeader expiresHeader = (ExpiresHeader) request.getHeader(Expires.NAME);
Response response = null; Response response = null;
boolean passwordCorrect = false; boolean passwordCorrect = false;
// 注册标志 0:未携带授权头或者密码错误 1:注册成功 2:注销成功 // 注册标志 0:未携带授权头或者密码错误 1:注册成功 2:注销成功
@ -128,7 +128,7 @@ public class RegisterRequestProcessor extends SIPRequestProcessorParent implemen
dateHeader.setDate(wvpSipDate); dateHeader.setDate(wvpSipDate);
response.addHeader(dateHeader); response.addHeader(dateHeader);
ExpiresHeader expiresHeader = (ExpiresHeader) request.getHeader(Expires.NAME);
if (expiresHeader == null) { if (expiresHeader == null) {
response = getMessageFactory().createResponse(Response.BAD_REQUEST, request); response = getMessageFactory().createResponse(Response.BAD_REQUEST, request);
ServerTransaction serverTransaction = getServerTransaction(evt); ServerTransaction serverTransaction = getServerTransaction(evt);
@ -193,9 +193,7 @@ public class RegisterRequestProcessor extends SIPRequestProcessorParent implemen
// 保存到redis // 保存到redis
if (registerFlag == 1 ) { if (registerFlag == 1 ) {
logger.info("[{}] 注册成功! deviceId:" + device.getDeviceId(), requestAddress); logger.info("[{}] 注册成功! deviceId:" + device.getDeviceId(), requestAddress);
publisher.onlineEventPublish(device, VideoManagerConstants.EVENT_ONLINE_REGISTER); publisher.onlineEventPublish(device, VideoManagerConstants.EVENT_ONLINE_REGISTER, expiresHeader.getExpires());
// 重新注册更新设备和通道,以免设备替换或更新后信息无法更新
handler.onRegister(device);
} else if (registerFlag == 2) { } else if (registerFlag == 2) {
logger.info("[{}] 注销成功! deviceId:" + device.getDeviceId(), requestAddress); logger.info("[{}] 注销成功! deviceId:" + device.getDeviceId(), requestAddress);
publisher.outlineEventPublish(device.getDeviceId(), VideoManagerConstants.EVENT_OUTLINE_UNREGISTER); publisher.outlineEventPublish(device.getDeviceId(), VideoManagerConstants.EVENT_OUTLINE_UNREGISTER);

55
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/SubscribeRequestProcessor.java

@ -5,6 +5,7 @@ import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.UserSetup; import com.genersoft.iot.vmp.conf.UserSetup;
import com.genersoft.iot.vmp.gb28181.bean.CmdType; import com.genersoft.iot.vmp.gb28181.bean.CmdType;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
import com.genersoft.iot.vmp.gb28181.bean.SubscribeHolder;
import com.genersoft.iot.vmp.gb28181.bean.SubscribeInfo; import com.genersoft.iot.vmp.gb28181.bean.SubscribeInfo;
import com.genersoft.iot.vmp.gb28181.task.GPSSubscribeTask; import com.genersoft.iot.vmp.gb28181.task.GPSSubscribeTask;
import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver; import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
@ -15,18 +16,19 @@ import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
import com.genersoft.iot.vmp.gb28181.utils.XmlUtil; import com.genersoft.iot.vmp.gb28181.utils.XmlUtil;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorager; import com.genersoft.iot.vmp.storager.IVideoManagerStorager;
import com.genersoft.iot.vmp.utils.SerializeUtils;
import gov.nist.javax.sip.SipProviderImpl;
import org.dom4j.DocumentException; import org.dom4j.DocumentException;
import org.dom4j.Element; import org.dom4j.Element;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import javax.sip.InvalidArgumentException; import javax.sip.*;
import javax.sip.RequestEvent;
import javax.sip.ServerTransaction;
import javax.sip.SipException;
import javax.sip.header.ExpiresHeader; import javax.sip.header.ExpiresHeader;
import javax.sip.header.ToHeader; import javax.sip.header.ToHeader;
import javax.sip.message.Request; import javax.sip.message.Request;
@ -54,12 +56,26 @@ public class SubscribeRequestProcessor extends SIPRequestProcessorParent impleme
@Autowired @Autowired
private IVideoManagerStorager storager; private IVideoManagerStorager storager;
@Lazy
@Autowired
@Qualifier(value="tcpSipProvider")
private SipProviderImpl tcpSipProvider;
@Lazy
@Autowired
@Qualifier(value="udpSipProvider")
private SipProviderImpl udpSipProvider;
@Autowired @Autowired
private DynamicTask dynamicTask; private DynamicTask dynamicTask;
@Autowired @Autowired
private UserSetup userSetup; private UserSetup userSetup;
@Autowired
private SubscribeHolder subscribeHolder;
@Override @Override
public void afterPropertiesSet() throws Exception { public void afterPropertiesSet() throws Exception {
// 添加消息处理的订阅 // 添加消息处理的订阅
@ -136,16 +152,17 @@ public class SubscribeRequestProcessor extends SIPRequestProcessorParent impleme
.append("</Response>\r\n"); .append("</Response>\r\n");
if (subscribeInfo.getExpires() > 0) { if (subscribeInfo.getExpires() > 0) {
if (redisCatchStorage.getSubscribe(key) != null) { if (subscribeHolder.getMobilePositionSubscribe(platformId) != null) {
dynamicTask.stop(key); dynamicTask.stop(key);
} }
String interval = XmlUtil.getText(rootElement, "Interval"); // GPS上报时间间隔 String interval = XmlUtil.getText(rootElement, "Interval"); // GPS上报时间间隔
dynamicTask.startCron(key, new GPSSubscribeTask(redisCatchStorage, sipCommanderForPlatform, storager, platformId, sn, key), Integer.parseInt(interval)); dynamicTask.startCron(key, new GPSSubscribeTask(redisCatchStorage, sipCommanderForPlatform, storager, platformId, sn, key, subscribeHolder), Integer.parseInt(interval));
subscribeHolder.putMobilePositionSubscribe(platformId, subscribeInfo);
redisCatchStorage.updateSubscribe(key, subscribeInfo); // redisCatchStorage.updateSubscribe(key, subscribeInfo);
}else if (subscribeInfo.getExpires() == 0) { }else if (subscribeInfo.getExpires() == 0) {
dynamicTask.stop(key); dynamicTask.stop(key);
redisCatchStorage.delSubscribe(key); // redisCatchStorage.delSubscribe(key);
subscribeHolder.removeMobilePositionSubscribe(platformId);
} }
try { try {
@ -168,10 +185,19 @@ public class SubscribeRequestProcessor extends SIPRequestProcessorParent impleme
} }
private void processNotifyCatalogList(RequestEvent evt, Element rootElement) { private void processNotifyCatalogList(RequestEvent evt, Element rootElement) throws SipException {
String platformId = SipUtils.getUserIdFromFromHeader(evt.getRequest()); String platformId = SipUtils.getUserIdFromFromHeader(evt.getRequest());
String deviceID = XmlUtil.getText(rootElement, "DeviceID"); String deviceID = XmlUtil.getText(rootElement, "DeviceID");
ParentPlatform platform = storager.queryParentPlatByServerGBId(platformId);
SubscribeInfo subscribeInfo = new SubscribeInfo(evt, platformId); SubscribeInfo subscribeInfo = new SubscribeInfo(evt, platformId);
if (evt.getServerTransaction() == null) {
ServerTransaction serverTransaction = platform.getTransport().equals("TCP") ? tcpSipProvider.getNewServerTransaction(evt.getRequest())
: udpSipProvider.getNewServerTransaction(evt.getRequest());
subscribeInfo.setTransaction(serverTransaction);
Dialog dialog = serverTransaction.getDialog();
dialog.terminateOnBye(false);
subscribeInfo.setDialog(dialog);
}
String sn = XmlUtil.getText(rootElement, "SN"); String sn = XmlUtil.getText(rootElement, "SN");
String key = VideoManagerConstants.SIP_SUBSCRIBE_PREFIX + userSetup.getServerId() + "_Catalog_" + platformId; String key = VideoManagerConstants.SIP_SUBSCRIBE_PREFIX + userSetup.getServerId() + "_Catalog_" + platformId;
logger.info("接收到{}的Catalog订阅", platformId); logger.info("接收到{}的Catalog订阅", platformId);
@ -185,9 +211,11 @@ public class SubscribeRequestProcessor extends SIPRequestProcessorParent impleme
.append("</Response>\r\n"); .append("</Response>\r\n");
if (subscribeInfo.getExpires() > 0) { if (subscribeInfo.getExpires() > 0) {
redisCatchStorage.updateSubscribe(key, subscribeInfo); // redisCatchStorage.updateSubscribe(key, subscribeInfo);
subscribeHolder.putCatalogSubscribe(platformId, subscribeInfo);
}else if (subscribeInfo.getExpires() == 0) { }else if (subscribeInfo.getExpires() == 0) {
redisCatchStorage.delSubscribe(key); // redisCatchStorage.delSubscribe(key);
subscribeHolder.removeCatalogSubscribe(platformId);
} }
try { try {
@ -195,7 +223,8 @@ public class SubscribeRequestProcessor extends SIPRequestProcessorParent impleme
Response response = responseXmlAck(evt, resultXml.toString(), parentPlatform); Response response = responseXmlAck(evt, resultXml.toString(), parentPlatform);
ToHeader toHeader = (ToHeader)response.getHeader(ToHeader.NAME); ToHeader toHeader = (ToHeader)response.getHeader(ToHeader.NAME);
subscribeInfo.setToTag(toHeader.getTag()); subscribeInfo.setToTag(toHeader.getTag());
redisCatchStorage.updateSubscribe(key, subscribeInfo); // redisCatchStorage.updateSubscribe(key, subscribeInfo);
subscribeHolder.putCatalogSubscribe(platformId, subscribeInfo);
} catch (SipException e) { } catch (SipException e) {
e.printStackTrace(); e.printStackTrace();

6
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/MessageRequestProcessor.java

@ -21,6 +21,7 @@ import org.springframework.stereotype.Component;
import javax.sip.InvalidArgumentException; import javax.sip.InvalidArgumentException;
import javax.sip.RequestEvent; import javax.sip.RequestEvent;
import javax.sip.SipException; import javax.sip.SipException;
import javax.sip.header.CSeqHeader;
import javax.sip.header.CallIdHeader; import javax.sip.header.CallIdHeader;
import javax.sip.message.Response; import javax.sip.message.Response;
import java.text.ParseException; import java.text.ParseException;
@ -64,6 +65,11 @@ public class MessageRequestProcessor extends SIPRequestProcessorParent implement
String deviceId = SipUtils.getUserIdFromFromHeader(evt.getRequest()); String deviceId = SipUtils.getUserIdFromFromHeader(evt.getRequest());
CallIdHeader callIdHeader = (CallIdHeader)evt.getRequest().getHeader(CallIdHeader.NAME); CallIdHeader callIdHeader = (CallIdHeader)evt.getRequest().getHeader(CallIdHeader.NAME);
// 查询设备是否存在 // 查询设备是否存在
CSeqHeader cseqHeader = (CSeqHeader) evt.getRequest().getHeader(CSeqHeader.NAME);
String method = cseqHeader.getMethod();
if (method.equals("MESSAGE")) {
System.out.println();
}
Device device = redisCatchStorage.getDevice(deviceId); Device device = redisCatchStorage.getDevice(deviceId);
// 查询上级平台是否存在 // 查询上级平台是否存在
ParentPlatform parentPlatform = storage.queryParentPlatByServerGBId(deviceId); ParentPlatform parentPlatform = storage.queryParentPlatByServerGBId(deviceId);

75
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/CatalogResponseMessageHandler.java

@ -85,41 +85,54 @@ public class CatalogResponseMessageHandler extends SIPRequestProcessorParent imp
return; return;
} }
int sumNum = Integer.parseInt(sumNumElement.getText()); int sumNum = Integer.parseInt(sumNumElement.getText());
Iterator<Element> deviceListIterator = deviceListElement.elementIterator(); if (sumNum == 0) {
if (deviceListIterator != null) { // 数据已经完整接收
List<DeviceChannel> channelList = new ArrayList<>(); storager.cleanChannelsForDevice(device.getDeviceId());
// 遍历DeviceList RequestMessage msg = new RequestMessage();
while (deviceListIterator.hasNext()) { msg.setKey(key);
Element itemDevice = deviceListIterator.next(); WVPResult<Object> result = new WVPResult<>();
Element channelDeviceElement = itemDevice.element("DeviceID"); result.setCode(0);
if (channelDeviceElement == null) { result.setData(device);
continue; msg.setData(result);
result.setMsg("更新成功,共0条");
deferredResultHolder.invokeAllResult(msg);
catalogDataCatch.del(key);
}else {
Iterator<Element> deviceListIterator = deviceListElement.elementIterator();
if (deviceListIterator != null) {
List<DeviceChannel> channelList = new ArrayList<>();
// 遍历DeviceList
while (deviceListIterator.hasNext()) {
Element itemDevice = deviceListIterator.next();
Element channelDeviceElement = itemDevice.element("DeviceID");
if (channelDeviceElement == null) {
continue;
}
DeviceChannel deviceChannel = XmlUtil.channelContentHander(itemDevice);
deviceChannel.setDeviceId(device.getDeviceId());
logger.debug("收到来自设备【{}】的通道: {}【{}】", device.getDeviceId(), deviceChannel.getName(), deviceChannel.getChannelId());
channelList.add(deviceChannel);
} }
DeviceChannel deviceChannel = XmlUtil.channelContentHander(itemDevice);
deviceChannel.setDeviceId(device.getDeviceId());
logger.debug("收到来自设备【{}】的通道: {}【{}】", device.getDeviceId(), deviceChannel.getName(), deviceChannel.getChannelId());
channelList.add(deviceChannel);
}
catalogDataCatch.put(key, sumNum, device, channelList); catalogDataCatch.put(key, sumNum, device, channelList);
if (catalogDataCatch.get(key).size() == sumNum) { if (catalogDataCatch.get(key).size() == sumNum) {
// 数据已经完整接收 // 数据已经完整接收
boolean resetChannelsResult = storager.resetChannels(device.getDeviceId(), catalogDataCatch.get(key)); boolean resetChannelsResult = storager.resetChannels(device.getDeviceId(), catalogDataCatch.get(key));
RequestMessage msg = new RequestMessage(); RequestMessage msg = new RequestMessage();
msg.setKey(key); msg.setKey(key);
WVPResult<Object> result = new WVPResult<>(); WVPResult<Object> result = new WVPResult<>();
result.setCode(0); result.setCode(0);
result.setData(device); result.setData(device);
if (resetChannelsResult) { if (resetChannelsResult || sumNum ==0) {
result.setMsg("更新成功,共" + sumNum + "条,已更新" + catalogDataCatch.get(key).size() + "条"); result.setMsg("更新成功,共" + sumNum + "条,已更新" + catalogDataCatch.get(key).size() + "条");
}else { }else {
result.setMsg("接收成功,写入失败,共" + sumNum + "条,已接收" + catalogDataCatch.get(key).size() + "条"); result.setMsg("接收成功,写入失败,共" + sumNum + "条,已接收" + catalogDataCatch.get(key).size() + "条");
}
msg.setData(result);
deferredResultHolder.invokeAllResult(msg);
catalogDataCatch.del(key);
} }
msg.setData(result);
deferredResultHolder.invokeAllResult(msg);
catalogDataCatch.del(key);
} }
// 回复200 OK // 回复200 OK
responseAck(evt, Response.OK); responseAck(evt, Response.OK);
if (offLineDetector.isOnline(device.getDeviceId())) { if (offLineDetector.isOnline(device.getDeviceId())) {

4
src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java

@ -31,8 +31,8 @@ public class DeviceServiceImpl implements IDeviceService {
return false; return false;
} }
if (dynamicTask.contains(device.getDeviceId())) { if (dynamicTask.contains(device.getDeviceId())) {
logger.info("[添加目录订阅] 设备{}的目录订阅以存在", device.getDeviceId()); // 存在则停止现有的,开启新的
return false; dynamicTask.stop(device.getDeviceId());
} }
logger.info("[添加目录订阅] 设备{}", device.getDeviceId()); logger.info("[添加目录订阅] 设备{}", device.getDeviceId());
// 添加目录订阅 // 添加目录订阅

10
src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java

@ -513,6 +513,14 @@ public class MediaServerServiceImpl implements IMediaServerService {
param.put("hook.on_server_keepalive",String.format("%s/on_server_keepalive", hookPrex)); param.put("hook.on_server_keepalive",String.format("%s/on_server_keepalive", hookPrex));
param.put("hook.timeoutSec","20"); param.put("hook.timeoutSec","20");
param.put("general.streamNoneReaderDelayMS",mediaServerItem.getStreamNoneReaderDelayMS()==-1?"3600000":mediaServerItem.getStreamNoneReaderDelayMS() ); param.put("general.streamNoneReaderDelayMS",mediaServerItem.getStreamNoneReaderDelayMS()==-1?"3600000":mediaServerItem.getStreamNoneReaderDelayMS() );
// 推流断开后可以在超时时间内重新连接上继续推流,这样播放器会接着播放。
// 置0关闭此特性(推流断开会导致立即断开播放器)
// 此参数不应大于播放器超时时间
// 优化此消息以更快的收到流注销事件
param.put("general.continue_push_ms", "3000" );
// 最多等待未初始化的Track时间,单位毫秒,超时之后会忽略未初始化的Track, 设置此选项优化那些音频错误的不规范流,
// 等zlm支持给每个rtpServer设置关闭音频的时候可以不设置此选项
param.put("general.wait_track_ready_ms", "3000" );
JSONObject responseJSON = zlmresTfulUtils.setServerConfig(mediaServerItem, param); JSONObject responseJSON = zlmresTfulUtils.setServerConfig(mediaServerItem, param);
@ -620,6 +628,8 @@ public class MediaServerServiceImpl implements IMediaServerService {
public void updateMediaServerKeepalive(String mediaServerId, JSONObject data) { public void updateMediaServerKeepalive(String mediaServerId, JSONObject data) {
MediaServerItem mediaServerItem = getOne(mediaServerId); MediaServerItem mediaServerItem = getOne(mediaServerId);
if (mediaServerItem == null) { if (mediaServerItem == null) {
// zlm连接重试
logger.warn("[更新ZLM 保活信息]失败,未找到流媒体信息"); logger.warn("[更新ZLM 保活信息]失败,未找到流媒体信息");
return; return;
} }

60
src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java

@ -1,14 +1,12 @@
package com.genersoft.iot.vmp.service.impl; package com.genersoft.iot.vmp.service.impl;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.TypeReference; import com.alibaba.fastjson.TypeReference;
import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.conf.UserSetup; import com.genersoft.iot.vmp.conf.UserSetup;
import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.bean.GbStream;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
import com.genersoft.iot.vmp.gb28181.bean.PlatformCatalog;
import com.genersoft.iot.vmp.gb28181.event.EventPublisher; import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent; import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe;
@ -23,6 +21,8 @@ import com.genersoft.iot.vmp.storager.dao.*;
import com.genersoft.iot.vmp.vmanager.bean.StreamPushExcelDto; import com.genersoft.iot.vmp.vmanager.bean.StreamPushExcelDto;
import com.github.pagehelper.PageHelper; import com.github.pagehelper.PageHelper;
import com.github.pagehelper.PageInfo; import com.github.pagehelper.PageInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils; import org.springframework.util.StringUtils;
@ -33,6 +33,8 @@ import java.util.stream.Collectors;
@Service @Service
public class StreamPushServiceImpl implements IStreamPushService { public class StreamPushServiceImpl implements IStreamPushService {
private final static Logger logger = LoggerFactory.getLogger(StreamPushServiceImpl.class);
@Autowired @Autowired
private GbStreamMapper gbStreamMapper; private GbStreamMapper gbStreamMapper;
@ -158,12 +160,17 @@ public class StreamPushServiceImpl implements IStreamPushService {
public boolean removeFromGB(GbStream stream) { public boolean removeFromGB(GbStream stream) {
// 判断是否需要发送事件 // 判断是否需要发送事件
gbStreamService.sendCatalogMsg(stream, CatalogEvent.DEL); gbStreamService.sendCatalogMsg(stream, CatalogEvent.DEL);
int del = gbStreamMapper.del(stream.getApp(), stream.getStream());
platformGbStreamMapper.delByAppAndStream(stream.getApp(), stream.getStream()); platformGbStreamMapper.delByAppAndStream(stream.getApp(), stream.getStream());
int del = gbStreamMapper.del(stream.getApp(), stream.getStream());
MediaServerItem mediaInfo = mediaServerService.getOne(stream.getMediaServerId()); MediaServerItem mediaInfo = mediaServerService.getOne(stream.getMediaServerId());
JSONObject mediaList = zlmresTfulUtils.getMediaList(mediaInfo, stream.getApp(), stream.getStream()); JSONObject mediaList = zlmresTfulUtils.getMediaList(mediaInfo, stream.getApp(), stream.getStream());
if (mediaList == null) { if (mediaList != null) {
streamPushMapper.del(stream.getApp(), stream.getStream()); if (mediaList.getInteger("code") == 0) {
JSONArray data = mediaList.getJSONArray("data");
if (data == null) {
streamPushMapper.del(stream.getApp(), stream.getStream());
}
}
} }
return del > 0; return del > 0;
} }
@ -180,9 +187,9 @@ public class StreamPushServiceImpl implements IStreamPushService {
StreamPushItem streamPushItem = streamPushMapper.selectOne(app, streamId); StreamPushItem streamPushItem = streamPushMapper.selectOne(app, streamId);
gbStreamService.sendCatalogMsg(streamPushItem, CatalogEvent.DEL); gbStreamService.sendCatalogMsg(streamPushItem, CatalogEvent.DEL);
int delStream = streamPushMapper.del(app, streamId);
gbStreamMapper.del(app, streamId);
platformGbStreamMapper.delByAppAndStream(app, streamId); platformGbStreamMapper.delByAppAndStream(app, streamId);
gbStreamMapper.del(app, streamId);
int delStream = streamPushMapper.del(app, streamId);
if (delStream > 0) { if (delStream > 0) {
MediaServerItem mediaServerItem = mediaServerService.getOne(streamPushItem.getMediaServerId()); MediaServerItem mediaServerItem = mediaServerService.getOne(streamPushItem.getMediaServerId());
zlmresTfulUtils.closeStreams(mediaServerItem,app, streamId); zlmresTfulUtils.closeStreams(mediaServerItem,app, streamId);
@ -376,6 +383,29 @@ public class StreamPushServiceImpl implements IStreamPushService {
.collect(Collectors.toList()); .collect(Collectors.toList());
if (streamPushItemsForPlatform.size() > 0) { if (streamPushItemsForPlatform.size() > 0) {
// 获取所有平台,平台和目录信息一般不会特别大量。
List<ParentPlatform> parentPlatformList = parentPlatformMapper.getParentPlatformList();
Map<String, Map<String, PlatformCatalog>> platformInfoMap = new HashMap<>();
if (parentPlatformList.size() == 0) {
return;
}
for (ParentPlatform platform : parentPlatformList) {
Map<String, PlatformCatalog> catalogMap = new HashMap<>();
// 创建根节点
PlatformCatalog platformCatalog = new PlatformCatalog();
platformCatalog.setId(platform.getServerGBId());
catalogMap.put(platform.getServerGBId(), platformCatalog);
// 查询所有节点信息
List<PlatformCatalog> platformCatalogs = platformCatalogMapper.selectByPlatForm(platform.getServerGBId());
if (platformCatalogs.size() > 0) {
for (PlatformCatalog catalog : platformCatalogs) {
catalogMap.put(catalog.getId(), catalog);
}
}
platformInfoMap.put(platform.getServerGBId(), catalogMap);
}
List<StreamPushItem> streamPushItemListFroPlatform = new ArrayList<>(); List<StreamPushItem> streamPushItemListFroPlatform = new ArrayList<>();
Map<String, List<GbStream>> platformForEvent = new HashMap<>(); Map<String, List<GbStream>> platformForEvent = new HashMap<>();
// 遍历存储结果,查找app+Stream->platformId+catalogId的对应关系,然后执行批量写入 // 遍历存储结果,查找app+Stream->platformId+catalogId的对应关系,然后执行批量写入
@ -388,6 +418,12 @@ public class StreamPushServiceImpl implements IStreamPushService {
streamPushItemForPlatform.setGbStreamId(streamPushItem.getGbStreamId()); streamPushItemForPlatform.setGbStreamId(streamPushItem.getGbStreamId());
if (platFormInfoArray.length > 0) { if (platFormInfoArray.length > 0) {
// 数组 platFormInfoArray 0 为平台ID。 1为目录ID // 数组 platFormInfoArray 0 为平台ID。 1为目录ID
// 不存在这个平台,则忽略导入此关联关系
if (platformInfoMap.get(platFormInfoArray[0]) == null
|| platformInfoMap.get(platFormInfoArray[0]).get(platFormInfoArray[1]) == null) {
logger.info("导入数据时不存在平台或目录{}/{},已导入未分配", platFormInfoArray[0], platFormInfoArray[1] );
continue;
}
streamPushItemForPlatform.setPlatformId(platFormInfoArray[0]); streamPushItemForPlatform.setPlatformId(platFormInfoArray[0]);
List<GbStream> gbStreamList = platformForEvent.get(streamPushItem.getPlatformId()); List<GbStream> gbStreamList = platformForEvent.get(streamPushItem.getPlatformId());
@ -406,8 +442,6 @@ public class StreamPushServiceImpl implements IStreamPushService {
streamPushItemForPlatform.setCatalogId(platFormInfoArray[1]); streamPushItemForPlatform.setCatalogId(platFormInfoArray[1]);
} }
streamPushItemListFroPlatform.add(streamPushItemForPlatform); streamPushItemListFroPlatform.add(streamPushItemForPlatform);
} }
} }
@ -432,9 +466,9 @@ public class StreamPushServiceImpl implements IStreamPushService {
} }
gbStreamService.sendCatalogMsgs(gbStreams, CatalogEvent.DEL); gbStreamService.sendCatalogMsgs(gbStreams, CatalogEvent.DEL);
int delStream = streamPushMapper.delAllForGbStream(gbStreams);
gbStreamMapper.batchDelForGbStream(gbStreams);
platformGbStreamMapper.delByGbStreams(gbStreams); platformGbStreamMapper.delByGbStreams(gbStreams);
gbStreamMapper.batchDelForGbStream(gbStreams);
int delStream = streamPushMapper.delAllForGbStream(gbStreams);
if (delStream > 0) { if (delStream > 0) {
for (GbStream gbStream : gbStreams) { for (GbStream gbStream : gbStreams) {
MediaServerItem mediaServerItem = mediaServerService.getOne(gbStream.getMediaServerId()); MediaServerItem mediaServerItem = mediaServerService.getOne(gbStream.getMediaServerId());

Loading…
Cancel
Save