lyhzzz 2 vuotta sitten
vanhempi
commit
cff2d4fcf8

+ 1 - 0
src/main/java/com/fdkankan/fusion/common/util/RedisKeyUtil.java

@@ -4,5 +4,6 @@ public class RedisKeyUtil {
 
     public static String mergerVideoKey = "fusion:merger:video:";
     public static String modelUpload = "fusion:model:upload:";
+    public static String modelCancelUpload = "fusion:model:cancel:upload:";
     public static String loginToken = "fusion:token:";
 }

+ 6 - 5
src/main/java/com/fdkankan/fusion/common/util/ShellUtil.java

@@ -51,14 +51,15 @@ public class ShellUtil {
     }
 
     private static void threadRun(Process process) {
-        new Thread() {
+        ThreadPoolConfig threadPoolConfig = new ThreadPoolConfig();
+        threadPoolConfig.threadPoolExecutor().submit(new Runnable() {
             @Override
             public void run() {
                 BufferedReader in = new BufferedReader(new InputStreamReader(process.getInputStream()));
                 String line = null;
                 try {
                     while((line = in.readLine()) != null) {
-                       log.debug("output: " + line);
+                        log.debug("output: " + line);
                     }
                 }
                 catch (IOException e) {
@@ -73,8 +74,8 @@ public class ShellUtil {
                     }
                 }
             }
-        }.start();
-        new Thread() {
+        });
+        threadPoolConfig.threadPoolExecutor().submit(new Runnable() {
             @Override
             public void run() {
                 BufferedReader err = new BufferedReader(new InputStreamReader(process.getErrorStream()));
@@ -96,7 +97,7 @@ public class ShellUtil {
                     }
                 }
             }
-        }.start();
+        });
     }
 
     private static void closeStream(Closeable stream) {

+ 79 - 0
src/main/java/com/fdkankan/fusion/common/util/ThreadPoolConfig.java

@@ -0,0 +1,79 @@
+package com.fdkankan.fusion.common.util;
+
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+
+import java.util.concurrent.*;
+
+@Slf4j
+@Configuration
+public class ThreadPoolConfig {
+    @Bean
+    public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
+        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
+        int i = Runtime.getRuntime().availableProcessors();
+        //核心线程数目
+        executor.setCorePoolSize(i * 2);
+        //指定最大线程数
+        executor.setMaxPoolSize(i * 2);
+        //队列中最大的数目
+        executor.setQueueCapacity(i * 2 * 10);
+        //线程名称前缀
+        executor.setThreadNamePrefix("ThreadPoolTaskExecutor-");
+        //rejection-policy:当pool已经达到max size的时候,如何处理新任务
+        //CALLER_RUNS:不在新线程中执行任务,而是由调用者所在的线程来执行
+        //对拒绝task的处理策略
+        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
+        //当调度器shutdown被调用时等待当前被调度的任务完成
+        executor.setWaitForTasksToCompleteOnShutdown(true);
+        //线程空闲后的最大存活时间
+        executor.setKeepAliveSeconds(60);
+        //加载
+        executor.initialize();
+        log.info("初始化线程池成功");
+        return executor;
+    }
+
+    @Bean
+    public ThreadPoolExecutor threadPoolExecutor() {
+        //获取cpu核心数
+        int i = Runtime.getRuntime().availableProcessors();
+        //核心线程数
+        int corePoolSize = i * 2;
+        //最大线程数
+        int maximumPoolSize = i * 2;
+        //线程无引用存活时间
+        long keepAliveTime = 60;
+        //时间单位
+        TimeUnit unit = TimeUnit.SECONDS;
+        //任务队列,接收一个整型的参数,这个整型参数指的是队列的长度,
+        //ArrayBlockingQueue(int,boolean),boolean类型的参数是作为可重入锁的参数进行初始化,默认false,另外初始化了notEmpty、notFull两个信号量。
+        BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue(i * 2 * 10);
+        //1. 同步阻塞队列 (put,take),直接提交。直接提交策略表示线程池不对任务进行缓存。新进任务直接提交给线程池,当线程池中没有空闲线程时,创建一个新的线程处理此任务。
+        // 这种策略需要线程池具有无限增长的可能性。实现为:SynchronousQueue
+        //2. 有界队列。当线程池中线程达到corePoolSize时,新进任务被放在队列里排队等待处理。有界队列(如ArrayBlockingQueue)有助于防止资源耗尽,
+        // 但是可能较难调整和控制。队列大小和最大池大小可能需要相互折衷:使用大型队列和小型池可以最大限度地降低 CPU 使用率、操作系统资源和上下文切换开销,
+        // 但是可能导致人工降低吞吐量。如果任务频繁阻塞(例如,如果它们是 I/O 边界),则系统可能为超过您许可的更多线程安排时间。使用小型队列通常要求较大的池大小,
+        // CPU 使用率较高,但是可能遇到不可接受的调度开销,这样也会降低吞吐量。
+        //3. 无界队列。使用无界队列(例如,不具有预定义容量的 LinkedBlockingQueue)将导致在所有 corePoolSize 线程都忙时新任务在队列中等待。
+        // 这样,创建的线程就不会超过 corePoolSize。(因此,maximumPoolSize 的值也就无效了。)当每个任务完全独立于其他任务,即任务执行互不影响时,
+        // 适合于使用无界队列;例如,在 Web 页服务器中。这种排队可用于处理瞬态突发请求,当命令以超过队列所能处理的平均数连续到达时,此策略允许无界线程具有增长的可能性。
+
+        //线程工厂
+        //defaultThreadFactory()
+        //返回用于创建新线程的默认线程工厂。
+        //privilegedThreadFactory()
+        //返回一个用于创建与当前线程具有相同权限的新线程的线程工厂。
+        ThreadFactory threadFactory =Executors.defaultThreadFactory();
+        //拒绝执行处理器
+        RejectedExecutionHandler handler = new ThreadPoolExecutor.CallerRunsPolicy();
+        //创建线程池
+        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
+        return threadPoolExecutor;
+    }
+
+}
+
+

