diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/task/GPSSubscribeTask.java b/src/main/java/com/genersoft/iot/vmp/gb28181/task/GPSSubscribeTask.java index ce990a0d..fc3d0277 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/task/GPSSubscribeTask.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/task/GPSSubscribeTask.java @@ -45,10 +45,21 @@ public class GPSSubscribeTask implements Runnable{ for (GbStream gbStream : gbStreams) { String gbId = gbStream.getGbId(); GPSMsgInfo gpsMsgInfo = redisCatchStorage.getGpsMsgInfo(gbId); - if (gpsMsgInfo != null && gbStream.isStatus()) { - // 发送GPS消息 - sipCommanderForPlatform.sendMobilePosition(parentPlatform, gpsMsgInfo, subscribe); + if (gbStream.isStatus()) { + if (gpsMsgInfo != null) { + // 发送GPS消息 + sipCommanderForPlatform.sendMobilePosition(parentPlatform, gpsMsgInfo, subscribe); + }else { + // 没有在redis找到新的消息就使用数据库的消息 + gpsMsgInfo = new GPSMsgInfo(); + gpsMsgInfo.setId(gbId); + gpsMsgInfo.setLat(gbStream.getLongitude()); + gpsMsgInfo.setLng(gbStream.getLongitude()); + // 发送GPS消息 + sipCommanderForPlatform.sendMobilePosition(parentPlatform, gpsMsgInfo, subscribe); + } } + } } } diff --git a/src/main/java/com/genersoft/iot/vmp/service/StreamGPSSubscribeTask.java b/src/main/java/com/genersoft/iot/vmp/service/StreamGPSSubscribeTask.java new file mode 100644 index 00000000..fd5bb060 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/service/StreamGPSSubscribeTask.java @@ -0,0 +1,39 @@ +package com.genersoft.iot.vmp.service; + +import com.genersoft.iot.vmp.service.bean.GPSMsgInfo; +import com.genersoft.iot.vmp.storager.IRedisCatchStorage; +import com.genersoft.iot.vmp.storager.IVideoManagerStorager; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; + +import java.util.List; + + +/** + * 定时查找redis中的GPS推送消息,并保存到对应的流中 + */ +@Component +public class StreamGPSSubscribeTask { + + @Autowired + private IRedisCatchStorage redisCatchStorage; + + @Autowired + private IVideoManagerStorager storager; + + + + @Scheduled(fixedRate = 30 * 1000) //每30秒执行一次 + public void execute(){ + List gpsMsgInfo = redisCatchStorage.getAllGpsMsgInfo(); + if (gpsMsgInfo.size() > 0) { + storager.updateStreamGPS(gpsMsgInfo); + for (GPSMsgInfo msgInfo : gpsMsgInfo) { + msgInfo.setStored(true); + redisCatchStorage.updateGpsMsgInfo(msgInfo); + } + } + + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/service/bean/GPSMsgInfo.java b/src/main/java/com/genersoft/iot/vmp/service/bean/GPSMsgInfo.java index 855cc5e4..b814c18f 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/bean/GPSMsgInfo.java +++ b/src/main/java/com/genersoft/iot/vmp/service/bean/GPSMsgInfo.java @@ -37,6 +37,8 @@ public class GPSMsgInfo { */ private String altitude; + private boolean stored; + public String getId() { return id; @@ -93,4 +95,12 @@ public class GPSMsgInfo { public void setAltitude(String altitude) { this.altitude = altitude; } + + public boolean isStored() { + return stored; + } + + public void setStored(boolean stored) { + this.stored = stored; + } } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/RedisGPSMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/impl/RedisGPSMsgListener.java index 4e390b73..e3bfcde7 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/RedisGPSMsgListener.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/RedisGPSMsgListener.java @@ -17,7 +17,6 @@ public class RedisGPSMsgListener implements MessageListener { @Override public void onMessage(Message message, byte[] bytes) { GPSMsgInfo gpsMsgInfo = JSON.parseObject(message.getBody(), GPSMsgInfo.class); - System.out.println(JSON.toJSON(gpsMsgInfo)); redisCatchStorage.updateGpsMsgInfo(gpsMsgInfo); } } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java b/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java index ea85cf19..f7b16a84 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java @@ -195,6 +195,7 @@ public interface IRedisCatchStorage { void updateGpsMsgInfo(GPSMsgInfo gpsMsgInfo); GPSMsgInfo getGpsMsgInfo(String gbId); + List getAllGpsMsgInfo(); Long getSN(String method); diff --git a/src/main/java/com/genersoft/iot/vmp/storager/IVideoManagerStorager.java b/src/main/java/com/genersoft/iot/vmp/storager/IVideoManagerStorager.java index 96a487c3..229ab8cb 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/IVideoManagerStorager.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/IVideoManagerStorager.java @@ -4,6 +4,7 @@ import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem; import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem; +import com.genersoft.iot.vmp.service.bean.GPSMsgInfo; import com.genersoft.iot.vmp.vmanager.gb28181.platform.bean.ChannelReduce; import com.github.pagehelper.PageInfo; @@ -456,4 +457,6 @@ public interface IVideoManagerStorager { List queryCatalogInPlatform(String serverGBId); int delRelation(PlatformCatalog platformCatalog); + + int updateStreamGPS(List gpsMsgInfo); } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/GbStreamMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/GbStreamMapper.java index 3a26b4e9..a51f3dc3 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/dao/GbStreamMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/GbStreamMapper.java @@ -3,6 +3,7 @@ package com.genersoft.iot.vmp.storager.dao; import com.genersoft.iot.vmp.gb28181.bean.GbStream; import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem; import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem; +import com.genersoft.iot.vmp.service.bean.GPSMsgInfo; import org.apache.ibatis.annotations.*; import org.springframework.stereotype.Repository; @@ -94,4 +95,13 @@ public interface GbStreamMapper { void batchAdd(List subList); + @Update({""}) + int updateStreamGPS(List gpsMsgInfos); } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java index adda2318..2dcc2171 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java @@ -453,7 +453,7 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage { @Override public void updateGpsMsgInfo(GPSMsgInfo gpsMsgInfo) { String key = VideoManagerConstants.WVP_STREAM_GPS_MSG_PREFIX + userSetup.getServerId() + "_" + gpsMsgInfo.getId(); - redis.set(key, gpsMsgInfo); + redis.set(key, gpsMsgInfo, 60); // 默认GPS消息保存1分钟 } @Override @@ -476,4 +476,20 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage { public void delSubscribe(String key) { redis.del(key); } + + @Override + public List getAllGpsMsgInfo() { + String scanKey = VideoManagerConstants.WVP_STREAM_GPS_MSG_PREFIX + userSetup.getServerId() + "_*"; + List result = new ArrayList<>(); + List keys = redis.scan(scanKey); + for (int i = 0; i < keys.size(); i++) { + String key = (String) keys.get(i); + GPSMsgInfo gpsMsgInfo = (GPSMsgInfo) redis.get(key); + if (!gpsMsgInfo.isStored()) { // 只取没有存过得 + result.add((GPSMsgInfo)redis.get(key)); + } + } + + return result; + } } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStoragerImpl.java b/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStoragerImpl.java index d381574d..dba268db 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStoragerImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStoragerImpl.java @@ -6,6 +6,7 @@ import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem; import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem; import com.genersoft.iot.vmp.service.IGbStreamService; +import com.genersoft.iot.vmp.service.bean.GPSMsgInfo; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorager; import com.genersoft.iot.vmp.storager.dao.*; @@ -898,4 +899,9 @@ public class VideoManagerStoragerImpl implements IVideoManagerStorager { } return 0; } + + @Override + public int updateStreamGPS(List gpsMsgInfos) { + return gbStreamMapper.updateStreamGPS(gpsMsgInfos); + } }