panlinlin
4 years ago
28 changed files with 736 additions and 67 deletions
@ -0,0 +1,36 @@ |
|||
package com.genersoft.iot.vmp.gb28181.bean; |
|||
|
|||
public class ParentPlatformCatch { |
|||
|
|||
private String id; |
|||
|
|||
// 心跳未回复次数
|
|||
private int keepAliveReply; |
|||
|
|||
// 注册未回复次数
|
|||
private int registerAliveReply; |
|||
|
|||
public String getId() { |
|||
return id; |
|||
} |
|||
|
|||
public void setId(String id) { |
|||
this.id = id; |
|||
} |
|||
|
|||
public int getKeepAliveReply() { |
|||
return keepAliveReply; |
|||
} |
|||
|
|||
public void setKeepAliveReply(int keepAliveReply) { |
|||
this.keepAliveReply = keepAliveReply; |
|||
} |
|||
|
|||
public int getRegisterAliveReply() { |
|||
return registerAliveReply; |
|||
} |
|||
|
|||
public void setRegisterAliveReply(int registerAliveReply) { |
|||
this.registerAliveReply = registerAliveReply; |
|||
} |
|||
} |
@ -0,0 +1,15 @@ |
|||
package com.genersoft.iot.vmp.gb28181.bean; |
|||
|
|||
public class PlatformRegister { |
|||
|
|||
// 未回复次数
|
|||
private int reply; |
|||
|
|||
public int getReply() { |
|||
return reply; |
|||
} |
|||
|
|||
public void setReply(int reply) { |
|||
this.reply = reply; |
|||
} |
|||
} |
@ -0,0 +1,64 @@ |
|||
package com.genersoft.iot.vmp.gb28181.event.offline; |
|||
|
|||
import org.slf4j.Logger; |
|||
import org.slf4j.LoggerFactory; |
|||
import org.springframework.beans.factory.annotation.Autowired; |
|||
import org.springframework.beans.factory.annotation.Value; |
|||
import org.springframework.data.redis.connection.Message; |
|||
import org.springframework.data.redis.connection.MessageListener; |
|||
import org.springframework.data.redis.listener.KeyExpirationEventMessageListener; |
|||
import org.springframework.data.redis.listener.RedisMessageListenerContainer; |
|||
import org.springframework.stereotype.Component; |
|||
|
|||
import com.genersoft.iot.vmp.common.VideoManagerConstants; |
|||
import com.genersoft.iot.vmp.gb28181.event.EventPublisher; |
|||
|
|||
import java.nio.charset.StandardCharsets; |
|||
|
|||
/** |
|||
* @Description:设备心跳超时监听,借助redis过期特性,进行监听,监听到说明设备心跳超时,发送离线事件 |
|||
* @author: swwheihei |
|||
* @date: 2020年5月6日 上午11:35:46 |
|||
*/ |
|||
@Component |
|||
public class KeepaliveTimeoutListenerForPlatform extends KeyExpirationEventMessageListener { |
|||
|
|||
@Autowired |
|||
private EventPublisher publisher; |
|||
|
|||
public KeepaliveTimeoutListenerForPlatform(RedisMessageListenerContainer listenerContainer) { |
|||
super(listenerContainer); |
|||
} |
|||
|
|||
|
|||
/** |
|||
* 监听失效的key |
|||
* @param message |
|||
* @param bytes |
|||
*/ |
|||
@Override |
|||
public void onMessage(Message message, byte[] pattern) { |
|||
// 获取失效的key
|
|||
String expiredKey = message.toString(); |
|||
System.out.println(expiredKey); |
|||
if(!expiredKey.startsWith(VideoManagerConstants.PLATFORM_PREFIX)){ |
|||
System.out.println("收到redis过期监听,但开头不是"+VideoManagerConstants.PLATFORM_PREFIX+",忽略"); |
|||
return; |
|||
} |
|||
// 平台心跳到期,需要重发, 判断是否已经多次未收到心跳回复, 多次未收到,则重新发起注册, 注册尝试多次未得到回复,则认为平台离线
|
|||
if (expiredKey.startsWith(VideoManagerConstants.PLATFORM_KEEPLIVEKEY_PREFIX)) { |
|||
String platformGBId = expiredKey.substring(VideoManagerConstants.PLATFORM_KEEPLIVEKEY_PREFIX.length(),expiredKey.length()); |
|||
|
|||
publisher.platformKeepaliveExpireEventPublish(platformGBId); |
|||
}else if (expiredKey.startsWith(VideoManagerConstants.PLATFORM_REGISTER_PREFIX)) { |
|||
System.out.println("11111111111111"); |
|||
String platformGBId = expiredKey.substring(VideoManagerConstants.PLATFORM_REGISTER_PREFIX.length(),expiredKey.length()); |
|||
|
|||
publisher.platformNotRegisterEventPublish(platformGBId); |
|||
}else{ |
|||
String deviceId = expiredKey.substring(VideoManagerConstants.KEEPLIVEKEY_PREFIX.length(),expiredKey.length()); |
|||
publisher.outlineEventPublish(deviceId, VideoManagerConstants.EVENT_OUTLINE_TIMEOUT); |
|||
} |
|||
|
|||
} |
|||
} |
@ -0,0 +1,23 @@ |
|||
package com.genersoft.iot.vmp.gb28181.event.platformKeepaliveExpire; |
|||
|
|||
import org.springframework.context.ApplicationEvent; |
|||
|
|||
/** |
|||
* 平台心跳超时事件 |
|||
*/ |
|||
public class PlatformKeepaliveExpireEvent extends ApplicationEvent { |
|||
|
|||
private String platformGbID; |
|||
|
|||
public PlatformKeepaliveExpireEvent(Object source) { |
|||
super(source); |
|||
} |
|||
|
|||
public String getPlatformGbID() { |
|||
return platformGbID; |
|||
} |
|||
|
|||
public void setPlatformGbID(String platformGbID) { |
|||
this.platformGbID = platformGbID; |
|||
} |
|||
} |
@ -0,0 +1,85 @@ |
|||
package com.genersoft.iot.vmp.gb28181.event.platformKeepaliveExpire; |
|||
|
|||
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; |
|||
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatformCatch; |
|||
import com.genersoft.iot.vmp.gb28181.bean.PlatformRegister; |
|||
import com.genersoft.iot.vmp.gb28181.event.EventPublisher; |
|||
import com.genersoft.iot.vmp.gb28181.event.SipSubscribe; |
|||
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; |
|||
import com.genersoft.iot.vmp.storager.IRedisCatchStorage; |
|||
import com.genersoft.iot.vmp.storager.IVideoManagerStorager; |
|||
import org.jetbrains.annotations.NotNull; |
|||
import org.slf4j.Logger; |
|||
import org.slf4j.LoggerFactory; |
|||
import org.springframework.beans.factory.annotation.Autowired; |
|||
import org.springframework.context.ApplicationListener; |
|||
import org.springframework.stereotype.Component; |
|||
|
|||
import javax.sip.ResponseEvent; |
|||
import javax.sip.message.Response; |
|||
|
|||
/** |
|||
* @Description: 平台心跳超时事件 |
|||
* @author: panll |
|||
* @date: 2020年11月5日 10:00 |
|||
*/ |
|||
@Component |
|||
public class PlatformKeepaliveExpireEventLister implements ApplicationListener<PlatformKeepaliveExpireEvent> { |
|||
|
|||
|
|||
private final static Logger logger = LoggerFactory.getLogger(PlatformKeepaliveExpireEventLister.class); |
|||
|
|||
@Autowired |
|||
private IVideoManagerStorager storager; |
|||
|
|||
@Autowired |
|||
private IRedisCatchStorage redisCatchStorage; |
|||
|
|||
@Autowired |
|||
private ISIPCommanderForPlatform sipCommanderForPlatform; |
|||
|
|||
@Autowired |
|||
private SipSubscribe sipSubscribe; |
|||
|
|||
@Autowired |
|||
private EventPublisher publisher; |
|||
|
|||
@Override |
|||
public void onApplicationEvent(@NotNull PlatformKeepaliveExpireEvent event) { |
|||
|
|||
if (logger.isDebugEnabled()) { |
|||
logger.debug("平台心跳到期事件事件触发,平台国标ID:" + event.getPlatformGbID()); |
|||
} |
|||
ParentPlatform parentPlatform = storager.queryParentPlatById(event.getPlatformGbID()); |
|||
ParentPlatformCatch parentPlatformCatch = redisCatchStorage.queryPlatformCatchInfo(event.getPlatformGbID()); |
|||
if (parentPlatform == null) { |
|||
logger.debug("平台心跳到期事件事件触发,但平台已经删除!!! 平台国标ID:" + event.getPlatformGbID()); |
|||
return; |
|||
} |
|||
if (parentPlatformCatch == null) { |
|||
return; |
|||
} |
|||
// 发送心跳
|
|||
if (parentPlatformCatch.getKeepAliveReply() >= 3) { |
|||
// 有3次未收到心跳回复, 设置平台状态为离线, 开始重新注册
|
|||
logger.warn("有3次未收到心跳回复,标记设置平台状态为离线, 并重新注册 平台国标ID:" + event.getPlatformGbID()); |
|||
publisher.platformNotRegisterEventPublish(event.getPlatformGbID()); |
|||
}else { |
|||
// 再次发送心跳
|
|||
String callId = sipCommanderForPlatform.keepalive(parentPlatform); |
|||
|
|||
parentPlatformCatch.setKeepAliveReply( parentPlatformCatch.getKeepAliveReply() + 1); |
|||
// 存储心跳信息, 并设置状态为未回复, 如果多次过期仍未收到回复,则认为上级平台已经离线
|
|||
redisCatchStorage.updatePlatformKeepalive(parentPlatform); |
|||
redisCatchStorage.updatePlatformCatchInfo(parentPlatformCatch); |
|||
|
|||
sipSubscribe.addOkSubscribe(callId, (ResponseEvent responseEvent) ->{ |
|||
if (responseEvent.getResponse().getStatusCode() == Response.OK) { |
|||
// 收到心跳响应信息,
|
|||
parentPlatformCatch.setKeepAliveReply(0); |
|||
redisCatchStorage.updatePlatformCatchInfo(parentPlatformCatch); |
|||
} |
|||
} ); |
|||
} |
|||
} |
|||
} |
@ -0,0 +1,150 @@ |
|||
package com.genersoft.iot.vmp.gb28181.transmit.cmd; |
|||
|
|||
import com.genersoft.iot.vmp.conf.SipConfig; |
|||
import com.genersoft.iot.vmp.gb28181.bean.Device; |
|||
import com.genersoft.iot.vmp.gb28181.bean.Host; |
|||
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; |
|||
import org.springframework.beans.factory.annotation.Autowired; |
|||
import org.springframework.beans.factory.annotation.Qualifier; |
|||
import org.springframework.stereotype.Component; |
|||
import org.springframework.util.DigestUtils; |
|||
|
|||
import javax.sip.InvalidArgumentException; |
|||
import javax.sip.PeerUnavailableException; |
|||
import javax.sip.SipFactory; |
|||
import javax.sip.SipProvider; |
|||
import javax.sip.address.Address; |
|||
import javax.sip.address.SipURI; |
|||
import javax.sip.header.*; |
|||
import javax.sip.message.Request; |
|||
import javax.validation.constraints.NotNull; |
|||
import java.text.ParseException; |
|||
import java.util.ArrayList; |
|||
|
|||
/** |
|||
* @Description:摄像头命令request创造器 TODO 冗余代码太多待优化 |
|||
* @author: swwheihei |
|||
* @date: 2020年5月6日 上午9:29:02 |
|||
*/ |
|||
@Component |
|||
public class SIPRequestHeaderPlarformProvider { |
|||
|
|||
@Autowired |
|||
private SipConfig sipConfig; |
|||
|
|||
@Autowired |
|||
private SipFactory sipFactory; |
|||
|
|||
@Autowired |
|||
@Qualifier(value="tcpSipProvider") |
|||
private SipProvider tcpSipProvider; |
|||
|
|||
@Autowired |
|||
@Qualifier(value="udpSipProvider") |
|||
private SipProvider udpSipProvider; |
|||
|
|||
|
|||
public Request createKeetpaliveMessageRequest(ParentPlatform parentPlatform, String content, String viaTag, String fromTag, String toTag) throws ParseException, InvalidArgumentException, PeerUnavailableException { |
|||
Request request = null; |
|||
// sipuri
|
|||
SipURI requestURI = sipFactory.createAddressFactory().createSipURI(parentPlatform.getServerGBId(), parentPlatform.getServerIP() + ":" + parentPlatform.getServerPort()); |
|||
// via
|
|||
ArrayList<ViaHeader> viaHeaders = new ArrayList<ViaHeader>(); |
|||
ViaHeader viaHeader = sipFactory.createHeaderFactory().createViaHeader(sipConfig.getSipIp(), sipConfig.getSipPort(), |
|||
parentPlatform.getTransport(), viaTag); |
|||
viaHeader.setRPort(); |
|||
viaHeaders.add(viaHeader); |
|||
// from
|
|||
SipURI fromSipURI = sipFactory.createAddressFactory().createSipURI(parentPlatform.getDeviceGBId(), |
|||
sipConfig.getSipIp() + ":" + sipConfig.getSipPort()); |
|||
Address fromAddress = sipFactory.createAddressFactory().createAddress(fromSipURI); |
|||
FromHeader fromHeader = sipFactory.createHeaderFactory().createFromHeader(fromAddress, fromTag); |
|||
// to
|
|||
SipURI toSipURI = sipFactory.createAddressFactory().createSipURI(parentPlatform.getServerGBId(), parentPlatform.getServerIP() + ":" + parentPlatform.getServerPort() ); |
|||
Address toAddress = sipFactory.createAddressFactory().createAddress(toSipURI); |
|||
ToHeader toHeader = sipFactory.createHeaderFactory().createToHeader(toAddress, toTag); |
|||
// callid
|
|||
CallIdHeader callIdHeader = parentPlatform.getTransport().equals("TCP") ? tcpSipProvider.getNewCallId() |
|||
: udpSipProvider.getNewCallId(); |
|||
// Forwards
|
|||
MaxForwardsHeader maxForwards = sipFactory.createHeaderFactory().createMaxForwardsHeader(70); |
|||
// ceq
|
|||
CSeqHeader cSeqHeader = sipFactory.createHeaderFactory().createCSeqHeader(1L, Request.MESSAGE); |
|||
|
|||
request = sipFactory.createMessageFactory().createRequest(requestURI, Request.MESSAGE, callIdHeader, cSeqHeader, fromHeader, |
|||
toHeader, viaHeaders, maxForwards); |
|||
ContentTypeHeader contentTypeHeader = sipFactory.createHeaderFactory().createContentTypeHeader("Application", "MANSCDP+xml"); |
|||
request.setContent(content, contentTypeHeader); |
|||
return request; |
|||
} |
|||
|
|||
|
|||
public Request createRegisterRequest(@NotNull ParentPlatform platform, String fromTag, String viaTag) throws ParseException, InvalidArgumentException, PeerUnavailableException { |
|||
Request request = null; |
|||
String sipAddress = sipConfig.getSipIp() + ":" + sipConfig.getSipPort(); |
|||
//请求行
|
|||
SipURI requestLine = sipFactory.createAddressFactory().createSipURI(platform.getDeviceGBId(), |
|||
platform.getServerIP() + ":" + platform.getServerPort()); |
|||
//via
|
|||
ArrayList<ViaHeader> viaHeaders = new ArrayList<ViaHeader>(); |
|||
ViaHeader viaHeader = sipFactory.createHeaderFactory().createViaHeader(platform.getServerIP(), platform.getServerPort(), platform.getTransport(), viaTag); |
|||
viaHeader.setRPort(); |
|||
viaHeaders.add(viaHeader); |
|||
//from
|
|||
SipURI fromSipURI = sipFactory.createAddressFactory().createSipURI(platform.getDeviceGBId(),sipAddress); |
|||
Address fromAddress = sipFactory.createAddressFactory().createAddress(fromSipURI); |
|||
FromHeader fromHeader = sipFactory.createHeaderFactory().createFromHeader(fromAddress, fromTag); |
|||
//to
|
|||
SipURI toSipURI = sipFactory.createAddressFactory().createSipURI(platform.getDeviceGBId(),sipAddress); |
|||
Address toAddress = sipFactory.createAddressFactory().createAddress(toSipURI); |
|||
ToHeader toHeader = sipFactory.createHeaderFactory().createToHeader(toAddress,null); |
|||
|
|||
//callid
|
|||
CallIdHeader callIdHeader = null; |
|||
if(platform.getTransport().equals("TCP")) { |
|||
callIdHeader = tcpSipProvider.getNewCallId(); |
|||
} |
|||
if(platform.getTransport().equals("UDP")) { |
|||
callIdHeader = udpSipProvider.getNewCallId(); |
|||
} |
|||
|
|||
//Forwards
|
|||
MaxForwardsHeader maxForwards = sipFactory.createHeaderFactory().createMaxForwardsHeader(70); |
|||
|
|||
//ceq
|
|||
CSeqHeader cSeqHeader = sipFactory.createHeaderFactory().createCSeqHeader(1L, Request.REGISTER); |
|||
request = sipFactory.createMessageFactory().createRequest(requestLine, Request.REGISTER, callIdHeader, |
|||
cSeqHeader,fromHeader, toHeader, viaHeaders, maxForwards); |
|||
|
|||
Address concatAddress = sipFactory.createAddressFactory().createAddress(sipFactory.createAddressFactory() |
|||
.createSipURI(platform.getDeviceGBId(), sipAddress)); |
|||
request.addHeader(sipFactory.createHeaderFactory().createContactHeader(concatAddress)); |
|||
|
|||
return request; |
|||
} |
|||
|
|||
public Request createRegisterRequest(@NotNull ParentPlatform parentPlatform, String fromTag, String viaTag, |
|||
String callId, String realm, String nonce, String scheme) throws ParseException, PeerUnavailableException, InvalidArgumentException { |
|||
Request registerRequest = createRegisterRequest(parentPlatform, fromTag, viaTag); |
|||
|
|||
CallIdHeader callIdHeader = (CallIdHeader)registerRequest.getHeader(CallIdHeader.NAME); |
|||
callIdHeader.setCallId(callId); |
|||
|
|||
String uri = "sip:" + parentPlatform.getServerGBId() + |
|||
"@" + parentPlatform.getServerIP() + |
|||
":" + parentPlatform.getServerPort(); |
|||
|
|||
String HA1 = DigestUtils.md5DigestAsHex((parentPlatform.getDeviceGBId() + ":" + realm + ":" + parentPlatform.getPassword()).getBytes()); |
|||
String HA2=DigestUtils.md5DigestAsHex((Request.REGISTER + ":" + uri).getBytes()); |
|||
String RESPONSE = DigestUtils.md5DigestAsHex((HA1 + ":" + nonce + ":" + HA2).getBytes()); |
|||
|
|||
String authorizationHeaderContent = scheme + " username=\"" + parentPlatform.getDeviceGBId() + "\", " + "realm=\"" |
|||
+ realm + "\", uri=\"" + uri + "\", response=\"" + RESPONSE + "\", nonce=\"" |
|||
+ nonce + "\""; |
|||
AuthorizationHeader authorizationHeader = sipFactory.createHeaderFactory().createAuthorizationHeader(authorizationHeaderContent); |
|||
registerRequest.addHeader(authorizationHeader); |
|||
|
|||
return registerRequest; |
|||
} |
|||
|
|||
} |
@ -0,0 +1,54 @@ |
|||
package com.genersoft.iot.vmp.storager.dao; |
|||
|
|||
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; |
|||
import org.apache.ibatis.annotations.*; |
|||
import org.springframework.stereotype.Repository; |
|||
|
|||
import java.util.List; |
|||
|
|||
/** |
|||
* 用于存储上级平台 |
|||
*/ |
|||
@Mapper |
|||
@Repository |
|||
public interface ParentPlatformMapper { |
|||
|
|||
@Insert("INSERT INTO parent_platform (enable, name, serverGBId, serverGBDomain, serverIP, serverPort, deviceGBId, deviceIp, " + |
|||
" devicePort, username, password, expires, keepTimeout, transport, characterSet, PTZEnable, rtcp, " + |
|||
" status) " + |
|||
" VALUES (${enable}, '${name}', '${serverGBId}', '${serverGBDomain}', '${serverIP}', ${serverPort}, '${deviceGBId}', '${deviceIp}', " + |
|||
" '${devicePort}', '${username}', '${password}', '${expires}', '${keepTimeout}', '${transport}', '${characterSet}', ${PTZEnable}, ${rtcp}, " + |
|||
" ${status})") |
|||
int addParentPlatform(ParentPlatform parentPlatform); |
|||
|
|||
@Update("UPDATE parent_platform " + |
|||
"SET enable=#{enable}, " + |
|||
"name=#{name}," + |
|||
"serverGBId=#{serverGBId}," + |
|||
"serverGBDomain=#{serverGBDomain}, " + |
|||
"serverIP=#{serverIP}," + |
|||
"serverPort=#{serverPort}, " + |
|||
"deviceIp=#{deviceIp}, " + |
|||
"devicePort=#{devicePort}, " + |
|||
"username=#{username}, " + |
|||
"password=#{password}, " + |
|||
"expires=#{expires}, " + |
|||
"keepTimeout=#{keepTimeout}, " + |
|||
"transport=#{transport}, " + |
|||
"characterSet=#{characterSet}, " + |
|||
"PTZEnable=#{PTZEnable}, " + |
|||
"rtcp=#{rtcp}, " + |
|||
"status=#{status} " + |
|||
"WHERE deviceGBId=#{deviceGBId}") |
|||
int updateParentPlatform(ParentPlatform parentPlatform); |
|||
|
|||
@Delete("DELETE FROM parent_platform WHERE deviceGBId=#{deviceGBId}") |
|||
int delParentPlatform(ParentPlatform parentPlatform); |
|||
|
|||
|
|||
@Select("SELECT * FROM parent_platform") |
|||
List<ParentPlatform> getParentPlatformList(); |
|||
|
|||
@Select("SELECT * FROM parent_platform WHERE deviceGBId=#{platformGbId}") |
|||
ParentPlatform getParentPlatById(String platformGbId); |
|||
} |
Binary file not shown.
Loading…
Reference in new issue