Prechádzať zdrojové kódy

使用两个监听队列切图

wuweihao 3 rokov pred
rodič
commit
f8dad38dd1

+ 7 - 16
720yun_fd_consumer/src/main/java/com/gis/listener/Fd720Listener.java

@@ -37,32 +37,23 @@ public class Fd720Listener {
     // 链路id
     static String traceId;
 
-    // 多线程处理消息
-    static ExecutorService executor = Executors.newFixedThreadPool(2);
 
     @RabbitHandler
     public void getMessage(Long param)  {
         traceId = System.currentTimeMillis()+"";
         MDC.put("TRACE_ID", traceId);
 
-        log.info("监听消息开始: {}, uuid: {}", param, traceId);
+        log.info("监听消息start: {}, uuid: {}", param, traceId);
         try {
             Thread.sleep(300);
         } catch (InterruptedException e) {
             e.printStackTrace();
         }
-        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
-            MDC.put("TRACE_ID", traceId);
-            log.info("进入多线程,  参数id:{}, uuid: {}", param, traceId);
-            processPano(param, traceId);
-            return "多线程正确执行完成";
-        }, executor);
-
-        future2.thenAccept((e) -> {
-            log.info("执行完成结果:{}, 参数id: {}, id: {}", e, param, traceId);
-        });
-
-        log.info("监听消息完成: {}, uuid: {}", param, traceId);
+
+        processPano(param, traceId);
+
+
+        log.info("监听消息end: {}, uuid: {}", param, traceId);
     }
 
 
@@ -143,7 +134,7 @@ public class Fd720Listener {
     /**
      * 上传切图目录
      */
-    public void uploadOss(String sceneCode){
+    private void uploadOss(String sceneCode){
         String cmd = CmdConstant.OSSUTIL_UPLOAD_DIR;
         cmd = cmd.replaceAll("@sceneCode", sceneCode);
         log.info("ossCmd: " + cmd);

+ 160 - 0
720yun_fd_consumer/src/main/java/com/gis/listener/Fd720Listener_2.java

@@ -0,0 +1,160 @@
+package com.gis.listener;
+
+import com.gis.constant.CmdConstant;
+import com.gis.constant.ConfigConstant;
+import com.gis.constant.RabbitConfig;
+import com.gis.entity.FodderEntity;
+import com.gis.mapper.FodderMapper;
+import com.gis.util.CmdUtils;
+import lombok.extern.slf4j.Slf4j;
+import org.junit.Test;
+import org.slf4j.MDC;
+import org.springframework.amqp.rabbit.annotation.RabbitHandler;
+import org.springframework.amqp.rabbit.annotation.RabbitListener;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.Date;
+import java.util.concurrent.*;
+
+
+/**
+ * Created by owen on 2021/12/24 0012 10:05
+ *
+ * 开启两个线程监听
+ */
+@Slf4j
+@Component
+@RabbitListener(queues = RabbitConfig.PANO_QUEUE) // 指定监听队列
+public class Fd720Listener_2 {
+
+    @Autowired
+    ConfigConstant configConstant;
+
+    @Autowired
+    FodderMapper fodderMapper;
+
+    // 链路id
+    static String traceId;
+
+
+    @RabbitHandler
+    public void getMessage(Long param)  {
+        traceId = System.currentTimeMillis()+"";
+        MDC.put("TRACE_ID", traceId);
+
+        log.info("监听消息_2_start: {}, uuid: {}", param, traceId);
+        try {
+            Thread.sleep(300);
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        }
+
+        processPano(param, traceId);
+
+
+        log.info("监听消息_1_end: {}, uuid: {}", param, traceId);
+    }
+
+
+    /**
+     * 切图会超时控制
+     * @param id
+     */
+    public void processPano(Long id, String traceId)  {
+        MDC.put("TRACE_ID", traceId);
+
+        log.info("切图流程开始 : " + id);
+        long start = System.currentTimeMillis();
+        FodderEntity entity = fodderMapper.findById(id);
+        if (entity == null) {
+            log.error("场景不存在: " + id);
+            // 直接结束,抛异常的话,会造成死循环,产生大量日志,而且队列也跑不下去
+            return;
+        }
+        String panoPath = entity.getFilePath();
+        String sceneCode = entity.getSceneCode();
+        String cmd = CmdConstant.PANO_KRPANO + panoPath;
+
+
+        // 超时处理机制
+        final ExecutorService exec = Executors.newFixedThreadPool(1);
+        Callable<String> call = new Callable<String>() {
+            @Override
+            public String call() throws Exception {
+                MDC.put("TRACE_ID", traceId);
+                CmdUtils.callLine(cmd, 200);
+                long end = System.currentTimeMillis();
+                log.info("切图完成耗时: {} s" ,(end-start)/1000);
+                log.warn("end processListener : "+ id);
+                return "执行完成";
+            }
+        };
+        // 超时回调
+        Future<String> future = exec.submit(call);
+        try {
+            String obj = future.get(10, TimeUnit.MINUTES); //任务处理超时时间设为 3分钟
+            entity.setStatus(3);
+            // 上传切图
+            uploadOss(sceneCode);
+            log.info("任务成功返回: " + obj);
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+            entity.setStatus(2);
+            StringWriter trace=new StringWriter();
+            log.error(trace.toString());
+            log.error("异常了 InterruptedException");
+        } catch (ExecutionException e) {
+            e.printStackTrace();
+            StringWriter trace=new StringWriter();
+            e.printStackTrace(new PrintWriter(trace));
+            log.error("超时了 1");
+            // 异常日志要打印,不然不会出现在日志文件中,只会出现在控制台
+            log.error(trace.toString());
+            entity.setStatus(2);
+            future.cancel(true);
+        } catch (TimeoutException e) {
+            e.printStackTrace();
+            StringWriter trace=new StringWriter();
+            e.printStackTrace(new PrintWriter(trace));
+            log.error("超时了 2");
+            log.error(trace.toString());
+            entity.setStatus(2);
+            future.cancel(true);
+        } finally {
+            entity.setUpdateTime(new Date());
+            fodderMapper.updateByPrimaryKey(entity);
+            log.info("切图流程, 更新数据库完成 : " + id);
+        }
+        log.info("切图流程结束 : " + id);
+    }
+
+
+
+    /**
+     * 上传切图目录
+     */
+    private void uploadOss(String sceneCode){
+        String cmd = CmdConstant.OSSUTIL_UPLOAD_DIR;
+        cmd = cmd.replaceAll("@sceneCode", sceneCode);
+        log.info("ossCmd: " + cmd);
+        long start = System.currentTimeMillis();
+        CmdUtils.callshell(cmd);
+        long end = System.currentTimeMillis();
+        log.info("切图上传完成, 耗时:{} s" , (end-start)/1000 );
+    }
+
+    @Test
+    public void test(){
+        CmdUtils.callshell("ipconfig");
+    }
+
+
+
+
+
+
+
+}