diff --git a/src/main/java/com/genersoft/iot/vmp/VManageBootstrap.java b/src/main/java/com/genersoft/iot/vmp/VManageBootstrap.java index 56038bd5..bfe58419 100644 --- a/src/main/java/com/genersoft/iot/vmp/VManageBootstrap.java +++ b/src/main/java/com/genersoft/iot/vmp/VManageBootstrap.java @@ -6,6 +6,7 @@ import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.web.servlet.ServletComponentScan; import org.springframework.context.ConfigurableApplicationContext; +import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.scheduling.annotation.EnableScheduling; import springfox.documentation.oas.annotations.EnableOpenApi; diff --git a/src/main/java/com/genersoft/iot/vmp/conf/ThreadPoolTaskConfig.java b/src/main/java/com/genersoft/iot/vmp/conf/ThreadPoolTaskConfig.java new file mode 100644 index 00000000..f2edf043 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/conf/ThreadPoolTaskConfig.java @@ -0,0 +1,57 @@ +package com.genersoft.iot.vmp.conf; + +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.scheduling.annotation.EnableAsync; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; + +import java.util.concurrent.ThreadPoolExecutor; + +@Configuration +@EnableAsync +public class ThreadPoolTaskConfig { + + /** + * 默认情况下,在创建了线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务, + * 当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中; + * 当队列满了,就继续创建线程,当线程数量大于等于maxPoolSize后,开始使用拒绝策略拒绝 + */ + + /** + * 核心线程数(默认线程数) + */ + private static final int corePoolSize = 5; + /** + * 最大线程数 + */ + private static final int maxPoolSize = 30; + /** + * 允许线程空闲时间(单位:默认为秒) + */ + private static final int keepAliveTime = 30; + /** + * 缓冲队列大小 + */ + private static final int queueCapacity = 10000; + /** + * 线程池名前缀 + */ + private static final String threadNamePrefix = "hdl-uhi-service-"; + + @Bean("taskExecutor") // bean的名称,默认为首字母小写的方法名 + public ThreadPoolTaskExecutor taskExecutor() { + ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); + executor.setCorePoolSize(corePoolSize); + executor.setMaxPoolSize(maxPoolSize); + executor.setQueueCapacity(queueCapacity); + executor.setKeepAliveSeconds(keepAliveTime); + executor.setThreadNamePrefix(threadNamePrefix); + + // 线程池对拒绝任务的处理策略 + // CallerRunsPolicy:由调用线程(提交任务的线程)处理该任务 + executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); + // 初始化 + executor.initialize(); + return executor; + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/SipLayer.java b/src/main/java/com/genersoft/iot/vmp/gb28181/SipLayer.java index 3d9b827b..420a30a7 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/SipLayer.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/SipLayer.java @@ -2,6 +2,7 @@ package com.genersoft.iot.vmp.gb28181; import com.genersoft.iot.vmp.conf.SipConfig; import com.genersoft.iot.vmp.gb28181.event.SipSubscribe; +import com.genersoft.iot.vmp.gb28181.transmit.ISIPProcessorObserver; import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver; import gov.nist.javax.sip.SipProviderImpl; import gov.nist.javax.sip.SipStackImpl; @@ -28,28 +29,12 @@ public class SipLayer{ private SipConfig sipConfig; @Autowired - private SIPProcessorObserver sipProcessorObserver; - - @Autowired - private SipSubscribe sipSubscribe; + private ISIPProcessorObserver sipProcessorObserver; private SipStackImpl sipStack; private SipFactory sipFactory; - /** - * 消息处理器线程池 - */ - private ThreadPoolExecutor processThreadPool; - - public SipLayer() { - int processThreadNum = Runtime.getRuntime().availableProcessors() * 10; - LinkedBlockingQueue processQueue = new LinkedBlockingQueue<>(10000); - processThreadPool = new ThreadPoolExecutor(processThreadNum,processThreadNum, - 0L,TimeUnit.MILLISECONDS,processQueue, - new ThreadPoolExecutor.CallerRunsPolicy()); - } - @Bean("sipFactory") private SipFactory createSipFactory() { diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/ISIPProcessorObserver.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/ISIPProcessorObserver.java new file mode 100644 index 00000000..2480f37e --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/ISIPProcessorObserver.java @@ -0,0 +1,6 @@ +package com.genersoft.iot.vmp.gb28181.transmit; + +import javax.sip.SipListener; + +public interface ISIPProcessorObserver extends SipListener { +} diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPProcessorObserver.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPProcessorObserver.java index cac1a01e..9149be1a 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPProcessorObserver.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPProcessorObserver.java @@ -7,6 +7,9 @@ import com.genersoft.iot.vmp.gb28181.transmit.event.timeout.ITimeoutProcessor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.scheduling.annotation.Async; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Component; import javax.sip.*; @@ -22,7 +25,7 @@ import java.util.concurrent.ConcurrentHashMap; * @date: 2021年11月5日 下午15:32 */ @Component -public class SIPProcessorObserver implements SipListener { +public class SIPProcessorObserver implements ISIPProcessorObserver { private final static Logger logger = LoggerFactory.getLogger(SIPProcessorObserver.class); @@ -33,6 +36,10 @@ public class SIPProcessorObserver implements SipListener { @Autowired private SipSubscribe sipSubscribe; + @Autowired + @Qualifier(value = "taskExecutor") + private ThreadPoolTaskExecutor poolTaskExecutor; + /** * 添加 request订阅 * @param method 方法名 @@ -65,13 +72,17 @@ public class SIPProcessorObserver implements SipListener { */ @Override public void processRequest(RequestEvent requestEvent) { - String method = requestEvent.getRequest().getMethod(); - ISIPRequestProcessor sipRequestProcessor = requestProcessorMap.get(method); - if (sipRequestProcessor == null) { - logger.warn("不支持方法{}的request", method); - return; - } - requestProcessorMap.get(method).process(requestEvent); + + poolTaskExecutor.execute(() -> { + String method = requestEvent.getRequest().getMethod(); + ISIPRequestProcessor sipRequestProcessor = requestProcessorMap.get(method); + if (sipRequestProcessor == null) { + logger.warn("不支持方法{}的request", method); + return; + } + requestProcessorMap.get(method).process(requestEvent); + }); + } /** @@ -90,43 +101,45 @@ public class SIPProcessorObserver implements SipListener { // } // sipRequestProcessor.process(responseEvent); - - Response response = responseEvent.getResponse(); - logger.debug(responseEvent.getResponse().toString()); - int status = response.getStatusCode(); - if (((status >= 200) && (status < 300)) || status == 401) { // Success! + poolTaskExecutor.execute(() -> { + Response response = responseEvent.getResponse(); + logger.debug(responseEvent.getResponse().toString()); + int status = response.getStatusCode(); + if (((status >= 200) && (status < 300)) || status == 401) { // Success! // ISIPResponseProcessor processor = processorFactory.createResponseProcessor(evt); - CSeqHeader cseqHeader = (CSeqHeader) responseEvent.getResponse().getHeader(CSeqHeader.NAME); - String method = cseqHeader.getMethod(); - ISIPResponseProcessor sipRequestProcessor = responseProcessorMap.get(method); - if (sipRequestProcessor != null) { - sipRequestProcessor.process(responseEvent); - } - if (responseEvent.getResponse() != null && sipSubscribe.getOkSubscribesSize() > 0 ) { - CallIdHeader callIdHeader = (CallIdHeader)responseEvent.getResponse().getHeader(CallIdHeader.NAME); - if (callIdHeader != null) { - SipSubscribe.Event subscribe = sipSubscribe.getOkSubscribe(callIdHeader.getCallId()); - if (subscribe != null) { - SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult(responseEvent); - subscribe.response(eventResult); + CSeqHeader cseqHeader = (CSeqHeader) responseEvent.getResponse().getHeader(CSeqHeader.NAME); + String method = cseqHeader.getMethod(); + ISIPResponseProcessor sipRequestProcessor = responseProcessorMap.get(method); + if (sipRequestProcessor != null) { + sipRequestProcessor.process(responseEvent); + } + if (responseEvent.getResponse() != null && sipSubscribe.getOkSubscribesSize() > 0 ) { + CallIdHeader callIdHeader = (CallIdHeader)responseEvent.getResponse().getHeader(CallIdHeader.NAME); + if (callIdHeader != null) { + SipSubscribe.Event subscribe = sipSubscribe.getOkSubscribe(callIdHeader.getCallId()); + if (subscribe != null) { + SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult(responseEvent); + subscribe.response(eventResult); + } } } - } - } else if ((status >= 100) && (status < 200)) { - // 增加其它无需回复的响应,如101、180等 - } else { - logger.warn("接收到失败的response响应!status:" + status + ",message:" + response.getReasonPhrase()/* .getContent().toString()*/); - if (responseEvent.getResponse() != null && sipSubscribe.getErrorSubscribesSize() > 0 ) { - CallIdHeader callIdHeader = (CallIdHeader)responseEvent.getResponse().getHeader(CallIdHeader.NAME); - if (callIdHeader != null) { - SipSubscribe.Event subscribe = sipSubscribe.getErrorSubscribe(callIdHeader.getCallId()); - if (subscribe != null) { - SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult(responseEvent); - subscribe.response(eventResult); + } else if ((status >= 100) && (status < 200)) { + // 增加其它无需回复的响应,如101、180等 + } else { + logger.warn("接收到失败的response响应!status:" + status + ",message:" + response.getReasonPhrase()/* .getContent().toString()*/); + if (responseEvent.getResponse() != null && sipSubscribe.getErrorSubscribesSize() > 0 ) { + CallIdHeader callIdHeader = (CallIdHeader)responseEvent.getResponse().getHeader(CallIdHeader.NAME); + if (callIdHeader != null) { + SipSubscribe.Event subscribe = sipSubscribe.getErrorSubscribe(callIdHeader.getCallId()); + if (subscribe != null) { + SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult(responseEvent); + subscribe.response(eventResult); + } } } } - } + }); + } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderProvider.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderProvider.java index 4eac1342..bb62902a 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderProvider.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderProvider.java @@ -204,7 +204,6 @@ public class SIPRequestHeaderProvider { // Event EventHeader eventHeader = sipFactory.createHeaderFactory().createEventHeader(event); - eventHeader.setEventType("Catalog"); request.addHeader(eventHeader); ContentTypeHeader contentTypeHeader = sipFactory.createHeaderFactory().createContentTypeHeader("APPLICATION", "MANSCDP+xml"); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java index 1af2cdf0..61647aa3 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java @@ -1496,7 +1496,7 @@ public class SIPCommander implements ISIPCommander { CallIdHeader callIdHeader = device.getTransport().equals("TCP") ? tcpSipProvider.getNewCallId() : udpSipProvider.getNewCallId(); - Request request = headerProvider.createSubscribeRequest(device, cmdXml.toString(), "z9hG4bK-viaPos-" + tm, "fromTagPos" + tm, null, device.getSubscribeCycleForCatalog(), "presence" , callIdHeader); + Request request = headerProvider.createSubscribeRequest(device, cmdXml.toString(), "z9hG4bK-viaPos-" + tm, "fromTagPos" + tm, null, device.getSubscribeCycleForCatalog(), "Catalog" , callIdHeader); transmitRequest(device, request, errorEvent, okEvent); return true;