Browse Source

使用@Async多线程处理sip消息

pull/243/head
648540858 3 years ago
parent
commit
80bfd9ce02
  1. 96
      src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPProcessorObserver.java
  2. 5
      src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java
  3. 3
      src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/MessageHandlerAbstract.java
  4. 5
      src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/NotifyMessageHandler.java

96
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPProcessorObserver.java

@ -36,9 +36,9 @@ public class SIPProcessorObserver implements ISIPProcessorObserver {
@Autowired @Autowired
private SipSubscribe sipSubscribe; private SipSubscribe sipSubscribe;
@Autowired // @Autowired
@Qualifier(value = "taskExecutor") // @Qualifier(value = "taskExecutor")
private ThreadPoolTaskExecutor poolTaskExecutor; // private ThreadPoolTaskExecutor poolTaskExecutor;
/** /**
* 添加 request订阅 * 添加 request订阅
@ -71,17 +71,15 @@ public class SIPProcessorObserver implements ISIPProcessorObserver {
* @param requestEvent RequestEvent事件 * @param requestEvent RequestEvent事件
*/ */
@Override @Override
@Async
public void processRequest(RequestEvent requestEvent) { public void processRequest(RequestEvent requestEvent) {
String method = requestEvent.getRequest().getMethod();
poolTaskExecutor.execute(() -> { ISIPRequestProcessor sipRequestProcessor = requestProcessorMap.get(method);
String method = requestEvent.getRequest().getMethod(); if (sipRequestProcessor == null) {
ISIPRequestProcessor sipRequestProcessor = requestProcessorMap.get(method); logger.warn("不支持方法{}的request", method);
if (sipRequestProcessor == null) { return;
logger.warn("不支持方法{}的request", method); }
return; requestProcessorMap.get(method).process(requestEvent);
}
requestProcessorMap.get(method).process(requestEvent);
});
} }
@ -90,55 +88,45 @@ public class SIPProcessorObserver implements ISIPProcessorObserver {
* @param responseEvent responseEvent事件 * @param responseEvent responseEvent事件
*/ */
@Override @Override
@Async
public void processResponse(ResponseEvent responseEvent) { public void processResponse(ResponseEvent responseEvent) {
logger.debug(responseEvent.getResponse().toString()); logger.debug(responseEvent.getResponse().toString());
// CSeqHeader cseqHeader = (CSeqHeader) responseEvent.getResponse().getHeader(CSeqHeader.NAME); Response response = responseEvent.getResponse();
// String method = cseqHeader.getMethod(); logger.debug(responseEvent.getResponse().toString());
// ISIPResponseProcessor sipRequestProcessor = responseProcessorMap.get(method); int status = response.getStatusCode();
// if (sipRequestProcessor == null) { if (((status >= 200) && (status < 300)) || status == 401) { // Success!
// logger.warn("不支持方法{}的response", method);
// return;
// }
// sipRequestProcessor.process(responseEvent);
poolTaskExecutor.execute(() -> {
Response response = responseEvent.getResponse();
logger.debug(responseEvent.getResponse().toString());
int status = response.getStatusCode();
if (((status >= 200) && (status < 300)) || status == 401) { // Success!
// ISIPResponseProcessor processor = processorFactory.createResponseProcessor(evt); // ISIPResponseProcessor processor = processorFactory.createResponseProcessor(evt);
CSeqHeader cseqHeader = (CSeqHeader) responseEvent.getResponse().getHeader(CSeqHeader.NAME); CSeqHeader cseqHeader = (CSeqHeader) responseEvent.getResponse().getHeader(CSeqHeader.NAME);
String method = cseqHeader.getMethod(); String method = cseqHeader.getMethod();
ISIPResponseProcessor sipRequestProcessor = responseProcessorMap.get(method); ISIPResponseProcessor sipRequestProcessor = responseProcessorMap.get(method);
if (sipRequestProcessor != null) { if (sipRequestProcessor != null) {
sipRequestProcessor.process(responseEvent); sipRequestProcessor.process(responseEvent);
} }
if (responseEvent.getResponse() != null && sipSubscribe.getOkSubscribesSize() > 0 ) { if (responseEvent.getResponse() != null && sipSubscribe.getOkSubscribesSize() > 0 ) {
CallIdHeader callIdHeader = (CallIdHeader)responseEvent.getResponse().getHeader(CallIdHeader.NAME); CallIdHeader callIdHeader = (CallIdHeader)responseEvent.getResponse().getHeader(CallIdHeader.NAME);
if (callIdHeader != null) { if (callIdHeader != null) {
SipSubscribe.Event subscribe = sipSubscribe.getOkSubscribe(callIdHeader.getCallId()); SipSubscribe.Event subscribe = sipSubscribe.getOkSubscribe(callIdHeader.getCallId());
if (subscribe != null) { if (subscribe != null) {
SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult(responseEvent); SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult(responseEvent);
subscribe.response(eventResult); subscribe.response(eventResult);
}
} }
} }
} else if ((status >= 100) && (status < 200)) { }
// 增加其它无需回复的响应,如101、180等 } else if ((status >= 100) && (status < 200)) {
} else { // 增加其它无需回复的响应,如101、180等
logger.warn("接收到失败的response响应!status:" + status + ",message:" + response.getReasonPhrase()/* .getContent().toString()*/); } else {
if (responseEvent.getResponse() != null && sipSubscribe.getErrorSubscribesSize() > 0 ) { logger.warn("接收到失败的response响应!status:" + status + ",message:" + response.getReasonPhrase()/* .getContent().toString()*/);
CallIdHeader callIdHeader = (CallIdHeader)responseEvent.getResponse().getHeader(CallIdHeader.NAME); if (responseEvent.getResponse() != null && sipSubscribe.getErrorSubscribesSize() > 0 ) {
if (callIdHeader != null) { CallIdHeader callIdHeader = (CallIdHeader)responseEvent.getResponse().getHeader(CallIdHeader.NAME);
SipSubscribe.Event subscribe = sipSubscribe.getErrorSubscribe(callIdHeader.getCallId()); if (callIdHeader != null) {
if (subscribe != null) { SipSubscribe.Event subscribe = sipSubscribe.getErrorSubscribe(callIdHeader.getCallId());
SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult(responseEvent); if (subscribe != null) {
subscribe.response(eventResult); SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult(responseEvent);
} subscribe.response(eventResult);
} }
} }
} }
}); }
} }

5
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java

@ -36,7 +36,8 @@ import java.util.Map;
@Component @Component
public class ByeRequestProcessor extends SIPRequestProcessorParent implements InitializingBean, ISIPRequestProcessor { public class ByeRequestProcessor extends SIPRequestProcessorParent implements InitializingBean, ISIPRequestProcessor {
private Logger logger = LoggerFactory.getLogger(ByeRequestProcessor.class); private final Logger logger = LoggerFactory.getLogger(ByeRequestProcessor.class);
private final String method = "BYE";
@Autowired @Autowired
private ISIPCommander cmder; private ISIPCommander cmder;
@ -53,8 +54,6 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In
@Autowired @Autowired
private IMediaServerService mediaServerService; private IMediaServerService mediaServerService;
private String method = "BYE";
@Autowired @Autowired
private SIPProcessorObserver sipProcessorObserver; private SIPProcessorObserver sipProcessorObserver;

3
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/MessageHandlerAbstract.java

@ -16,9 +16,6 @@ public abstract class MessageHandlerAbstract extends SIPRequestProcessorParent i
public static Map<String, IMessageHandler> messageHandlerMap = new ConcurrentHashMap<>(); public static Map<String, IMessageHandler> messageHandlerMap = new ConcurrentHashMap<>();
@Autowired
public MessageRequestProcessor messageRequestProcessor;
public void addHandler(String cmdType, IMessageHandler messageHandler) { public void addHandler(String cmdType, IMessageHandler messageHandler) {
messageHandlerMap.put(cmdType, messageHandler); messageHandlerMap.put(cmdType, messageHandler);
} }

5
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/NotifyMessageHandler.java

@ -1,7 +1,9 @@
package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.notify; package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.notify;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.MessageHandlerAbstract; import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.MessageHandlerAbstract;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.MessageRequestProcessor;
import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
@Component @Component
@ -9,6 +11,9 @@ public class NotifyMessageHandler extends MessageHandlerAbstract implements Init
private final String messageType = "Notify"; private final String messageType = "Notify";
@Autowired
private MessageRequestProcessor messageRequestProcessor;
@Override @Override
public void afterPropertiesSet() throws Exception { public void afterPropertiesSet() throws Exception {
messageRequestProcessor.addHandler(messageType, this); messageRequestProcessor.addHandler(messageType, this);

Loading…
Cancel
Save