Procházet zdrojové kódy

增加消费队列

xiewj před 1 rokem
rodič
revize
32768c9887

+ 64 - 4
720yun_fd_consumer/gis_consumer/src/main/java/com/gis/listener/SceneListener.java

@@ -73,6 +73,11 @@ public class SceneListener {
     RabbitMqProducerUtil rabbitMqProducerUtil;
     @Value("${queue.relics-init-queue}")
     String relicsInitQueue;
+
+    @Value("${queue.scene-queue-work}")
+    String sceneQueueWork;
+
+
     /**
      * 全景场景初始化方法
      *
@@ -81,7 +86,7 @@ public class SceneListener {
      * @throws Exception the io exception  这里异常需要处理
      */
     @RabbitListener(
-            queuesToDeclare = @Queue("${queue.scene-queue}"),concurrency = "1"
+            queuesToDeclare = @Queue("${queue.scene-queue}"),concurrency = "5"
     )
     public void sceneQueue(Channel channel, Message message) throws IOException {
             if (ObjectUtils.isEmpty(message.getBody())) {
@@ -115,16 +120,71 @@ public class SceneListener {
                     workEntity.setPassword(param.getPwd());
                     workEntity.setIsPassword(1);
                 }
+                if (ObjectUtil.isNotEmpty(param.getPayStatus())){
+                    workEntity.setPayStatus(param.getPayStatus());
+                }
                 workService.update(workEntity);
                 sendStartMq(param);
                 channel.basicAck(deliveryTag, false);
-            }else if (ObjectUtil.isNotEmpty(param.getSceneCode())&&param.getStatus()==1){
+            }else if (ObjectUtil.isNotEmpty(param.getSceneCode())&&param.getStatus()==1) {
+                //发送到处理队列
+                rabbitMqProducerUtil.sendByWorkQueue(relicsInitQueue, msg);
+
+            }
+
+
+        }catch (Exception e){
+            if (StrUtil.isNotEmpty(sceneCode)){
+                updateOssStatusJson(sceneCode,0);
+            }
+            channel.basicAck(deliveryTag, false);
+            log.error("场景sceneQueue报错{}",e.getMessage());
+            e.printStackTrace();
+
+        }
+    }
+
+
+    /**
+     * 全景场景初始化方法
+     *
+     * @param channel
+     * @param message
+     * @throws Exception the io exception  这里异常需要处理
+     */
+    @RabbitListener(
+            queuesToDeclare = @Queue("${queue.scene-queue-work}"),concurrency = "1"
+    )
+    public void sceneQueueWork(Channel channel, Message message) throws IOException {
+        if (ObjectUtils.isEmpty(message.getBody())) {
+            log.error("消息内容为空,退出构建,当前服务器id:{}" );
+            return;
+        }
+        String traceId = System.currentTimeMillis()+"";
+        MDC.put("TRACE_ID", traceId);
+        String sceneCode="";
+        long deliveryTag = message.getMessageProperties().getDeliveryTag();
+        try {
+            String msg = new String(message.getBody(), StandardCharsets.UTF_8);
+            String messageId = message.getMessageProperties().getMessageId();
+            log.info("场景sceneQueueWork开始,id:{},deliveryTag:{},消息体:{}", messageId,deliveryTag,msg);
+            SceneQueueDTO param = JSONObject.parseObject(JSON.parse(msg).toString(), SceneQueueDTO.class);
+            if (ObjectUtil.isEmpty(param.getSceneCode())) {
+                log.error("参数列表错误(缺少),格式不匹配,{}",param);
+                channel.basicAck(deliveryTag, false);
+            }
+            sceneCode=param.getSceneCode();
+            WorkEntity workEntity=workService.findByNum(param.getSceneCode());
+            if (ObjectUtil.isNotEmpty(param.getSceneCode())&&param.getStatus()==1){
                 updateOssStatusJson(param.getSceneCode(),0);
 
                 //计算完成处理
                 //1,下载场景的全景图 默认 。scene_view_data/panorama/目录下的全景图图片
                 workEntity.setAlgorithmTime(param.getAlgorithmTime());
                 workEntity.setCalcStatus(0);
+                if (ObjectUtil.isNotEmpty(param.getPayStatus())){
+                    workEntity.setPayStatus(param.getPayStatus());
+                }
                 if (ObjectUtil.isNotEmpty(param.getSceneSource())){
                     workEntity.setSceneSource(param.getSceneSource());
                 }
@@ -148,7 +208,7 @@ public class SceneListener {
                     log.info("vision.txt存在: {}");
                     JSONObject visJson = JSONObject.parseObject(execute.body());
                     if(visJson.containsKey("sweepLocations")){
-                          sweepLocations = visJson.getJSONArray("sweepLocations");
+                        sweepLocations = visJson.getJSONArray("sweepLocations");
                     }
                 }
 
@@ -206,10 +266,10 @@ public class SceneListener {
             channel.basicAck(deliveryTag, false);
             log.error("场景sceneQueue报错{}",e.getMessage());
             e.printStackTrace();
-
         }
     }
 
+
     private void updateOssStatusJson(String sceneCode, int status) throws IOException {
         //修改OSS,status状态 scene_view_data/场景码/data/status.json
         String statusJson = "scene_view_data/"+sceneCode+"/data/status.json";

+ 1 - 0
720yun_fd_consumer/gis_consumer/src/main/resources/application-pro.yml

@@ -81,3 +81,4 @@ queue:
     relics-update-name-queue: relics-update-name-queue
     pano-migrate-scene-queue: pano-migrate-scene-queue
     pano-paystatus-scene-queue: pano-paystatus-scene-queue
+    scene-queue-work: queue-pano-scene-work

+ 2 - 0
720yun_fd_consumer/gis_consumer/src/main/resources/application-sit.yml

@@ -67,3 +67,5 @@ queue:
     relics-update-name-queue: relics-update-name-queue
     pano-migrate-scene-queue: pano-migrate-scene-queue
     pano-paystatus-scene-queue: pano-paystatus-scene-queue
+    scene-queue-work: queue-pano-scene-work
+