Browse Source

添加使用多线程消息处理sip消息

pull/231/head
648540858 3 years ago
parent
commit
9561e952a3
  1. 1
      src/main/java/com/genersoft/iot/vmp/VManageBootstrap.java
  2. 57
      src/main/java/com/genersoft/iot/vmp/conf/ThreadPoolTaskConfig.java
  3. 19
      src/main/java/com/genersoft/iot/vmp/gb28181/SipLayer.java
  4. 6
      src/main/java/com/genersoft/iot/vmp/gb28181/transmit/ISIPProcessorObserver.java
  5. 91
      src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPProcessorObserver.java
  6. 1
      src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderProvider.java
  7. 2
      src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java

1
src/main/java/com/genersoft/iot/vmp/VManageBootstrap.java

@ -6,6 +6,7 @@ import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.web.servlet.ServletComponentScan; import org.springframework.boot.web.servlet.ServletComponentScan;
import org.springframework.context.ConfigurableApplicationContext; import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.scheduling.annotation.EnableScheduling;
import springfox.documentation.oas.annotations.EnableOpenApi; import springfox.documentation.oas.annotations.EnableOpenApi;

57
src/main/java/com/genersoft/iot/vmp/conf/ThreadPoolTaskConfig.java

@ -0,0 +1,57 @@
package com.genersoft.iot.vmp.conf;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.ThreadPoolExecutor;
@Configuration
@EnableAsync
public class ThreadPoolTaskConfig {
/**
* 默认情况下在创建了线程池后线程池中的线程数为0当有任务来之后就会创建一个线程去执行任务
* 当线程池中的线程数目达到corePoolSize后就会把到达的任务放到缓存队列当中
* 当队列满了就继续创建线程当线程数量大于等于maxPoolSize后开始使用拒绝策略拒绝
*/
/**
* 核心线程数默认线程数
*/
private static final int corePoolSize = 5;
/**
* 最大线程数
*/
private static final int maxPoolSize = 30;
/**
* 允许线程空闲时间单位默认为秒
*/
private static final int keepAliveTime = 30;
/**
* 缓冲队列大小
*/
private static final int queueCapacity = 10000;
/**
* 线程池名前缀
*/
private static final String threadNamePrefix = "hdl-uhi-service-";
@Bean("taskExecutor") // bean的名称,默认为首字母小写的方法名
public ThreadPoolTaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(corePoolSize);
executor.setMaxPoolSize(maxPoolSize);
executor.setQueueCapacity(queueCapacity);
executor.setKeepAliveSeconds(keepAliveTime);
executor.setThreadNamePrefix(threadNamePrefix);
// 线程池对拒绝任务的处理策略
// CallerRunsPolicy:由调用线程(提交任务的线程)处理该任务
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// 初始化
executor.initialize();
return executor;
}
}

19
src/main/java/com/genersoft/iot/vmp/gb28181/SipLayer.java

