From 1b44ba33671e27c3d206d875306b226c770b7980 Mon Sep 17 00:00:00 2001 From: panlinlin <648540858@qq.com> Date: Fri, 15 Jan 2021 18:22:57 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=8C=E6=88=90=E5=90=91=E4=B8=8A=E7=BA=A7?= =?UTF-8?q?=E8=81=94->=E7=82=B9=E6=92=AD--002?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../gb28181/transmit/SIPProcessorFactory.java | 6 ++ .../request/impl/InviteRequestProcessor.java | 80 +++++++++++++++++-- .../vmp/storager/IVideoManagerStorager.java | 2 + .../impl/VideoManagerStoragerImpl.java | 5 ++ .../iot/vmp/vmanager/play/PlayController.java | 54 ++----------- .../vmp/vmanager/play/bean/PlayResult.java | 37 +++++++++ .../vmp/vmanager/service/IPlayService.java | 7 ++ .../service/impl/PlayServiceImpl.java | 75 +++++++++++++++++ 8 files changed, 210 insertions(+), 56 deletions(-) create mode 100644 src/main/java/com/genersoft/iot/vmp/vmanager/play/bean/PlayResult.java diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPProcessorFactory.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPProcessorFactory.java index 75db9107..f4e49281 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPProcessorFactory.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPProcessorFactory.java @@ -15,6 +15,7 @@ import com.alibaba.fastjson.JSON; import com.genersoft.iot.vmp.gb28181.transmit.response.impl.*; import com.genersoft.iot.vmp.gb28181.transmit.response.impl.*; import com.genersoft.iot.vmp.gb28181.event.SipSubscribe; +import com.genersoft.iot.vmp.vmanager.service.IPlayService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -103,6 +104,9 @@ public class SIPProcessorFactory { @Autowired private OtherResponseProcessor otherResponseProcessor; + @Autowired + private IPlayService playService; + // 注:这里使用注解会导致循环依赖注入,暂用springBean private SipProvider tcpSipProvider; @@ -120,7 +124,9 @@ public class SIPProcessorFactory { processor.setTcpSipProvider(getTcpSipProvider()); processor.setUdpSipProvider(getUdpSipProvider()); + processor.setCmder(cmder); processor.setCmderFroPlatform(cmderFroPlatform); + processor.setPlayService(playService); processor.setStorager(storager); return processor; } else if (Request.REGISTER.equals(method)) { diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/InviteRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/InviteRequestProcessor.java index 4dea5d44..58e11ad6 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/InviteRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/InviteRequestProcessor.java @@ -10,19 +10,26 @@ import javax.sip.header.SubjectHeader; import javax.sip.message.Request; import javax.sip.message.Response; +import com.alibaba.fastjson.JSONObject; +import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; import com.genersoft.iot.vmp.gb28181.sdp.Codec; import com.genersoft.iot.vmp.gb28181.sdp.MediaDescription; import com.genersoft.iot.vmp.gb28181.sdp.SdpParser; import com.genersoft.iot.vmp.gb28181.sdp.SessionDescription; +import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; +import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage; import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander; import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform; import com.genersoft.iot.vmp.gb28181.transmit.request.SIPRequestAbstractProcessor; import com.genersoft.iot.vmp.storager.IVideoManagerStorager; +import com.genersoft.iot.vmp.vmanager.play.bean.PlayResult; +import com.genersoft.iot.vmp.vmanager.service.IPlayService; import gov.nist.javax.sip.address.AddressImpl; import gov.nist.javax.sip.address.SipUri; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; import java.io.IOException; import java.text.ParseException; @@ -41,6 +48,10 @@ public class InviteRequestProcessor extends SIPRequestAbstractProcessor { private IVideoManagerStorager storager; + private SIPCommander cmder; + + private IPlayService playService; + /** * 处理invite请求 * @@ -119,7 +130,30 @@ public class InviteRequestProcessor extends SIPRequestAbstractProcessor { String ssrc = sdp.getSsrc(); + + Device device = storager.queryVideoDeviceByPlatformIdAndChannelId(platformId, channelId); + if (device == null) { + logger.warn("点播平台{}的通道{}时未找到设备信息", platformId, channel); + response500Ack(evt); + return; + } + // 通知下级推流, + PlayResult playResult = playService.play(device.getDeviceId(), channelId, (response)->{ + // 收到推流, 回复200OK + + },(event -> { + // 未知错误。直接转发设备点播的错误 + Response response = null; + try { + response = getMessageFactory().createResponse(event.getResponse().getStatusCode(), evt.getRequest()); + getServerTransaction(evt).sendResponse(response); + + } catch (ParseException | SipException | InvalidArgumentException e) { + e.printStackTrace(); + } + })); + playResult.getResult(); // 查找合适的端口推流, // 发送 200ok // 收到ack后调用推流接口 @@ -149,14 +183,16 @@ public class InviteRequestProcessor extends SIPRequestAbstractProcessor { } /*** - * 回复404 + * 回复200 OK * @param evt * @throws SipException * @throws InvalidArgumentException * @throws ParseException */ - private void response404Ack(RequestEvent evt) throws SipException, InvalidArgumentException, ParseException { - Response response = getMessageFactory().createResponse(Response.NOT_FOUND, evt.getRequest()); + private void responseAck(RequestEvent evt, String sdp) throws SipException, InvalidArgumentException, ParseException { + Response response = getMessageFactory().createResponse(Response.OK, evt.getRequest()); + ContentTypeHeader contentTypeHeader = getHeaderFactory().createContentTypeHeader("APPLICATION", "SDP"); + response.setContent(sdp, contentTypeHeader); getServerTransaction(evt).sendResponse(response); } @@ -172,6 +208,18 @@ public class InviteRequestProcessor extends SIPRequestAbstractProcessor { getServerTransaction(evt).sendResponse(response); } + /*** + * 回复404 + * @param evt + * @throws SipException + * @throws InvalidArgumentException + * @throws ParseException + */ + private void response404Ack(RequestEvent evt) throws SipException, InvalidArgumentException, ParseException { + Response response = getMessageFactory().createResponse(Response.NOT_FOUND, evt.getRequest()); + getServerTransaction(evt).sendResponse(response); + } + /*** * 回复488 * @param evt @@ -185,16 +233,14 @@ public class InviteRequestProcessor extends SIPRequestAbstractProcessor { } /*** - * 回复200 OK + * 回复500 * @param evt * @throws SipException * @throws InvalidArgumentException * @throws ParseException */ - private void responseAck(RequestEvent evt, String sdp) throws SipException, InvalidArgumentException, ParseException { - Response response = getMessageFactory().createResponse(Response.OK, evt.getRequest()); - ContentTypeHeader contentTypeHeader = getHeaderFactory().createContentTypeHeader("APPLICATION", "SDP"); - response.setContent(sdp, contentTypeHeader); + private void response500Ack(RequestEvent evt) throws SipException, InvalidArgumentException, ParseException { + Response response = getMessageFactory().createResponse(Response.SERVER_INTERNAL_ERROR, evt.getRequest()); getServerTransaction(evt).sendResponse(response); } @@ -207,6 +253,8 @@ public class InviteRequestProcessor extends SIPRequestAbstractProcessor { + + public SIPCommanderFroPlatform getCmderFroPlatform() { return cmderFroPlatform; } @@ -222,4 +270,20 @@ public class InviteRequestProcessor extends SIPRequestAbstractProcessor { public void setStorager(IVideoManagerStorager storager) { this.storager = storager; } + + public SIPCommander getCmder() { + return cmder; + } + + public void setCmder(SIPCommander cmder) { + this.cmder = cmder; + } + + public IPlayService getPlayService() { + return playService; + } + + public void setPlayService(IPlayService playService) { + this.playService = playService; + } } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/IVideoManagerStorager.java b/src/main/java/com/genersoft/iot/vmp/storager/IVideoManagerStorager.java index 7963109c..7a2a76c2 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/IVideoManagerStorager.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/IVideoManagerStorager.java @@ -234,4 +234,6 @@ public interface IVideoManagerStorager { DeviceChannel queryChannelInParentPlatform(String platformId, String channelId); + + Device queryVideoDeviceByPlatformIdAndChannelId(String platformId, String channelId); } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStoragerImpl.java b/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStoragerImpl.java index aa83e3d1..dac0aa24 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStoragerImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStoragerImpl.java @@ -335,4 +335,9 @@ public class VideoManagerStoragerImpl implements IVideoManagerStorager { DeviceChannel channel = patformChannelMapper.queryChannelInParentPlatform(platformId, channelId); return channel; } + + @Override + public Device queryVideoDeviceByPlatformIdAndChannelId(String platformId, String channelId) { + return null; + } } diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/play/PlayController.java b/src/main/java/com/genersoft/iot/vmp/vmanager/play/PlayController.java index 3bd828a9..9b9a69ba 100644 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/play/PlayController.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/play/PlayController.java @@ -10,6 +10,7 @@ import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage; import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils; import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; +import com.genersoft.iot.vmp.vmanager.play.bean.PlayResult; import com.genersoft.iot.vmp.vmanager.service.IPlayService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -64,62 +65,19 @@ public class PlayController { @PathVariable String channelId) { - Device device = storager.queryVideoDevice(deviceId); - StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(deviceId, channelId); - - UUID uuid = UUID.randomUUID(); - DeferredResult> result = new DeferredResult>(); - - // 录像查询以channelId作为deviceId查询 - resultHolder.put(DeferredResultHolder.CALLBACK_CMD_PlAY + uuid, result); - - if (streamInfo == null) { - // 发送点播消息 - cmder.playStreamCmd(device, channelId, (JSONObject response) -> { - logger.info("收到订阅消息: " + response.toJSONString()); - playService.onPublishHandlerForPlay(response, deviceId, channelId, uuid.toString()); - }, event -> { - RequestMessage msg = new RequestMessage(); - msg.setId(DeferredResultHolder.CALLBACK_CMD_PlAY + uuid); - Response response = event.getResponse(); - msg.setData(String.format("点播失败, 错误码: %s, %s", response.getStatusCode(), response.getReasonPhrase())); - resultHolder.invokeResult(msg); - }); - } else { - String streamId = streamInfo.getStreamId(); - JSONObject rtpInfo = zlmresTfulUtils.getRtpInfo(streamId); - if (rtpInfo.getBoolean("exist")) { - RequestMessage msg = new RequestMessage(); - msg.setId(DeferredResultHolder.CALLBACK_CMD_PlAY + uuid); - msg.setData(JSON.toJSONString(streamInfo)); - resultHolder.invokeResult(msg); - } else { - redisCatchStorage.stopPlay(streamInfo); - storager.stopPlay(streamInfo.getDeviceID(), streamInfo.getChannelId()); - cmder.playStreamCmd(device, channelId, (JSONObject response) -> { - logger.info("收到订阅消息: " + response.toJSONString()); - playService.onPublishHandlerForPlay(response, deviceId, channelId, uuid.toString()); - }, event -> { - RequestMessage msg = new RequestMessage(); - msg.setId(DeferredResultHolder.CALLBACK_CMD_PlAY + uuid); - Response response = event.getResponse(); - msg.setData(String.format("点播失败, 错误码: %s, %s", response.getStatusCode(), response.getReasonPhrase())); - resultHolder.invokeResult(msg); - }); - } - } + PlayResult playResult = playService.play(deviceId, channelId, null, null); // 超时处理 - result.onTimeout(()->{ + playResult.getResult().onTimeout(()->{ logger.warn(String.format("设备点播超时,deviceId:%s ,channelId:%s", deviceId, channelId)); // 释放rtpserver - cmder.closeRTPServer(device, channelId); + cmder.closeRTPServer(playResult.getDevice(), channelId); RequestMessage msg = new RequestMessage(); - msg.setId(DeferredResultHolder.CALLBACK_CMD_PlAY + uuid); + msg.setId(DeferredResultHolder.CALLBACK_CMD_PlAY + playResult.getUuid()); msg.setData("Timeout"); resultHolder.invokeResult(msg); }); - return result; + return playResult.getResult(); } @PostMapping("/play/{streamId}/stop") diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/play/bean/PlayResult.java b/src/main/java/com/genersoft/iot/vmp/vmanager/play/bean/PlayResult.java new file mode 100644 index 00000000..3d21349e --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/play/bean/PlayResult.java @@ -0,0 +1,37 @@ +package com.genersoft.iot.vmp.vmanager.play.bean; + +import com.genersoft.iot.vmp.gb28181.bean.Device; +import org.springframework.http.ResponseEntity; +import org.springframework.web.context.request.async.DeferredResult; + +public class PlayResult { + + private DeferredResult> result; + private String uuid; + + private Device device; + + public DeferredResult> getResult() { + return result; + } + + public void setResult(DeferredResult> result) { + this.result = result; + } + + public String getUuid() { + return uuid; + } + + public void setUuid(String uuid) { + this.uuid = uuid; + } + + public Device getDevice() { + return device; + } + + public void setDevice(Device device) { + this.device = device; + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/service/IPlayService.java b/src/main/java/com/genersoft/iot/vmp/vmanager/service/IPlayService.java index a80ab5df..898c0148 100644 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/service/IPlayService.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/service/IPlayService.java @@ -2,6 +2,11 @@ package com.genersoft.iot.vmp.vmanager.service; import com.alibaba.fastjson.JSONObject; import com.genersoft.iot.vmp.common.StreamInfo; +import com.genersoft.iot.vmp.gb28181.event.SipSubscribe; +import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe; +import com.genersoft.iot.vmp.vmanager.play.bean.PlayResult; +import org.springframework.http.ResponseEntity; +import org.springframework.web.context.request.async.DeferredResult; /** * 点播处理 @@ -10,4 +15,6 @@ public interface IPlayService { void onPublishHandlerForPlayBack(JSONObject resonse, String deviceId, String channelId, String uuid); void onPublishHandlerForPlay(JSONObject resonse, String deviceId, String channelId, String uuid); + + PlayResult play(String deviceId, String channelId, ZLMHttpHookSubscribe.Event event, SipSubscribe.Event errorEvent); } diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/service/impl/PlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/vmanager/service/impl/PlayServiceImpl.java index 70ca1f31..e09541ad 100644 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/service/impl/PlayServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/service/impl/PlayServiceImpl.java @@ -4,19 +4,29 @@ import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.conf.MediaServerConfig; +import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; +import com.genersoft.iot.vmp.gb28181.event.SipSubscribe; import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage; +import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander; +import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe; +import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorager; import com.genersoft.iot.vmp.vmanager.play.PlayController; +import com.genersoft.iot.vmp.vmanager.play.bean.PlayResult; import com.genersoft.iot.vmp.vmanager.service.IPlayService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.http.ResponseEntity; import org.springframework.stereotype.Service; +import org.springframework.web.context.request.async.DeferredResult; +import javax.sip.message.Response; import java.text.DecimalFormat; +import java.util.UUID; @Service public class PlayServiceImpl implements IPlayService { @@ -26,12 +36,77 @@ public class PlayServiceImpl implements IPlayService { @Autowired private IVideoManagerStorager storager; + @Autowired + private SIPCommander cmder; + @Autowired private IRedisCatchStorage redisCatchStorage; @Autowired private DeferredResultHolder resultHolder; + @Autowired + private ZLMRESTfulUtils zlmresTfulUtils; + + + @Override + public PlayResult play(String deviceId, String channelId, ZLMHttpHookSubscribe.Event hookEvent, SipSubscribe.Event errorEvent) { + PlayResult playResult = new PlayResult(); + Device device = storager.queryVideoDevice(deviceId); + StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(deviceId, channelId); + playResult.setDevice(device); + UUID uuid = UUID.randomUUID(); + playResult.setUuid(uuid.toString()); + DeferredResult> result = new DeferredResult>(); + playResult.setResult(result); + // 录像查询以channelId作为deviceId查询 + resultHolder.put(DeferredResultHolder.CALLBACK_CMD_PlAY + uuid, result); + + if (streamInfo == null) { + // 发送点播消息 + cmder.playStreamCmd(device, channelId, (JSONObject response) -> { + logger.info("收到订阅消息: " + response.toJSONString()); + onPublishHandlerForPlay(response, deviceId, channelId, uuid.toString()); + if (hookEvent != null) { + hookEvent.response(response); + } + }, event -> { + RequestMessage msg = new RequestMessage(); + msg.setId(DeferredResultHolder.CALLBACK_CMD_PlAY + uuid); + Response response = event.getResponse(); + msg.setData(String.format("点播失败, 错误码: %s, %s", response.getStatusCode(), response.getReasonPhrase())); + resultHolder.invokeResult(msg); + if (errorEvent != null) { + errorEvent.response(event); + } + }); + } else { + String streamId = streamInfo.getStreamId(); + JSONObject rtpInfo = zlmresTfulUtils.getRtpInfo(streamId); + if (rtpInfo.getBoolean("exist")) { + RequestMessage msg = new RequestMessage(); + msg.setId(DeferredResultHolder.CALLBACK_CMD_PlAY + uuid); + msg.setData(JSON.toJSONString(streamInfo)); + resultHolder.invokeResult(msg); + } else { + redisCatchStorage.stopPlay(streamInfo); + storager.stopPlay(streamInfo.getDeviceID(), streamInfo.getChannelId()); + cmder.playStreamCmd(device, channelId, (JSONObject response) -> { + logger.info("收到订阅消息: " + response.toJSONString()); + onPublishHandlerForPlay(response, deviceId, channelId, uuid.toString()); + }, event -> { + RequestMessage msg = new RequestMessage(); + msg.setId(DeferredResultHolder.CALLBACK_CMD_PlAY + uuid); + Response response = event.getResponse(); + msg.setData(String.format("点播失败, 错误码: %s, %s", response.getStatusCode(), response.getReasonPhrase())); + resultHolder.invokeResult(msg); + }); + } + } + + return playResult; + } + @Override public void onPublishHandlerForPlay(JSONObject resonse, String deviceId, String channelId, String uuid) { RequestMessage msg = new RequestMessage();