+ 6 - 3
src/main/java/com/fdkankan/fusion/controller/TestController.java

@@ -2,6 +2,7 @@ package com.fdkankan.fusion.controller;
 
 import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
 import com.fdkankan.fusion.common.ResultData;
+import com.fdkankan.fusion.common.util.ThreadPoolConfig;
 import com.fdkankan.fusion.entity.FusionMeter;
 import com.fdkankan.fusion.service.IFusionMeterService;
 import com.fdkankan.redis.util.RedisUtil;
@@ -9,6 +10,8 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.web.bind.annotation.RequestMapping;
 import org.springframework.web.bind.annotation.RestController;
 
+import java.util.concurrent.ThreadPoolExecutor;
+
 @RestController
 public class TestController {
 
@@ -16,12 +19,12 @@ public class TestController {
     RedisUtil redisUtil;
     @Autowired
     IFusionMeterService meterService;
+    @Autowired
+    ThreadPoolConfig threadPoolConfig;
 
     @RequestMapping("/test")
     public ResultData test(){
-        LambdaQueryWrapper<FusionMeter> wrapper = new LambdaQueryWrapper<>();
-        wrapper.like(FusionMeter::getPosition,"%\"modelId\":\""+35+"\"%");
-        return ResultData.ok(meterService.list(wrapper));
+        return ResultData.ok();
     }
 
 

+ 73 - 66
src/main/java/com/fdkankan/fusion/service/impl/CaseVideoServiceImpl.java

@@ -30,6 +30,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.LinkedHashSet;
 import java.util.List;
+import java.util.concurrent.ThreadPoolExecutor;
 
 /**
  * <p>
@@ -55,6 +56,8 @@ public class CaseVideoServiceImpl extends ServiceImpl<ICaseVideoMapper, CaseVide
     private String queryPath;
     @Autowired
     RedisUtil redisUtil;
+    @Autowired
+    ThreadPoolExecutor threadPoolExecutor;
 
     @Override
     public List<CaseVideo> getAllList(Integer folderId) {
@@ -87,74 +90,78 @@ public class CaseVideoServiceImpl extends ServiceImpl<ICaseVideoMapper, CaseVide
     }
 
     private void runThread(MultipartFile[] files,CaseVideoFolder videoFolder,ICaseVideoService caseVideoService){
-        ((Runnable) () -> {
-            String mergeVideoUrl = null;
-            if (StringUtils.isNotBlank(videoFolder.getVideoMergeUrl())) {
-                mergeVideoUrl = videoFolder.getVideoMergeUrl().replace(queryPath, "/home/");
-                uploadToOssUtil.downFormAli(videoFolder.getVideoMergeUrl().replace(queryPath, ""), mergeVideoUrl);
-                uploadService.deleteOssUrl(videoFolder.getVideoMergeUrl());
-            }
-            if (StringUtils.isNotBlank(videoFolder.getVideoMergeUrl())) {
-                uploadService.deleteOssUrl(videoFolder.getVideoFolderCover());
-            }
-            caseVideoService.deleteByFolderId(videoFolder.getVideoFolderId());
-            redisUtil.set(RedisKeyUtil.mergerVideoKey + videoFolder.getVideoFolderId(), "10");
-            //替换文件夹中文件
-            List<CaseVideo> videoList = new ArrayList<>();
-            Integer sort = 1;
-            LinkedHashSet<String> filePathSet = new LinkedHashSet<>();
-            LinkedHashSet<String> filePathSetAll = new LinkedHashSet<>();
-            for (MultipartFile file : files) {
-                String ossPath = uploadService.uploadFile(file, true, String.format(FilePath.VIDEO_OSS_PATH, videoFolder.getVideoFolderId()) + "/", filePathSet);
-                String suffixName = ossPath.substring(ossPath.lastIndexOf("."));
-
-                CaseVideo caseVideo = new CaseVideo();
-                caseVideo.setFolderId(videoFolder.getVideoFolderId());
-                caseVideo.setVideoPath(ossPath);
-                caseVideo.setVideoName(file.getName());
-                caseVideo.setVideoCover(ossPath.replace(suffixName, ".jpg"));
-                caseVideo.setSort(sort);
-                videoList.add(caseVideo);
-                sort++;
-            }
-            caseVideoService.saveBatch(videoList);
-            if (videoList.size() <= 0) {
-                return;
+        threadPoolExecutor.submit(new Runnable() {
+            @Override
+            public void run() {
+                String mergeVideoUrl = null;
+                if (StringUtils.isNotBlank(videoFolder.getVideoMergeUrl())) {
+                    mergeVideoUrl = videoFolder.getVideoMergeUrl().replace(queryPath, "/home/");
+                    uploadToOssUtil.downFormAli(videoFolder.getVideoMergeUrl().replace(queryPath, ""), mergeVideoUrl);
+                    uploadService.deleteOssUrl(videoFolder.getVideoMergeUrl());
+                }
+                if (StringUtils.isNotBlank(videoFolder.getVideoMergeUrl())) {
+                    uploadService.deleteOssUrl(videoFolder.getVideoFolderCover());
+                }
+                caseVideoService.deleteByFolderId(videoFolder.getVideoFolderId());
+                redisUtil.set(RedisKeyUtil.mergerVideoKey + videoFolder.getVideoFolderId(), "10");
+                //替换文件夹中文件
+                List<CaseVideo> videoList = new ArrayList<>();
+                Integer sort = 1;
+                LinkedHashSet<String> filePathSet = new LinkedHashSet<>();
+                LinkedHashSet<String> filePathSetAll = new LinkedHashSet<>();
+                for (MultipartFile file : files) {
+                    String ossPath = uploadService.uploadFile(file, true, String.format(FilePath.VIDEO_OSS_PATH, videoFolder.getVideoFolderId()) + "/", filePathSet);
+                    String suffixName = ossPath.substring(ossPath.lastIndexOf("."));
+
+                    CaseVideo caseVideo = new CaseVideo();
+                    caseVideo.setFolderId(videoFolder.getVideoFolderId());
+                    caseVideo.setVideoPath(ossPath);
+                    caseVideo.setVideoName(file.getName());
+                    caseVideo.setVideoCover(ossPath.replace(suffixName, ".jpg"));
+                    caseVideo.setSort(sort);
+                    videoList.add(caseVideo);
+                    sort++;
+                }
+                caseVideoService.saveBatch(videoList);
+                if (videoList.size() <= 0) {
+                    return;
+                }
+
+                redisUtil.set(RedisKeyUtil.mergerVideoKey + videoFolder.getVideoFolderId(), "20");
+                if (StringUtils.isNotBlank(mergeVideoUrl)) {
+                    filePathSetAll.add(mergeVideoUrl);
+                }
+                filePathSetAll.addAll(filePathSet);
+                String mergeLocalName = VideoUtil.mergeVideo(filePathSetAll, FilePath.VIDEO_LOCAL_PATH);
+                redisUtil.set(RedisKeyUtil.mergerVideoKey + videoFolder.getVideoFolderId(), "30");
+                if (mergeLocalName == null) {
+                    updateCaseVideoStatus(videoFolder.getVideoFolderId(), -1);
+                    redisUtil.set(RedisKeyUtil.mergerVideoKey + videoFolder.getVideoFolderId(), "-1");
+                    return;
+                }
+                String mergeLocalPath = FilePath.VIDEO_LOCAL_PATH + "/" + mergeLocalName;
+                String ossKey = String.format(FilePath.VIDEO_OSS_PATH, videoFolder.getVideoFolderId()) + "/" + mergeLocalName;
+
+                String mergeLocalPathImg = FilePath.VIDEO_LOCAL_PATH + "/" + mergeLocalName.replace(".mp4", ".jpg");
+                String ossKeyImg = String.format(FilePath.VIDEO_OSS_PATH, videoFolder.getVideoFolderId()) + "/" + mergeLocalName.replace(".mp4", ".jpg");
+                uploadToOssUtil.uploadOss(mergeLocalPath, ossKey);
+                uploadToOssUtil.uploadOss(mergeLocalPathImg, ossKeyImg);
+                if (!uploadToOssUtil.existKey(ossKey) && !uploadToOssUtil.existKey(ossKeyImg)) {
+                    updateCaseVideoStatus(videoFolder.getVideoFolderId(), -1);
+                    redisUtil.set(RedisKeyUtil.mergerVideoKey + videoFolder.getVideoFolderId(), "-1");
+                    return;
+                }
+                redisUtil.set(RedisKeyUtil.mergerVideoKey + videoFolder.getVideoFolderId(), "70");
+
+                videoFolder.setVideoMergeUrl(queryPath + ossKey);
+                videoFolder.setVideoFolderCover(queryPath + ossKeyImg);
+                videoFolderService.updateById(videoFolder);
+                FileUtil.del(FilePath.VIDEO_LOCAL_PATH);
+                redisUtil.set(RedisKeyUtil.mergerVideoKey + videoFolder.getVideoFolderId(), "100");
+                updateCaseVideoStatus(videoFolder.getVideoFolderId(), 1);
             }
+        });
 
-            redisUtil.set(RedisKeyUtil.mergerVideoKey + videoFolder.getVideoFolderId(), "20");
-            if (StringUtils.isNotBlank(mergeVideoUrl)) {
-                filePathSetAll.add(mergeVideoUrl);
-            }
-            filePathSetAll.addAll(filePathSet);
-            String mergeLocalName = VideoUtil.mergeVideo(filePathSetAll, FilePath.VIDEO_LOCAL_PATH);
-            redisUtil.set(RedisKeyUtil.mergerVideoKey + videoFolder.getVideoFolderId(), "30");
-            if (mergeLocalName == null) {
-                updateCaseVideoStatus(videoFolder.getVideoFolderId(), -1);
-                redisUtil.set(RedisKeyUtil.mergerVideoKey + videoFolder.getVideoFolderId(), "-1");
-                return;
-            }
-            String mergeLocalPath = FilePath.VIDEO_LOCAL_PATH + "/" + mergeLocalName;
-            String ossKey = String.format(FilePath.VIDEO_OSS_PATH, videoFolder.getVideoFolderId()) + "/" + mergeLocalName;
-
-            String mergeLocalPathImg = FilePath.VIDEO_LOCAL_PATH + "/" + mergeLocalName.replace(".mp4", ".jpg");
-            String ossKeyImg = String.format(FilePath.VIDEO_OSS_PATH, videoFolder.getVideoFolderId()) + "/" + mergeLocalName.replace(".mp4", ".jpg");
-            uploadToOssUtil.uploadOss(mergeLocalPath, ossKey);
-            uploadToOssUtil.uploadOss(mergeLocalPathImg, ossKeyImg);
-            if (!uploadToOssUtil.existKey(ossKey) && !uploadToOssUtil.existKey(ossKeyImg)) {
-                updateCaseVideoStatus(videoFolder.getVideoFolderId(), -1);
-                redisUtil.set(RedisKeyUtil.mergerVideoKey + videoFolder.getVideoFolderId(), "-1");
-                return;
-            }
-            redisUtil.set(RedisKeyUtil.mergerVideoKey + videoFolder.getVideoFolderId(), "70");
-
-            videoFolder.setVideoMergeUrl(queryPath + ossKey);
-            videoFolder.setVideoFolderCover(queryPath + ossKeyImg);
-            videoFolderService.updateById(videoFolder);
-            FileUtil.del(FilePath.VIDEO_LOCAL_PATH);
-            redisUtil.set(RedisKeyUtil.mergerVideoKey + videoFolder.getVideoFolderId(), "100");
-            updateCaseVideoStatus(videoFolder.getVideoFolderId(), 1);
-        }).run();
     }
 
     private void updateCaseVideoStatus(Integer folderId, Integer status) {

+ 28 - 10
src/main/java/com/fdkankan/fusion/service/impl/ModelServiceImpl.java

@@ -54,6 +54,8 @@ public class ModelServiceImpl extends ServiceImpl<IModelMapper, Model> implement
     IFusionNumService fusionNumService;
     @Autowired
     RedisUtil redisUtil;
+    @Autowired
+    ThreadPoolExecutor threadPoolExecutor;
 
     @Value("${local.obj_path}")
     private String OBJ_PATH;
@@ -133,13 +135,21 @@ public class ModelServiceImpl extends ServiceImpl<IModelMapper, Model> implement
     }
 
     private void setCreateStatus(Model model,Integer status){
+        String redisKey = RedisKeyUtil.modelCancelUpload+model.getModelId();
+        if(redisUtil.hasKey(redisKey)){
+            if(redisUtil.get(redisKey).equals("-2")){
+                return;
+            }
+        }
         model.setCreateStatus(status);
         this.saveOrUpdate(model);
-        redisUtil.set(RedisKeyUtil.modelUpload+model.getModelId(),status.toString());
+        if(status != 1){
+            redisUtil.set(RedisKeyUtil.modelUpload+model.getModelId(),status.toString());
+        }
     }
 
     private void runThread(File file1,Model model,File newObjFile,File objPathFile,IModelService modelService){
-        new Thread() {
+        threadPoolExecutor.submit(new Runnable() {
             @Override
             public void run() {
                 File mntFile = null;
@@ -171,12 +181,10 @@ public class ModelServiceImpl extends ServiceImpl<IModelMapper, Model> implement
                     }
 
                     model.setModelGlbUrl(queryPath + glbOssPath);
-                    model.setCreateStatus(1);  //上传成功
+                    setCreateStatus(model,1);
                     modelService.saveOrUpdate(model);
                 }catch (Exception e){
-                    model.setCreateStatus(-1);
-                    modelService.saveOrUpdate(model);
-                    redisUtil.set(RedisKeyUtil.modelUpload+model.getModelId(),"-1");
+                    setCreateStatus(model,-1);
                 }finally {
                     if(newObjFile!=null){
                         FileUtil.del(newObjFile);
@@ -190,7 +198,7 @@ public class ModelServiceImpl extends ServiceImpl<IModelMapper, Model> implement
                     redisUtil.set(RedisKeyUtil.modelUpload+model.getModelId(),"100");
                 }
             }
-        }.start();
+        });
     }
 
     @Override
@@ -307,6 +315,10 @@ public class ModelServiceImpl extends ServiceImpl<IModelMapper, Model> implement
     @Override
     public String uploadObjProgress(Integer modelId) {
         String redisKey = RedisKeyUtil.modelUpload+modelId;
+        String cancel = RedisKeyUtil.modelCancelUpload+modelId;
+        if(redisUtil.hasKey(cancel)){
+            return redisUtil.get(cancel);
+        }
         if(redisUtil.hasKey(redisKey)){
             return redisUtil.get(redisKey);
         }
@@ -315,10 +327,16 @@ public class ModelServiceImpl extends ServiceImpl<IModelMapper, Model> implement
 
     @Override
     public void cancelUpload(Integer modelId) {
-        String redisKey = RedisKeyUtil.modelUpload+modelId;
-        if(!redisUtil.hasKey(redisKey)){
+        String redisKey = RedisKeyUtil.modelCancelUpload+modelId;
+        if(redisUtil.hasKey(redisKey)){
             return;
         }
-
+        Model model = this.getById(modelId);
+        if(model == null){
+            return;
+        }
+        model.setCreateStatus(-2);
+        this.updateById(model);
+        redisUtil.set(redisKey,"-2");
     }
 }