@ -1,18 +1,29 @@
package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl ;
import com.alibaba.fastjson.JSON ;
import com.alibaba.fastjson.JSONObject ;
import com.genersoft.iot.vmp.common.StreamInfo ;
import com.genersoft.iot.vmp.gb28181.bean.* ;
import com.genersoft.iot.vmp.gb28181.event.SipSubscribe ;
import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver ;
import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage ;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander ;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander ;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform ;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor ;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent ;
import com.genersoft.iot.vmp.gb28181.utils.SipUtils ;
import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe ;
import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory ;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem ;
import com.genersoft.iot.vmp.service.IMediaServerService ;
import com.genersoft.iot.vmp.service.IPlayService ;
import com.genersoft.iot.vmp.service.bean.SSRCInfo ;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage ;
import com.genersoft.iot.vmp.storager.IVideoManagerStorager ;
import com.genersoft.iot.vmp.vmanager.gb28181.play.bean.PlayResult ;
import gov.nist.javax.sdp.TimeDescriptionImpl ;
import gov.nist.javax.sdp.fields.TimeField ;
import gov.nist.javax.sip.address.AddressImpl ;
import gov.nist.javax.sip.address.SipUri ;
import org.slf4j.Logger ;
@ -27,10 +38,13 @@ import javax.sip.RequestEvent;
import javax.sip.ServerTransaction ;
import javax.sip.SipException ;
import javax.sip.address.SipURI ;
import javax.sip.header.CallIdHeader ;
import javax.sip.header.FromHeader ;
import javax.sip.message.Request ;
import javax.sip.message.Response ;
import java.text.ParseException ;
import java.text.SimpleDateFormat ;
import java.util.Date ;
import java.util.List ;
import java.util.Vector ;
@ -60,6 +74,9 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
@Autowired
private IPlayService playService ;
@Autowired
private ISIPCommander commander ;
@Autowired
private ZLMRTPServerFactory zlmrtpServerFactory ;
@ -69,6 +86,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
@Autowired
private SIPProcessorObserver sipProcessorObserver ;
@Override
public void afterPropertiesSet ( ) throws Exception {
// 添加消息处理的订阅
@ -88,22 +106,19 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
Request request = evt . getRequest ( ) ;
SipURI sipURI = ( SipURI ) request . getRequestURI ( ) ;
String channelId = sipURI . getUser ( ) ;
String requesterId = null ;
FromHeader fromHeader = ( FromHeader ) request . getHeader ( FromHeader . NAME ) ;
AddressImpl address = ( AddressImpl ) fromHeader . getAddress ( ) ;
SipUri uri = ( SipUri ) address . getURI ( ) ;
requesterId = uri . getUser ( ) ;
String requesterId = SipUtils . getUserIdFromFromHeader ( request ) ;
CallIdHeader callIdHeader = ( CallIdHeader ) request . getHeader ( CallIdHeader . NAME ) ;
if ( requesterId = = null | | channelId = = null ) {
logger . info ( "无法从FromHeader的Address中获取到平台id,返回400" ) ;
responseAck ( evt , Response . BAD_REQUEST ) ; // 参数不全, 发400,请求错误
return ;
}
// 查询请求方 是否上级平台
// 查询请求是否来自 上级平台\设备
ParentPlatform platform = storager . queryParentPlatByServerGBId ( requesterId ) ;
if ( platform ! = null ) {
if ( platform = = null ) {
inviteFromDeviceHandle ( evt , requesterId ) ;
} else {
// 查询平台下是否有该通道
DeviceChannel channel = storager . queryChannelInParentPlatform ( requesterId , channelId ) ;
GbStream gbStream = storager . queryStreamInParentPlatform ( requesterId , channelId ) ;
@ -122,7 +137,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
mediaServerItem = mediaServerService . getOne ( mediaServerId ) ;
if ( mediaServerItem = = null ) {
logger . info ( "[ app={}, stream={} ]找不到zlm {},返回410" , gbStream . getApp ( ) , gbStream . getStream ( ) , mediaServerId ) ;
responseAck ( evt , Response . GONE , "media server not found" ) ;
responseAck ( evt , Response . GONE ) ;
return ;
}
Boolean streamReady = zlmrtpServerFactory . isStreamReady ( mediaServerItem , gbStream . getApp ( ) , gbStream . getStream ( ) ) ;
@ -158,13 +173,26 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
ssrc = ssrcDefault ;
sdp = SdpFactory . getInstance ( ) . createSessionDescription ( contentString ) ;
}
String sessionName = sdp . getSessionName ( ) . getValue ( ) ;
Long startTime = null ;
Long stopTime = null ;
Date start = null ;
Date end = null ;
if ( sdp . getTimeDescriptions ( false ) ! = null & & sdp . getTimeDescriptions ( false ) . size ( ) > 0 ) {
TimeDescriptionImpl timeDescription = ( TimeDescriptionImpl ) ( sdp . getTimeDescriptions ( false ) . get ( 0 ) ) ;
TimeField startTimeFiled = ( TimeField ) timeDescription . getTime ( ) ;
startTime = startTimeFiled . getStartTime ( ) ;
stopTime = startTimeFiled . getStopTime ( ) ;
start = new Date ( startTime * 1000 ) ;
end = new Date ( stopTime * 1000 ) ;
}
// 获取支持的格式
Vector mediaDescriptions = sdp . getMediaDescriptions ( true ) ;
// 查看是否支持PS 负载96
//String ip = null;
int port = - 1 ;
//boolean recvonly = false;
boolean mediaTransmissionTCP = false ;
Boolean tcpActive = null ;
for ( Object description : mediaDescriptions ) {
@ -200,7 +228,6 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
}
String username = sdp . getOrigin ( ) . getUsername ( ) ;
String addressStr = sdp . getOrigin ( ) . getAddress ( ) ;
//String sessionName = sdp.getSessionName().getValue();
logger . info ( "[上级点播]用户:{}, 地址:{}:{}, ssrc:{}" , username , addressStr , port , ssrc ) ;
Device device = null ;
// 通过 channel 和 gbStream 是否为null 值判断来源是直播流合适国标
@ -228,23 +255,33 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
responseAck ( evt , Response . BUSY_HERE ) ;
return ;
}
sendRtpItem . setCallId ( callIdHeader . getCallId ( ) ) ;
sendRtpItem . setPlay ( "Play" . equals ( sessionName ) ) ;
// 写入redis, 超时时回复
redisCatchStorage . updateSendRTPSever ( sendRtpItem ) ;
// 通知下级推流,
PlayResult playResult = playService . play ( mediaServerItem , device . getDeviceId ( ) , channelId , ( mediaServerItemInUSe , responseJSON ) - > {
// 收到推流, 回复200OK, 等待ack
// if (sendRtpItem == null) return;
Device finalDevice = device ;
MediaServerItem finalMediaServerItem = mediaServerItem ;
Long finalStartTime = startTime ;
Long finalStopTime = stopTime ;
ZLMHttpHookSubscribe . Event hookEvent = ( mediaServerItemInUSe , responseJSON ) - > {
logger . info ( "[上级点播]下级已经开始推流。 回复200OK(SDP), {}/{}" , sendRtpItem . getApp ( ) , sendRtpItem . getStreamId ( ) ) ;
// * 0 等待设备推流上来
// * 1 下级已经推流,等待上级平台回复ack
// * 2 推流中
sendRtpItem . setStatus ( 1 ) ;
redisCatchStorage . updateSendRTPSever ( sendRtpItem ) ;
// TODO 添加对tcp的支持
StringBuffer content = new StringBuffer ( 200 ) ;
content . append ( "v=0\r\n" ) ;
content . append ( "o=" + channelId + " 0 0 IN IP4 " + mediaServerItemInUSe . getSdpIp ( ) + "\r\n" ) ;
content . append ( "s=Play \r\n" ) ;
content . append ( "s=" + sessionName + " \r\n" ) ;
content . append ( "c=IN IP4 " + mediaServerItemInUSe . getSdpIp ( ) + "\r\n" ) ;
content . append ( "t=0 0\r\n" ) ;
if ( "Playback" . equals ( sessionName ) ) {
content . append ( "t=" + finalStartTime + " " + finalStopTime + "\r\n" ) ;
} else {
content . append ( "t=0 0\r\n" ) ;
}
content . append ( "m=video " + sendRtpItem . getLocalPort ( ) + " RTP/AVP 96\r\n" ) ;
content . append ( "a=sendonly\r\n" ) ;
content . append ( "a=rtpmap:96 PS/90000\r\n" ) ;
@ -260,7 +297,8 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
} catch ( ParseException e ) {
e . printStackTrace ( ) ;
}
} , ( ( event ) - > {
} ;
SipSubscribe . Event errorEvent = ( ( event ) - > {
// 未知错误。直接转发设备点播的错误
Response response = null ;
try {
@ -271,11 +309,46 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
} catch ( ParseException | SipException | InvalidArgumentException e ) {
e . printStackTrace ( ) ;
}
} ) ) ;
if ( logger . isDebugEnabled ( ) ) {
logger . debug ( playResult . getResult ( ) . toString ( ) ) ;
} ) ;
if ( "Playback" . equals ( sessionName ) ) {
sendRtpItem . setPlay ( false ) ;
sendRtpItem . setStreamId ( ssrc ) ;
SimpleDateFormat format = new SimpleDateFormat ( "yyyy-MM-dd HH:mm:ss" ) ;
playService . playBack ( device . getDeviceId ( ) , channelId , format . format ( start ) , format . format ( end ) , result - > {
if ( result . getCode ( ) ! = 0 ) {
logger . warn ( "录像回放失败" ) ;
if ( result . getEvent ( ) ! = null ) {
errorEvent . response ( result . getEvent ( ) ) ;
}
try {
responseAck ( evt , Response . REQUEST_TIMEOUT ) ;
} catch ( SipException e ) {
e . printStackTrace ( ) ;
} catch ( InvalidArgumentException e ) {
e . printStackTrace ( ) ;
} catch ( ParseException e ) {
e . printStackTrace ( ) ;
}
} else {
if ( result . getMediaServerItem ( ) ! = null ) {
hookEvent . response ( result . getMediaServerItem ( ) , result . getResponse ( ) ) ;
}
}
} ) ;
} else {
sendRtpItem . setPlay ( true ) ;
StreamInfo streamInfo = redisCatchStorage . queryPlayByDevice ( device . getDeviceId ( ) , channelId ) ;
if ( streamInfo = = null ) {
if ( mediaServerItem . isRtpEnable ( ) ) {
sendRtpItem . setStreamId ( String . format ( "%s_%s" , device . getDeviceId ( ) , channelId ) ) ;
}
sendRtpItem . setPlay ( false ) ;
playService . play ( mediaServerItem , device . getDeviceId ( ) , channelId , hookEvent , errorEvent ) ;
} else {
sendRtpItem . setStreamId ( streamInfo . getStream ( ) ) ;
hookEvent . response ( mediaServerItem , null ) ;
}
}
} else if ( gbStream ! = null ) {
SendRtpItem sendRtpItem = zlmrtpServerFactory . createSendRtpItem ( mediaServerItem , addressStr , port , ssrc , requesterId ,
gbStream . getApp ( ) , gbStream . getStream ( ) , channelId ,
@ -295,7 +368,6 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
sendRtpItem . setStatus ( 1 ) ;
redisCatchStorage . updateSendRTPSever ( sendRtpItem ) ;
// TODO 添加对tcp的支持
StringBuffer content = new StringBuffer ( 200 ) ;
content . append ( "v=0\r\n" ) ;
content . append ( "o=" + channelId + " 0 0 IN IP4 " + mediaServerItem . getSdpIp ( ) + "\r\n" ) ;
@ -319,72 +391,6 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
}
}
} else {
// 非上级平台请求,查询是否设备请求(通常为接收语音广播的设备)
Device device = redisCatchStorage . getDevice ( requesterId ) ;
if ( device ! = null ) {
logger . info ( "收到设备" + requesterId + "的语音广播Invite请求" ) ;
responseAck ( evt , Response . TRYING ) ;
String contentString = new String ( request . getRawContent ( ) ) ;
// jainSip不支持y=字段, 移除移除以解析。
String substring = contentString ;
String ssrc = "0000000404" ;
int ssrcIndex = contentString . indexOf ( "y=" ) ;
if ( ssrcIndex > 0 ) {
substring = contentString . substring ( 0 , ssrcIndex ) ;
ssrc = contentString . substring ( ssrcIndex + 2 , ssrcIndex + 12 ) ;
}
ssrcIndex = substring . indexOf ( "f=" ) ;
if ( ssrcIndex > 0 ) {
substring = contentString . substring ( 0 , ssrcIndex ) ;
}
SessionDescription sdp = SdpFactory . getInstance ( ) . createSessionDescription ( substring ) ;
// 获取支持的格式
Vector mediaDescriptions = sdp . getMediaDescriptions ( true ) ;
// 查看是否支持PS 负载96
int port = - 1 ;
//boolean recvonly = false;
boolean mediaTransmissionTCP = false ;
Boolean tcpActive = null ;
for ( int i = 0 ; i < mediaDescriptions . size ( ) ; i + + ) {
MediaDescription mediaDescription = ( MediaDescription ) mediaDescriptions . get ( i ) ;
Media media = mediaDescription . getMedia ( ) ;
Vector mediaFormats = media . getMediaFormats ( false ) ;
if ( mediaFormats . contains ( "8" ) ) {
port = media . getMediaPort ( ) ;
String protocol = media . getProtocol ( ) ;
// 区分TCP发流还是udp, 当前默认udp
if ( "TCP/RTP/AVP" . equals ( protocol ) ) {
String setup = mediaDescription . getAttribute ( "setup" ) ;
if ( setup ! = null ) {
mediaTransmissionTCP = true ;
if ( "active" . equals ( setup ) ) {
tcpActive = true ;
} else if ( "passive" . equals ( setup ) ) {
tcpActive = false ;
}
}
}
break ;
}
}
if ( port = = - 1 ) {
logger . info ( "不支持的媒体格式,返回415" ) ;
// 回复不支持的格式
responseAck ( evt , Response . UNSUPPORTED_MEDIA_TYPE ) ; // 不支持的格式,发415
return ;
}
String username = sdp . getOrigin ( ) . getUsername ( ) ;
String addressStr = sdp . getOrigin ( ) . getAddress ( ) ;
logger . info ( "设备{}请求语音流,地址:{}:{},ssrc:{}" , username , addressStr , port , ssrc ) ;
} else {
logger . warn ( "来自无效设备/平台的请求" ) ;
responseAck ( evt , Response . BAD_REQUEST ) ;
}
}
} catch ( SipException | InvalidArgumentException | ParseException e ) {
@ -397,4 +403,74 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
e . printStackTrace ( ) ;
}
}
public void inviteFromDeviceHandle ( RequestEvent evt , String requesterId ) throws InvalidArgumentException , ParseException , SipException , SdpException {
// 非上级平台请求,查询是否设备请求(通常为接收语音广播的设备)
Device device = redisCatchStorage . getDevice ( requesterId ) ;
Request request = evt . getRequest ( ) ;
if ( device ! = null ) {
logger . info ( "收到设备" + requesterId + "的语音广播Invite请求" ) ;
responseAck ( evt , Response . TRYING ) ;
String contentString = new String ( request . getRawContent ( ) ) ;
// jainSip不支持y=字段, 移除移除以解析。
String substring = contentString ;
String ssrc = "0000000404" ;
int ssrcIndex = contentString . indexOf ( "y=" ) ;
if ( ssrcIndex > 0 ) {
substring = contentString . substring ( 0 , ssrcIndex ) ;
ssrc = contentString . substring ( ssrcIndex + 2 , ssrcIndex + 12 ) ;
}
ssrcIndex = substring . indexOf ( "f=" ) ;
if ( ssrcIndex > 0 ) {
substring = contentString . substring ( 0 , ssrcIndex ) ;
}
SessionDescription sdp = SdpFactory . getInstance ( ) . createSessionDescription ( substring ) ;
// 获取支持的格式
Vector mediaDescriptions = sdp . getMediaDescriptions ( true ) ;
// 查看是否支持PS 负载96
int port = - 1 ;
//boolean recvonly = false;
boolean mediaTransmissionTCP = false ;
Boolean tcpActive = null ;
for ( int i = 0 ; i < mediaDescriptions . size ( ) ; i + + ) {
MediaDescription mediaDescription = ( MediaDescription ) mediaDescriptions . get ( i ) ;
Media media = mediaDescription . getMedia ( ) ;
Vector mediaFormats = media . getMediaFormats ( false ) ;
if ( mediaFormats . contains ( "8" ) ) {
port = media . getMediaPort ( ) ;
String protocol = media . getProtocol ( ) ;
// 区分TCP发流还是udp, 当前默认udp
if ( "TCP/RTP/AVP" . equals ( protocol ) ) {
String setup = mediaDescription . getAttribute ( "setup" ) ;
if ( setup ! = null ) {
mediaTransmissionTCP = true ;
if ( "active" . equals ( setup ) ) {
tcpActive = true ;
} else if ( "passive" . equals ( setup ) ) {
tcpActive = false ;
}
}
}
break ;
}
}
if ( port = = - 1 ) {
logger . info ( "不支持的媒体格式,返回415" ) ;
// 回复不支持的格式
responseAck ( evt , Response . UNSUPPORTED_MEDIA_TYPE ) ; // 不支持的格式,发415
return ;
}
String username = sdp . getOrigin ( ) . getUsername ( ) ;
String addressStr = sdp . getOrigin ( ) . getAddress ( ) ;
logger . info ( "设备{}请求语音流,地址:{}:{},ssrc:{}" , username , addressStr , port , ssrc ) ;
} else {
logger . warn ( "来自无效设备/平台的请求" ) ;
responseAck ( evt , Response . BAD_REQUEST ) ;
}
}
}