diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/platformKeepaliveExpire/PlatformKeepaliveExpireEventLister.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/platformKeepaliveExpire/PlatformKeepaliveExpireEventLister.java index c02b7e19..a52a9256 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/platformKeepaliveExpire/PlatformKeepaliveExpireEventLister.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/event/platformKeepaliveExpire/PlatformKeepaliveExpireEventLister.java @@ -64,6 +64,7 @@ public class PlatformKeepaliveExpireEventLister implements ApplicationListener

\r\n"); String tm = Long.toString(System.currentTimeMillis()); - Request request = headerProvider.createMessageRequest(device, catalogXml.toString(), "ViaDeviceInfoBranch", "FromDev" + tm, null); + Request request = headerProvider.createMessageRequest(device, catalogXml.toString(), "z9hG4bK-ViaDeviceInfo" + tm, "FromDev" + tm, null); transmitRequest(device, request); @@ -923,7 +923,7 @@ public class SIPCommander implements ISIPCommander { catalogXml.append("\r\n"); String tm = Long.toString(System.currentTimeMillis()); - Request request = headerProvider.createMessageRequest(device, catalogXml.toString(), "ViaCatalogBranch", "FromCat" + tm, null); + Request request = headerProvider.createMessageRequest(device, catalogXml.toString(), "z9hG4bK-ViaCatalog" + tm, "FromCat" + tm, null); transmitRequest(device, request, errorEvent); } catch (SipException | ParseException | InvalidArgumentException e) { diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java index ae9dfa73..28761ed2 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java @@ -118,18 +118,18 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform { try { StringBuffer keepaliveXml = new StringBuffer(200); - keepaliveXml.append("\r\n"); + keepaliveXml.append("\r\n");//" encoding=\"GB2312\"?>\r\n"); keepaliveXml.append("\r\n"); keepaliveXml.append("Keepalive\r\n"); keepaliveXml.append("" + (int)((Math.random()*9+1)*100000) + "\r\n"); - keepaliveXml.append("" + parentPlatform.getServerGBId() + "\r\n"); + keepaliveXml.append("" + parentPlatform.getDeviceGBId() + "\r\n"); keepaliveXml.append("OK\r\n"); keepaliveXml.append("\r\n"); Request request = headerProviderPlarformProvider.createKeetpaliveMessageRequest( parentPlatform, keepaliveXml.toString(), - UUID.randomUUID().toString().replace("-", ""), + "z9hG4bK-" + UUID.randomUUID().toString().replace("-", ""), UUID.randomUUID().toString().replace("-", ""), null); transmitRequest(parentPlatform, request); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/AckRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/AckRequestProcessor.java index 7f58de5e..ad7b0708 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/AckRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/AckRequestProcessor.java @@ -22,13 +22,10 @@ import org.springframework.stereotype.Component; @Component public class AckRequestProcessor extends SIPRequestAbstractProcessor { - //@Autowired private IRedisCatchStorage redisCatchStorage; - //@Autowired private ZLMRTPServerFactory zlmrtpServerFactory; - /** * 处理 ACK请求 * @@ -49,6 +46,8 @@ public class AckRequestProcessor extends SIPRequestAbstractProcessor { String is_Udp = sendRtpItem.isTcp() ? "0" : "1"; String deviceId = sendRtpItem.getDeviceId(); StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(deviceId, channelId); + sendRtpItem.setStreamId(streamInfo.getStreamId()); + redisCatchStorage.updateSendRTPSever(sendRtpItem); System.out.println(platformGbId); System.out.println(channelId); Map param = new HashMap<>(); @@ -68,11 +67,16 @@ public class AckRequestProcessor extends SIPRequestAbstractProcessor { if (System.currentTimeMillis() - startTime < 30 * 1000) { if (zlmrtpServerFactory.isRtpReady(streamInfo.getStreamId())) { rtpPushed = true; + System.out.println("已获取设备推流,开始向上级推流"); zlmrtpServerFactory.startSendRtpStream(param); } else { + System.out.println("等待设备推流......."); Thread.sleep(2000); continue; } + } else { + rtpPushed = true; + System.out.println("设备推流超时,终止向上级推流"); } } catch (InterruptedException e) { e.printStackTrace(); @@ -108,5 +112,4 @@ public class AckRequestProcessor extends SIPRequestAbstractProcessor { public void setZlmrtpServerFactory(ZLMRTPServerFactory zlmrtpServerFactory) { this.zlmrtpServerFactory = zlmrtpServerFactory; } - } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/ByeRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/ByeRequestProcessor.java index 0ba6bd83..a14a4cc6 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/ByeRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/request/impl/ByeRequestProcessor.java @@ -1,13 +1,20 @@ package com.genersoft.iot.vmp.gb28181.transmit.request.impl; +import javax.sip.Dialog; +import javax.sip.DialogState; import javax.sip.InvalidArgumentException; import javax.sip.RequestEvent; import javax.sip.SipException; import javax.sip.message.Response; +import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; import com.genersoft.iot.vmp.gb28181.transmit.request.SIPRequestAbstractProcessor; +import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; +import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import java.text.ParseException; +import java.util.HashMap; +import java.util.Map; /** * @Description: BYE请求处理器 @@ -16,6 +23,10 @@ import java.text.ParseException; */ public class ByeRequestProcessor extends SIPRequestAbstractProcessor { + private IRedisCatchStorage redisCatchStorage; + + private ZLMRTPServerFactory zlmrtpServerFactory; + /** * 处理BYE请求 * @param evt @@ -24,6 +35,22 @@ public class ByeRequestProcessor extends SIPRequestAbstractProcessor { public void process(RequestEvent evt) { try { responseAck(evt); + Dialog dialog = evt.getDialog(); + if (dialog == null) return; + if (dialog.getState().equals(DialogState.TERMINATED)) { + String remoteUri = dialog.getRemoteParty().getURI().toString(); + String localUri = dialog.getLocalParty().getURI().toString(); + String platformGbId = remoteUri.substring(remoteUri.indexOf(":") + 1, remoteUri.indexOf("@")); + String channelId = localUri.substring(remoteUri.indexOf(":") + 1, remoteUri.indexOf("@")); + SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(platformGbId, channelId); + String streamId = sendRtpItem.getStreamId(); + Map param = new HashMap<>(); + param.put("vhost","__defaultVhost__"); + param.put("app","rtp"); + param.put("stream",streamId); + System.out.println("停止向上级推流:" + streamId); + zlmrtpServerFactory.stopSendRtpStream(param); + } } catch (SipException e) { e.printStackTrace(); } catch (InvalidArgumentException e) { @@ -47,4 +74,19 @@ public class ByeRequestProcessor extends SIPRequestAbstractProcessor { getServerTransaction(evt).sendResponse(response); } + public IRedisCatchStorage getRedisCatchStorage() { + return redisCatchStorage; + } + + public void setRedisCatchStorage(IRedisCatchStorage redisCatchStorage) { + this.redisCatchStorage = redisCatchStorage; + } + + public ZLMRTPServerFactory getZlmrtpServerFactory() { + return zlmrtpServerFactory; + } + + public void setZlmrtpServerFactory(ZLMRTPServerFactory zlmrtpServerFactory) { + this.zlmrtpServerFactory = zlmrtpServerFactory; + } } diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java index e3cc8a48..42670a71 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java @@ -123,4 +123,8 @@ public class ZLMRESTfulUtils { public JSONObject startSendRtp(Map param) { return sendPost("startSendRtp",param); } + + public JSONObject stopSendRtp(Map param) { + return sendPost("stopSendRtp",param); + } } diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java index 819c6a82..00951ba2 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java @@ -127,46 +127,46 @@ public class ZLMRTPServerFactory { } /** - * + * 调用zlm RESTful API —— startSendRtp */ public Boolean startSendRtpStream(Mapparam) { Boolean result = false; JSONObject jsonObject = zlmresTfulUtils.startSendRtp(param); System.out.println(jsonObject); - if (jsonObject != null) { - switch (jsonObject.getInteger("code")){ - case 0: - result= true; - logger.error("RTP推流请求成功,本地推流端口:" + jsonObject.getString("local_port")); - break; - // case -300: // id已经存在 - // result = false; - // break; - // case -400: // 端口占用 - // result= false; - // break; - default: - logger.error("RTP推流失败: " + jsonObject.getString("msg")); - break; - } - }else { - // 检查ZLM状态 + if (jsonObject == null) { logger.error("RTP推流失败: 请检查ZLM服务"); + } else if (jsonObject.getInteger("code") == 0) { + result= true; + logger.error("RTP推流请求成功,本地推流端口:" + jsonObject.getString("local_port")); + } else { + logger.error("RTP推流失败: " + jsonObject.getString("msg")); } return result; } /** - * + * 查询待转推的流是否就绪 */ public Boolean isRtpReady(String streamId) { JSONObject mediaInfo = zlmresTfulUtils.getMediaInfo("rtp", "rtmp", streamId); - if (mediaInfo.getInteger("code") == 0 && mediaInfo.getBoolean("online")) { - logger.info("设备RTP推流成功"); - return true; + return (mediaInfo.getInteger("code") == 0 && mediaInfo.getBoolean("online")); + } + + /** + * 调用zlm RESTful API —— stopSendRtp + */ + public Boolean stopSendRtpStream(Mapparam) { + Boolean result = false; + JSONObject jsonObject = zlmresTfulUtils.stopSendRtp(param); + System.out.println(jsonObject); + if (jsonObject == null) { + logger.error("停止RTP推流失败: 请检查ZLM服务"); + } else if (jsonObject.getInteger("code") == 0) { + result= true; + logger.error("停止RTP推流成功"); } else { - logger.info("设备RTP推流未完成"); - return false; + logger.error("停止RTP推流失败: " + jsonObject.getString("msg")); } + return result; } }