Kaynağa Gözat

Merge remote-tracking branch 'origin/feature-去掉重复消费' into release

dengsixing 4 ay önce
ebeveyn
işleme
9257a17f14

+ 42 - 1
src/main/java/com/fdkankan/modeling/ModelingApplication.java

@@ -1,21 +1,62 @@
 package com.fdkankan.modeling;
 
+import cn.hutool.core.collection.CollUtil;
+import cn.hutool.core.io.FileUtil;
+import cn.hutool.core.util.StrUtil;
+import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
+import com.fdkankan.model.utils.CreateObjUtil;
+import com.fdkankan.modeling.constants.SysConstants;
+import com.fdkankan.modeling.entity.BuildLog;
+import com.fdkankan.modeling.service.IBuildLogService;
+import lombok.extern.slf4j.Slf4j;
 import org.mybatis.spring.annotation.MapperScan;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.CommandLineRunner;
 import org.springframework.boot.SpringApplication;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
 import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
 import org.springframework.context.annotation.ComponentScan;
 import org.springframework.scheduling.annotation.EnableScheduling;
 
+import java.util.List;
+
 @SpringBootApplication
 @EnableScheduling
 @ComponentScan(basePackages = {"com.fdkankan.*"})
 @MapperScan("com.fdkankan.**.mapper")
 @EnableDiscoveryClient
-public class ModelingApplication {
+@Slf4j
+public class ModelingApplication implements CommandLineRunner {
+
+	@Autowired
+	private IBuildLogService buildLogService;
 
 	public static void main(String[] args) {
+
 		SpringApplication.run(ModelingApplication.class, args);
+
+		//进入计算之前,把上次未关闭的算法杀掉(主要是重启的问题)
+		CreateObjUtil.killMainLoader();
+
 	}
 
+	@Override
+	public void run(String... args) throws Exception {
+
+//		String lockPath = null;
+//		try {
+//			if(StrUtil.isNotEmpty(SysConstants.hostName)){
+//				List<BuildLog> list = buildLogService.list(new LambdaQueryWrapper<BuildLog>().eq(BuildLog::getHostName, SysConstants.hostName).eq(BuildLog::getStatus, 0).orderByDesc(BuildLog::getId));
+//				if(CollUtil.isNotEmpty(list)){
+//					BuildLog buildLog = list.get(0);
+//					lockPath = buildLog.getDataSource() + "/.lockdirectory";
+//					if(FileUtil.exist(lockPath)){
+//						FileUtil.del(lockPath);
+//					}
+//				}
+//			}
+//		}catch (Exception e){
+//			log.warn("删除上一计算中场景文件锁失败,logPath:{}", lockPath, e);
+//		}
+	}
 }

+ 37 - 25
src/main/java/com/fdkankan/modeling/receiver/RabbitMqListener.java

@@ -1,11 +1,13 @@
 package com.fdkankan.modeling.receiver;
 
 import cn.hutool.core.collection.CollUtil;
+import cn.hutool.core.io.FileUtil;
 import cn.hutool.core.util.StrUtil;
 import com.alibaba.fastjson.JSONObject;
 import com.fdkankan.common.constant.ModelingBuildStatus;
 import com.fdkankan.common.util.FileUtils;
 import com.fdkankan.model.utils.ComputerUtil;
+import com.fdkankan.model.utils.CreateObjUtil;
 import com.fdkankan.modeling.bean.BuildSceneResultBean;
 import com.fdkankan.modeling.constants.RedisKey;
 import com.fdkankan.modeling.constants.SysConstants;
@@ -134,6 +136,12 @@ public class RabbitMqListener {
             return;
         }
 
+        //进入计算之前,判断算法文件锁是否存在,如果存在,则删除
+        String lockPath = buildSceneMessage.getPath() + "/.lockdirectory";
+        if(FileUtil.exist(lockPath)){
+            FileUtil.del(lockPath);
+        }
+
         BuildLog buildLog = new BuildLog();
         log.info("场景计算开始,队列名:{},id:{},deliveryTag:{},消息体:{}", queueNameService.getQueueName(), messageId,deliveryTag,msg);
         buildLog.setMessageId(messageId);
@@ -207,6 +215,9 @@ public class RabbitMqListener {
             } else {
                 status = ModelingBuildStatus.FAILED;
             }
+        }finally {
+            //杀掉算法进程,否则有可能导致上一个超时的算法进程没终止,下一个场景算法进程启动,一台机器有两个算法进程在跑
+            CreateObjUtil.killMainLoader();
         }
         buildLog.setStatus(status.code());
         buildSceneResult.setCameraType(message.getCameraType());
@@ -367,32 +378,33 @@ public class RabbitMqListener {
         buildLog.setUpdateTime(new Date());
         buildLog.setBuildType(0);
         buildLog.setStatus(0);
-        // 设置缓存锁
-        String key = SysConstants.SCENE_BUILDING + buildLog.getSceneNum();
-        // 如果是强制重新计算,则删除key。
-        if (StringUtils.equals(message.getRebuild(), "1")) {
-            redisUtil.del(key);
-        }
-
-        // 获取缓存锁,防止重复消费
-        Long building = redisUtil.incr(key, 1);
         buildLogService.save(buildLog);
-
-        if(!ObjectUtils.isEmpty(buildLog.getMessageId())){
-            // 设置消息id幂等性,防止消息重复消费
-            boolean lock = redisLockUtil.lock(SysConstants.SCENE_MESSAGE_BUILDING + buildLog.getMessageId(), SysConstants.modelTimeOut * 3600);
-            if (!lock) {
-                log.error("服务:{},消息重复消费:{}", SysConstants.hostName, buildLog.getMessageId());
-                throw new BuildException(ModelingBuildStatus.REPEAT);
-            }
-        }
-
-        if (building.compareTo(1L) > 0) {
-            log.error("服务:{},重复构建:{},构建次数:{}", SysConstants.hostName, buildLog.getSceneNum(),building);
-            throw new BuildException(ModelingBuildStatus.REPEAT);
-        } else {
-            redisUtil.expire(key, Integer.toUnsignedLong(SysConstants.modelTimeOut * 3600));
-        }
+//        // 设置缓存锁
+//        String key = SysConstants.SCENE_BUILDING + buildLog.getSceneNum();
+//        // 如果是强制重新计算,则删除key。
+//        if (StringUtils.equals(message.getRebuild(), "1")) {
+//            redisUtil.del(key);
+//        }
+//
+//        // 获取缓存锁,防止重复消费
+//        Long building = redisUtil.incr(key, 1);
+//        buildLogService.save(buildLog);
+//
+//        if(!ObjectUtils.isEmpty(buildLog.getMessageId())){
+//            // 设置消息id幂等性,防止消息重复消费
+//            boolean lock = redisLockUtil.lock(SysConstants.SCENE_MESSAGE_BUILDING + buildLog.getMessageId(), SysConstants.modelTimeOut * 3600);
+//            if (!lock) {
+//                log.error("服务:{},消息重复消费:{}", SysConstants.hostName, buildLog.getMessageId());
+//                throw new BuildException(ModelingBuildStatus.REPEAT);
+//            }
+//        }
+//
+//        if (building.compareTo(1L) > 0) {
+//            log.error("服务:{},重复构建:{},构建次数:{}", SysConstants.hostName, buildLog.getSceneNum(),building);
+//            throw new BuildException(ModelingBuildStatus.REPEAT);
+//        } else {
+//            redisUtil.expire(key, Integer.toUnsignedLong(SysConstants.modelTimeOut * 3600));
+//        }
     }
 
     /**