diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommander.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommander.java index 9f413779..6238b70c 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommander.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommander.java @@ -1,5 +1,6 @@ package com.genersoft.iot.vmp.gb28181.transmit.cmd; +import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.event.SipSubscribe; import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe; @@ -121,6 +122,26 @@ public interface ISIPCommander { void streamByeCmd(String deviceId, String channelId, SipSubscribe.Event okEvent); void streamByeCmd(String deviceId, String channelId); + /** + * 回放暂停 + */ + void playPauseCmd(Device device, StreamInfo streamInfo); + + /** + * 回放恢复 + */ + void playResumeCmd(Device device, StreamInfo streamInfo); + + /** + * 回放拖动播放 + */ + void playSeekCmd(Device device, StreamInfo streamInfo, long seekTime); + + /** + * 回放倍速播放 + */ + void playSpeedCmd(Device device, StreamInfo streamInfo, String speed); + /** * 语音广播 * diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderProvider.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderProvider.java index bb62902a..98ea7c9c 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderProvider.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderProvider.java @@ -3,6 +3,7 @@ package com.genersoft.iot.vmp.gb28181.transmit.cmd; import java.text.ParseException; import java.util.ArrayList; +import javax.sip.Dialog; import javax.sip.InvalidArgumentException; import javax.sip.PeerUnavailableException; import javax.sip.SipFactory; @@ -11,6 +12,9 @@ import javax.sip.address.SipURI; import javax.sip.header.*; import javax.sip.message.Request; +import com.genersoft.iot.vmp.common.StreamInfo; +import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager; +import com.genersoft.iot.vmp.vmanager.gb28181.session.InfoCseqCache; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @@ -30,6 +34,9 @@ public class SIPRequestHeaderProvider { @Autowired private SipFactory sipFactory; + + @Autowired + private VideoStreamSessionManager streamSession; public Request createMessageRequest(Device device, String content, String viaTag, String fromTag, String toTag, CallIdHeader callIdHeader) throws ParseException, InvalidArgumentException, PeerUnavailableException { Request request = null; @@ -210,4 +217,50 @@ public class SIPRequestHeaderProvider { request.setContent(content, contentTypeHeader); return request; } + + public Request createInfoRequest(Device device, StreamInfo streamInfo, String content) + throws PeerUnavailableException, ParseException, InvalidArgumentException { + Request request = null; + Dialog dialog = streamSession.getDialog(streamInfo.getDeviceID(), streamInfo.getChannelId()); + + SipURI requestLine = sipFactory.createAddressFactory().createSipURI(device.getDeviceId(), + device.getHostAddress()); + // via + ArrayList viaHeaders = new ArrayList(); + ViaHeader viaHeader = sipFactory.createHeaderFactory().createViaHeader(device.getIp(), device.getPort(), + device.getTransport(), null); + viaHeader.setRPort(); + viaHeaders.add(viaHeader); + // from + SipURI fromSipURI = sipFactory.createAddressFactory().createSipURI(sipConfig.getId(), + sipConfig.getDomain()); + Address fromAddress = sipFactory.createAddressFactory().createAddress(fromSipURI); + FromHeader fromHeader = sipFactory.createHeaderFactory().createFromHeader(fromAddress, dialog.getLocalTag()); + // to + SipURI toSipURI = sipFactory.createAddressFactory().createSipURI(streamInfo.getChannelId(), + sipConfig.getDomain()); + Address toAddress = sipFactory.createAddressFactory().createAddress(toSipURI); + ToHeader toHeader = sipFactory.createHeaderFactory().createToHeader(toAddress, dialog.getRemoteTag()); + + // callid + CallIdHeader callIdHeader = dialog.getCallId(); + + // Forwards + MaxForwardsHeader maxForwards = sipFactory.createHeaderFactory().createMaxForwardsHeader(70); + + // ceq + CSeqHeader cSeqHeader = sipFactory.createHeaderFactory() + .createCSeqHeader(InfoCseqCache.CSEQCACHE.get(streamInfo.getStreamId()), Request.INFO); + + request = sipFactory.createMessageFactory().createRequest(requestLine, Request.INFO, callIdHeader, cSeqHeader, + fromHeader, toHeader, viaHeaders, maxForwards); + Address concatAddress = sipFactory.createAddressFactory().createAddress(sipFactory.createAddressFactory() + .createSipURI(sipConfig.getId(), sipConfig.getIp() + ":" + sipConfig.getPort())); + request.addHeader(sipFactory.createHeaderFactory().createContactHeader(concatAddress)); + + ContentTypeHeader contentTypeHeader = sipFactory.createHeaderFactory().createContentTypeHeader("Application", + "MANSRTSP"); + request.setContent(content, contentTypeHeader); + return request; + } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java index 61647aa3..d90705c3 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java @@ -1,6 +1,7 @@ package com.genersoft.iot.vmp.gb28181.transmit.cmd.impl; import com.alibaba.fastjson.JSONObject; +import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.conf.SipConfig; import com.genersoft.iot.vmp.conf.UserSetup; import com.genersoft.iot.vmp.gb28181.bean.Device; @@ -17,6 +18,7 @@ import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.service.bean.SSRCInfo; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorager; +import com.genersoft.iot.vmp.vmanager.gb28181.session.InfoCseqCache; import gov.nist.javax.sip.SipProviderImpl; import gov.nist.javax.sip.SipStackImpl; import gov.nist.javax.sip.message.SIPRequest; @@ -1543,4 +1545,110 @@ public class SIPCommander implements ISIPCommander { clientTransaction.sendRequest(); return clientTransaction; } + + /** + * 回放暂停 + */ + @Override + public void playPauseCmd(Device device, StreamInfo streamInfo) { + try { + + StringBuffer content = new StringBuffer(200); + content.append("PAUSE RTSP/1.0\r\n"); + content.append("CSeq: " + InfoCseqCache.CSEQCACHE.get(streamInfo.getStreamId()) + "\r\n"); + content.append("PauseTime: now\r\n"); + Request request = headerProvider.createInfoRequest(device, streamInfo, content.toString()); + logger.info(request.toString()); + ClientTransaction clientTransaction = null; + if ("TCP".equals(device.getTransport())) { + clientTransaction = tcpSipProvider.getNewClientTransaction(request); + } else if ("UDP".equals(device.getTransport())) { + clientTransaction = udpSipProvider.getNewClientTransaction(request); + } + if (clientTransaction != null) { + clientTransaction.sendRequest(); + } + + } catch (SipException | ParseException | InvalidArgumentException e) { + e.printStackTrace(); + } + } + + /** + * 回放恢复 + */ + @Override + public void playResumeCmd(Device device, StreamInfo streamInfo) { + try { + StringBuffer content = new StringBuffer(200); + content.append("PLAY RTSP/1.0\r\n"); + content.append("CSeq: " + InfoCseqCache.CSEQCACHE.get(streamInfo.getStreamId()) + "\r\n"); + content.append("Range: npt=now-\r\n"); + Request request = headerProvider.createInfoRequest(device, streamInfo, content.toString()); + logger.info(request.toString()); + ClientTransaction clientTransaction = null; + if ("TCP".equals(device.getTransport())) { + clientTransaction = tcpSipProvider.getNewClientTransaction(request); + } else if ("UDP".equals(device.getTransport())) { + clientTransaction = udpSipProvider.getNewClientTransaction(request); + } + + clientTransaction.sendRequest(); + + } catch (SipException | ParseException | InvalidArgumentException e) { + e.printStackTrace(); + } + } + + /** + * 回放拖动播放 + */ + @Override + public void playSeekCmd(Device device, StreamInfo streamInfo, long seekTime) { + try { + StringBuffer content = new StringBuffer(200); + content.append("PLAY RTSP/1.0\r\n"); + content.append("CSeq: " + InfoCseqCache.CSEQCACHE.get(streamInfo.getStreamId()) + "\r\n"); + content.append("Range: npt=" + seekTime + "-\r\n"); + Request request = headerProvider.createInfoRequest(device, streamInfo, content.toString()); + logger.info(request.toString()); + ClientTransaction clientTransaction = null; + if ("TCP".equals(device.getTransport())) { + clientTransaction = tcpSipProvider.getNewClientTransaction(request); + } else if ("UDP".equals(device.getTransport())) { + clientTransaction = udpSipProvider.getNewClientTransaction(request); + } + + clientTransaction.sendRequest(); + + } catch (SipException | ParseException | InvalidArgumentException e) { + e.printStackTrace(); + } + } + + /** + * 回放倍速播放 + */ + @Override + public void playSpeedCmd(Device device, StreamInfo streamInfo, String speed) { + try { + StringBuffer content = new StringBuffer(200); + content.append("PLAY RTSP/1.0\r\n"); + content.append("CSeq: " + InfoCseqCache.CSEQCACHE.get(streamInfo.getStreamId()) + "\r\n"); + content.append("Scale: " + speed + ".000000\r\n"); + Request request = headerProvider.createInfoRequest(device, streamInfo, content.toString()); + logger.info(request.toString()); + ClientTransaction clientTransaction = null; + if ("TCP".equals(device.getTransport())) { + clientTransaction = tcpSipProvider.getNewClientTransaction(request); + } else if ("UDP".equals(device.getTransport())) { + clientTransaction = udpSipProvider.getNewClientTransaction(request); + } + + clientTransaction.sendRequest(); + + } catch (SipException | ParseException | InvalidArgumentException e) { + e.printStackTrace(); + } + } } diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/playback/PlaybackController.java b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/playback/PlaybackController.java index 98df8ddf..90ecfd42 100644 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/playback/PlaybackController.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/playback/PlaybackController.java @@ -9,6 +9,7 @@ import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.service.bean.SSRCInfo; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.service.IPlayService; +import com.genersoft.iot.vmp.vmanager.gb28181.session.InfoCseqCache; import io.swagger.annotations.Api; import io.swagger.annotations.ApiImplicitParam; import io.swagger.annotations.ApiImplicitParams; @@ -152,4 +153,98 @@ public class PlaybackController { return new ResponseEntity(HttpStatus.INTERNAL_SERVER_ERROR); } } + + @ApiOperation("回放暂停") + @ApiImplicitParams({ + @ApiImplicitParam(name = "streamId", value = "回放流ID", dataTypeClass = String.class), + }) + @GetMapping("/pause/{streamId}") + public ResponseEntity playPause(@PathVariable String streamId) { + logger.info("playPause: "+streamId); + JSONObject json = new JSONObject(); + StreamInfo streamInfo = redisCatchStorage.queryPlaybackByStreamId(streamId); + if (null == streamInfo) { + json.put("msg", "streamId不存在"); + logger.warn("streamId不存在!"); + return new ResponseEntity(json.toString(), HttpStatus.BAD_REQUEST); + } + setCseq(streamId); + Device device = storager.queryVideoDevice(streamInfo.getDeviceID()); + cmder.playPauseCmd(device, streamInfo); + json.put("msg", "ok"); + return new ResponseEntity(json.toString(), HttpStatus.OK); + } + + @ApiOperation("回放恢复") + @ApiImplicitParams({ + @ApiImplicitParam(name = "streamId", value = "回放流ID", dataTypeClass = String.class), + }) + @GetMapping("/resume/{streamId}") + public ResponseEntity playResume(@PathVariable String streamId) { + logger.info("playResume: "+streamId); + JSONObject json = new JSONObject(); + StreamInfo streamInfo = redisCatchStorage.queryPlaybackByStreamId(streamId); + if (null == streamInfo) { + json.put("msg", "streamId不存在"); + logger.warn("streamId不存在!"); + return new ResponseEntity(json.toString(), HttpStatus.BAD_REQUEST); + } + setCseq(streamId); + Device device = storager.queryVideoDevice(streamInfo.getDeviceID()); + cmder.playResumeCmd(device, streamInfo); + json.put("msg", "ok"); + return new ResponseEntity(json.toString(), HttpStatus.OK); + } + + @ApiOperation("回放拖动播放") + @ApiImplicitParams({ + @ApiImplicitParam(name = "streamId", value = "回放流ID", dataTypeClass = String.class), + @ApiImplicitParam(name = "seekTime", value = "拖动偏移量,单位s", dataTypeClass = Long.class), + }) + @GetMapping("/seek/{streamId}/{seekTime}") + public ResponseEntity playSeek(@PathVariable String streamId, @PathVariable long seekTime) { + logger.info("playSeek: "+streamId+", "+seekTime); + JSONObject json = new JSONObject(); + StreamInfo streamInfo = redisCatchStorage.queryPlaybackByStreamId(streamId); + if (null == streamInfo) { + json.put("msg", "streamId不存在"); + logger.warn("streamId不存在!"); + return new ResponseEntity(json.toString(), HttpStatus.BAD_REQUEST); + } + setCseq(streamId); + Device device = storager.queryVideoDevice(streamInfo.getDeviceID()); + cmder.playSeekCmd(device, streamInfo, seekTime); + json.put("msg", "ok"); + return new ResponseEntity(json.toString(), HttpStatus.OK); + } + + @ApiOperation("回放倍速播放") + @ApiImplicitParams({ + @ApiImplicitParam(name = "streamId", value = "回放流ID", dataTypeClass = String.class), + @ApiImplicitParam(name = "speed", value = "倍速 1、2、4", dataTypeClass = String.class), + }) + @GetMapping("/speed/{streamId}/{speed}") + public ResponseEntity playSpeed(@PathVariable String streamId, @PathVariable String speed) { + logger.info("playSpeed: "+streamId+", "+speed); + JSONObject json = new JSONObject(); + StreamInfo streamInfo = redisCatchStorage.queryPlaybackByStreamId(streamId); + if (null == streamInfo) { + json.put("msg", "streamId不存在"); + logger.warn("streamId不存在!"); + return new ResponseEntity(json.toString(), HttpStatus.BAD_REQUEST); + } + setCseq(streamId); + Device device = storager.queryVideoDevice(streamInfo.getDeviceID()); + cmder.playSpeedCmd(device, streamInfo, speed); + json.put("msg", "ok"); + return new ResponseEntity(json.toString(), HttpStatus.OK); + } + + public void setCseq(String streamId) { + if (InfoCseqCache.CSEQCACHE.containsKey(streamId)) { + InfoCseqCache.CSEQCACHE.put(streamId, InfoCseqCache.CSEQCACHE.get(streamId) + 1); + } else { + InfoCseqCache.CSEQCACHE.put(streamId, 2L); + } + } } diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/session/InfoCseqCache.java b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/session/InfoCseqCache.java new file mode 100644 index 00000000..051f9817 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/session/InfoCseqCache.java @@ -0,0 +1,14 @@ +package com.genersoft.iot.vmp.vmanager.gb28181.session; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * @ClassName: InfoCseqCache + * @Description: INFO类型的Sip中cseq的缓存 + */ +public class InfoCseqCache { + + public static Map CSEQCACHE = new ConcurrentHashMap<>(); + +} \ No newline at end of file