Browse Source

fix #61 1、支持配置接入多台ZLM。2、播流会话数据存储到redis,防止wvp宕机后会话数据丢失。3、master合并到2.0分支。4、移除lombok。5、代码格式化。

pull/76/head
wangshaopeng@sunnybs.com 4 years ago
parent
commit
1fe9569af2
  1. 1
      .gitignore
  2. 5
      pom.xml
  3. 5
      src/main/java/com/genersoft/iot/vmp/VManageBootstrap.java
  4. 133
      src/main/java/com/genersoft/iot/vmp/common/StreamInfo.java
  5. 97
      src/main/java/com/genersoft/iot/vmp/conf/MediaConfig.java
  6. 535
      src/main/java/com/genersoft/iot/vmp/conf/MediaServerConfig.java
  7. 16
      src/main/java/com/genersoft/iot/vmp/conf/SsrcConfig.java
  8. 14
      src/main/java/com/genersoft/iot/vmp/gb28181/session/VideoStreamSessionManager.java
  9. 50
      src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPProcessorFactory.java
  10. 6
      src/main/java/com/genersoft/iot/vmp/gb28181/transmit/callback/DeferredResultHolder.java
  11. 11
      src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommander.java
  12. 26
      src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderProvider.java
  13. 22
      src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java
  14. 11
      src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/AckRequestProcessor.java
  15. 9
      src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/ByeRequestProcessor.java
  16. 41
      src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/InviteRequestProcessor.java
  17. 43
      src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/MessageRequestProcessor.java
  18. 12
      src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
  19. 9
      src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaListManager.java
  20. 17
      src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java
  21. 27
      src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java
  22. 17
      src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRunner.java
  23. 9
      src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java
  24. 59
      src/main/java/com/genersoft/iot/vmp/storager/IVideoManagerStorager.java
  25. 33
      src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java
  26. 33
      src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStoragerImpl.java
  27. 20
      src/main/java/com/genersoft/iot/vmp/utils/redis/RedisUtil.java
  28. 76
      src/main/java/com/genersoft/iot/vmp/vmanager/play/PlayController.java
  29. 37
      src/main/java/com/genersoft/iot/vmp/vmanager/play/bean/PlayResult.java
  30. 7
      src/main/java/com/genersoft/iot/vmp/vmanager/service/IPlayService.java
  31. 108
      src/main/java/com/genersoft/iot/vmp/vmanager/service/impl/PlayServiceImpl.java
  32. 6
      src/main/resources/application-dev.yml
  33. 2
      src/main/resources/application.yml

1
.gitignore