@ -2,6 +2,7 @@ package com.genersoft.iot.vmp.gb28181;
import com.genersoft.iot.vmp.conf.SipConfig; import com.genersoft.iot.vmp.conf.SipConfig;
import com.genersoft.iot.vmp.gb28181.event.SipSubscribe; import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
import com.genersoft.iot.vmp.gb28181.transmit.ISIPProcessorObserver;
import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver; import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
import gov.nist.javax.sip.SipProviderImpl; import gov.nist.javax.sip.SipProviderImpl;
import gov.nist.javax.sip.SipStackImpl; import gov.nist.javax.sip.SipStackImpl;
@ -28,28 +29,12 @@ public class SipLayer{
private SipConfig sipConfig; private SipConfig sipConfig;
@Autowired @Autowired
private SIPProcessorObserver sipProcessorObserver; private ISIPProcessorObserver sipProcessorObserver;
@Autowired
private SipSubscribe sipSubscribe;
private SipStackImpl sipStack; private SipStackImpl sipStack;
private SipFactory sipFactory; private SipFactory sipFactory;
/**
* 消息处理器线程池
*/
private ThreadPoolExecutor processThreadPool;
public SipLayer() {
int processThreadNum = Runtime.getRuntime().availableProcessors() * 10;
LinkedBlockingQueue<Runnable> processQueue = new LinkedBlockingQueue<>(10000);
processThreadPool = new ThreadPoolExecutor(processThreadNum,processThreadNum,
0L,TimeUnit.MILLISECONDS,processQueue,
new ThreadPoolExecutor.CallerRunsPolicy());
}
@Bean("sipFactory") @Bean("sipFactory")
private SipFactory createSipFactory() { private SipFactory createSipFactory() {

6
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/ISIPProcessorObserver.java

@ -0,0 +1,6 @@
package com.genersoft.iot.vmp.gb28181.transmit;
import javax.sip.SipListener;
public interface ISIPProcessorObserver extends SipListener {
}

91
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPProcessorObserver.java

@ -7,6 +7,9 @@ import com.genersoft.iot.vmp.gb28181.transmit.event.timeout.ITimeoutProcessor;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import javax.sip.*; import javax.sip.*;
@ -22,7 +25,7 @@ import java.util.concurrent.ConcurrentHashMap;
* @date: 2021年11月5日 下午1532 * @date: 2021年11月5日 下午1532
*/ */
@Component @Component
public class SIPProcessorObserver implements SipListener { public class SIPProcessorObserver implements ISIPProcessorObserver {
private final static Logger logger = LoggerFactory.getLogger(SIPProcessorObserver.class); private final static Logger logger = LoggerFactory.getLogger(SIPProcessorObserver.class);
@ -33,6 +36,10 @@ public class SIPProcessorObserver implements SipListener {
@Autowired @Autowired
private SipSubscribe sipSubscribe; private SipSubscribe sipSubscribe;
@Autowired
@Qualifier(value = "taskExecutor")
private ThreadPoolTaskExecutor poolTaskExecutor;
/** /**
* 添加 request订阅 * 添加 request订阅
* @param method 方法名 * @param method 方法名
@ -65,13 +72,17 @@ public class SIPProcessorObserver implements SipListener {
*/ */
@Override @Override
public void processRequest(RequestEvent requestEvent) { public void processRequest(RequestEvent requestEvent) {
String method = requestEvent.getRequest().getMethod();
ISIPRequestProcessor sipRequestProcessor = requestProcessorMap.get(method); poolTaskExecutor.execute(() -> {
if (sipRequestProcessor == null) { String method = requestEvent.getRequest().getMethod();
logger.warn("不支持方法{}的request", method); ISIPRequestProcessor sipRequestProcessor = requestProcessorMap.get(method);
return; if (sipRequestProcessor == null) {
} logger.warn("不支持方法{}的request", method);
requestProcessorMap.get(method).process(requestEvent); return;
}
requestProcessorMap.get(method).process(requestEvent);
});
} }
/** /**
@ -90,43 +101,45 @@ public class SIPProcessorObserver implements SipListener {
// } // }
// sipRequestProcessor.process(responseEvent); // sipRequestProcessor.process(responseEvent);
poolTaskExecutor.execute(() -> {
Response response = responseEvent.getResponse(); Response response = responseEvent.getResponse();
logger.debug(responseEvent.getResponse().toString()); logger.debug(responseEvent.getResponse().toString());
int status = response.getStatusCode(); int status = response.getStatusCode();
if (((status >= 200) && (status < 300)) || status == 401) { // Success! if (((status >= 200) && (status < 300)) || status == 401) { // Success!
// ISIPResponseProcessor processor = processorFactory.createResponseProcessor(evt); // ISIPResponseProcessor processor = processorFactory.createResponseProcessor(evt);
CSeqHeader cseqHeader = (CSeqHeader) responseEvent.getResponse().getHeader(CSeqHeader.NAME); CSeqHeader cseqHeader = (CSeqHeader) responseEvent.getResponse().getHeader(CSeqHeader.NAME);
String method = cseqHeader.getMethod(); String method = cseqHeader.getMethod();
ISIPResponseProcessor sipRequestProcessor = responseProcessorMap.get(method); ISIPResponseProcessor sipRequestProcessor = responseProcessorMap.get(method);
if (sipRequestProcessor != null) { if (sipRequestProcessor != null) {
sipRequestProcessor.process(responseEvent); sipRequestProcessor.process(responseEvent);
} }
if (responseEvent.getResponse() != null && sipSubscribe.getOkSubscribesSize() > 0 ) { if (responseEvent.getResponse() != null && sipSubscribe.getOkSubscribesSize() > 0 ) {
CallIdHeader callIdHeader = (CallIdHeader)responseEvent.getResponse().getHeader(CallIdHeader.NAME); CallIdHeader callIdHeader = (CallIdHeader)responseEvent.getResponse().getHeader(CallIdHeader.NAME);
if (callIdHeader != null) { if (callIdHeader != null) {
SipSubscribe.Event subscribe = sipSubscribe.getOkSubscribe(callIdHeader.getCallId()); SipSubscribe.Event subscribe = sipSubscribe.getOkSubscribe(callIdHeader.getCallId());
if (subscribe != null) { if (subscribe != null) {
SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult(responseEvent); SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult(responseEvent);
subscribe.response(eventResult); subscribe.response(eventResult);
}
} }
} }
} } else if ((status >= 100) && (status < 200)) {
} else if ((status >= 100) && (status < 200)) { // 增加其它无需回复的响应,如101、180等
// 增加其它无需回复的响应,如101、180等 } else {
} else { logger.warn("接收到失败的response响应!status:" + status + ",message:" + response.getReasonPhrase()/* .getContent().toString()*/);
logger.warn("接收到失败的response响应!status:" + status + ",message:" + response.getReasonPhrase()/* .getContent().toString()*/); if (responseEvent.getResponse() != null && sipSubscribe.getErrorSubscribesSize() > 0 ) {
if (responseEvent.getResponse() != null && sipSubscribe.getErrorSubscribesSize() > 0 ) { CallIdHeader callIdHeader = (CallIdHeader)responseEvent.getResponse().getHeader(CallIdHeader.NAME);
CallIdHeader callIdHeader = (CallIdHeader)responseEvent.getResponse().getHeader(CallIdHeader.NAME); if (callIdHeader != null) {
if (callIdHeader != null) { SipSubscribe.Event subscribe = sipSubscribe.getErrorSubscribe(callIdHeader.getCallId());
SipSubscribe.Event subscribe = sipSubscribe.getErrorSubscribe(callIdHeader.getCallId()); if (subscribe != null) {
if (subscribe != null) { SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult(responseEvent);
SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult(responseEvent); subscribe.response(eventResult);
subscribe.response(eventResult); }
} }
} }
} }
} });
} }

