|
@@ -5,14 +5,11 @@ import cn.hutool.core.util.StrUtil;
|
|
|
import com.alibaba.fastjson.JSONObject;
|
|
|
import com.fdkankan.common.constant.ConstantFilePath;
|
|
|
import com.fdkankan.common.constant.ModelingBuildStatus;
|
|
|
-import com.fdkankan.common.constant.ServerCode;
|
|
|
import com.fdkankan.common.constant.UploadFilePath;
|
|
|
-import com.fdkankan.common.exception.BusinessException;
|
|
|
import com.fdkankan.common.util.ComputerUtil;
|
|
|
import com.fdkankan.common.util.CreateObjUtil;
|
|
|
import com.fdkankan.common.util.FileUtils;
|
|
|
import com.fdkankan.common.util.SceneUtil;
|
|
|
-import com.fdkankan.dingtalk.DingTalkSendUtils;
|
|
|
import com.fdkankan.fyun.oss.UploadToOssUtil;
|
|
|
import com.fdkankan.modeling.bean.BuildSceneResultBean;
|
|
|
import com.fdkankan.modeling.constants.SysConstants;
|
|
@@ -26,26 +23,28 @@ import com.fdkankan.rabbitmq.util.RabbitMqProducer;
|
|
|
import com.fdkankan.redis.constant.RedisKey;
|
|
|
import com.fdkankan.redis.util.RedisUtil;
|
|
|
import com.rabbitmq.client.Channel;
|
|
|
-import java.io.File;
|
|
|
-import java.nio.charset.StandardCharsets;
|
|
|
-import java.util.Calendar;
|
|
|
-import java.util.Map;
|
|
|
-import java.util.Objects;
|
|
|
-import java.util.concurrent.Future;
|
|
|
-import java.util.concurrent.TimeUnit;
|
|
|
-import java.util.concurrent.TimeoutException;
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
import org.apache.commons.lang3.StringUtils;
|
|
|
import org.apache.commons.lang3.time.StopWatch;
|
|
|
import org.springframework.amqp.core.Message;
|
|
|
import org.springframework.amqp.rabbit.annotation.Queue;
|
|
|
import org.springframework.amqp.rabbit.annotation.RabbitListener;
|
|
|
+import org.springframework.amqp.support.AmqpHeaders;
|
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
|
import org.springframework.beans.factory.annotation.Value;
|
|
|
+import org.springframework.messaging.handler.annotation.Header;
|
|
|
import org.springframework.stereotype.Component;
|
|
|
import org.springframework.util.ObjectUtils;
|
|
|
import org.springframework.web.client.RestTemplate;
|
|
|
|
|
|
+import java.io.File;
|
|
|
+import java.nio.charset.StandardCharsets;
|
|
|
+import java.util.Map;
|
|
|
+import java.util.Objects;
|
|
|
+import java.util.concurrent.Future;
|
|
|
+import java.util.concurrent.TimeUnit;
|
|
|
+import java.util.concurrent.TimeoutException;
|
|
|
+
|
|
|
/**
|
|
|
* <p>
|
|
|
* TODO
|
|
@@ -66,16 +65,13 @@ public class RabbitMqListener {
|
|
|
private String queueModelingPost;
|
|
|
|
|
|
@Autowired
|
|
|
- RedisUtil redisUtil;
|
|
|
+ private RedisUtil redisUtil;
|
|
|
|
|
|
private RestTemplate restTemplate = new RestTemplate();
|
|
|
|
|
|
@Value("${4dkk.laserService.host}")
|
|
|
private String laserHost;
|
|
|
|
|
|
- @Value("${upload.type}")
|
|
|
- private String ossType;
|
|
|
-
|
|
|
@Value("${oss.prefix.ali}")
|
|
|
private String prefixAli;
|
|
|
|
|
@@ -83,13 +79,7 @@ public class RabbitMqListener {
|
|
|
private UploadToOssUtil uploadToOssUtil;
|
|
|
|
|
|
@Autowired
|
|
|
- RabbitMqProducer rabbitMqProducer;
|
|
|
-
|
|
|
- @Value("${spring.profiles.active}")
|
|
|
- private String environment;
|
|
|
-
|
|
|
- @Autowired
|
|
|
- private DingTalkSendUtils dingTalkSendUtils;
|
|
|
+ private RabbitMqProducer rabbitMqProducer;
|
|
|
|
|
|
@Autowired
|
|
|
private IBuildLogService buildLogService;
|
|
@@ -102,47 +92,49 @@ public class RabbitMqListener {
|
|
|
*/
|
|
|
@RabbitListener(
|
|
|
queuesToDeclare = @Queue("${queue.modeling.modeling-call}"),
|
|
|
- concurrency = "${maxThread.modeling.modeling-call}"
|
|
|
+ concurrency = "${maxThread.modeling.modeling-call}",
|
|
|
+ priority = "${mq.consumerPriority}"
|
|
|
)
|
|
|
- public void buildSceneHandler(Channel channel, Message message) throws Exception {
|
|
|
- Object correlation = message.getMessageProperties().getHeader("spring_returned_message_correlation");
|
|
|
- String correlationId = (String) correlation;
|
|
|
+ public void buildSceneHandler(Channel channel, Message message, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws Exception {
|
|
|
+ SysConstants.SYSTEM_BUILDING = true;
|
|
|
+ if (SysConstants.SYSTEM_OFFING) {
|
|
|
+ SysConstants.SYSTEM_BUILDING = false;
|
|
|
+ channel.basicNack(deliveryTag, true, true);
|
|
|
+ log.error("服务实例:{} 正在关闭,退出构建!", SysConstants.hostName);
|
|
|
+ return;
|
|
|
+ }
|
|
|
if (ObjectUtils.isEmpty(message.getBody())) {
|
|
|
SysConstants.SYSTEM_BUILDING = false;
|
|
|
- log.error("消息体为空,退出构建,当前服务器:{}", SysConstants.hostName);
|
|
|
+ log.error("消息内容为空,退出构建,当前服务器id:{}", SysConstants.hostName);
|
|
|
return;
|
|
|
}
|
|
|
String msg = new String(message.getBody(), StandardCharsets.UTF_8);
|
|
|
BuildSceneCallMessage buildSceneMessage = JSONObject.parseObject(msg, BuildSceneCallMessage.class);
|
|
|
- log.info("场景计算开始,队列名:{},id:{},消息体:{}", queueModelingCall, correlationId,msg);
|
|
|
+
|
|
|
+ Object correlation = message.getMessageProperties().getHeader("spring_returned_message_correlation");
|
|
|
+ String correlationId = (String) correlation;
|
|
|
if(Objects.isNull(buildSceneMessage) || StrUtil.isBlank(buildSceneMessage.getSceneNum())){
|
|
|
log.error("消息内容错误,id:{},消息体:{}", correlationId, msg);
|
|
|
return;
|
|
|
}
|
|
|
-
|
|
|
- Thread.sleep(2000L);
|
|
|
+ log.info("场景计算开始,队列名:{},id:{},消息体:{}", queueModelingCall, correlationId,msg);
|
|
|
this.process(buildSceneMessage);
|
|
|
-
|
|
|
- log.info("场景计算开始,队列名:{},id:{}", queueModelingCall, correlationId);
|
|
|
+ log.info("场景计算结束,队列名:{},id:{}", queueModelingCall, correlationId);
|
|
|
|
|
|
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
|
|
|
+
|
|
|
+ //计算完毕,将当前系统构建状态改为false
|
|
|
+ SysConstants.SYSTEM_BUILDING = false;
|
|
|
}
|
|
|
|
|
|
public void process(BuildSceneCallMessage message) {
|
|
|
-
|
|
|
- SysConstants.SYSTEM_BUILDING = true;
|
|
|
- if (SysConstants.SYSTEM_OFFING) {
|
|
|
- SysConstants.SYSTEM_BUILDING = false;
|
|
|
- log.error("服务实例:{} 正在关闭,退出构建!", SysConstants.hostName);
|
|
|
- throw new BusinessException(ServerCode.SERVER_CLOSING);
|
|
|
- }
|
|
|
//开始计时
|
|
|
StopWatch watch = new StopWatch();
|
|
|
watch.start();
|
|
|
|
|
|
BuildLog buildLog = new BuildLog();
|
|
|
final BuildSceneResultBean buildSceneResult = new BuildSceneResultBean();
|
|
|
- Future<ModelingBuildStatus> future = null;
|
|
|
+ ModelingBuildStatus status = null;
|
|
|
try {
|
|
|
|
|
|
//休眠2秒等待准备数据
|
|
@@ -151,29 +143,30 @@ public class RabbitMqListener {
|
|
|
//计算前准备
|
|
|
preBuild(message, buildLog);
|
|
|
|
|
|
- future = SysConstants.executorService.submit(()->{
|
|
|
- return buildScene(message, buildSceneResult);
|
|
|
- });
|
|
|
- future.get(SysConstants.modelTimeOut, TimeUnit.HOURS);
|
|
|
+ Future<ModelingBuildStatus> future = SysConstants.executorService.submit(()-> buildScene(message, buildSceneResult));
|
|
|
+ status = future.get(SysConstants.modelTimeOut, TimeUnit.HOURS);
|
|
|
|
|
|
//结束计时
|
|
|
watch.stop();
|
|
|
buildLog.setDuration(watch.getTime(TimeUnit.SECONDS));
|
|
|
} catch (TimeoutException ex) {
|
|
|
log.error("服务实例:{} 构建异常:",SysConstants.hostName,ex);
|
|
|
- buildSceneResult.setBuildStatus(ModelingBuildStatus.OVERTIME);
|
|
|
+ status = ModelingBuildStatus.OVERTIME;
|
|
|
} catch (BuildException e){
|
|
|
- buildSceneResult.setBuildStatus(e.getBuildStatus());
|
|
|
+ status = e.getBuildStatus();
|
|
|
} catch(Exception e) {
|
|
|
log.error("服务实例:{} 构建异常:", SysConstants.hostName, e);
|
|
|
if (e.getCause() instanceof BuildException) {
|
|
|
- buildSceneResult.setBuildStatus(((BuildException) e.getCause()).getBuildStatus());
|
|
|
+ status = ((BuildException) e.getCause()).getBuildStatus();
|
|
|
+ buildLog.setBuildStatus(((BuildException) e.getCause()).getBuildStatus().code());
|
|
|
} else {
|
|
|
- buildSceneResult.setBuildStatus(ModelingBuildStatus.FAILED);
|
|
|
+ status = ModelingBuildStatus.FAILED;
|
|
|
//未知异常,记录异常信息
|
|
|
buildLog.setReason(ExceptionUtil.stacktraceToString(e, 3000));
|
|
|
}
|
|
|
}
|
|
|
+ buildLog.setBuildStatus(status.code());
|
|
|
+ buildSceneResult.setBuildStatus(status);
|
|
|
//计算后处理
|
|
|
afterBuild(message, buildSceneResult, buildLog);
|
|
|
|
|
@@ -299,7 +292,6 @@ public class RabbitMqListener {
|
|
|
this.jgSynch(message);
|
|
|
}
|
|
|
|
|
|
- buildSceneResult.setBuildStatus(ModelingBuildStatus.SUCCESS);
|
|
|
buildSceneResult.setFileId(fileId);
|
|
|
buildSceneResult.setPayStatus(payStatus);
|
|
|
buildSceneResult.setUploadMap(map);
|
|
@@ -315,11 +307,10 @@ public class RabbitMqListener {
|
|
|
}
|
|
|
|
|
|
private void afterBuild(BuildSceneCallMessage message, BuildSceneResultBean buildSceneResult, BuildLog buildLog){
|
|
|
+ ModelingBuildStatus buildStatus = null;
|
|
|
try {
|
|
|
- ModelingBuildStatus buildStatus = buildSceneResult.getBuildStatus();
|
|
|
- String buildLogPath = String.format(UploadFilePath.BUILD_LOG_PATH, message.getSceneNum());
|
|
|
+ buildStatus = buildSceneResult.getBuildStatus();
|
|
|
log.info("服务{} 计算结束:{},计算状态:{}", SysConstants.hostName, buildLog.getNum(),buildStatus.message());
|
|
|
-
|
|
|
if(!buildStatus.equals(ModelingBuildStatus.REPEAT)){
|
|
|
//释放锁
|
|
|
redisUtil.del(String.format(RedisKey.SCENE_BUILDING, message.getSceneNum()));
|
|
@@ -329,21 +320,22 @@ public class RabbitMqListener {
|
|
|
message.getSceneNum(),message.getThumb(),message.getPayStatus(), message.getDataSource());
|
|
|
|
|
|
//如果是重复计算,没有走到计算逻辑,不需要上传日志文件
|
|
|
+ String buildLogPath = String.format(UploadFilePath.BUILD_LOG_PATH, message.getSceneNum());
|
|
|
buildLog.setLogDetailPath(prefixAli+ buildLogPath + "console.log");
|
|
|
}
|
|
|
- buildLog.setBuildStatus(buildStatus.code());
|
|
|
buildLog.setCameraType(Integer.parseInt(buildSceneResult.getCameraType()));
|
|
|
buildLogService.updateById(buildLog);
|
|
|
-
|
|
|
} catch (Exception e) {
|
|
|
log.error("计算后业务处理出错!", e);
|
|
|
}finally {
|
|
|
//发送计算结果mq
|
|
|
buildSceneResult.setDuration(buildLog.getDuration());
|
|
|
this.sendCallResult(message, buildSceneResult);
|
|
|
-
|
|
|
- //计算完毕,将当前系统构建状态改为false
|
|
|
- SysConstants.SYSTEM_BUILDING = false;
|
|
|
+ //如果是计算失败,需要发送钉钉
|
|
|
+ if(!ModelingBuildStatus.SUCCESS.equals(buildStatus)){
|
|
|
+ // TODO: 2022/1/25 测试阶段,先注释掉推送
|
|
|
+ this.sendDingTalkMq(buildStatus.message(), message);
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -354,7 +346,18 @@ public class RabbitMqListener {
|
|
|
buildLog.setQueueName(queueModelingCall);
|
|
|
buildLog.setNum(message.getSceneNum());
|
|
|
buildLog.setDataSource(message.getDataSource());
|
|
|
+
|
|
|
+ // 设置缓存锁
|
|
|
+ String key = SysConstants.SCENE_BUILDING + buildLog.getNum();
|
|
|
+ // 获取缓存锁,防止重复消费
|
|
|
+ Long building = redisUtil.incr(key, 1);
|
|
|
buildLogService.save(buildLog);
|
|
|
+ if (building.compareTo(1L) != 0) {
|
|
|
+ log.error("服务:{},重复构建:{}",SysConstants.hostName,buildLog.getNum());
|
|
|
+ throw new BuildException(ModelingBuildStatus.REPEAT);
|
|
|
+ }else{
|
|
|
+ redisUtil.expire(key,SysConstants.modelTimeOut);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -368,10 +371,9 @@ public class RabbitMqListener {
|
|
|
if(buildStatus.equals(ModelingBuildStatus.REPEAT)){
|
|
|
return;
|
|
|
}
|
|
|
- boolean buildSuccess = buildStatus.equals(ModelingBuildStatus.SUCCESS) ? true : false;
|
|
|
rabbitMqProducer.sendByWorkQueue(queueModelingPost,
|
|
|
BuildSceneResultMqMessage.builder()
|
|
|
- .buildSuccess(buildSuccess)
|
|
|
+ .buildSuccess(buildStatus.equals(ModelingBuildStatus.SUCCESS))
|
|
|
.cameraType(buildSceneResult.getCameraType())
|
|
|
.computeTime(buildSceneResult.getDuration())
|
|
|
.fileId(buildSceneResult.getFileId())
|
|
@@ -384,15 +386,9 @@ public class RabbitMqListener {
|
|
|
.prefix(buildSceneResult.getPrefix())
|
|
|
.videoVersion(buildSceneResult.getVideoVersion())
|
|
|
.build());
|
|
|
- //如果是计算失败,需要发送钉钉
|
|
|
- if(!buildSuccess){
|
|
|
- // TODO: 2022/1/25 测试阶段,先注释掉推送
|
|
|
-// this.sendDingTalkMq(buildStatus.message(), message);
|
|
|
- }
|
|
|
}
|
|
|
|
|
|
- private void upoadLog(Integer sceneStatus, String webSite,
|
|
|
- String num, String thumb, Integer payStatus, String dataSource){
|
|
|
+ private void upoadLog(Integer sceneStatus, String webSite, String num, String thumb, Integer payStatus, String dataSource){
|
|
|
try{
|
|
|
JSONObject statusJson = new JSONObject();
|
|
|
//临时将-2改成1,app还没完全更新
|
|
@@ -413,16 +409,13 @@ public class RabbitMqListener {
|
|
|
String buildLogPath = String.format(UploadFilePath.BUILD_LOG_PATH, num);
|
|
|
|
|
|
//todo 待删除old_path
|
|
|
- uploadToOssUtil.upload(
|
|
|
- ConstantFilePath.SCENE_PATH + "data/data" + num + File.separator + "status.json",
|
|
|
+ uploadToOssUtil.upload(ConstantFilePath.SCENE_PATH + "data/data" + num + File.separator + "status.json",
|
|
|
"data/data" + num + File.separator + "status.json");
|
|
|
|
|
|
- uploadToOssUtil.upload(
|
|
|
- ConstantFilePath.SCENE_PATH + "data/data" + num + File.separator + "status.json",
|
|
|
+ uploadToOssUtil.upload(ConstantFilePath.SCENE_PATH + "data/data" + num + File.separator + "status.json",
|
|
|
dataPath + "status.json");
|
|
|
|
|
|
- uploadToOssUtil.upload(dataSource + File.separator + "console.log",
|
|
|
- buildLogPath + "console.log");
|
|
|
+ uploadToOssUtil.upload(dataSource + File.separator + "console.log", buildLogPath + "console.log");
|
|
|
|
|
|
}catch (Exception e){
|
|
|
log.error("上传日志失败!", e);
|