@ -26,5 +26,6 @@ hs_err_pid*
/.idea/*
/target/*
/.idea/
/logs/
/target/

5
pom.xml

@ -175,11 +175,6 @@
<artifactId>guava</artifactId>
<version>${guava-version}</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>${lombok-version}</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>

5
src/main/java/com/genersoft/iot/vmp/VManageBootstrap.java

@ -5,17 +5,18 @@ import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
@SpringBootApplication
public class VManageBootstrap extends LogManager {
public class VManageBootstrap {
private static String[] args;
private static ConfigurableApplicationContext context;
public static void main(String[] args) {
VManageBootstrap.args = args;
VManageBootstrap.context = SpringApplication.run(VManageBootstrap.class, args);
}
// 项目重启
public static void restart() {
context.close();
VManageBootstrap.context = SpringApplication.run(VManageBootstrap.class, args);
}
}

133
src/main/java/com/genersoft/iot/vmp/common/StreamInfo.java

@ -1,9 +1,10 @@
package com.genersoft.iot.vmp.common;
import com.alibaba.fastjson.JSONArray;
import lombok.Data;
@Data
/**
* @author skywsp
*/
public class StreamInfo {
/**
@ -25,4 +26,132 @@ public class StreamInfo {
private String rtmp;
private String rtsp;
private JSONArray tracks;
public String getMediaServerIp() {
return mediaServerIp;
}
public void setMediaServerIp(String mediaServerIp) {
this.mediaServerIp = mediaServerIp;
}
public String getSsrc() {
return ssrc;
}
public void setSsrc(String ssrc) {
this.ssrc = ssrc;
}
public String getDeviceID() {
return deviceID;
}
public void setDeviceID(String deviceID) {
this.deviceID = deviceID;
}
public String getChannelId() {
return channelId;
}
public void setChannelId(String channelId) {
this.channelId = channelId;
}
public String getFlv() {
return flv;
}
public void setFlv(String flv) {
this.flv = flv;
}
public String getWs_flv() {
return ws_flv;
}
public void setWs_flv(String ws_flv) {
this.ws_flv = ws_flv;
}
public String getRtmp() {
return rtmp;
}
public void setRtmp(String rtmp) {
this.rtmp = rtmp;
}
public String getHls() {
return hls;
}
public void setHls(String hls) {
this.hls = hls;
}
public String getRtsp() {
return rtsp;
}
public void setRtsp(String rtsp) {
this.rtsp = rtsp;
}
public JSONArray getTracks() {
return tracks;
}
public void setTracks(JSONArray tracks) {
this.tracks = tracks;
}
public String getFmp4() {
return fmp4;
}
public void setFmp4(String fmp4) {
this.fmp4 = fmp4;
}
public String getWs_fmp4() {
return ws_fmp4;
}
public void setWs_fmp4(String ws_fmp4) {
this.ws_fmp4 = ws_fmp4;
}
public String getWs_hls() {
return ws_hls;
}
public void setWs_hls(String ws_hls) {
this.ws_hls = ws_hls;
}
public String getTs() {
return ts;
}
public void setTs(String ts) {
this.ts = ts;
}
public String getWs_ts() {
return ws_ts;
}
public void setWs_ts(String ws_ts) {
this.ws_ts = ws_ts;
}
public String getStreamId() {
return streamId;
}
public void setStreamId(String streamId) {
this.streamId = streamId;
}
}

97
src/main/java/com/genersoft/iot/vmp/conf/MediaConfig.java

@ -1,6 +1,5 @@
package com.genersoft.iot.vmp.conf;
import lombok.Data;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
@ -11,7 +10,6 @@ import java.util.HashMap;
* 对配置文件进行校验
*/
@Configuration("mediaConfig")
@Data
public class MediaConfig {
@Value("${media.ip}")
private String mediaIp;
@ -50,4 +48,99 @@ public class MediaConfig {
*/
private HashMap<String, SsrcConfig> mediaServerSsrcMap;
public String getMediaIp() {
return mediaIp;
}
public void setMediaIp(String mediaIp) {
this.mediaIp = mediaIp;
}
public String[] getMediaIpArr() {
return mediaIpArr;
}
public void setMediaIpArr(String[] mediaIpArr) {
this.mediaIpArr = mediaIpArr;
}
public String getMediaHookIp() {
return mediaHookIp;
}
public void setMediaHookIp(String mediaHookIp) {
this.mediaHookIp = mediaHookIp;
}
public Integer getMediaPort() {
return mediaPort;
}
public void setMediaPort(Integer mediaPort) {
this.mediaPort = mediaPort;
}
public Boolean getAutoConfig() {
return autoConfig;
}
public void setAutoConfig(Boolean autoConfig) {
this.autoConfig = autoConfig;
}
public String getMediaSecret() {
return mediaSecret;
}
public void setMediaSecret(String mediaSecret) {
this.mediaSecret = mediaSecret;
}
public String getStreamNoneReaderDelayMS() {
return streamNoneReaderDelayMS;
}
public void setStreamNoneReaderDelayMS(String streamNoneReaderDelayMS) {
this.streamNoneReaderDelayMS = streamNoneReaderDelayMS;
}
public Boolean getAutoApplyPlay() {
return autoApplyPlay;
}
public void setAutoApplyPlay(Boolean autoApplyPlay) {
this.autoApplyPlay = autoApplyPlay;
}
public Boolean getSeniorSdp() {
return seniorSdp;
}
public void setSeniorSdp(Boolean seniorSdp) {
this.seniorSdp = seniorSdp;
}
public Boolean getRtpEnable() {
return rtpEnable;
}
public void setRtpEnable(Boolean rtpEnable) {
this.rtpEnable = rtpEnable;
}
public String getUdpPortRange() {
return udpPortRange;
}
public void setUdpPortRange(String udpPortRange) {
this.udpPortRange = udpPortRange;
}
public HashMap<String, SsrcConfig> getMediaServerSsrcMap() {
return mediaServerSsrcMap;
}
public void setMediaServerSsrcMap(HashMap<String, SsrcConfig> mediaServerSsrcMap) {
this.mediaServerSsrcMap = mediaServerSsrcMap;
}
}

535
src/main/java/com/genersoft/iot/vmp/conf/MediaServerConfig.java

@ -1,9 +1,7 @@
package com.genersoft.iot.vmp.conf;
import com.alibaba.fastjson.annotation.JSONField;
import lombok.Data;
@Data
public class MediaServerConfig {
@JSONField(name = "api.apiDebug")
@ -33,6 +31,10 @@ public class MediaServerConfig {
@JSONField(name = "general.streamNoneReaderDelayMS")
private String generalStreamNoneReaderDelayMS;
private String localIP;
private String wanIp;
@JSONField(name = "hls.fileBufSize")
private String hlsFileBufSize;
@ -197,4 +199,533 @@ public class MediaServerConfig {
@JSONField(name = "shell.shell")
private String shellPhell;
public String getApiDebug() {
return apiDebug;
}
public void setApiDebug(String apiDebug) {
this.apiDebug = apiDebug;
}
public String getApiSecret() {
return apiSecret;
}
public void setApiSecret(String apiSecret) {
this.apiSecret = apiSecret;
}
public String getFfmpegBin() {
return ffmpegBin;
}
public void setFfmpegBin(String ffmpegBin) {
this.ffmpegBin = ffmpegBin;
}
public String getFfmpegCmd() {
return ffmpegCmd;
}
public void setFfmpegCmd(String ffmpegCmd) {
this.ffmpegCmd = ffmpegCmd;
}
public String getFfmpegLog() {
return ffmpegLog;
}
public void setFfmpegLog(String ffmpegLog) {
this.ffmpegLog = ffmpegLog;
}
public String getGeneralEnableVhost() {
return generalEnableVhost;
}
public void setGeneralEnableVhost(String generalEnableVhost) {
this.generalEnableVhost = generalEnableVhost;
}
public String getGeneralFlowThreshold() {
return generalFlowThreshold;
}
public void setGeneralFlowThreshold(String generalFlowThreshold) {
this.generalFlowThreshold = generalFlowThreshold;
}
public String getGeneralMaxStreamWaitMS() {
return generalMaxStreamWaitMS;
}
public void setGeneralMaxStreamWaitMS(String generalMaxStreamWaitMS) {
this.generalMaxStreamWaitMS = generalMaxStreamWaitMS;
}
public String getGeneralStreamNoneReaderDelayMS() {
return generalStreamNoneReaderDelayMS;
}
public void setGeneralStreamNoneReaderDelayMS(String generalStreamNoneReaderDelayMS) {
this.generalStreamNoneReaderDelayMS = generalStreamNoneReaderDelayMS;
}
public String getLocalIP() {
return localIP;
}
public void setLocalIP(String localIP) {
this.localIP = localIP;
}
public String getHlsFileBufSize() {
return hlsFileBufSize;
}
public void setHlsFileBufSize(String hlsFileBufSize) {
this.hlsFileBufSize = hlsFileBufSize;
}
public String getHlsFilePath() {
return hlsFilePath;
}
public void setHlsFilePath(String hlsFilePath) {
this.hlsFilePath = hlsFilePath;
}
public String getHlsSegDur() {
return hlsSegDur;
}
public void setHlsSegDur(String hlsSegDur) {
this.hlsSegDur = hlsSegDur;
}
public String getHlsSegNum() {
return hlsSegNum;
}
public void setHlsSegNum(String hlsSegNum) {
this.hlsSegNum = hlsSegNum;
}
public String getHookAccessFileExceptHLS() {
return hookAccessFileExceptHLS;
}
public void setHookAccessFileExceptHLS(String hookAccessFileExceptHLS) {
this.hookAccessFileExceptHLS = hookAccessFileExceptHLS;
}
public String getHookAdminParams() {
return hookAdminParams;
}
public void setHookAdminParams(String hookAdminParams) {
this.hookAdminParams = hookAdminParams;
}
public String getHookEnable() {
return hookEnable;
}
public void setHookEnable(String hookEnable) {
this.hookEnable = hookEnable;
}
public String getHookOnFlowReport() {
return hookOnFlowReport;
}
public void setHookOnFlowReport(String hookOnFlowReport) {
this.hookOnFlowReport = hookOnFlowReport;
}
public String getHookOnHttpAccess() {
return hookOnHttpAccess;
}
public void setHookOnHttpAccess(String hookOnHttpAccess) {
this.hookOnHttpAccess = hookOnHttpAccess;
}
public String getHookOnPlay() {
return hookOnPlay;
}
public void setHookOnPlay(String hookOnPlay) {
this.hookOnPlay = hookOnPlay;
}
public String getHookOnPublish() {
return hookOnPublish;
}
public void setHookOnPublish(String hookOnPublish) {
this.hookOnPublish = hookOnPublish;
}
public String getHookOnRecordMp4() {
return hookOnRecordMp4;
}
public void setHookOnRecordMp4(String hookOnRecordMp4) {
this.hookOnRecordMp4 = hookOnRecordMp4;
}
public String getHookOnRtspAuth() {
return hookOnRtspAuth;
}
public void setHookOnRtspAuth(String hookOnRtspAuth) {
this.hookOnRtspAuth = hookOnRtspAuth;
}
public String getHookOnRtspRealm() {
return hookOnRtspRealm;
}
public void setHookOnRtspRealm(String hookOnRtspRealm) {
this.hookOnRtspRealm = hookOnRtspRealm;
}
public String getHookOnShellLogin() {
return hookOnShellLogin;
}
public void setHookOnShellLogin(String hookOnShellLogin) {
this.hookOnShellLogin = hookOnShellLogin;
}
public String getHookOnStreamChanged() {
return hookOnStreamChanged;
}
public void setHookOnStreamChanged(String hookOnStreamChanged) {
this.hookOnStreamChanged = hookOnStreamChanged;
}
public String getHookOnStreamNoneReader() {
return hookOnStreamNoneReader;
}
public void setHookOnStreamNoneReader(String hookOnStreamNoneReader) {
this.hookOnStreamNoneReader = hookOnStreamNoneReader;
}
public String getHookOnStreamNotFound() {
return hookOnStreamNotFound;
}
public void setHookOnStreamNotFound(String hookOnStreamNotFound) {
this.hookOnStreamNotFound = hookOnStreamNotFound;
}
public String getHookTimeoutSec() {
return hookTimeoutSec;
}
public void setHookTimeoutSec(String hookTimeoutSec) {
this.hookTimeoutSec = hookTimeoutSec;
}
public String getHttpCharSet() {
return httpCharSet;
}
public void setHttpCharSet(String httpCharSet) {
this.httpCharSet = httpCharSet;
}
public String getHttpKeepAliveSecond() {
return httpKeepAliveSecond;
}
public void setHttpKeepAliveSecond(String httpKeepAliveSecond) {
this.httpKeepAliveSecond = httpKeepAliveSecond;
}
public String getHttpMaxReqCount() {
return httpMaxReqCount;
}
public void setHttpMaxReqCount(String httpMaxReqCount) {
this.httpMaxReqCount = httpMaxReqCount;
}
public String getHttpMaxReqSize() {
return httpMaxReqSize;
}
public void setHttpMaxReqSize(String httpMaxReqSize) {
this.httpMaxReqSize = httpMaxReqSize;
}
public String getHttpNotFound() {
return httpNotFound;
}
public void setHttpNotFound(String httpNotFound) {
this.httpNotFound = httpNotFound;
}
public String getHttpPort() {
return httpPort;
}
public void setHttpPort(String httpPort) {
this.httpPort = httpPort;
}
public String getHttpRootPath() {
return httpRootPath;
}
public void setHttpRootPath(String httpRootPath) {
this.httpRootPath = httpRootPath;
}
public String getHttpSendBufSize() {
return httpSendBufSize;
}
public void setHttpSendBufSize(String httpSendBufSize) {
this.httpSendBufSize = httpSendBufSize;
}
public String getHttpSSLport() {
return httpSSLport;
}
public void setHttpSSLport(String httpSSLport) {
this.httpSSLport = httpSSLport;
}
public String getMulticastAddrMax() {
return multicastAddrMax;
}
public void setMulticastAddrMax(String multicastAddrMax) {
this.multicastAddrMax = multicastAddrMax;
}
public String getMulticastAddrMin() {
return multicastAddrMin;
}
public void setMulticastAddrMin(String multicastAddrMin) {
this.multicastAddrMin = multicastAddrMin;
}
public String getMulticastUdpTTL() {
return multicastUdpTTL;
}
public void setMulticastUdpTTL(String multicastUdpTTL) {
this.multicastUdpTTL = multicastUdpTTL;
}
public String getRecordAppName() {
return recordAppName;
}
public void setRecordAppName(String recordAppName) {
this.recordAppName = recordAppName;
}
public String getRecordFilePath() {
return recordFilePath;
}
public void setRecordFilePath(String recordFilePath) {
this.recordFilePath = recordFilePath;
}
public String getRecordFileSecond() {
return recordFileSecond;
}
public void setRecordFileSecond(String recordFileSecond) {
this.recordFileSecond = recordFileSecond;
}
public String getRecordFileSampleMS() {
return recordFileSampleMS;
}
public void setRecordFileSampleMS(String recordFileSampleMS) {
this.recordFileSampleMS = recordFileSampleMS;
}
public String getRtmpHandshakeSecond() {
return rtmpHandshakeSecond;
}
public void setRtmpHandshakeSecond(String rtmpHandshakeSecond) {
this.rtmpHandshakeSecond = rtmpHandshakeSecond;
}
public String getRtmpKeepAliveSecond() {
return rtmpKeepAliveSecond;
}
public void setRtmpKeepAliveSecond(String rtmpKeepAliveSecond) {
this.rtmpKeepAliveSecond = rtmpKeepAliveSecond;
}
public String getRtmpModifyStamp() {
return rtmpModifyStamp;
}
public void setRtmpModifyStamp(String rtmpModifyStamp) {
this.rtmpModifyStamp = rtmpModifyStamp;
}
public String getRtmpPort() {
return rtmpPort;
}
public void setRtmpPort(String rtmpPort) {
this.rtmpPort = rtmpPort;
}
public String getRtpAudioMtuSize() {
return rtpAudioMtuSize;
}
public void setRtpAudioMtuSize(String rtpAudioMtuSize) {
this.rtpAudioMtuSize = rtpAudioMtuSize;
}
public String getRtpClearCount() {
return rtpClearCount;
}
public void setRtpClearCount(String rtpClearCount) {
this.rtpClearCount = rtpClearCount;
}
public String getRtpCycleMS() {
return rtpCycleMS;
}
public void setRtpCycleMS(String rtpCycleMS) {
this.rtpCycleMS = rtpCycleMS;
}
public String getRtpMaxRtpCount() {
return rtpMaxRtpCount;
}
public void setRtpMaxRtpCount(String rtpMaxRtpCount) {
this.rtpMaxRtpCount = rtpMaxRtpCount;
}
public String getRtpVideoMtuSize() {
return rtpVideoMtuSize;
}
public void setRtpVideoMtuSize(String rtpVideoMtuSize) {
this.rtpVideoMtuSize = rtpVideoMtuSize;
}
public String getRtpProxyCheckSource() {
return rtpProxyCheckSource;
}
public void setRtpProxyCheckSource(String rtpProxyCheckSource) {
this.rtpProxyCheckSource = rtpProxyCheckSource;
}
public String getRtpProxyDumpDir() {
return rtpProxyDumpDir;
}
public void setRtpProxyDumpDir(String rtpProxyDumpDir) {
this.rtpProxyDumpDir = rtpProxyDumpDir;
}
public String getRtpProxyPort() {
return rtpProxyPort;
}
public void setRtpProxyPort(String rtpProxyPort) {
this.rtpProxyPort = rtpProxyPort;
}
public String getRtpProxyTimeoutSec() {
return rtpProxyTimeoutSec;
}
public void setRtpProxyTimeoutSec(String rtpProxyTimeoutSec) {
this.rtpProxyTimeoutSec = rtpProxyTimeoutSec;
}
public String getRtspAuthBasic() {
return rtspAuthBasic;
}
public void setRtspAuthBasic(String rtspAuthBasic) {
this.rtspAuthBasic = rtspAuthBasic;
}
public String getRtspHandshakeSecond() {
return rtspHandshakeSecond;
}
public void setRtspHandshakeSecond(String rtspHandshakeSecond) {
this.rtspHandshakeSecond = rtspHandshakeSecond;
}
public String getRtspKeepAliveSecond() {
return rtspKeepAliveSecond;
}
public void setRtspKeepAliveSecond(String rtspKeepAliveSecond) {
this.rtspKeepAliveSecond = rtspKeepAliveSecond;
}
public String getRtspPort() {
return rtspPort;
}
public void setRtspPort(String rtspPort) {
this.rtspPort = rtspPort;
}
public String getRtspSSlport() {
return rtspSSlport;
}
public void setRtspSSlport(String rtspSSlport) {
this.rtspSSlport = rtspSSlport;
}
public String getShellMaxReqSize() {
return shellMaxReqSize;
}
public void setShellMaxReqSize(String shellMaxReqSize) {
this.shellMaxReqSize = shellMaxReqSize;
}
public String getShellPhell() {
return shellPhell;
}
public void setShellPhell(String shellPhell) {
this.shellPhell = shellPhell;
}
public String getWanIp() {
return wanIp;
}
public void setWanIp(String wanIp) {
this.wanIp = wanIp;
}
}

16
src/main/java/com/genersoft/iot/vmp/conf/SsrcConfig.java

@ -1,7 +1,6 @@
package com.genersoft.iot.vmp.conf;
import com.genersoft.iot.vmp.utils.ConfigConst;
import lombok.Data;
import java.util.ArrayList;
import java.util.List;
@ -10,12 +9,11 @@ import java.util.Set;
/**
* 每一个zlm流媒体服务器都设置MAX_STRTEAM_COUNT个可用同步信源(SSRC)
*/
@Data
public class SsrcConfig {
/**
* zlm流媒体服务器IP
*/
String mediaServerIp;
private String mediaServerIp;
/**
* zlm流媒体服务器已用会话句柄
*/
@ -25,6 +23,18 @@ public class SsrcConfig {
*/
private List<String> notUsed;
public String getMediaServerIp() {
return mediaServerIp;
}
public List<String> getIsUsed() {
return isUsed;
}
public List<String> getNotUsed() {
return notUsed;
}
public void init(String mediaServerIp, Set<String> usedSet) {
this.mediaServerIp = mediaServerIp;
this.isUsed = new ArrayList<>();

14
src/main/java/com/genersoft/iot/vmp/gb28181/session/VideoStreamSessionManager.java

@ -9,8 +9,9 @@ import com.genersoft.iot.vmp.conf.SsrcConfig;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.utils.redis.JedisUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@ -27,7 +28,6 @@ import java.util.concurrent.ConcurrentHashMap;
* @author: swwheihei
* @date: 2020年5月13日 下午4:03:02
*/
@Slf4j
@Component
public class VideoStreamSessionManager {
/**
@ -37,6 +37,8 @@ public class VideoStreamSessionManager {
private ConcurrentHashMap<String, ClientTransaction> sessionMap = new ConcurrentHashMap<>();
private String ssrcPrefix;
private final Logger logger = LoggerFactory.getLogger(VideoStreamSessionManager.class);
@Autowired
private SipConfig sipConfig;
@Autowired
@ -190,7 +192,7 @@ public class VideoStreamSessionManager {
public StreamInfo getPlayStreamInfo(String channelId) {
if (StringUtils.isBlank(channelId)) {
log.error("getPlayStreamInfo channelId can not be null!!!");
logger.error("getPlayStreamInfo channelId can not be null!!!");
return null;
}
return redisCatchStorage.queryPlayByChannel(channelId);
@ -198,7 +200,7 @@ public class VideoStreamSessionManager {
public StreamInfo getPlayBackStreamInfo(String channelId) {
if (StringUtils.isBlank(channelId)) {
log.error("getPlayBackStreamInfo channelId can not be null!!!");
logger.error("getPlayBackStreamInfo channelId can not be null!!!");
return null;
}
return redisCatchStorage.queryPlaybackByChannel(channelId);
@ -206,7 +208,7 @@ public class VideoStreamSessionManager {
public StreamInfo getStreamInfo(String channelId, String streamId) {
if (StringUtils.isBlank(channelId) || StringUtils.isBlank(streamId)) {
log.error("getStreamInfo channelId and streamId can not be null!!!");
logger.error("getStreamInfo channelId and streamId can not be null!!!");
return null;
}
StreamInfo streamInfo = getStreamInfo(channelId, streamId, PlayTypeEnum.PLAY);
@ -218,7 +220,7 @@ public class VideoStreamSessionManager {
private StreamInfo getStreamInfo(String channelId, String streamId, PlayTypeEnum playType) {
if (StringUtils.isBlank(channelId) || StringUtils.isBlank(streamId)) {
log.error("getStreamInfo channelId and streamId can not be null!!!");
logger.error("getStreamInfo channelId and streamId can not be null!!!");
return null;
}
// TODO channelId

50
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPProcessorFactory.java

@ -1,47 +1,35 @@
package com.genersoft.iot.vmp.gb28181.transmit;
import javax.sip.RequestEvent;
import javax.sip.ResponseEvent;
import javax.sip.SipProvider;
import javax.sip.header.CSeqHeader;
import javax.sip.message.Request;
import javax.sip.message.Response;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform;
import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.gb28181.transmit.response.impl.*;
import com.genersoft.iot.vmp.vmanager.service.IPlayService;
// import org.slf4j.Logger;
// import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component;
import com.genersoft.iot.vmp.conf.SipConfig;
import com.genersoft.iot.vmp.gb28181.auth.RegisterLogicHandler;
import com.genersoft.iot.vmp.gb28181.event.DeviceOffLineDetector;
import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform;
import com.genersoft.iot.vmp.gb28181.transmit.request.ISIPRequestProcessor;
import com.genersoft.iot.vmp.gb28181.transmit.request.impl.AckRequestProcessor;
import com.genersoft.iot.vmp.gb28181.transmit.request.impl.ByeRequestProcessor;
import com.genersoft.iot.vmp.gb28181.transmit.request.impl.CancelRequestProcessor;
import com.genersoft.iot.vmp.gb28181.transmit.request.impl.InviteRequestProcessor;
import com.genersoft.iot.vmp.gb28181.transmit.request.impl.MessageRequestProcessor;
import com.genersoft.iot.vmp.gb28181.transmit.request.impl.NotifyRequestProcessor;
import com.genersoft.iot.vmp.gb28181.transmit.request.impl.OtherRequestProcessor;
import com.genersoft.iot.vmp.gb28181.transmit.request.impl.RegisterRequestProcessor;
import com.genersoft.iot.vmp.gb28181.transmit.request.impl.SubscribeRequestProcessor;
import com.genersoft.iot.vmp.gb28181.transmit.request.impl.*;
import com.genersoft.iot.vmp.gb28181.transmit.response.ISIPResponseProcessor;
import com.genersoft.iot.vmp.gb28181.transmit.response.impl.ByeResponseProcessor;
import com.genersoft.iot.vmp.gb28181.transmit.response.impl.CancelResponseProcessor;
import com.genersoft.iot.vmp.gb28181.transmit.response.impl.InviteResponseProcessor;
import com.genersoft.iot.vmp.gb28181.transmit.response.impl.OtherResponseProcessor;
import com.genersoft.iot.vmp.gb28181.transmit.response.impl.*;
import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorager;
import com.genersoft.iot.vmp.utils.SpringBeanFactory;
import com.genersoft.iot.vmp.utils.redis.RedisUtil;
import com.genersoft.iot.vmp.vmanager.service.IPlayService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component;
import javax.sip.RequestEvent;
import javax.sip.ResponseEvent;
import javax.sip.SipProvider;
import javax.sip.header.CSeqHeader;
import javax.sip.message.Request;
import javax.sip.message.Response;
// import org.slf4j.Logger;
// import org.slf4j.LoggerFactory;
/**
* @Description: SIP信令处理分配

6
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/callback/DeferredResultHolder.java

@ -1,13 +1,13 @@
package com.genersoft.iot.vmp.gb28181.transmit.callback;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Component;
import org.springframework.web.context.request.async.DeferredResult;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* @Description: 异步请求处理
* @author: swwheihei

11
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommander.java

@ -19,7 +19,6 @@ public interface ISIPCommander {
* @param channelId 预览通道
* @param leftRight 镜头左移右移 0:停止 1:左移 2:右移
* @param upDown 镜头上移下移 0:停止 1:上移 2:下移
* @param moveSpeed 镜头移动速度
*/
boolean ptzdirectCmd(Device device, String channelId, int leftRight, int upDown);
@ -49,7 +48,7 @@ public interface ISIPCommander {
* @param device 控制设备
* @param channelId 预览通道
* @param inOut 镜头放大缩小 0:停止 1:缩小 2:放大
* @param zoomSpeed 镜头缩放速度
* @param moveSpeed 镜头缩放速度
*/
boolean ptzZoomCmd(Device device, String channelId, int inOut, int moveSpeed);
@ -80,6 +79,7 @@ public interface ISIPCommander {
/**
* 前端控制指令用于转发上级指令
*
* @param device 控制设备
* @param channelId 预览通道
* @param cmdString 前端控制指令串
@ -126,6 +126,7 @@ public interface ISIPCommander {
* @param device 视频设备
*/
void audioBroadcastCmd(Device device, SipSubscribe.Event okEvent);
boolean audioBroadcastCmd(Device device);
/**
@ -148,7 +149,7 @@ public interface ISIPCommander {
* 报警布防/撤防命令
*
* @param device 视频设备
* @param setGuard true: SetGuard, false: ResetGuard
* @param guardCmdStr true: SetGuard, false: ResetGuard
*/
boolean guardCmd(Device device, String guardCmdStr, SipSubscribe.Event errorEvent);
@ -279,11 +280,12 @@ public interface ISIPCommander {
/**
* 订阅取消订阅报警信息
*
* @param device 视频设备
* @param expires 订阅过期时间0 = 取消订阅
* @param startPriority 报警起始级别可选
* @param endPriority 报警终止级别可选
* @param alarmMethods 报警方式条件可选
* @param alarmMethod 报警方式条件可选
* @param alarmType 报警类型
* @param startTime 报警发生起始时间可选
* @param endTime 报警发生终止时间可选
@ -294,6 +296,7 @@ public interface ISIPCommander {
/**
* 释放rtpserver
*
* @param device
* @param channelId
*/

26
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderProvider.java

@ -1,23 +1,19 @@
package com.genersoft.iot.vmp.gb28181.transmit.cmd;
import java.text.ParseException;
import java.util.ArrayList;
import com.genersoft.iot.vmp.conf.SipConfig;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.sip.InvalidArgumentException;
import javax.sip.PeerUnavailableException;
import javax.sip.SipFactory;
// import javax.sip.SipProvider;
import javax.sip.address.Address;
import javax.sip.address.SipURI;
import javax.sip.header.*;
import javax.sip.message.Request;
import org.springframework.beans.factory.annotation.Autowired;
// import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;
import com.genersoft.iot.vmp.conf.SipConfig;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import java.text.ParseException;
import java.util.ArrayList;
/**
* @Description:摄像头命令request创造器 TODO 冗余代码太多待优化
@ -136,7 +132,15 @@ public class SIPRequestHeaderProvider {
return request;
}
public Request createSubscribeRequest(Device device, String content, String viaTag, String fromTag, String toTag, Integer expires, String event, CallIdHeader callIdHeader) throws ParseException, InvalidArgumentException, PeerUnavailableException {
public Request createSubscribeRequest(Device device,
String content,
String viaTag,
String fromTag,
String toTag,
Integer expires,
String event,
CallIdHeader callIdHeader
) throws ParseException, InvalidArgumentException, PeerUnavailableException {
Request request = null;
// sipuri
SipURI requestURI = sipFactory.createAddressFactory().createSipURI(device.getDeviceId(), device.getHostAddress());

22
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java

@ -17,13 +17,10 @@ import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe;
import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorager;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
// import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.DependsOn;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component;
@ -290,6 +287,7 @@ public class SIPCommander implements ISIPCommander {
/**
* 前端控制指令用于转发上级指令
*
* @param device 控制设备
* @param channelId 预览通道
* @param cmdString 前端控制指令串
@ -635,6 +633,7 @@ public class SIPCommander implements ISIPCommander {
}
return false;
}
@Override
public void audioBroadcastCmd(Device device, SipSubscribe.Event errorEvent) {
try {
@ -1107,7 +1106,7 @@ public class SIPCommander implements ISIPCommander {
* @param device 视频设备
* @param startPriority 报警起始级别可选
* @param endPriority 报警终止级别可选
* @param alarmMethods 报警方式条件可选
* @param alarmMethod 报警方式条件可选
* @param alarmType 报警类型
* @param startTime 报警发生起始时间可选
* @param endTime 报警发生终止时间可选
@ -1269,6 +1268,7 @@ public class SIPCommander implements ISIPCommander {
* @param interval 上报时间间隔
* @return true = 命令发送成功
*/
@Override
public boolean mobilePositionSubscribe(Device device, int expires, int interval) {
try {
StringBuffer subscribePostitionXml = new StringBuffer(200);
@ -1340,7 +1340,19 @@ public class SIPCommander implements ISIPCommander {
cmdXml.append("</Query>\r\n");
String tm = Long.toString(System.currentTimeMillis());
Request request = headerProvider.createSubscribeRequest(device, cmdXml.toString(), "viaTagPos" + tm, "fromTagPos" + tm, null, expires, "presence" );
CallIdHeader callIdHeader = device.getTransport().equals("TCP") ? tcpSipProvider.getNewCallId()
: udpSipProvider.getNewCallId();
Request request = headerProvider.createSubscribeRequest(
device,
cmdXml.toString(),
"viaTagPos" + tm,
"fromTagPos" + tm,
null,
expires,
"presence",
callIdHeader);
transmitRequest(device, request);
return true;

11
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/AckRequestProcessor.java

@ -11,9 +11,11 @@ import javax.sip.header.ToHeader;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
import com.genersoft.iot.vmp.gb28181.transmit.request.SIPRequestAbstractProcessor;
import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import org.springframework.beans.factory.annotation.Autowired;
/**
* @Description:ACK请求处理器
@ -26,6 +28,9 @@ public class AckRequestProcessor extends SIPRequestAbstractProcessor {
private ZLMRTPServerFactory zlmrtpServerFactory;
@Autowired
private VideoStreamSessionManager streamSession;
/**
* 处理 ACK请求
*
@ -35,7 +40,9 @@ public class AckRequestProcessor extends SIPRequestAbstractProcessor {
public void process(RequestEvent evt) {
//Request request = evt.getRequest();
Dialog dialog = evt.getDialog();
if (dialog == null) return;
if (dialog == null) {
return;
}
//DialogState state = dialog.getState();
if (/*request.getMecodewwthod().equals(Request.INVITE) &&*/ dialog.getState()== DialogState.CONFIRMED) {
String platformGbId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(FromHeader.NAME)).getAddress().getURI()).getUser();
@ -43,7 +50,7 @@ public class AckRequestProcessor extends SIPRequestAbstractProcessor {
SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(platformGbId, channelId);
String is_Udp = sendRtpItem.isTcp() ? "0" : "1";
String deviceId = sendRtpItem.getDeviceId();
StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(deviceId, channelId);
StreamInfo streamInfo = redisCatchStorage.queryPlayByChannel(channelId);
sendRtpItem.setStreamId(streamInfo.getStreamId());
redisCatchStorage.updateSendRTPSever(sendRtpItem);
System.out.println(platformGbId);

9
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/ByeRequestProcessor.java

@ -11,11 +11,14 @@ import javax.sip.header.HeaderAddress;
import javax.sip.header.ToHeader;
import javax.sip.message.Response;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;
import com.genersoft.iot.vmp.gb28181.transmit.request.SIPRequestAbstractProcessor;
import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import org.springframework.beans.factory.annotation.Autowired;
import java.text.ParseException;
import java.util.HashMap;
@ -34,6 +37,9 @@ public class ByeRequestProcessor extends SIPRequestAbstractProcessor {
private ZLMRTPServerFactory zlmrtpServerFactory;
@Autowired
private VideoStreamSessionManager streamSession;
/**
* 处理BYE请求
* @param evt
@ -54,11 +60,12 @@ public class ByeRequestProcessor extends SIPRequestAbstractProcessor {
param.put("app","rtp");
param.put("stream",streamId);
System.out.println("停止向上级推流:" + streamId);
StreamInfo streamInfo = streamSession.getStreamInfo(channelId, streamId);
zlmrtpServerFactory.stopSendRtpStream(param);
redisCatchStorage.deleteSendRTPServer(platformGbId, channelId);
if (zlmrtpServerFactory.totalReaderCount(streamId) == 0) {
System.out.println(streamId + "无其它观看者,通知设备停止推流");
cmder.streamByeCmd(streamId);
cmder.stopStreamByeCmd(streamInfo, null);
}
}
} catch (SipException e) {

41
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/InviteRequestProcessor.java

@ -1,16 +1,5 @@
package com.genersoft.iot.vmp.gb28181.transmit.request.impl;
import javax.sdp.*;
import javax.sip.InvalidArgumentException;
import javax.sip.RequestEvent;
import javax.sip.SipException;
import javax.sip.SipFactory;
import javax.sip.address.Address;
import javax.sip.address.SipURI;
import javax.sip.header.*;
import javax.sip.message.Request;
import javax.sip.message.Response;
import com.genersoft.iot.vmp.conf.MediaServerConfig;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
@ -22,13 +11,25 @@ import com.genersoft.iot.vmp.gb28181.transmit.request.SIPRequestAbstractProcesso
import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorager;
import com.genersoft.iot.vmp.vmanager.play.bean.PlayResult;
import com.genersoft.iot.vmp.vmanager.service.IPlayService;
import gov.nist.javax.sip.address.AddressImpl;
import gov.nist.javax.sip.address.SipUri;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.ResponseEntity;
import org.springframework.web.context.request.async.DeferredResult;
import javax.sdp.*;
import javax.sip.InvalidArgumentException;
import javax.sip.RequestEvent;
import javax.sip.SipException;
import javax.sip.SipFactory;
import javax.sip.address.Address;
import javax.sip.address.SipURI;
import javax.sip.header.ContentTypeHeader;
import javax.sip.header.FromHeader;
import javax.sip.message.Request;
import javax.sip.message.Response;
import java.text.ParseException;
import java.util.Vector;
@ -65,8 +66,7 @@ public class InviteRequestProcessor extends SIPRequestAbstractProcessor {
/**
* 处理invite请求
*
* @param evt
* 请求消息
* @param evt 请求消息
*/
@Override
public void process(RequestEvent evt) {
@ -160,7 +160,7 @@ public class InviteRequestProcessor extends SIPRequestAbstractProcessor {
responseAck(evt, Response.SERVER_INTERNAL_ERROR);
return;
}
SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(addressStr, port, ssrc, requesterId, device.getDeviceId(), channelId,
SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(addressStr, port, ssrc, requesterId, device, channelId,
mediaTransmissionTCP);
if (tcpActive != null) {
sendRtpItem.setTcpActive(tcpActive);
@ -174,7 +174,7 @@ public class InviteRequestProcessor extends SIPRequestAbstractProcessor {
// 写入redis, 超时时回复
redisCatchStorage.updateSendRTPSever(sendRtpItem);
// 通知下级推流,
PlayResult playResult = playService.play(device.getDeviceId(), channelId, (responseJSON)->{
DeferredResult<ResponseEntity<String>> playResult = playService.play(device.getDeviceId(), channelId, (responseJSON) -> {
// 收到推流, 回复200OK, 等待ack
sendRtpItem.setStatus(1);
redisCatchStorage.updateSendRTPSever(sendRtpItem);
@ -277,10 +277,6 @@ public class InviteRequestProcessor extends SIPRequestAbstractProcessor {
logger.info("设备{}请求语音流,地址:{}:{},ssrc:{}", username, addressStr, port, ssrc);
} else {
logger.warn("来自无效设备/平台的请求");
responseAck(evt, Response.BAD_REQUEST);
@ -316,6 +312,7 @@ public class InviteRequestProcessor extends SIPRequestAbstractProcessor {
/**
* 回复带sdp的200
*
* @param evt
* @param sdp
* @throws SipException
@ -338,10 +335,6 @@ public class InviteRequestProcessor extends SIPRequestAbstractProcessor {
}
public SIPCommanderFroPlatform getCmderFroPlatform() {
return cmderFroPlatform;
}

43
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/MessageRequestProcessor.java

@ -1,23 +1,5 @@
package com.genersoft.iot.vmp.gb28181.transmit.request.impl;
import java.io.ByteArrayInputStream;
import java.text.ParseException;
import java.util.*;
import javax.sip.address.SipURI;
import javax.sip.header.FromHeader;
import javax.sip.header.HeaderAddress;
import javax.sip.header.ToHeader;
import javax.sip.InvalidArgumentException;
import javax.sip.ListeningPoint;
import javax.sip.ObjectInUseException;
import javax.sip.RequestEvent;
import javax.sip.SipException;
import javax.sip.SipProvider;
import javax.sip.message.Request;
import javax.sip.message.Response;
import com.alibaba.fastjson.JSONObject;
import com.genersoft.iot.vmp.VManageBootstrap;
import com.genersoft.iot.vmp.common.StreamInfo;
@ -26,7 +8,6 @@ import com.genersoft.iot.vmp.conf.UserSetup;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.event.DeviceOffLineDetector;
import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
import com.genersoft.iot.vmp.gb28181.transmit.callback.CheckForAllRecordsThread;
import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage;
@ -42,11 +23,9 @@ import com.genersoft.iot.vmp.utils.GpsUtil;
import com.genersoft.iot.vmp.utils.SpringBeanFactory;
import com.genersoft.iot.vmp.utils.redis.RedisUtil;
import com.genersoft.iot.vmp.vmanager.platform.bean.ChannelReduce;
import gov.nist.javax.sip.SipStackImpl;
import gov.nist.javax.sip.address.AddressImpl;
import gov.nist.javax.sip.address.SipUri;
import org.dom4j.Document;
import org.dom4j.DocumentException;
import org.dom4j.Element;
@ -55,9 +34,11 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.StringUtils;
import javax.sip.InvalidArgumentException;
import javax.sip.RequestEvent;
import javax.sip.SipException;
import javax.sip.*;
import javax.sip.address.SipURI;
import javax.sip.header.FromHeader;
import javax.sip.header.HeaderAddress;
import javax.sip.header.ToHeader;
import javax.sip.message.Request;
import javax.sip.message.Response;
import java.io.ByteArrayInputStream;
@ -86,12 +67,8 @@ public class MessageRequestProcessor extends SIPRequestAbstractProcessor {
private IVideoManagerStorager storager;
private SIPCommanderFroPlatform cmderFroPlatform;
private IVideoManagerStorager storager;
private IRedisCatchStorage redisCatchStorage;
private VideoStreamSessionManager streamSession;
private EventPublisher publisher;
private RedisUtil redis;
@ -167,7 +144,7 @@ public class MessageRequestProcessor extends SIPRequestAbstractProcessor {
responseAck(evt);
}
} catch (DocumentException | SipException | InvalidArgumentException | ParseException e) {
log.error("MessageRequestProcessor.process error!", e);
logger.error("MessageRequestProcessor.process error!", e);
}
}
@ -735,7 +712,7 @@ public class MessageRequestProcessor extends SIPRequestAbstractProcessor {
}
} catch (DocumentException | SipException | InvalidArgumentException | ParseException e) {
// } catch (DocumentException e) {
log.error("MessageRequestProcessor.processMessageAlarm error!", e);
logger.error("MessageRequestProcessor.processMessageAlarm error!", e);
}
}
@ -758,7 +735,7 @@ public class MessageRequestProcessor extends SIPRequestAbstractProcessor {
}
}
} catch (ParseException | SipException | InvalidArgumentException | DocumentException e) {
log.error("MessageRequestProcessor.processMessageKeepAlive error!", e);
logger.error("MessageRequestProcessor.processMessageKeepAlive error!", e);
}
}
@ -879,7 +856,7 @@ public class MessageRequestProcessor extends SIPRequestAbstractProcessor {
// deferredResultHolder.invokeResult(msg);
// logger.info("处理完成,返回结果");
} catch (DocumentException | SipException | InvalidArgumentException | ParseException e) {
log.error("MessageRequestProcessor.processMessageRecordInfo error!", e);
logger.error("MessageRequestProcessor.processMessageRecordInfo error!", e);
}
}
@ -905,7 +882,7 @@ public class MessageRequestProcessor extends SIPRequestAbstractProcessor {
}
}
} catch (ParseException | SipException | InvalidArgumentException | DocumentException e) {
log.error("MessageRequestProcessor.processMessageMediaStatus error!", e);
logger.error("MessageRequestProcessor.processMessageMediaStatus error!", e);
}
}

12
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java

@ -50,8 +50,6 @@ public class ZLMHttpHookListener {
@Autowired
private ZLMHttpHookSubscribe subscribe;
@Autowired
private ZLMHttpHookSubscribe subscribe;
@Autowired
MediaConfig mediaConfig;
@ -61,7 +59,6 @@ public class ZLMHttpHookListener {
/**
* 流量统计事件播放器或推流器断开时并且耗用流量超过特定阈值时会触发此事件阈值通过配置文件general.flowThreshold配置此事件对回复不敏感
*
*/
@ResponseBody
@PostMapping(value = "/on_flow_report", produces = "application/json;charset=UTF-8")
@ -78,7 +75,6 @@ public class ZLMHttpHookListener {
/**
* 访问http文件服务器上hls之外的文件时触发
*
*/
@ResponseBody
@PostMapping(value = "/on_http_access", produces = "application/json;charset=UTF-8")
@ -97,7 +93,6 @@ public class ZLMHttpHookListener {
/**
* 播放器鉴权事件rtsp/rtmp/http-flv/ws-flv/hls的播放都将触发此鉴权事件
*
*/
@ResponseBody
@PostMapping(value = "/on_play", produces = "application/json;charset=UTF-8")
@ -114,7 +109,6 @@ public class ZLMHttpHookListener {
/**
* rtsp/rtmp/rtp推流鉴权事件
*
*/
@ResponseBody
@PostMapping(value = "/on_publish", produces = "application/json;charset=UTF-8")
@ -140,7 +134,6 @@ public class ZLMHttpHookListener {
/**
* 录制mp4完成后通知事件此事件对回复不敏感
*
*/
@ResponseBody
@PostMapping(value = "/on_record_mp4", produces = "application/json;charset=UTF-8")
@ -157,7 +150,6 @@ public class ZLMHttpHookListener {
/**
* rtsp专用的鉴权事件先触发on_rtsp_realm事件然后才会触发on_rtsp_auth事件
*
*/
@ResponseBody
@PostMapping(value = "/on_rtsp_realm", produces = "application/json;charset=UTF-8")
@ -175,7 +167,6 @@ public class ZLMHttpHookListener {
/**
* 该rtsp流是否开启rtsp专用方式的鉴权事件开启后才会触发on_rtsp_auth事件需要指出的是rtsp也支持url参数鉴权它支持两种方式鉴权
*
*/
@ResponseBody
@PostMapping(value = "/on_rtsp_auth", produces = "application/json;charset=UTF-8")
@ -193,7 +184,6 @@ public class ZLMHttpHookListener {
/**
* shell登录鉴权ZLMediaKit提供简单的telnet调试方式使用telnet 127.0.0.1 9000能进入MediaServer进程的shell界面
*
*/
@ResponseBody
@PostMapping(value = "/on_shell_login", produces = "application/json;charset=UTF-8")
@ -233,6 +223,7 @@ public class ZLMHttpHookListener {
// 流消失移除redis play
String app = json.getString("app");
String streamId = json.getString("stream");
String schema = json.getString("schema");
boolean regist = json.getBoolean("regist");
if (!"rtp".equals(app) || regist) {
if (!"rtp".equals(app) && "rtsp".equals(schema)) {
@ -338,7 +329,6 @@ public class ZLMHttpHookListener {
/**
* 服务器启动事件可以用于监听服务器崩溃重启此事件对回复不敏感
*
*/
@ResponseBody
@PostMapping(value = "/on_server_started", produces = "application/json;charset=UTF-8")

9
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaListManager.java

@ -1,19 +1,14 @@
package com.genersoft.iot.vmp.media.zlm;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.TypeReference;
import com.genersoft.iot.vmp.common.RealVideo;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.gb28181.session.SsrcUtil;
import com.genersoft.iot.vmp.media.zlm.dto.MediaItem;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.impl.RedisCatchStorageImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import java.util.*;
@ -38,7 +33,8 @@ public class ZLMMediaListManager {
Map<String, RealVideo> result = new HashMap<>();
if (code == 0) {
if (dataStr != null) {
List<MediaItem> mediaItems = JSON.parseObject(dataStr, new TypeReference<List<MediaItem>>() {});
List<MediaItem> mediaItems = JSON.parseObject(dataStr, new TypeReference<List<MediaItem>>() {
});
for (MediaItem item : mediaItems) {
if ("rtp".equals(item.getApp())) {
continue;
@ -83,5 +79,4 @@ public class ZLMMediaListManager {
}
}

17
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java

@ -81,16 +81,20 @@ public class ZLMRESTfulUtils {
// }
public JSONObject getMediaList() {
return sendPost("getMediaList",null);
// TODO mediaServerIp 要作为参数传递进来,
String mediaServerIp = mediaConfig.getMediaIp();
return sendPost(mediaServerIp, "getMediaList", null);
}
public JSONObject getMediaInfo(String app, String schema, String stream) {
// TODO mediaServerIp 要作为参数传递进来,
String mediaServerIp = mediaConfig.getMediaIp();
Map<String, Object> param = new HashMap<>();
param.put("app", app);
param.put("schema", schema);
param.put("stream", stream);
param.put("vhost", "__defaultVhost__");
return sendPost("getMediaInfo",param);
return sendPost(mediaServerIp, "getMediaInfo", param);
}
public JSONObject getRtpInfo(String mediaServerIp, String stream_id) {
@ -178,10 +182,15 @@ public class ZLMRESTfulUtils {
}
public JSONObject startSendRtp(Map<String, Object> param) {
return sendPost("startSendRtp",param);
// TODO mediaServerIp 要作为参数传递进来,
String mediaServerIp = mediaConfig.getMediaIp();
return sendPost(mediaServerIp, "startSendRtp", param);
}
public JSONObject stopSendRtp(Map<String, Object> param) {
return sendPost("stopSendRtp",param);
// TODO mediaServerIp 要作为参数传递进来,
String mediaServerIp = mediaConfig.getMediaIp();
return sendPost(mediaServerIp, "stopSendRtp", param);
}
}

27
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java

@ -1,9 +1,11 @@
package com.genersoft.iot.vmp.media.zlm;
import com.alibaba.fastjson.JSONObject;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.gb28181.session.SsrcUtil;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.conf.MediaConfig;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@ -24,6 +26,9 @@ public class ZLMRTPServerFactory {
@Autowired
private ZLMRESTfulUtils zlmresTfulUtils;
@Autowired
private VideoStreamSessionManager streamSession;
private int[] udpPortRangeArray = new int[2];
private ConcurrentHashMap<String, Integer> currentPortMap = new ConcurrentHashMap<>();
@ -110,20 +115,27 @@ public class ZLMRTPServerFactory {
/**
* 创建一个推流
*
* @param ip 推流ip
* @param port 推流端口
* @param ssrc 推流唯一标识
* @param platformId 平台id
* @param device 平台id
* @param channelId 通道id
* @param tcp 是否为tcp
* @return SendRtpItem
*/
public SendRtpItem createSendRtpItem(String ip, int port, String ssrc, String platformId, String deviceId, String channelId, boolean tcp){
String playSsrc = SsrcUtil.getPlaySsrc();
int localPort = createRTPServer(SsrcUtil.getPlaySsrc());
public SendRtpItem createSendRtpItem(String ip, int port, String ssrc, String platformId, Device device, String channelId, boolean tcp) {
StreamInfo playStreamInfo = streamSession.createPlayStreamInfo(device, channelId);
String mediaServerIp = playStreamInfo.getMediaServerIp();
String streamId = playStreamInfo.getStreamId();
int localPort = createRTPServer(mediaServerIp, streamId);
if (localPort != -1) {
closeRTPServer(playSsrc);
// TODO 没看懂这块逻辑,-1代表失败吗?分配端口后为什么要再把端口关掉?
closeRTPServer(mediaServerIp, streamId);
streamSession.remove(playStreamInfo);
} else {
streamSession.remove(playStreamInfo);
logger.error("没有可用的端口");
return null;
}
@ -132,7 +144,7 @@ public class ZLMRTPServerFactory {
sendRtpItem.setPort(port);
sendRtpItem.setSsrc(ssrc);
sendRtpItem.setPlatformId(platformId);
sendRtpItem.setDeviceId(deviceId);
sendRtpItem.setDeviceId(device.getDeviceId());
sendRtpItem.setChannelId(channelId);
sendRtpItem.setTcp(tcp);
sendRtpItem.setLocalPort(localPort);
@ -167,6 +179,7 @@ public class ZLMRTPServerFactory {
/**
* 查询转推的流是否有其它观看者
*
* @param streamId
* @return
*/

17
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRunner.java

@ -7,7 +7,8 @@ import com.genersoft.iot.vmp.conf.MediaConfig;
import com.genersoft.iot.vmp.conf.MediaServerConfig;
import com.genersoft.iot.vmp.conf.SipConfig;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.CommandLineRunner;
@ -18,10 +19,10 @@ import org.springframework.util.StringUtils;
import java.util.HashMap;
import java.util.Map;
@Slf4j
@Component
@Order(value = 1)
public class ZLMRunner implements CommandLineRunner {
private final Logger logger = LoggerFactory.getLogger(ZLMRunner.class);
@Autowired
private IRedisCatchStorage redisCatchStorage;
@ -44,10 +45,10 @@ public class ZLMRunner implements CommandLineRunner {
String[] mediaIpArr = mediaConfig.getMediaIpArr();
for (String mediaIp : mediaIpArr) {
// 获取zlm信息
log.info("等待zlm {} 接入...", mediaIp);
logger.info("等待zlm {} 接入...", mediaIp);
MediaServerConfig mediaServerConfig = getMediaServerConfig(mediaIp);
if (mediaServerConfig != null) {
log.info("zlm {} 接入成功...", mediaIp);
logger.info("zlm {} 接入成功...", mediaIp);
if (mediaConfig.getAutoConfig()) {
// 自动配置zlm
saveZLMConfig(mediaIp);
@ -71,7 +72,7 @@ public class ZLMRunner implements CommandLineRunner {
mediaServerConfig = JSON.parseObject(JSON.toJSONString(data.get(0)), MediaServerConfig.class);
}
} else {
log.error("getMediaServerConfig失败, 1s后重试");
logger.error("getMediaServerConfig失败, 1s后重试");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
@ -83,7 +84,7 @@ public class ZLMRunner implements CommandLineRunner {
}
private void saveZLMConfig(String mediaIp) {
log.info("设置zlm {} ...", mediaIp);
logger.info("设置zlm {} ...", mediaIp);
String mediaHookIp = mediaConfig.getMediaHookIp();
if (StringUtils.isEmpty(mediaHookIp)) {
mediaHookIp = sipConfig.getSipIp();
@ -112,9 +113,9 @@ public class ZLMRunner implements CommandLineRunner {
JSONObject responseJSON = zlmresTfulUtils.setServerConfig(mediaIp, param);
if (responseJSON != null && responseJSON.getInteger("code") == 0) {
log.info("设置zlm {} 成功", mediaIp);
logger.info("设置zlm {} 成功", mediaIp);
} else {
log.info("设置zlm {} 失败: {}", mediaIp, responseJSON);
logger.info("设置zlm {} 失败: {}", mediaIp, responseJSON);
}
}
}

9
src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java

@ -8,7 +8,6 @@ import com.genersoft.iot.vmp.gb28181.bean.ParentPlatformCatch;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import java.util.List;
import java.util.Map;
public interface IRedisCatchStorage {
@ -56,7 +55,8 @@ public interface IRedisCatchStorage {
StreamInfo queryPlaybackByChannel(String channelId);
List<StreamInfo> queryPlayBackByDeviceId(String deviceId);
StreamInfo queryPlaybackByDevice(String deviceId, String code);
StreamInfo queryPlaybackByDevice(String deviceId, String channelId);
void updatePlatformCatchInfo(ParentPlatformCatch parentPlatformCatch);
@ -84,6 +84,7 @@ public interface IRedisCatchStorage {
/**
* 查询RTP推送信息缓存
*
* @param platformGbId
* @param channelId
* @return sendRtpItem
@ -92,6 +93,7 @@ public interface IRedisCatchStorage {
/**
* 删除RTP推送信息缓存
*
* @param platformGbId
* @param channelId
*/
@ -99,18 +101,21 @@ public interface IRedisCatchStorage {
/**
* 查询某个通道是否存在上级点播RTP推送
*
* @param channelId
*/
boolean isChannelSendingRTP(String channelId);
/**
* 更新媒体流列表
*
* @param mediaList
*/
void updateMediaList(List<RealVideo> mediaList);
/**
* 获取当前媒体流列表
*
* @return List<RealVideo>
*/
List<Object> getMediaList(int start, int end);

59
src/main/java/com/genersoft/iot/vmp/storager/IVideoManagerStorager.java

@ -2,9 +2,9 @@ package com.genersoft.iot.vmp.storager;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
import com.genersoft.iot.vmp.gb28181.bean.MobilePosition;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
import com.genersoft.iot.vmp.vmanager.platform.bean.ChannelReduce;
import com.genersoft.iot.vmp.gb28181.bean.MobilePosition;
import com.github.pagehelper.PageInfo;
import java.util.List;
@ -22,7 +22,7 @@ public interface IVideoManagerStorager {
* @param deviceId 设备ID
* @return true:存在 false不存在
*/
public boolean exists(String deviceId);
boolean exists(String deviceId);
/**
* 视频设备创建
@ -30,7 +30,7 @@ public interface IVideoManagerStorager {
* @param device 设备对象
* @return true创建成功 false创建失败
*/
public boolean create(Device device);
boolean create(Device device);
/**
* 视频设备更新
@ -38,7 +38,7 @@ public interface IVideoManagerStorager {
* @param device 设备对象
* @return true创建成功 false创建失败
*/
public boolean updateDevice(Device device);
boolean updateDevice(Device device);
/**
* 添加设备通道
@ -46,7 +46,7 @@ public interface IVideoManagerStorager {
* @param deviceId 设备id
* @param channel 通道
*/
public void updateChannel(String deviceId, DeviceChannel channel);
void updateChannel(String deviceId, DeviceChannel channel);
/**
* 开始播放
@ -55,7 +55,7 @@ public interface IVideoManagerStorager {
* @param channelId 通道ID
* @param streamId 流地址
*/
public void startPlay(String deviceId, String channelId, String streamId);
void startPlay(String deviceId, String channelId, String streamId);
/**
* 停止播放
@ -63,7 +63,7 @@ public interface IVideoManagerStorager {
* @param deviceId 设备id
* @param channelId 通道ID
*/
public void stopPlay(String deviceId, String channelId);
void stopPlay(String deviceId, String channelId);
/**
* 获取设备
@ -71,7 +71,7 @@ public interface IVideoManagerStorager {
* @param deviceId 设备ID
* @return DShadow 设备对象
*/
public Device queryVideoDevice(String deviceId);
Device queryVideoDevice(String deviceId);
/**
* 获取某个设备的通道列表
@ -81,7 +81,7 @@ public interface IVideoManagerStorager {
* @param count 每页数量
* @return
*/
public PageInfo queryChannelsByDeviceId(String deviceId, String query, Boolean hasSubChannel, Boolean online, int page, int count);
PageInfo queryChannelsByDeviceId(String deviceId, String query, Boolean hasSubChannel, Boolean online, int page, int count);
/**
* 获取某个设备的通道列表
@ -89,7 +89,7 @@ public interface IVideoManagerStorager {
* @param deviceId 设备ID
* @return
*/
public List<DeviceChannel> queryChannelsByDeviceId(String deviceId);
List<DeviceChannel> queryChannelsByDeviceId(String deviceId);
/**
* 获取某个设备的通道
@ -97,7 +97,7 @@ public interface IVideoManagerStorager {
* @param deviceId 设备ID
* @param channelId 通道ID
*/
public DeviceChannel queryChannel(String deviceId, String channelId);
DeviceChannel queryChannel(String deviceId, String channelId);
/**
* 获取多个设备
@ -106,14 +106,14 @@ public interface IVideoManagerStorager {
* @param count 每页数量
* @return List<Device> 设备对象数组
*/
public PageInfo<Device> queryVideoDeviceList(int page, int count);
PageInfo<Device> queryVideoDeviceList(int page, int count);
/**
* 获取多个设备
*
* @return List<Device> 设备对象数组
*/
public List<Device> queryVideoDeviceList();
List<Device> queryVideoDeviceList();
/**
* 删除设备
@ -121,7 +121,7 @@ public interface IVideoManagerStorager {
* @param deviceId 设备ID
* @return true删除成功 false删除失败
*/
public boolean delete(String deviceId);
boolean delete(String deviceId);
/**
* 更新设备在线
@ -129,7 +129,7 @@ public interface IVideoManagerStorager {
* @param deviceId 设备ID
* @return true更新成功 false更新失败
*/
public boolean online(String deviceId);
boolean online(String deviceId);
/**
* 更新设备离线
@ -137,7 +137,7 @@ public interface IVideoManagerStorager {
* @param deviceId 设备ID
* @return true更新成功 false更新失败
*/
public boolean outline(String deviceId);
boolean outline(String deviceId);
/**
@ -159,16 +159,9 @@ public interface IVideoManagerStorager {
*/
void cleanChannelsForDevice(String deviceId);
/**
* 添加Mobile Position设备移动位置
*
* @param MobilePosition
* @return
*/
public boolean insertMobilePosition(MobilePosition mobilePosition);
/**
* 更新上级平台
*
* @param parentPlatform
*/
boolean updateParentPlatform(ParentPlatform parentPlatform);
@ -176,12 +169,14 @@ public interface IVideoManagerStorager {
/**
* 添加上级平台
*
* @param parentPlatform
*/
boolean addParentPlatform(ParentPlatform parentPlatform);
/**
* 删除上级平台
*
* @param parentPlatform
*/
boolean deleteParentPlatform(ParentPlatform parentPlatform);
@ -189,6 +184,7 @@ public interface IVideoManagerStorager {
/**
* 分页获取上级平台
*
* @param page
* @param count
* @return
@ -197,12 +193,14 @@ public interface IVideoManagerStorager {
/**
* 获取所有已启用的平台
*
* @return
*/
List<ParentPlatform> queryEnableParentPlatformList(boolean enable);
/**
* 获取上级平台
*
* @param platformGbId
* @return
*/
@ -226,6 +224,7 @@ public interface IVideoManagerStorager {
/**
* 更新上级平台的通道信息
*
* @param platformId
* @param channelReduces
* @return
@ -234,6 +233,7 @@ public interface IVideoManagerStorager {
/**
* 移除上级平台的通道信息
*
* @param platformId
* @param channelReduces
* @return
@ -248,10 +248,11 @@ public interface IVideoManagerStorager {
/**
* 添加Mobile Position设备移动位置
* @param MobilePosition
*
* @param mobilePosition
* @return
*/
public boolean insertMobilePosition(MobilePosition mobilePosition);
boolean insertMobilePosition(MobilePosition mobilePosition);
/**
* 查询移动位置轨迹
@ -260,19 +261,19 @@ public interface IVideoManagerStorager {
* @param startTime
* @param endTime
*/
public List<MobilePosition> queryMobilePositions(String deviceId, String startTime, String endTime);
List<MobilePosition> queryMobilePositions(String deviceId, String startTime, String endTime);
/**
* 查询最新移动位置
*
* @param deviceId
*/
public MobilePosition queryLatestPosition(String deviceId);
MobilePosition queryLatestPosition(String deviceId);
/**
* 删除指定设备的所有移动位置
*
* @param deviceId
*/
public int clearMobilePositionsByDeviceId(String deviceId);
int clearMobilePositionsByDeviceId(String deviceId);
}

33
src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java

@ -4,7 +4,10 @@ import com.genersoft.iot.vmp.common.RealVideo;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.common.VideoManagerConstants;
import com.genersoft.iot.vmp.conf.MediaServerConfig;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatformCatch;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.dao.DeviceChannelMapper;
import com.genersoft.iot.vmp.utils.redis.RedisUtil;
@ -12,9 +15,9 @@ import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.*;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
@SuppressWarnings("rawtypes")
@Component
@ -142,6 +145,28 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage {
return redis.del(key);
}
@Override
public StreamInfo queryPlaybackByDevice(String deviceId, String channelId) {
String keyByChannel = getKey(VideoManagerConstants.PLAY_BLACK_PREFIX,
null,
channelId,
deviceId
);
List<Object> playLeys = redis.scan(keyByChannel);
if (playLeys == null || playLeys.size() == 0) {
String keyByDevice = getKey(VideoManagerConstants.PLAY_BLACK_PREFIX,
null,
null,
deviceId
);
playLeys = redis.scan(keyByDevice);
}
if (playLeys == null || playLeys.size() == 0) {
return null;
}
return (StreamInfo) redis.get(playLeys.get(0).toString());
}
@Override
public StreamInfo queryPlaybackByStreamId(String channelId, String steamId) {
String key = getKey(VideoManagerConstants.PLAY_BLACK_PREFIX,
@ -269,6 +294,7 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage {
/**
* 删除RTP推送信息缓存
*
* @param platformGbId
* @param channelId
*/
@ -280,6 +306,7 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage {
/**
* 查询某个通道是否存在上级点播RTP推送
*
* @param channelId
*/
@Override
@ -296,6 +323,7 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage {
/**
* 更新媒体流列表
*
* @param mediaList
*/
@Override
@ -311,6 +339,7 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage {
/**
* 获取当前媒体流列表
*
* @return List<RealVideo>
*/
@Override

33
src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStoragerImpl.java

@ -1,27 +1,21 @@
package com.genersoft.iot.vmp.storager.impl;
import java.util.*;
import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatformCatch;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.gb28181.bean.MobilePosition;
import com.genersoft.iot.vmp.storager.dao.DeviceChannelMapper;
import com.genersoft.iot.vmp.storager.dao.DeviceMapper;
import com.genersoft.iot.vmp.storager.dao.ParentPlatformMapper;
import com.genersoft.iot.vmp.storager.dao.PatformChannelMapper;
import com.genersoft.iot.vmp.storager.IVideoManagerStorager;
import com.genersoft.iot.vmp.storager.dao.*;
import com.genersoft.iot.vmp.vmanager.platform.bean.ChannelReduce;
import com.genersoft.iot.vmp.storager.dao.DeviceMobilePositionMapper;
import com.github.pagehelper.PageHelper;
import com.github.pagehelper.PageInfo;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.storager.IVideoManagerStorager;
import org.springframework.transaction.annotation.Transactional;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* @Description:视频设备数据存储-jdbc实现
* @author: swwheihei
@ -50,8 +44,6 @@ public class VideoManagerStoragerImpl implements IVideoManagerStorager {
private PatformChannelMapper patformChannelMapper;
/**
* 根据设备ID判断设备是否存在
*
@ -75,7 +67,6 @@ public class VideoManagerStoragerImpl implements IVideoManagerStorager {
}
/**
* 视频设备更新
*
@ -224,6 +215,7 @@ public class VideoManagerStoragerImpl implements IVideoManagerStorager {
/**
* 清空通道
*
* @param deviceId
*/
@Override
@ -233,7 +225,8 @@ public class VideoManagerStoragerImpl implements IVideoManagerStorager {
/**
* 添加Mobile Position设备移动位置
* @param MobilePosition
*
* @param mobilePosition
*/
@Override
public synchronized boolean insertMobilePosition(MobilePosition mobilePosition) {
@ -242,6 +235,7 @@ public class VideoManagerStoragerImpl implements IVideoManagerStorager {
/**
* 查询移动位置轨迹
*
* @param deviceId
* @param startTime
* @param endTime
@ -373,6 +367,7 @@ public class VideoManagerStoragerImpl implements IVideoManagerStorager {
/**
* 查询最新移动位置
*
* @param deviceId
*/
@Override
@ -382,8 +377,10 @@ public class VideoManagerStoragerImpl implements IVideoManagerStorager {
/**
* 删除指定设备的所有移动位置
*
* @param deviceId
*/
@Override
public int clearMobilePositionsByDeviceId(String deviceId) {
return deviceMobilePositionMapper.clearMobilePositionsByDeviceId(deviceId);
}

20
src/main/java/com/genersoft/iot/vmp/utils/redis/RedisUtil.java

@ -420,7 +420,7 @@ public class RedisUtil {
* @param value
* @param score
*/
public void zAdd(String key, String value, double score) {
public void zAdd(Object key, Object value, double score) {
redisTemplate.opsForZSet().add(key, value, score);
}
@ -430,7 +430,7 @@ public class RedisUtil {
* @param key
* @param value
*/
public void zRemove(String key, String value) {
public void zRemove(Object key, Object value) {
redisTemplate.opsForZSet().remove(key, value);
}
@ -441,7 +441,7 @@ public class RedisUtil {
* @param value
* @param score
*/
public Double zIncrScore(String key, String value, double score) {
public Double zIncrScore(Object key, Object value, double score) {
return redisTemplate.opsForZSet().incrementScore(key, value, score);
}
@ -452,7 +452,7 @@ public class RedisUtil {
* @param value
* @return
*/
public Double zScore(String key, String value) {
public Double zScore(Object key, Object value) {
return redisTemplate.opsForZSet().score(key, value);
}
@ -463,7 +463,7 @@ public class RedisUtil {
* @param value
* @return
*/
public Long zRank(String key, String value) {
public Long zRank(Object key, Object value) {
return redisTemplate.opsForZSet().rank(key, value);
}
@ -473,7 +473,7 @@ public class RedisUtil {
* @param key
* @return
*/
public Long zSize(String key) {
public Long zSize(Object key) {
return redisTemplate.opsForZSet().zCard(key);
}
@ -487,7 +487,7 @@ public class RedisUtil {
* @param end
* @return
*/
public Set<String> ZRange(String key, int start, int end) {
public Set<Object> ZRange(Object key, int start, int end) {
return redisTemplate.opsForZSet().range(key, start, end);
}
@ -499,7 +499,7 @@ public class RedisUtil {
* @param end
* @return
*/
public Set<ZSetOperations.TypedTuple<String>> zRangeWithScore(String key, int start, int end) {
public Set<ZSetOperations.TypedTuple<String>> zRangeWithScore(Object key, int start, int end) {
return redisTemplate.opsForZSet().rangeWithScores(key, start, end);
}
@ -513,7 +513,7 @@ public class RedisUtil {
* @param end
* @return
*/
public Set<String> zRevRange(String key, int start, int end) {
public Set<String> zRevRange(Object key, int start, int end) {
return redisTemplate.opsForZSet().reverseRange(key, start, end);
}
@ -525,7 +525,7 @@ public class RedisUtil {
* @param max
* @return
*/
public Set<String> zSortRange(String key, int min, int max) {
public Set<String> zSortRange(Object key, int min, int max) {
return redisTemplate.opsForZSet().rangeByScore(key, min, max);
}

76
src/main/java/com/genersoft/iot/vmp/vmanager/play/PlayController.java

@ -1,19 +1,16 @@
package com.genersoft.iot.vmp.vmanager.play;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.conf.MediaServerConfig;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander;
import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorager;
import com.genersoft.iot.vmp.vmanager.play.bean.PlayResult;
import com.genersoft.iot.vmp.vmanager.service.IPlayService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -21,20 +18,8 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.bind.annotation.CrossOrigin;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import com.alibaba.fastjson.JSONObject;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander;
import com.genersoft.iot.vmp.storager.IVideoManagerStorager;
import org.springframework.web.context.request.async.DeferredResult;
import java.util.UUID;
import javax.sip.message.Response;
@CrossOrigin
@ -68,66 +53,7 @@ public class PlayController {
@GetMapping("/play/{deviceId}/{channelId}")
public DeferredResult<ResponseEntity<String>> play(@PathVariable String deviceId,
@PathVariable String channelId) {
Device device = storager.queryVideoDevice(deviceId);
RequestMessage msg = playService.createCallbackPlayMsg();
DeferredResult<ResponseEntity<String>> result = new DeferredResult<>();
// 超时处理
result.onTimeout(() -> {
logger.warn(String.format("设备点播超时,deviceId:%s ,channelId:%s", deviceId, channelId));
// 释放rtpserver
cmder.closeRTPServer(device, channelId);
StreamInfo streamInfo = streamSession.getPlayStreamInfo(channelId);
streamSession.remove(streamInfo);
msg.setData("Timeout");
resultHolder.invokeResult(msg);
});
resultHolder.put(msg.getId(), result);
// 判断是否已经存在点播
StreamInfo oldStreamInfo = streamSession.getPlayStreamInfo(channelId);
if (oldStreamInfo == null) {
// 发送点播消息
playStreamCmd(device, channelId, msg);
return result;
}
// 若已有人点播,直接播放
String streamId = oldStreamInfo.getStreamId();
String mediaServerIp = oldStreamInfo.getMediaServerIp();
JSONObject rtpInfo = zlmresTfulUtils.getRtpInfo(mediaServerIp, streamId);
if (rtpInfo.getBoolean("exist")) {
msg.setData(JSON.toJSONString(oldStreamInfo));
resultHolder.invokeResult(msg);
return result;
}
// 若已有人点播,但已超时自动断开,则重新发起点播
storager.stopPlay(oldStreamInfo.getDeviceID(), oldStreamInfo.getChannelId());
streamSession.remove(oldStreamInfo);
playStreamCmd(device, channelId, msg);
return result;
}
private void playStreamCmd(Device device, String channelId, RequestMessage msg) {
cmder.playStreamCmd(device, channelId, (JSONObject response) -> {
logger.info("收到点播回调消息: " + response.toJSONString());
playService.onPublishHandlerForPlay(response, device.getDeviceId(), channelId, msg);
}, event -> {
StreamInfo streamInfo = streamSession.getPlayStreamInfo(channelId);
streamSession.remove(streamInfo);
Response response = event.getResponse();
int statusCode = response.getStatusCode();
String errMsg;
if (503 == statusCode) {
errMsg = "点播失败,请检查在NVR上是否可以正常打开监控,并检查NVR和SIP是否连通, 错误码: %s, %s";
} else {
errMsg = "点播失败,错误码: %s, %s";
}
msg.setData(String.format(errMsg, statusCode, response.getReasonPhrase()));
resultHolder.invokeResult(msg);
});
return playService.play(deviceId, channelId, null, null);
}
@PostMapping("/play/{channelId}/{streamId}/stop")

37
src/main/java/com/genersoft/iot/vmp/vmanager/play/bean/PlayResult.java

@ -1,37 +0,0 @@
package com.genersoft.iot.vmp.vmanager.play.bean;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import org.springframework.http.ResponseEntity;
import org.springframework.web.context.request.async.DeferredResult;
public class PlayResult {
private DeferredResult<ResponseEntity<String>> result;
private String uuid;
private Device device;
public DeferredResult<ResponseEntity<String>> getResult() {
return result;
}
public void setResult(DeferredResult<ResponseEntity<String>> result) {
this.result = result;
}
public String getUuid() {
return uuid;
}
public void setUuid(String uuid) {
this.uuid = uuid;
}
public Device getDevice() {
return device;
}
public void setDevice(Device device) {
this.device = device;
}
}

7
src/main/java/com/genersoft/iot/vmp/vmanager/service/IPlayService.java

@ -1,10 +1,11 @@
package com.genersoft.iot.vmp.vmanager.service;
import com.alibaba.fastjson.JSONObject;
import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage;
import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage;
import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe;
import com.genersoft.iot.vmp.vmanager.play.bean.PlayResult;
import org.springframework.http.ResponseEntity;
import org.springframework.web.context.request.async.DeferredResult;
/**
* 点播处理
@ -16,5 +17,5 @@ public interface IPlayService {
void onPublishHandlerForPlay(JSONObject resonse, String deviceId, String channelId, RequestMessage msg);
PlayResult play(String deviceId, String channelId, ZLMHttpHookSubscribe.Event event, SipSubscribe.Event errorEvent);
DeferredResult<ResponseEntity<String>> play(String deviceId, String channelId, ZLMHttpHookSubscribe.Event event, SipSubscribe.Event errorEvent);
}

108
src/main/java/com/genersoft/iot/vmp/vmanager/service/impl/PlayServiceImpl.java

@ -3,7 +3,6 @@ package com.genersoft.iot.vmp.vmanager.service.impl;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.conf.MediaServerConfig;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
@ -15,7 +14,6 @@ import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe;
import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorager;
import com.genersoft.iot.vmp.vmanager.play.bean.PlayResult;
import com.genersoft.iot.vmp.vmanager.service.IPlayService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -27,8 +25,6 @@ import org.springframework.web.context.request.async.DeferredResult;
import javax.sip.message.Response;
import java.util.UUID;
import java.util.UUID;
@Service
public class PlayServiceImpl implements IPlayService {
@ -52,66 +48,80 @@ public class PlayServiceImpl implements IPlayService {
@Autowired
private ZLMRESTfulUtils zlmresTfulUtils;
@Override
public PlayResult play(String deviceId, String channelId, ZLMHttpHookSubscribe.Event hookEvent, SipSubscribe.Event errorEvent) {
PlayResult playResult = new PlayResult();
public DeferredResult<ResponseEntity<String>> play(String deviceId, String channelId, ZLMHttpHookSubscribe.Event hookEvent, SipSubscribe.Event errorEvent) {
Device device = storager.queryVideoDevice(deviceId);
StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(deviceId, channelId);
playResult.setDevice(device);
UUID uuid = UUID.randomUUID();
playResult.setUuid(uuid.toString());
DeferredResult<ResponseEntity<String>> result = new DeferredResult<ResponseEntity<String>>();
playResult.setResult(result);
// 录像查询以channelId作为deviceId查询
resultHolder.put(DeferredResultHolder.CALLBACK_CMD_PlAY + uuid, result);
if (streamInfo == null) {
// 发送点播消息
cmder.playStreamCmd(device, channelId, (JSONObject response) -> {
logger.info("收到订阅消息: " + response.toJSONString());
onPublishHandlerForPlay(response, deviceId, channelId, uuid.toString());
if (hookEvent != null) {
hookEvent.response(response);
}
}, event -> {
RequestMessage msg = new RequestMessage();
msg.setId(DeferredResultHolder.CALLBACK_CMD_PlAY + uuid);
Response response = event.getResponse();
msg.setData(String.format("点播失败, 错误码: %s, %s", response.getStatusCode(), response.getReasonPhrase()));
RequestMessage msg = this.createCallbackPlayMsg();
DeferredResult<ResponseEntity<String>> result = new DeferredResult<>();
// 超时处理
result.onTimeout(() -> {
logger.warn(String.format("设备点播超时,deviceId:%s ,channelId:%s", deviceId, channelId));
// 释放rtpserver
cmder.closeRTPServer(device, channelId);
StreamInfo streamInfo = streamSession.getPlayStreamInfo(channelId);
streamSession.remove(streamInfo);
msg.setData("Timeout");
resultHolder.invokeResult(msg);
if (errorEvent != null) {
errorEvent.response(event);
errorEvent.response(null);
}
});
} else {
String streamId = streamInfo.getStreamId();
JSONObject rtpInfo = zlmresTfulUtils.getRtpInfo(streamId);
resultHolder.put(msg.getId(), result);
// 判断是否已经存在点播
StreamInfo oldStreamInfo = streamSession.getPlayStreamInfo(channelId);
if (oldStreamInfo == null) {
// 发送点播消息
playStreamCmd(device, channelId, msg, hookEvent, errorEvent);
return result;
}
// 若已有人点播,直接播放
String streamId = oldStreamInfo.getStreamId();
String mediaServerIp = oldStreamInfo.getMediaServerIp();
JSONObject rtpInfo = zlmresTfulUtils.getRtpInfo(mediaServerIp, streamId);
if (rtpInfo.getBoolean("exist")) {
RequestMessage msg = new RequestMessage();
msg.setId(DeferredResultHolder.CALLBACK_CMD_PlAY + uuid);
msg.setData(JSON.toJSONString(streamInfo));
msg.setData(JSON.toJSONString(oldStreamInfo));
resultHolder.invokeResult(msg);
if (hookEvent != null) {
hookEvent.response(JSONObject.parseObject(JSON.toJSONString(streamInfo)));
hookEvent.response(null);
}
} else {
redisCatchStorage.stopPlay(streamInfo);
storager.stopPlay(streamInfo.getDeviceID(), streamInfo.getChannelId());
return result;
}
// 若已有人点播,但已超时自动断开,则重新发起点播
storager.stopPlay(oldStreamInfo.getDeviceID(), oldStreamInfo.getChannelId());
streamSession.remove(oldStreamInfo);
playStreamCmd(device, channelId, msg, hookEvent, errorEvent);
return result;
}
private void playStreamCmd(Device device, String channelId, RequestMessage msg, ZLMHttpHookSubscribe.Event hookEvent, SipSubscribe.Event errorEvent) {
cmder.playStreamCmd(device, channelId, (JSONObject response) -> {
logger.info("收到订阅消息: " + response.toJSONString());
onPublishHandlerForPlay(response, deviceId, channelId, uuid.toString());
logger.info("收到点播回调消息: " + response.toJSONString());
this.onPublishHandlerForPlay(response, device.getDeviceId(), channelId, msg);
if (hookEvent != null) {
hookEvent.response(response);
}
}, event -> {
RequestMessage msg = new RequestMessage();
msg.setId(DeferredResultHolder.CALLBACK_CMD_PlAY + uuid);
StreamInfo streamInfo = streamSession.getPlayStreamInfo(channelId);
streamSession.remove(streamInfo);
Response response = event.getResponse();
msg.setData(String.format("点播失败, 错误码: %s, %s", response.getStatusCode(), response.getReasonPhrase()));
resultHolder.invokeResult(msg);
});
int statusCode = response.getStatusCode();
String errMsg;
if (503 == statusCode) {
errMsg = "点播失败,请检查在NVR上是否可以正常打开监控,并检查NVR和SIP是否连通, 错误码: %s, %s";
} else {
errMsg = "点播失败,错误码: %s, %s";
}
msg.setData(String.format(errMsg, statusCode, response.getReasonPhrase()));
resultHolder.invokeResult(msg);
if (errorEvent != null) {
errorEvent.response(event);
}
return playResult;
});
}
@Override

6
src/main/resources/application-dev.yml

@ -8,7 +8,7 @@ spring:
# [可选] 数据库 DB
database: 6
# [可选] 访问密码,若你的redis服务器没有设置密码,就不需要用密码去连接
password:
password: 111111
# [可选] 超时时间
timeout: 10000
poolMaxTotal: 1000
@ -38,7 +38,7 @@ server:
# 作为28181服务器的配置
sip:
# [必须修改] 本机的内网IP, 必须是网卡上的IP
ip: 192.168.0.100
ip: 192.168.1.105
# [可选] 28181服务监听的端口
port: 5060
# 根据国标6.1.2中规定,domain宜采用ID统一编码的前十位编码。国标附录D中定义前8位为中心编码(由省级、市级、区级、基层编号组成,参照GB/T 2260-2007)
@ -61,7 +61,7 @@ auth:
#zlm服务器配置
media:
# [必须修改] zlm服务器的IP(内网公网IP均可),配置多台时IP用逗号隔开
ip: 192.168.0.100
ip: 192.168.1.105
# [可选] zlm服务器的公网IP, 内网部署置空即可
wanIp:
# [可选] zlm服务器的hook所使用的IP, 默认使用sip.ip

2
src/main/resources/application.yml

@ -1,3 +1,3 @@
spring:
profiles:
active: local
active: dev
Loading…
Cancel
Save