1
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderProvider.java

@ -204,7 +204,6 @@ public class SIPRequestHeaderProvider {
// Event // Event
EventHeader eventHeader = sipFactory.createHeaderFactory().createEventHeader(event); EventHeader eventHeader = sipFactory.createHeaderFactory().createEventHeader(event);
eventHeader.setEventType("Catalog");
request.addHeader(eventHeader); request.addHeader(eventHeader);
ContentTypeHeader contentTypeHeader = sipFactory.createHeaderFactory().createContentTypeHeader("APPLICATION", "MANSCDP+xml"); ContentTypeHeader contentTypeHeader = sipFactory.createHeaderFactory().createContentTypeHeader("APPLICATION", "MANSCDP+xml");

2
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java

@ -1496,7 +1496,7 @@ public class SIPCommander implements ISIPCommander {
CallIdHeader callIdHeader = device.getTransport().equals("TCP") ? tcpSipProvider.getNewCallId() CallIdHeader callIdHeader = device.getTransport().equals("TCP") ? tcpSipProvider.getNewCallId()
: udpSipProvider.getNewCallId(); : udpSipProvider.getNewCallId();
Request request = headerProvider.createSubscribeRequest(device, cmdXml.toString(), "z9hG4bK-viaPos-" + tm, "fromTagPos" + tm, null, device.getSubscribeCycleForCatalog(), "presence" , callIdHeader); Request request = headerProvider.createSubscribeRequest(device, cmdXml.toString(), "z9hG4bK-viaPos-" + tm, "fromTagPos" + tm, null, device.getSubscribeCycleForCatalog(), "Catalog" , callIdHeader);
transmitRequest(device, request, errorEvent, okEvent); transmitRequest(device, request, errorEvent, okEvent);
return true; return true;

Loading…
Cancel
Save