diff --git a/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java b/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java index 8f109e2b..b4002714 100644 --- a/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java +++ b/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java @@ -27,6 +27,7 @@ public class VideoManagerConstants { public static final String PLAYER_PREFIX = "VMP_PLAYER_"; public static final String PLAY_BLACK_PREFIX = "VMP_PLAYBACK_"; + public static final String DOWNLOAD_PREFIX = "VMP_DOWNLOAD_"; public static final String PLATFORM_KEEPLIVEKEY_PREFIX = "VMP_PLATFORM_KEEPLIVE_"; diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/RecordInfo.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/RecordInfo.java index dcac49d3..24fc2212 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/RecordInfo.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/RecordInfo.java @@ -16,6 +16,8 @@ public class RecordInfo { private String channelId; + private String sn; + private String name; private int sumNum; @@ -61,4 +63,12 @@ public class RecordInfo { public void setChannelId(String channelId) { this.channelId = channelId; } + + public String getSn() { + return sn; + } + + public void setSn(String sn) { + this.sn = sn; + } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/callback/CheckForAllRecordsThread.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/callback/CheckForAllRecordsThread.java index 0327cc52..dff74cfe 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/callback/CheckForAllRecordsThread.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/callback/CheckForAllRecordsThread.java @@ -53,7 +53,7 @@ public class CheckForAllRecordsThread extends Thread { // 自然顺序排序, 元素进行升序排列 this.recordInfo.getRecordList().sort(Comparator.naturalOrder()); RequestMessage msg = new RequestMessage(); - msg.setKey(DeferredResultHolder.CALLBACK_CMD_RECORDINFO + recordInfo.getDeviceId() + recordInfo.getChannelId()); + msg.setKey(DeferredResultHolder.CALLBACK_CMD_RECORDINFO + recordInfo.getDeviceId() + recordInfo.getSn()); msg.setData(recordInfo); deferredResultHolder.invokeAllResult(msg); logger.info("处理完成,返回结果"); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/callback/DeferredResultHolder.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/callback/DeferredResultHolder.java index 255ded1d..de5a4233 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/callback/DeferredResultHolder.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/callback/DeferredResultHolder.java @@ -35,6 +35,10 @@ public class DeferredResultHolder { public static final String CALLBACK_CMD_PLAY = "CALLBACK_PLAY"; + public static final String CALLBACK_CMD_PLAYBACK = "CALLBACK_PLAY"; + + public static final String CALLBACK_CMD_DOWNLOAD = "CALLBACK_DOWNLOAD"; + public static final String CALLBACK_CMD_STOP = "CALLBACK_STOP"; public static final String CALLBACK_CMD_MOBILEPOSITION = "CALLBACK_MOBILEPOSITION"; diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommander.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommander.java index 868dbb4e..6e96dac3 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommander.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommander.java @@ -256,8 +256,9 @@ public interface ISIPCommander { * @param device 视频设备 * @param startTime 开始时间,格式要求:yyyy-MM-dd HH:mm:ss * @param endTime 结束时间,格式要求:yyyy-MM-dd HH:mm:ss + * @param sn */ - boolean recordInfoQuery(Device device, String channelId, String startTime, String endTime, SipSubscribe.Event errorEvent); + boolean recordInfoQuery(Device device, String channelId, String startTime, String endTime, int sn, SipSubscribe.Event errorEvent); /** * 查询报警信息 diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java index e06a7fc8..2f90deed 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java @@ -1196,14 +1196,15 @@ public class SIPCommander implements ISIPCommander { * @param endTime 结束时间,格式要求:yyyy-MM-dd HH:mm:ss */ @Override - public boolean recordInfoQuery(Device device, String channelId, String startTime, String endTime, SipSubscribe.Event errorEvent) { - + public boolean recordInfoQuery(Device device, String channelId, String startTime, String endTime, int sn, SipSubscribe.Event errorEvent) { + + try { StringBuffer recordInfoXml = new StringBuffer(200); recordInfoXml.append("\r\n"); recordInfoXml.append("\r\n"); recordInfoXml.append("RecordInfo\r\n"); - recordInfoXml.append("" + (int)((Math.random()*9+1)*100000) + "\r\n"); + recordInfoXml.append("" + sn + "\r\n"); recordInfoXml.append("" + channelId + "\r\n"); recordInfoXml.append("" + DateUtil.yyyy_MM_dd_HH_mm_ssToISO8601(startTime) + "\r\n"); recordInfoXml.append("" + DateUtil.yyyy_MM_dd_HH_mm_ssToISO8601(endTime) + "\r\n"); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/MessageRequestProcessor1.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/MessageRequestProcessor1.java index 847f7e18..2a908414 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/MessageRequestProcessor1.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/MessageRequestProcessor1.java @@ -914,21 +914,20 @@ public class MessageRequestProcessor1 extends SIPRequestProcessorParent implemen String uuid = UUID.randomUUID().toString().replace("-", ""); RecordInfo recordInfo = new RecordInfo(); Element rootElement = getRootElement(evt); - Element deviceIdElement = rootElement.element("DeviceID"); - String channelId = deviceIdElement.getText().toString(); - String key = DeferredResultHolder.CALLBACK_CMD_RECORDINFO + deviceId + channelId; + String sn = getText(rootElement, "SN"); + String key = DeferredResultHolder.CALLBACK_CMD_RECORDINFO + deviceId + sn; if (device != null ) { rootElement = getRootElement(evt, device.getCharset()); } recordInfo.setDeviceId(deviceId); - recordInfo.setChannelId(channelId); + recordInfo.setSn(sn); recordInfo.setName(getText(rootElement, "Name")); if (getText(rootElement, "SumNum")== null || getText(rootElement, "SumNum") =="") { recordInfo.setSumNum(0); } else { recordInfo.setSumNum(Integer.parseInt(getText(rootElement, "SumNum"))); } - String sn = getText(rootElement, "SN"); + Element recordListElement = rootElement.element("RecordList"); if (recordListElement == null || recordInfo.getSumNum() == 0) { logger.info("无录像数据"); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/RecordInfoResponseMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/RecordInfoResponseMessageHandler.java index f1919da4..f0f84213 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/RecordInfoResponseMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/RecordInfoResponseMessageHandler.java @@ -64,18 +64,16 @@ public class RecordInfoResponseMessageHandler extends SIPRequestProcessorParent rootElement = getRootElement(evt, device.getCharset()); String uuid = UUID.randomUUID().toString().replace("-", ""); RecordInfo recordInfo = new RecordInfo(); - Element deviceIdElement = rootElement.element("DeviceID"); - String channelId = deviceIdElement.getText(); - String key = DeferredResultHolder.CALLBACK_CMD_RECORDINFO + device.getDeviceId() + channelId; + String sn = getText(rootElement, "SN"); + String key = DeferredResultHolder.CALLBACK_CMD_RECORDINFO + device.getDeviceId() + sn; recordInfo.setDeviceId(device.getDeviceId()); - recordInfo.setChannelId(channelId); + recordInfo.setSn(sn); recordInfo.setName(getText(rootElement, "Name")); if (getText(rootElement, "SumNum") == null || getText(rootElement, "SumNum") == "") { recordInfo.setSumNum(0); } else { recordInfo.setSumNum(Integer.parseInt(getText(rootElement, "SumNum"))); } - String sn = getText(rootElement, "SN"); Element recordListElement = rootElement.element("RecordList"); if (recordListElement == null || recordInfo.getSumNum() == 0) { logger.info("无录像数据"); diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java index d6e77c74..e5372f3f 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java @@ -358,14 +358,13 @@ public class ZLMHttpHookListener { String mediaServerId = json.getString("mediaServerId"); String streamId = json.getString("stream"); String app = json.getString("app"); - - // TODO 如果在给上级推流,也不停止。 + JSONObject ret = new JSONObject(); + ret.put("code", 0); if ("rtp".equals(app)){ - JSONObject ret = new JSONObject(); - ret.put("code", 0); ret.put("close", true); StreamInfo streamInfoForPlayCatch = redisCatchStorage.queryPlayByStreamId(streamId); if (streamInfoForPlayCatch != null) { + // 如果在给上级推流,也不停止。 if (redisCatchStorage.isChannelSendingRTP(streamInfoForPlayCatch.getChannelId())) { ret.put("close", false); } else { @@ -378,6 +377,12 @@ public class ZLMHttpHookListener { if (streamInfoForPlayBackCatch != null) { cmder.streamByeCmd(streamInfoForPlayBackCatch.getDeviceID(), streamInfoForPlayBackCatch.getChannelId()); redisCatchStorage.stopPlayback(streamInfoForPlayBackCatch); + }else { + StreamInfo streamInfoForDownload = redisCatchStorage.queryDownloadByStreamId(streamId); + // 进行录像下载时无人观看不断流 + if (streamInfoForDownload != null) { + ret.put("close", false); + } } } MediaServerItem mediaServerItem = mediaServerService.getOne(mediaServerId); @@ -386,8 +391,6 @@ public class ZLMHttpHookListener { } return new ResponseEntity(ret.toString(),HttpStatus.OK); }else { - JSONObject ret = new JSONObject(); - ret.put("code", 0); StreamProxyItem streamProxyItem = streamProxyService.getStreamProxyByAppAndStream(app, streamId); if (streamProxyItem != null && streamProxyItem.isEnable_remove_none_reader()) { ret.put("close", true); diff --git a/src/main/java/com/genersoft/iot/vmp/service/IPlayService.java b/src/main/java/com/genersoft/iot/vmp/service/IPlayService.java index 9e5c4442..8a7437cd 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/IPlayService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/IPlayService.java @@ -18,4 +18,6 @@ public interface IPlayService { PlayResult play(MediaServerItem mediaServerItem, String deviceId, String channelId, ZLMHttpHookSubscribe.Event event, SipSubscribe.Event errorEvent); MediaServerItem getNewMediaServerItem(Device device); + + void onPublishHandlerForDownload(MediaServerItem mediaServerItem, JSONObject response, String deviceId, String channelId, String toString); } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java index f67df040..640e99aa 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java @@ -34,11 +34,6 @@ import org.springframework.stereotype.Service; import org.springframework.util.ResourceUtils; import org.springframework.web.context.request.async.DeferredResult; -import javax.sip.DialogTerminatedEvent; -import javax.sip.ResponseEvent; -import javax.sip.TimeoutEvent; -import javax.sip.TransactionTerminatedEvent; -import javax.sip.message.Response; import java.io.FileNotFoundException; import java.util.Objects; import java.util.UUID; @@ -286,16 +281,35 @@ public class PlayServiceImpl implements IPlayService { return mediaServerItem; } + @Override public void onPublishHandlerForPlayBack(MediaServerItem mediaServerItem, JSONObject resonse, String deviceId, String channelId, String uuid) { RequestMessage msg = new RequestMessage(); - msg.setKey(DeferredResultHolder.CALLBACK_CMD_PLAY + deviceId + channelId); + msg.setKey(DeferredResultHolder.CALLBACK_CMD_PLAYBACK + deviceId + channelId); msg.setId(uuid); StreamInfo streamInfo = onPublishHandler(mediaServerItem, resonse, deviceId, channelId, uuid); if (streamInfo != null) { redisCatchStorage.startPlayback(streamInfo); msg.setData(JSON.toJSONString(streamInfo)); resultHolder.invokeResult(msg); + } else { + logger.warn("设备回放API调用失败!"); + msg.setData("设备回放API调用失败!"); + resultHolder.invokeResult(msg); + } + } + + + @Override + public void onPublishHandlerForDownload(MediaServerItem mediaServerItem, JSONObject response, String deviceId, String channelId, String uuid) { + RequestMessage msg = new RequestMessage(); + msg.setKey(DeferredResultHolder.CALLBACK_CMD_DOWNLOAD + deviceId + channelId); + msg.setId(uuid); + StreamInfo streamInfo = onPublishHandler(mediaServerItem, response, deviceId, channelId, uuid); + if (streamInfo != null) { + redisCatchStorage.startDownload(streamInfo); + msg.setData(JSON.toJSONString(streamInfo)); + resultHolder.invokeResult(msg); } else { logger.warn("设备预览API调用失败!"); msg.setData("设备预览API调用失败!"); @@ -303,6 +317,7 @@ public class PlayServiceImpl implements IPlayService { } } + public StreamInfo onPublishHandler(MediaServerItem mediaServerItem, JSONObject resonse, String deviceId, String channelId, String uuid) { String streamId = resonse.getString("stream"); JSONArray tracks = resonse.getJSONArray("tracks"); diff --git a/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java b/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java index 25743e13..5878339b 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java @@ -144,4 +144,12 @@ public interface IRedisCatchStorage { * @param streamId */ void removePushStream(MediaServerItem mediaServerItem, String app, String streamId); + + /** + * 开始下载录像时存入 + * @param streamInfo + */ + boolean startDownload(StreamInfo streamInfo); + + StreamInfo queryDownloadByStreamId(String streamId); } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java index 0d5b98d6..b3211191 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java @@ -64,15 +64,15 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage { streamInfo.getChannelId())); } @Override - public StreamInfo queryPlayByStreamId(String steamId) { - List playLeys = redis.scan(String.format("%S_%s_*", VideoManagerConstants.PLAYER_PREFIX, steamId)); + public StreamInfo queryPlayByStreamId(String streamId) { + List playLeys = redis.scan(String.format("%S_%s_*", VideoManagerConstants.PLAYER_PREFIX, streamId)); if (playLeys == null || playLeys.size() == 0) return null; return (StreamInfo)redis.get(playLeys.get(0).toString()); } @Override - public StreamInfo queryPlaybackByStreamId(String steamId) { - List playLeys = redis.scan(String.format("%S_%s_*", VideoManagerConstants.PLAY_BLACK_PREFIX, steamId)); + public StreamInfo queryPlaybackByStreamId(String streamId) { + List playLeys = redis.scan(String.format("%S_%s_*", VideoManagerConstants.PLAY_BLACK_PREFIX, streamId)); if (playLeys == null || playLeys.size() == 0) return null; return (StreamInfo)redis.get(playLeys.get(0).toString()); } @@ -104,10 +104,15 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage { @Override public boolean startPlayback(StreamInfo stream) { - return redis.set(String.format("%S_%s_%s_%s", VideoManagerConstants.PLAY_BLACK_PREFIX, stream.getStreamId(),stream.getDeviceID(), stream.getChannelId()), - stream); + return redis.set(String.format("%S_%s_%s_%s", VideoManagerConstants.PLAY_BLACK_PREFIX, stream.getStreamId(), + stream.getDeviceID(), stream.getChannelId()), stream); } + @Override + public boolean startDownload(StreamInfo streamInfo) { + return redis.set(String.format("%S_%s_%s_%s", VideoManagerConstants.DOWNLOAD_PREFIX, streamInfo.getStreamId(), + streamInfo.getDeviceID(), streamInfo.getChannelId()), streamInfo); + } @Override public boolean stopPlayback(StreamInfo streamInfo) { @@ -318,4 +323,11 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage { String key = VideoManagerConstants.WVP_SERVER_STREAM_PUSH_PREFIX + app + "_" + streamId + "_" + mediaServerItem.getId(); redis.del(key); } + + @Override + public StreamInfo queryDownloadByStreamId(String streamId) { + List playLeys = redis.scan(String.format("%S_%s_*", VideoManagerConstants.DOWNLOAD_PREFIX, streamId)); + if (playLeys == null || playLeys.size() == 0) return null; + return (StreamInfo)redis.get(playLeys.get(0).toString()); + } } diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/playback/DownloadController.java b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/playback/DownloadController.java index f6228af4..3f846c60 100644 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/playback/DownloadController.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/playback/DownloadController.java @@ -76,7 +76,7 @@ public class DownloadController { if (logger.isDebugEnabled()) { logger.debug(String.format("历史媒体下载 API调用,deviceId:%s,channelId:%s,downloadSpeed:%s", deviceId, channelId, downloadSpeed)); } - String key = DeferredResultHolder.CALLBACK_CMD_PLAY + deviceId + channelId; + String key = DeferredResultHolder.CALLBACK_CMD_DOWNLOAD + deviceId + channelId; String uuid = UUID.randomUUID().toString(); DeferredResult> result = new DeferredResult>(30000L); // 超时处理 @@ -114,7 +114,7 @@ public class DownloadController { cmder.downloadStreamCmd(newMediaServerItem, ssrcInfo, device, channelId, startTime, endTime, downloadSpeed, (MediaServerItem mediaServerItem, JSONObject response) -> { logger.info("收到订阅消息: " + response.toJSONString()); - playService.onPublishHandlerForPlayBack(mediaServerItem, response, deviceId, channelId, uuid.toString()); + playService.onPublishHandlerForDownload(mediaServerItem, response, deviceId, channelId, uuid.toString()); }, event -> { RequestMessage msg = new RequestMessage(); msg.setId(uuid); diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/playback/PlaybackController.java b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/playback/PlaybackController.java index a0290732..fd1f2ab3 100644 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/playback/PlaybackController.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/playback/PlaybackController.java @@ -78,7 +78,7 @@ public class PlaybackController { logger.debug(String.format("设备回放 API调用,deviceId:%s ,channelId:%s", deviceId, channelId)); } String uuid = UUID.randomUUID().toString(); - String key = DeferredResultHolder.CALLBACK_CMD_PLAY + deviceId + channelId; + String key = DeferredResultHolder.CALLBACK_CMD_PLAYBACK + deviceId + channelId; DeferredResult> result = new DeferredResult>(30000L); Device device = storager.queryVideoDevice(deviceId); if (device == null) { diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/record/GBRecordController.java b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/record/GBRecordController.java index 5b338269..a8675e8e 100644 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/record/GBRecordController.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/record/GBRecordController.java @@ -59,11 +59,12 @@ public class GBRecordController { // 指定超时时间 1分钟30秒 DeferredResult> result = new DeferredResult<>(90*1000L); String uuid = UUID.randomUUID().toString(); - String key = DeferredResultHolder.CALLBACK_CMD_RECORDINFO + deviceId + channelId; + int sn = (int)((Math.random()*9+1)*100000); + String key = DeferredResultHolder.CALLBACK_CMD_RECORDINFO + deviceId + sn; RequestMessage msg = new RequestMessage(); msg.setId(uuid); msg.setKey(key); - cmder.recordInfoQuery(device, channelId, startTime, endTime, (eventResult -> { + cmder.recordInfoQuery(device, channelId, startTime, endTime, sn, (eventResult -> { msg.setData("查询录像失败, status: " + eventResult.statusCode + ", message: " + eventResult.msg ); resultHolder.invokeResult(msg); }));