Browse Source

添加国标级联目录分组分组加快通道传输速度

pull/439/head
648540858 3 years ago
parent
commit
93d69d5476
  1. 26
      src/main/java/com/genersoft/iot/vmp/gb28181/bean/ParentPlatform.java
  2. 21
      src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/CatalogSubscribeTask.java
  3. 83
      src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java
  4. 10
      src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java
  5. 6
      src/main/java/com/genersoft/iot/vmp/storager/dao/ParentPlatformMapper.java
  6. 7
      src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStorageImpl.java
  7. 15
      web_src/src/components/dialog/platformEdit.vue

26
src/main/java/com/genersoft/iot/vmp/gb28181/bean/ParentPlatform.java

@ -134,6 +134,16 @@ public class ParentPlatform {
*/
private boolean startOfflinePush;
/**
* 目录分组-每次向上级发送通道信息时单个包携带的通道数量取值1,2,4,8
*/
private int catalogGroup;
/**
* 行政区划
*/
private String administrativeDivision;
public Integer getId() {
return id;
}
@ -342,4 +352,20 @@ public class ParentPlatform {
public void setStartOfflinePush(boolean startOfflinePush) {
this.startOfflinePush = startOfflinePush;
}
public int getCatalogGroup() {
return catalogGroup;
}
public void setCatalogGroup(int catalogGroup) {
this.catalogGroup = catalogGroup;
}
public String getAdministrativeDivision() {
return administrativeDivision;
}
public void setAdministrativeDivision(String administrativeDivision) {
this.administrativeDivision = administrativeDivision;
}
}

21
src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/CatalogSubscribeTask.java

@ -10,6 +10,8 @@ import org.springframework.scheduling.annotation.Async;
import javax.sip.Dialog;
import javax.sip.DialogState;
import javax.sip.ResponseEvent;
import java.util.Timer;
import java.util.TimerTask;
/**
* 目录订阅任务
@ -20,6 +22,8 @@ public class CatalogSubscribeTask implements ISubscribeTask {
private final ISIPCommander sipCommander;
private Dialog dialog;
private Timer timer ;
public CatalogSubscribeTask(Device device, ISIPCommander sipCommander) {
this.device = device;
this.sipCommander = sipCommander;
@ -27,6 +31,10 @@ public class CatalogSubscribeTask implements ISubscribeTask {
@Override
public void run() {
if (timer != null ) {
timer.cancel();
timer = null;
}
sipCommander.catalogSubscribe(device, dialog, eventResult -> {
if (eventResult.dialog != null || eventResult.dialog.getState().equals(DialogState.CONFIRMED)) {
dialog = eventResult.dialog;
@ -43,6 +51,13 @@ public class CatalogSubscribeTask implements ISubscribeTask {
dialog = null;
// 失败
logger.warn("[目录订阅]失败,信令发送失败: {}-{} ", device.getDeviceId(), eventResult.msg);
timer = new Timer();
timer.schedule(new TimerTask() {
@Override
public void run() {
CatalogSubscribeTask.this.run();
}
}, 2000);
});
}
@ -56,9 +71,13 @@ public class CatalogSubscribeTask implements ISubscribeTask {
* TERMINATED-> Terminated Dialog状态-终止
*/
logger.info("取消目录订阅时dialog状态为{}", DialogState.CONFIRMED);
if (timer != null ) {
timer.cancel();
timer = null;
}
if (dialog != null && dialog.getState().equals(DialogState.CONFIRMED)) {
device.setSubscribeCycleForCatalog(0);
sipCommander.mobilePositionSubscribe(device, dialog, eventResult -> {
sipCommander.catalogSubscribe(device, dialog, eventResult -> {
ResponseEvent event = (ResponseEvent) eventResult.event;
if (event.getResponse().getRawContent() != null) {
// 成功

83
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java

@ -32,6 +32,7 @@ import javax.sip.header.*;
import javax.sip.message.Request;
import java.lang.reflect.Field;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.UUID;
@ -215,7 +216,11 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform {
return false;
}
try {
String catalogXml = getCatalogXml(channel, sn, parentPlatform, size);
List<DeviceChannel> channels = new ArrayList<>();
if (channel != null) {
channels.add(channel);
}
String catalogXml = getCatalogXml(channels, sn, parentPlatform, size);
// callid
CallIdHeader callIdHeader = parentPlatform.getTransport().equals("TCP") ? tcpSipProvider.getNewCallId()
@ -239,7 +244,7 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform {
sendCatalogResponse(channels, parentPlatform, sn, fromTag, 0);
return true;
}
private String getCatalogXml(DeviceChannel channel, String sn, ParentPlatform parentPlatform, int size) {
private String getCatalogXml(List<DeviceChannel> channels, String sn, ParentPlatform parentPlatform, int size) {
String characterSet = parentPlatform.getCharacterSet();
StringBuffer catalogXml = new StringBuffer(600);
catalogXml.append("<?xml version=\"1.0\" encoding=\"" + characterSet +"\"?>\r\n");
@ -248,9 +253,10 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform {
catalogXml.append("<SN>" +sn + "</SN>\r\n");
catalogXml.append("<DeviceID>" + parentPlatform.getDeviceGBId() + "</DeviceID>\r\n");
catalogXml.append("<SumNum>" + size + "</SumNum>\r\n");
catalogXml.append("<DeviceList Num=\"1\">\r\n");
catalogXml.append("<DeviceList Num=\"" + channels.size() +"\">\r\n");
if (channels.size() > 0) {
for (DeviceChannel channel : channels) {
catalogXml.append("<Item>\r\n");
if (channel != null) {
catalogXml.append("<DeviceID>" + channel.getChannelId() + "</DeviceID>\r\n");
catalogXml.append("<Name>" + channel.getName() + "</Name>\r\n");
catalogXml.append("<Manufacturer>" + channel.getManufacture() + "</Manufacturer>\r\n");
@ -272,10 +278,10 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform {
catalogXml.append("<Info>\r\n");
catalogXml.append("<PTZType>" + channel.getPTZType() + "</PTZType>\r\n");
catalogXml.append("</Info>\r\n");
catalogXml.append("</Item>\r\n");
}
}
catalogXml.append("</Item>\r\n");
catalogXml.append("</DeviceList>\r\n");
catalogXml.append("</Response>\r\n");
return catalogXml.toString();
@ -286,15 +292,21 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform {
return;
}
try {
DeviceChannel deviceChannel = channels.get(index);
String catalogXml = getCatalogXml(deviceChannel, sn, parentPlatform, channels.size());
List<DeviceChannel> deviceChannels;
if (index + parentPlatform.getCatalogGroup() < channels.size() - 1) {
deviceChannels = channels.subList(index, index + parentPlatform.getCatalogGroup());
}else {
deviceChannels = channels.subList(index, channels.size());
}
String catalogXml = getCatalogXml(deviceChannels, sn, parentPlatform, channels.size());
// callid
CallIdHeader callIdHeader = parentPlatform.getTransport().equals("TCP") ? tcpSipProvider.getNewCallId()
: udpSipProvider.getNewCallId();
Request request = headerProviderPlarformProvider.createMessageRequest(parentPlatform, catalogXml, fromTag, callIdHeader);
transmitRequest(parentPlatform, request, null, eventResult -> {
int indexNext = index + 1;
int indexNext = index + parentPlatform.getCatalogGroup();
sendCatalogResponse(channels, parentPlatform, sn, fromTag, indexNext);
});
} catch (SipException | ParseException | InvalidArgumentException e) {
@ -432,13 +444,21 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform {
if (index >= deviceChannels.size()) {
return true;
}
List<DeviceChannel> channels;
if (index + parentPlatform.getCatalogGroup() < deviceChannels.size() - 1) {
channels = deviceChannels.subList(index, index + parentPlatform.getCatalogGroup());
}else {
channels = deviceChannels.subList(index, deviceChannels.size());
}
try {
Integer finalIndex = index;
String catalogXmlContent = getCatalogXmlContentForCatalogAddOrUpdate(parentPlatform, deviceChannels.get(index ), deviceChannels.size(), type, subscribeInfo);
String catalogXmlContent = getCatalogXmlContentForCatalogAddOrUpdate(parentPlatform, channels,
deviceChannels.size(), type, subscribeInfo);
sendNotify(parentPlatform, catalogXmlContent, subscribeInfo, eventResult -> {
logger.error("发送NOTIFY通知消息失败。错误:{} {}", eventResult.statusCode, eventResult.msg);
}, (eventResult -> {
sendNotifyForCatalogAddOrUpdate(type, parentPlatform, deviceChannels, subscribeInfo, finalIndex + 1);
sendNotifyForCatalogAddOrUpdate(type, parentPlatform, deviceChannels, subscribeInfo,
finalIndex + parentPlatform.getCatalogGroup());
}));
} catch (SipException | ParseException e) {
e.printStackTrace();
@ -500,11 +520,9 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform {
}
private String getCatalogXmlContentForCatalogAddOrUpdate(ParentPlatform parentPlatform, DeviceChannel channel, int sumNum, String type, SubscribeInfo subscribeInfo) {
private String getCatalogXmlContentForCatalogAddOrUpdate(ParentPlatform parentPlatform, List<DeviceChannel> channels, int sumNum, String type, SubscribeInfo subscribeInfo) {
StringBuffer catalogXml = new StringBuffer(600);
if (parentPlatform.getServerGBId().equals(channel.getParentId())) {
channel.setParentId(parentPlatform.getDeviceGBId());
}
String characterSet = parentPlatform.getCharacterSet();
catalogXml.append("<?xml version=\"1.0\" encoding=\"" + characterSet + "\"?>\r\n");
catalogXml.append("<Notify>\r\n");
@ -512,7 +530,12 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform {
catalogXml.append("<SN>" + (int) ((Math.random() * 9 + 1) * 100000) + "</SN>\r\n");
catalogXml.append("<DeviceID>" + parentPlatform.getDeviceGBId() + "</DeviceID>\r\n");
catalogXml.append("<SumNum>1</SumNum>\r\n");
catalogXml.append("<DeviceList Num=\"1\">\r\n");
catalogXml.append("<DeviceList Num=\"" + channels.size() + "\">\r\n");
if (channels.size() > 0) {
for (DeviceChannel channel : channels) {
if (parentPlatform.getServerGBId().equals(channel.getParentId())) {
channel.setParentId(parentPlatform.getDeviceGBId());
}
catalogXml.append("<Item>\r\n");
catalogXml.append("<DeviceID>" + channel.getChannelId() + "</DeviceID>\r\n");
catalogXml.append("<Name>" + channel.getName() + "</Name>\r\n");
@ -532,6 +555,8 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform {
catalogXml.append("<Event>" + type + "</Event>\r\n");
}
catalogXml.append("</Item>\r\n");
}
}
catalogXml.append("</DeviceList>\r\n");
catalogXml.append("</Notify>\r\n");
return catalogXml.toString();
@ -553,13 +578,20 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform {
if (index >= deviceChannels.size()) {
return true;
}
List<DeviceChannel> channels;
if (index + parentPlatform.getCatalogGroup() < deviceChannels.size() - 1) {
channels = deviceChannels.subList(index, index + parentPlatform.getCatalogGroup());
}else {
channels = deviceChannels.subList(index, deviceChannels.size());
}
try {
Integer finalIndex = index;
String catalogXmlContent = getCatalogXmlContentForCatalogOther(parentPlatform, deviceChannels.get(index), type);
String catalogXmlContent = getCatalogXmlContentForCatalogOther(parentPlatform, channels, type);
sendNotify(parentPlatform, catalogXmlContent, subscribeInfo, eventResult -> {
logger.error("发送NOTIFY通知消息失败。错误:{} {}", eventResult.statusCode, eventResult.msg);
}, (eventResult -> {
sendNotifyForCatalogOther(type, parentPlatform, deviceChannels, subscribeInfo, finalIndex + 1);
sendNotifyForCatalogOther(type, parentPlatform, deviceChannels, subscribeInfo,
finalIndex + parentPlatform.getCatalogGroup());
}));
} catch (SipException e) {
e.printStackTrace();
@ -574,10 +606,8 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform {
return true;
}
private String getCatalogXmlContentForCatalogOther(ParentPlatform parentPlatform, DeviceChannel channel, String type) {
if (parentPlatform.getServerGBId().equals(channel.getParentId())) {
channel.setParentId(parentPlatform.getDeviceGBId());
}
private String getCatalogXmlContentForCatalogOther(ParentPlatform parentPlatform, List<DeviceChannel> channels, String type) {
String characterSet = parentPlatform.getCharacterSet();
StringBuffer catalogXml = new StringBuffer(600);
catalogXml.append("<?xml version=\"1.0\" encoding=\"" + characterSet + "\"?>\r\n");
@ -586,11 +616,18 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform {
catalogXml.append("<SN>" + (int) ((Math.random() * 9 + 1) * 100000) + "</SN>\r\n");
catalogXml.append("<DeviceID>" + parentPlatform.getDeviceGBId() + "</DeviceID>\r\n");
catalogXml.append("<SumNum>1</SumNum>\r\n");
catalogXml.append("<DeviceList Num=\"1\">\r\n");
catalogXml.append("<DeviceList Num=\" " + channels.size() + " \">\r\n");
if (channels.size() > 0) {
for (DeviceChannel channel : channels) {
if (parentPlatform.getServerGBId().equals(channel.getParentId())) {
channel.setParentId(parentPlatform.getDeviceGBId());
}
catalogXml.append("<Item>\r\n");
catalogXml.append("<DeviceID>" + channel.getChannelId() + "</DeviceID>\r\n");
catalogXml.append("<Event>" + type + "</Event>\r\n");
catalogXml.append("</Item>\r\n");
}
}
catalogXml.append("</DeviceList>\r\n");
catalogXml.append("</Notify>\r\n");
return catalogXml.toString();

10
src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java

@ -41,10 +41,6 @@ public class DeviceServiceImpl implements IDeviceService {
if (device == null || device.getSubscribeCycleForCatalog() < 0) {
return false;
}
CatalogSubscribeTask task = (CatalogSubscribeTask)dynamicTask.get(device.getDeviceId() + "catalog");
if (task != null && task.getDialogState() != null && task.getDialogState().equals(DialogState.CONFIRMED)) { // 已存在不需要再次添加
return true;
}
logger.info("[添加目录订阅] 设备{}", device.getDeviceId());
// 添加目录订阅
CatalogSubscribeTask catalogSubscribeTask = new CatalogSubscribeTask(device, sipCommander);
@ -71,10 +67,6 @@ public class DeviceServiceImpl implements IDeviceService {
return false;
}
logger.info("[添加移动位置订阅] 设备{}", device.getDeviceId());
MobilePositionSubscribeTask task = (MobilePositionSubscribeTask)dynamicTask.get(device.getDeviceId() + "mobile_position");
if (task != null && task.getDialogState() != null && task.getDialogState().equals(DialogState.CONFIRMED)) { // 已存在不需要再次添加
return true;
}
// 添加目录订阅
MobilePositionSubscribeTask mobilePositionSubscribeTask = new MobilePositionSubscribeTask(device, sipCommander);
// 提前开始刷新订阅
@ -106,7 +98,7 @@ public class DeviceServiceImpl implements IDeviceService {
@Override
public void sync(Device device) {
if (catalogResponseMessageHandler.getChannelSyncProgress(device.getDeviceId()) != null) {
if (catalogResponseMessageHandler.isSyncRunning(device.getDeviceId())) {
logger.info("开启同步时发现同步已经存在");
return;
}

6
src/main/java/com/genersoft/iot/vmp/storager/dao/ParentPlatformMapper.java

@ -16,10 +16,10 @@ public interface ParentPlatformMapper {
@Insert("INSERT INTO parent_platform (enable, name, serverGBId, serverGBDomain, serverIP, serverPort, deviceGBId, deviceIp, " +
" devicePort, username, password, expires, keepTimeout, transport, characterSet, ptz, rtcp, " +
" status, shareAllLiveStream, startOfflinePush, catalogId) " +
" status, shareAllLiveStream, startOfflinePush, catalogId, administrativeDivision, catalogGroup) " +
" VALUES (${enable}, '${name}', '${serverGBId}', '${serverGBDomain}', '${serverIP}', ${serverPort}, '${deviceGBId}', '${deviceIp}', " +
" '${devicePort}', '${username}', '${password}', '${expires}', '${keepTimeout}', '${transport}', '${characterSet}', ${ptz}, ${rtcp}, " +
" ${status}, ${shareAllLiveStream}, ${startOfflinePush}, #{catalogId})")
" ${status}, ${shareAllLiveStream}, ${startOfflinePush}, #{catalogId}, #{administrativeDivision}, #{catalogGroup})")
int addParentPlatform(ParentPlatform parentPlatform);
@Update("UPDATE parent_platform " +
@ -43,6 +43,8 @@ public interface ParentPlatformMapper {
"status=#{status}, " +
"shareAllLiveStream=#{shareAllLiveStream}, " +
"startOfflinePush=${startOfflinePush}, " +
"catalogGroup=#{catalogGroup}, " +
"administrativeDivision=#{administrativeDivision}, " +
"catalogId=#{catalogId} " +
"WHERE id=#{id}")
int updateParentPlatform(ParentPlatform parentPlatform);

7
src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStorageImpl.java

@ -520,6 +520,12 @@ public class VideoManagerStorageImpl implements IVideoManagerStorage {
@Override
public boolean updateParentPlatform(ParentPlatform parentPlatform) {
int result = 0;
if (parentPlatform.getCatalogGroup() == 0) {
parentPlatform.setCatalogGroup(1);
}
if (parentPlatform.getAdministrativeDivision() == null) {
parentPlatform.setAdministrativeDivision(parentPlatform.getDeviceGBId().substring(0,6));
}
ParentPlatformCatch parentPlatformCatch = redisCatchStorage.queryPlatformCatchInfo(parentPlatform.getServerGBId()); // .getDeviceGBId());
if (parentPlatform.getId() == null ) {
if (parentPlatform.getCatalogId() == null) {
@ -539,6 +545,7 @@ public class VideoManagerStorageImpl implements IVideoManagerStorage {
parentPlatformCatch.setId(parentPlatform.getServerGBId());
redisCatchStorage.delPlatformCatchInfo(parentPlatById.getServerGBId());
}
result = platformMapper.updateParentPlatform(parentPlatform);
}
// 更新缓存

15
web_src/src/components/dialog/platformEdit.vue

@ -63,6 +63,18 @@
<el-option label="TCP" value="TCP"></el-option>
</el-select>
</el-form-item>
<el-form-item label="目录分组" prop="catalogGroup">
<el-select
v-model="platform.catalogGroup"
style="width: 100%"
placeholder="请选择目录分组"
>
<el-option label="1" value="1"></el-option>
<el-option label="2" value="2"></el-option>
<el-option label="4" value="4"></el-option>
<el-option label="8" value="8"></el-option>
</el-select>
</el-form-item>
<el-form-item label="字符集" prop="characterSet">
<el-select
v-model="platform.characterSet"
@ -140,6 +152,7 @@ export default {
characterSet: "GB2312",
shareAllLiveStream: false,
startOfflinePush: false,
catalogGroup: 1,
},
rules: {
name: [{ required: true, message: "请输入平台名称", trigger: "blur" }],
@ -202,6 +215,7 @@ export default {
this.platform.shareAllLiveStream = platform.shareAllLiveStream;
this.platform.catalogId = platform.catalogId;
this.platform.startOfflinePush = platform.startOfflinePush;
this.platform.catalogGroup = platform.catalogGroup;
this.onSubmit_text = "保存";
this.saveUrl = "/api/platform/save";
}
@ -270,6 +284,7 @@ export default {
characterSet: "GB2312",
shareAllLiveStream: false,
startOfflinePush: false,
catalogGroup: 1,
}
},
deviceGBIdExit: async function (deviceGbId) {

Loading…
Cancel
Save