diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/session/VideoStreamSessionManager.java b/src/main/java/com/genersoft/iot/vmp/gb28181/session/VideoStreamSessionManager.java index 65e1e5f6..360472fb 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/session/VideoStreamSessionManager.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/session/VideoStreamSessionManager.java @@ -16,6 +16,7 @@ public class VideoStreamSessionManager { private ConcurrentHashMap sessionMap = new ConcurrentHashMap<>(); private ConcurrentHashMap ssrcMap = new ConcurrentHashMap<>(); + private ConcurrentHashMap streamIdMap = new ConcurrentHashMap<>(); public String createPlaySsrc(){ return SsrcUtil.getPlaySsrc(); @@ -25,18 +26,23 @@ public class VideoStreamSessionManager { return SsrcUtil.getPlayBackSsrc(); } - public void put(String streamId,String ssrc,ClientTransaction transaction){ - sessionMap.put(streamId, transaction); - ssrcMap.put(streamId, ssrc); + public void put(String deviceId, String channelId ,String ssrc, String streamId, ClientTransaction transaction){ + sessionMap.put(deviceId + "_" + channelId, transaction); + ssrcMap.put(deviceId + "_" + channelId, ssrc); + streamIdMap.put(deviceId + "_" + channelId, streamId); } - public ClientTransaction get(String streamId){ - return sessionMap.get(streamId); + public ClientTransaction getTransaction(String deviceId, String channelId){ + return sessionMap.get(deviceId + "_" + channelId); + } + + public String getStreamId(String deviceId, String channelId){ + return streamIdMap.get(deviceId + "_" + channelId); } - public void remove(String streamId) { - sessionMap.remove(streamId); - SsrcUtil.releaseSsrc(ssrcMap.get(streamId)); - ssrcMap.remove(streamId); + public void remove(String deviceId, String channelId) { + sessionMap.remove(deviceId + "_" + channelId); + SsrcUtil.releaseSsrc(ssrcMap.get(deviceId + "_" + channelId)); + ssrcMap.remove(deviceId + "_" + channelId); } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommander.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommander.java index dd50e065..4e111d64 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommander.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommander.java @@ -87,7 +87,6 @@ public interface ISIPCommander { /** * 请求预览视频流 - * * @param device 视频设备 * @param channelId 预览通道 */ @@ -108,8 +107,8 @@ public interface ISIPCommander { * * @param ssrc ssrc */ - void streamByeCmd(String ssrc, SipSubscribe.Event okEvent); - void streamByeCmd(String ssrc); + void streamByeCmd(String deviceId, String channelId, SipSubscribe.Event okEvent); + void streamByeCmd(String deviceId, String channelId); /** * 语音广播 diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java index 84c08985..e0d0bf30 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java @@ -332,17 +332,17 @@ public class SIPCommander implements ISIPCommander { /** * 请求预览视频流 - * @param device 视频设备 - * @param channelId 预览通道 - * @param event hook订阅 - * @param errorEvent sip错误订阅 - */ + * @param device 视频设备 + * @param channelId 预览通道 + * @param event hook订阅 + * @param errorEvent sip错误订阅 + */ @Override public void playStreamCmd(Device device, String channelId, ZLMHttpHookSubscribe.Event event, SipSubscribe.Event errorEvent) { + String streamId = null; try { if (device == null) return; String ssrc = streamSession.createPlaySsrc(); - String streamId = null; if (rtpEnable) { streamId = String.format("gb_play_%s_%s", device.getDeviceId(), channelId); }else { @@ -444,9 +444,12 @@ public class SIPCommander implements ISIPCommander { Request request = headerProvider.createInviteRequest(device, channelId, content.toString(), null, "FromInvt" + tm, null, ssrc, callIdHeader); - ClientTransaction transaction = transmitRequest(device, request, errorEvent); - streamSession.put(streamId,ssrc, transaction); - + ClientTransaction transaction = transmitRequest(device, request, (e -> { + streamSession.remove(device.getDeviceId(), channelId); + errorEvent.response(e); + })); + streamSession.put(device.getDeviceId(), channelId ,ssrc,streamId, transaction); + } catch ( SipException | ParseException | InvalidArgumentException e) { e.printStackTrace(); } @@ -552,7 +555,7 @@ public class SIPCommander implements ISIPCommander { Request request = headerProvider.createPlaybackInviteRequest(device, channelId, content.toString(), null, "fromplybck" + tm, null, callIdHeader); ClientTransaction transaction = transmitRequest(device, request, errorEvent); - streamSession.put(streamId, ssrc, transaction); + streamSession.put(device.getDeviceId(), channelId, ssrc, streamId, transaction); } catch ( SipException | ParseException | InvalidArgumentException e) { e.printStackTrace(); @@ -566,17 +569,17 @@ public class SIPCommander implements ISIPCommander { * */ @Override - public void streamByeCmd(String ssrc) { - streamByeCmd(ssrc, null); + public void streamByeCmd(String deviceId, String channelId) { + streamByeCmd(deviceId, channelId, null); } @Override - public void streamByeCmd(String streamId, SipSubscribe.Event okEvent) { + public void streamByeCmd(String deviceId, String channelId, SipSubscribe.Event okEvent) { try { - ClientTransaction transaction = streamSession.get(streamId); + ClientTransaction transaction = streamSession.getTransaction(deviceId, channelId); // 服务重启后 if (transaction == null) { - StreamInfo streamInfo = redisCatchStorage.queryPlayByStreamId(streamId); + StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(deviceId, channelId); if (streamInfo != null) { } @@ -613,14 +616,9 @@ public class SIPCommander implements ISIPCommander { } dialog.sendRequest(clientTransaction); - - streamSession.remove(streamId); - zlmrtpServerFactory.closeRTPServer(streamId); - } catch (TransactionDoesNotExistException e) { - e.printStackTrace(); - } catch (SipException e) { - e.printStackTrace(); - } catch (ParseException e) { + zlmrtpServerFactory.closeRTPServer(streamSession.getStreamId(deviceId, channelId)); + streamSession.remove(deviceId, channelId); + } catch (SipException | ParseException e) { e.printStackTrace(); } } @@ -641,7 +639,6 @@ public class SIPCommander implements ISIPCommander { * 语音广播 * * @param device 视频设备 - * @param channelId 预览通道 */ @Override public boolean audioBroadcastCmd(Device device) { @@ -1140,7 +1137,7 @@ public class SIPCommander implements ISIPCommander { * @param device 视频设备 * @param startPriority 报警起始级别(可选) * @param endPriority 报警终止级别(可选) - * @param alarmMethods 报警方式条件(可选) + * @param alarmMethod 报警方式条件(可选) * @param alarmType 报警类型 * @param startTime 报警发生起始时间(可选) * @param endTime 报警发生终止时间(可选) @@ -1428,5 +1425,6 @@ public class SIPCommander implements ISIPCommander { String streamId = String.format("gb_play_%s_%s", device.getDeviceId(), channelId); zlmrtpServerFactory.closeRTPServer(streamId); } + streamSession.remove(device.getDeviceId(), channelId); } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/ByeRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/ByeRequestProcessor.java index ec5f921f..ec136e99 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/ByeRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/ByeRequestProcessor.java @@ -58,7 +58,7 @@ public class ByeRequestProcessor extends SIPRequestAbstractProcessor { redisCatchStorage.deleteSendRTPServer(platformGbId, channelId); if (zlmrtpServerFactory.totalReaderCount(sendRtpItem.getApp(), streamId) == 0) { System.out.println(streamId + "无其它观看者,通知设备停止推流"); - cmder.streamByeCmd(streamId); + cmder.streamByeCmd(sendRtpItem.getDeviceId(), channelId); } } } catch (SipException e) { diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/MessageRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/MessageRequestProcessor.java index 6341fb27..6306a986 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/MessageRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/MessageRequestProcessor.java @@ -922,7 +922,7 @@ public class MessageRequestProcessor extends SIPRequestAbstractProcessor { StreamInfo streamInfo = redisCatchStorage.queryPlaybackByDevice(deviceId, "*"); if (streamInfo != null) { redisCatchStorage.stopPlayback(streamInfo); - cmder.streamByeCmd(streamInfo.getStreamId()); + cmder.streamByeCmd(streamInfo.getDeviceID(), streamInfo.getChannelId()); } } } catch (ParseException | SipException | InvalidArgumentException | DocumentException e) { diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java index 2a76f26a..183a5441 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java @@ -306,12 +306,12 @@ public class ZLMHttpHookListener { if (redisCatchStorage.isChannelSendingRTP(streamInfo.getChannelId())) { ret.put("close", false); } else { - cmder.streamByeCmd(streamId); + cmder.streamByeCmd(streamInfo.getDeviceID(), streamInfo.getChannelId()); redisCatchStorage.stopPlay(streamInfo); storager.stopPlay(streamInfo.getDeviceID(), streamInfo.getChannelId()); } }else{ - cmder.streamByeCmd(streamId); + cmder.streamByeCmd(streamInfo.getDeviceID(), streamInfo.getChannelId()); streamInfo = redisCatchStorage.queryPlaybackByStreamId(streamId); redisCatchStorage.stopPlayback(streamInfo); } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java index f2579cdd..b4076d09 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java @@ -63,7 +63,16 @@ public class PlayServiceImpl implements IPlayService { playResult.setResult(result); // 录像查询以channelId作为deviceId查询 resultHolder.put(DeferredResultHolder.CALLBACK_CMD_PlAY + uuid, result); - + // 超时处理 + result.onTimeout(()->{ + logger.warn(String.format("设备点播超时,deviceId:%s ,channelId:%s", deviceId, channelId)); + // 释放rtpserver + cmder.closeRTPServer(playResult.getDevice(), channelId); + RequestMessage msg = new RequestMessage(); + msg.setId(DeferredResultHolder.CALLBACK_CMD_PlAY + playResult.getUuid()); + msg.setData("Timeout"); + resultHolder.invokeResult(msg); + }); if (streamInfo == null) { // 发送点播消息 cmder.playStreamCmd(device, channelId, (JSONObject response) -> { @@ -76,6 +85,7 @@ public class PlayServiceImpl implements IPlayService { RequestMessage msg = new RequestMessage(); msg.setId(DeferredResultHolder.CALLBACK_CMD_PlAY + uuid); Response response = event.getResponse(); + cmder.closeRTPServer(playResult.getDevice(), channelId); msg.setData(String.format("点播失败, 错误码: %s, %s", response.getStatusCode(), response.getReasonPhrase())); resultHolder.invokeResult(msg); if (errorEvent != null) { @@ -107,6 +117,7 @@ public class PlayServiceImpl implements IPlayService { logger.info("收到订阅消息: " + response.toJSONString()); onPublishHandlerForPlay(response, deviceId, channelId, uuid.toString()); }, event -> { + cmder.closeRTPServer(playResult.getDevice(), channelId); RequestMessage msg = new RequestMessage(); msg.setId(DeferredResultHolder.CALLBACK_CMD_PlAY + uuid); Response response = event.getResponse(); diff --git a/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java b/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java index d0d07f79..8b2c2bf5 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java @@ -36,7 +36,7 @@ public interface IRedisCatchStorage { StreamInfo queryPlaybackByStreamId(String steamId); - StreamInfo queryPlayByDevice(String deviceId, String code); + StreamInfo queryPlayByDevice(String deviceId, String channelId); /** * 更新流媒体信息 diff --git a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java index 3417acf8..98795ef8 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java @@ -75,11 +75,11 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage { } @Override - public StreamInfo queryPlayByDevice(String deviceId, String code) { + public StreamInfo queryPlayByDevice(String deviceId, String channelId) { // List playLeys = redis.keys(String.format("%S_*_%s_%s", VideoManagerConstants.PLAYER_PREFIX, List playLeys = redis.scan(String.format("%S_*_%s_%s", VideoManagerConstants.PLAYER_PREFIX, deviceId, - code)); + channelId)); if (playLeys == null || playLeys.size() == 0) return null; return (StreamInfo)redis.get(playLeys.get(0).toString()); } diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/play/PlayController.java b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/play/PlayController.java index 18f1a74c..f9b6a35d 100644 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/play/PlayController.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/play/PlayController.java @@ -75,27 +75,19 @@ public class PlayController { PlayResult playResult = playService.play(deviceId, channelId, null, null); - // 超时处理 - playResult.getResult().onTimeout(()->{ - logger.warn(String.format("设备点播超时,deviceId:%s ,channelId:%s", deviceId, channelId)); - // 释放rtpserver - cmder.closeRTPServer(playResult.getDevice(), channelId); - RequestMessage msg = new RequestMessage(); - msg.setId(DeferredResultHolder.CALLBACK_CMD_PlAY + playResult.getUuid()); - msg.setData("Timeout"); - resultHolder.invokeResult(msg); - }); + return playResult.getResult(); } @ApiOperation("停止点播") @ApiImplicitParams({ - @ApiImplicitParam(name = "streamId", value = "视频流ID", dataTypeClass = String.class), + @ApiImplicitParam(name = "deviceId", value = "设备ID", dataTypeClass = String.class), + @ApiImplicitParam(name = "channelId", value = "通道ID", dataTypeClass = String.class), }) - @GetMapping("/stop/{streamId}") - public DeferredResult> playStop(@PathVariable String streamId) { + @GetMapping("/stop/{deviceId}/{channelId}") + public DeferredResult> playStop(@PathVariable String deviceId, @PathVariable String channelId) { - logger.debug(String.format("设备预览/回放停止API调用,streamId:%s", streamId)); + logger.debug(String.format("设备预览/回放停止API调用,streamId:%s/$s", deviceId, channelId )); UUID uuid = UUID.randomUUID(); DeferredResult> result = new DeferredResult>(); @@ -103,8 +95,8 @@ public class PlayController { // 录像查询以channelId作为deviceId查询 resultHolder.put(DeferredResultHolder.CALLBACK_CMD_STOP + uuid, result); - cmder.streamByeCmd(streamId, event -> { - StreamInfo streamInfo = redisCatchStorage.queryPlayByStreamId(streamId); + cmder.streamByeCmd(deviceId, channelId, event -> { + StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(deviceId, channelId); if (streamInfo == null) { RequestMessage msg = new RequestMessage(); msg.setId(DeferredResultHolder.CALLBACK_CMD_PlAY + uuid); @@ -121,9 +113,10 @@ public class PlayController { } }); - if (streamId != null) { + if (deviceId != null || channelId != null) { JSONObject json = new JSONObject(); - json.put("streamId", streamId); + json.put("deviceId", deviceId); + json.put("channelId", channelId); RequestMessage msg = new RequestMessage(); msg.setId(DeferredResultHolder.CALLBACK_CMD_PlAY + uuid); msg.setData(json.toString()); @@ -138,7 +131,7 @@ public class PlayController { // 超时处理 result.onTimeout(()->{ - logger.warn(String.format("设备预览/回放停止超时,streamId:%s ", streamId)); + logger.warn(String.format("设备预览/回放停止超时,deviceId/channelId:%s/$s ", deviceId, channelId)); RequestMessage msg = new RequestMessage(); msg.setId(DeferredResultHolder.CALLBACK_CMD_STOP + uuid); msg.setData("Timeout"); diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/playback/PlaybackController.java b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/playback/PlaybackController.java index daa5557b..11c210ed 100644 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/playback/PlaybackController.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/playback/PlaybackController.java @@ -84,7 +84,7 @@ public class PlaybackController { StreamInfo streamInfo = redisCatchStorage.queryPlaybackByDevice(deviceId, channelId); if (streamInfo != null) { // 停止之前的回放 - cmder.streamByeCmd(streamInfo.getStreamId()); + cmder.streamByeCmd(deviceId, channelId); } resultHolder.put(DeferredResultHolder.CALLBACK_CMD_PlAY + uuid, result); cmder.playbackStreamCmd(device, channelId, startTime, endTime, (JSONObject response) -> { @@ -103,20 +103,22 @@ public class PlaybackController { @ApiOperation("停止视频回放") @ApiImplicitParams({ - @ApiImplicitParam(name = "ssrc", value = "视频流标识", dataTypeClass = String.class), + @ApiImplicitParam(name = "deviceId", value = "设备ID", dataTypeClass = String.class), + @ApiImplicitParam(name = "channelId", value = "通道ID", dataTypeClass = String.class), }) - @GetMapping("/stop/{ssrc}") - public ResponseEntity playStop(@PathVariable String ssrc) { + @GetMapping("/stop/{deviceId}/{channelId}") + public ResponseEntity playStop(@PathVariable String deviceId, @PathVariable String channelId) { - cmder.streamByeCmd(ssrc); + cmder.streamByeCmd(deviceId, channelId); if (logger.isDebugEnabled()) { - logger.debug(String.format("设备录像回放停止 API调用,ssrc:%s", ssrc)); + logger.debug(String.format("设备录像回放停止 API调用,deviceId/channelId:%s/%s", deviceId, channelId)); } - if (ssrc != null) { + if (deviceId != null && channelId != null) { JSONObject json = new JSONObject(); - json.put("ssrc", ssrc); + json.put("deviceId", deviceId); + json.put("channelId", channelId); return new ResponseEntity(json.toString(), HttpStatus.OK); } else { logger.warn("设备录像回放停止API调用失败!"); diff --git a/src/main/java/com/genersoft/iot/vmp/web/ApiStreamController.java b/src/main/java/com/genersoft/iot/vmp/web/ApiStreamController.java index bc505efe..932684a2 100644 --- a/src/main/java/com/genersoft/iot/vmp/web/ApiStreamController.java +++ b/src/main/java/com/genersoft/iot/vmp/web/ApiStreamController.java @@ -163,7 +163,7 @@ public class ApiStreamController { result.put("error","未找到流信息"); return result; } - cmder.streamByeCmd(streamInfo.getStreamId()); + cmder.streamByeCmd(serial, code); redisCatchStorage.stopPlay(streamInfo); storager.stopPlay(streamInfo.getDeviceID(), streamInfo.getChannelId()); return null; diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index caf4dfcd..ef46c2ad 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -1,3 +1,3 @@ spring: profiles: - active: dev \ No newline at end of file + active: local \ No newline at end of file diff --git a/web_src/src/components/channelList.vue b/web_src/src/components/channelList.vue index 89fe0aef..7dbed081 100644 --- a/web_src/src/components/channelList.vue +++ b/web_src/src/components/channelList.vue @@ -216,12 +216,12 @@ export default { var that = this; this.$axios({ method: 'get', - url: '/api/play/stop/' + itemData.streamId + url: '/api/play/stop/' + this.deviceId + "/" + itemData.channelId }).then(function (res) { console.log(JSON.stringify(res)); that.initData(); }).catch(function (error) { - if (error.response.status == 402) { // 已经停止过 + if (error.response.status === 402) { // 已经停止过 that.initData(); }else { console.log(error) @@ -253,7 +253,7 @@ export default { this.$axios({ method: 'get', - url:`/api/device/query/sub_channels/${this.deviceId}/${this.parentChannelId}/channels`, + url:`/api/device/query/sub_channels/${this.deviceId}/${this.parentChannelId}/channels`, params: { page: that.currentPage, count: that.count, diff --git a/web_src/src/components/dialog/devicePlayer.vue b/web_src/src/components/dialog/devicePlayer.vue index f5c7d501..6d91da9a 100644 --- a/web_src/src/components/dialog/devicePlayer.vue +++ b/web_src/src/components/dialog/devicePlayer.vue @@ -415,7 +415,7 @@ export default { this.videoUrl = ''; this.$axios({ method: 'get', - url: '/api/playback/stop/' + this.streamId + url: '/api/playback/stop/' + this.deviceId + "/" + this.channelId }).then(function (res) { if (callback) callback() });