648540858
3 years ago
18 changed files with 275 additions and 214 deletions
@ -0,0 +1,94 @@ |
|||||
|
package com.genersoft.iot.vmp.gb28181.session; |
||||
|
|
||||
|
import com.genersoft.iot.vmp.gb28181.bean.*; |
||||
|
import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; |
||||
|
import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage; |
||||
|
import com.genersoft.iot.vmp.storager.IVideoManagerStorage; |
||||
|
import com.genersoft.iot.vmp.utils.DateUtil; |
||||
|
import com.genersoft.iot.vmp.vmanager.bean.WVPResult; |
||||
|
import org.springframework.beans.factory.annotation.Autowired; |
||||
|
import org.springframework.scheduling.annotation.Scheduled; |
||||
|
import org.springframework.stereotype.Component; |
||||
|
|
||||
|
import java.text.SimpleDateFormat; |
||||
|
import java.util.*; |
||||
|
import java.util.concurrent.ConcurrentHashMap; |
||||
|
|
||||
|
/** |
||||
|
* @author lin |
||||
|
*/ |
||||
|
@Component |
||||
|
public class RecordDataCatch { |
||||
|
|
||||
|
public static Map<String, RecordInfo> data = new ConcurrentHashMap<>(); |
||||
|
|
||||
|
@Autowired |
||||
|
private DeferredResultHolder deferredResultHolder; |
||||
|
|
||||
|
|
||||
|
public int put(String deviceId, String sn, int sumNum, List<RecordItem> recordItems) { |
||||
|
String key = deviceId + sn; |
||||
|
RecordInfo recordInfo = data.get(key); |
||||
|
if (recordInfo == null) { |
||||
|
recordInfo = new RecordInfo(); |
||||
|
recordInfo.setDeviceId(deviceId); |
||||
|
recordInfo.setSn(sn.trim()); |
||||
|
recordInfo.setSumNum(sumNum); |
||||
|
recordInfo.setRecordList(Collections.synchronizedList(new ArrayList<>())); |
||||
|
recordInfo.setLastTime(new Date(System.currentTimeMillis())); |
||||
|
recordInfo.getRecordList().addAll(recordItems); |
||||
|
data.put(key, recordInfo); |
||||
|
}else { |
||||
|
// 同一个设备的通道同步请求只考虑一个,其他的直接忽略
|
||||
|
if (!Objects.equals(sn.trim(), recordInfo.getSn())) { |
||||
|
return 0; |
||||
|
} |
||||
|
recordInfo.getRecordList().addAll(recordItems); |
||||
|
recordInfo.setLastTime(new Date(System.currentTimeMillis())); |
||||
|
} |
||||
|
return recordInfo.getRecordList().size(); |
||||
|
} |
||||
|
|
||||
|
@Scheduled(fixedRate = 5 * 1000) //每5秒执行一次, 发现数据5秒未更新则移除数据并认为数据接收超时
|
||||
|
private void timerTask(){ |
||||
|
Set<String> keys = data.keySet(); |
||||
|
Calendar calendarBefore5S = Calendar.getInstance(); |
||||
|
calendarBefore5S.setTime(new Date()); |
||||
|
calendarBefore5S.set(Calendar.SECOND, calendarBefore5S.get(Calendar.SECOND) - 5); |
||||
|
|
||||
|
for (String key : keys) { |
||||
|
RecordInfo recordInfo = data.get(key); |
||||
|
// 超过五秒收不到消息任务超时, 只更新这一部分数据
|
||||
|
if ( recordInfo.getLastTime().before(calendarBefore5S.getTime())) { |
||||
|
// 处理录像数据, 返回给前端
|
||||
|
String msgKey = DeferredResultHolder.CALLBACK_CMD_RECORDINFO + recordInfo.getDeviceId() + recordInfo.getSn(); |
||||
|
|
||||
|
WVPResult<RecordInfo> wvpResult = new WVPResult<>(); |
||||
|
wvpResult.setCode(0); |
||||
|
wvpResult.setMsg("success"); |
||||
|
// 对数据进行排序
|
||||
|
Collections.sort(recordInfo.getRecordList()); |
||||
|
wvpResult.setData(recordInfo); |
||||
|
|
||||
|
RequestMessage msg = new RequestMessage(); |
||||
|
msg.setKey(msgKey); |
||||
|
msg.setData(wvpResult); |
||||
|
deferredResultHolder.invokeAllResult(msg); |
||||
|
data.remove(key); |
||||
|
} |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
public boolean isComplete(String deviceId, String sn) { |
||||
|
RecordInfo recordInfo = data.get(deviceId + sn); |
||||
|
return recordInfo != null && recordInfo.getRecordList().size() == recordInfo.getSumNum(); |
||||
|
} |
||||
|
|
||||
|
public RecordInfo getRecordInfo(String deviceId, String sn) { |
||||
|
return data.get(deviceId + sn); |
||||
|
} |
||||
|
|
||||
|
public void remove(String deviceId, String sn) { |
||||
|
data.remove(deviceId + sn); |
||||
|
} |
||||
|
} |
@ -1,76 +0,0 @@ |
|||||
package com.genersoft.iot.vmp.gb28181.transmit.callback; |
|
||||
|
|
||||
import com.genersoft.iot.vmp.gb28181.bean.RecordInfo; |
|
||||
import com.genersoft.iot.vmp.gb28181.bean.RecordItem; |
|
||||
import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.cmd.RecordInfoResponseMessageHandler; |
|
||||
import com.genersoft.iot.vmp.utils.redis.RedisUtil; |
|
||||
import org.slf4j.Logger; |
|
||||
|
|
||||
import java.util.ArrayList; |
|
||||
import java.util.Comparator; |
|
||||
import java.util.List; |
|
||||
import java.util.concurrent.TimeUnit; |
|
||||
|
|
||||
@SuppressWarnings("unchecked") |
|
||||
public class CheckForAllRecordsThread extends Thread { |
|
||||
|
|
||||
private String key; |
|
||||
|
|
||||
private RecordInfo recordInfo; |
|
||||
|
|
||||
private RedisUtil redis; |
|
||||
|
|
||||
private Logger logger; |
|
||||
|
|
||||
private DeferredResultHolder deferredResultHolder; |
|
||||
|
|
||||
public CheckForAllRecordsThread(String key, RecordInfo recordInfo) { |
|
||||
this.key = key; |
|
||||
this.recordInfo = recordInfo; |
|
||||
} |
|
||||
|
|
||||
@Override |
|
||||
public void run() { |
|
||||
|
|
||||
String cacheKey = this.key; |
|
||||
|
|
||||
for (long stop = System.nanoTime() + TimeUnit.SECONDS.toNanos(10); stop > System.nanoTime();) { |
|
||||
List<Object> cacheKeys = redis.scan(cacheKey + "_*"); |
|
||||
List<RecordItem> totalRecordList = new ArrayList<RecordItem>(); |
|
||||
for (int i = 0; i < cacheKeys.size(); i++) { |
|
||||
totalRecordList.addAll((List<RecordItem>) redis.get(cacheKeys.get(i).toString())); |
|
||||
} |
|
||||
if (totalRecordList.size() < this.recordInfo.getSumNum()) { |
|
||||
logger.info("已获取" + totalRecordList.size() + "项录像数据,共" + this.recordInfo.getSumNum() + "项"); |
|
||||
} else { |
|
||||
logger.info("录像数据已全部获取,共 {} 项", this.recordInfo.getSumNum()); |
|
||||
this.recordInfo.setRecordList(totalRecordList); |
|
||||
for (int i = 0; i < cacheKeys.size(); i++) { |
|
||||
redis.del(cacheKeys.get(i).toString()); |
|
||||
} |
|
||||
break; |
|
||||
} |
|
||||
} |
|
||||
// 自然顺序排序, 元素进行升序排列
|
|
||||
this.recordInfo.getRecordList().sort(Comparator.naturalOrder()); |
|
||||
RequestMessage msg = new RequestMessage(); |
|
||||
msg.setKey(DeferredResultHolder.CALLBACK_CMD_RECORDINFO + recordInfo.getDeviceId() + recordInfo.getSn()); |
|
||||
msg.setData(recordInfo); |
|
||||
deferredResultHolder.invokeAllResult(msg); |
|
||||
logger.info("处理完成,返回结果"); |
|
||||
RecordInfoResponseMessageHandler.threadNameList.remove(cacheKey); |
|
||||
} |
|
||||
|
|
||||
public void setRedis(RedisUtil redis) { |
|
||||
this.redis = redis; |
|
||||
} |
|
||||
|
|
||||
public void setDeferredResultHolder(DeferredResultHolder deferredResultHolder) { |
|
||||
this.deferredResultHolder = deferredResultHolder; |
|
||||
} |
|
||||
|
|
||||
public void setLogger(Logger logger) { |
|
||||
this.logger = logger; |
|
||||
} |
|
||||
|
|
||||
} |
|
Loading…
Reference in new issue