xiewj 4 mesi fa
parent
commit
f49b69bfd8

+ 0 - 3
720yun_fd_consumer/gis_consumer/src/main/java/com/gis/listener/container/DoSliceQueueListener.java

@@ -39,14 +39,11 @@ public class DoSliceQueueListener implements ChannelAwareMessageListener {
             log.info("场景doSliceQueue开始,id:{},deliveryTag:{},消息体:{}", messageId,deliveryTag,msg);
             DoSliceDTO dto = JSONObject.parseObject(msg, DoSliceDTO.class);
             fodderService.doSlice(dto.getId(), traceId);
-            channel.basicAck(deliveryTag, false);
         }catch (Exception e){
             e.printStackTrace();
-            channel.basicAck(deliveryTag, false);
             log.error("场景doSliceQueue报错{}",e.getMessage());
         }finally {
             channel.basicAck(deliveryTag, false);
-
         }
     }
 }

+ 0 - 1
720yun_fd_consumer/gis_consumer/src/main/java/com/gis/listener/container/PanoMigrateSceneQueueListener.java

@@ -54,7 +54,6 @@ public class PanoMigrateSceneQueueListener implements ChannelAwareMessageListene
                 }
             }
         }catch (Exception e){
-            channel.basicAck(deliveryTag, false);
             log.error("场景migrateSceneQueue报错{}",e.getMessage());
             e.printStackTrace();
         }finally {

+ 1 - 1
720yun_fd_consumer/gis_consumer/src/main/java/com/gis/listener/container/PanoPayStatusSceneQueueListener.java

@@ -46,6 +46,7 @@ public class PanoPayStatusSceneQueueListener implements ChannelAwareMessageListe
             if (ObjectUtil.isEmpty(param.getSceneCodes())&& ArrayUtil.isNotEmpty(param.getSceneCodes())) {
                 log.error("参数列表错误(缺少),格式不匹配,{}",param);
                 channel.basicAck(deliveryTag, false);
+                return;
             }
             for (String sceneCode : param.getSceneCodes()) {
                 WorkEntity workEntity=workService.findByNum(sceneCode);
@@ -56,7 +57,6 @@ public class PanoPayStatusSceneQueueListener implements ChannelAwareMessageListe
                 }
             }
         }catch (Exception e){
-            channel.basicAck(deliveryTag, false);
             log.error("场景sceneQueue报错{}",e.getMessage());
             e.printStackTrace();
         }finally {

+ 5 - 0
720yun_fd_consumer/gis_consumer/src/main/java/com/gis/listener/container/RabbitMQConfig.java

@@ -82,6 +82,7 @@ public class RabbitMQConfig {
         container.setAutoStartup(!isDynamic); // 根据配置决定是否自动启动
         container.setPrefetchCount(2);
         container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
+        container.setErrorHandler(t -> log.error("doSliceQueue 监听器发生异常", t));
         return container;
     }
 
@@ -94,6 +95,7 @@ public class RabbitMQConfig {
         container.setAutoStartup(!isDynamic); // 根据配置决定是否自动启动
         container.setPrefetchCount(1);
         container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
+        container.setErrorHandler(t -> log.error("panoMigrateSceneQueue 监听器发生异常", t));
         return container;
     }
 
@@ -106,6 +108,7 @@ public class RabbitMQConfig {
         container.setAutoStartup(!isDynamic); // 根据配置决定是否自动启动
         container.setPrefetchCount(1);
         container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
+        container.setErrorHandler(t -> log.error("panoPayStatusSceneQueue 监听器发生异常", t));
         return container;
     }
 
@@ -118,6 +121,7 @@ public class RabbitMQConfig {
         container.setAutoStartup(!isDynamic); // 根据配置决定是否自动启动
         container.setPrefetchCount(1);
         container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
+        container.setErrorHandler(t -> log.error("sceneQueue 监听器发生异常", t));
         return container;
     }
 
@@ -131,6 +135,7 @@ public class RabbitMQConfig {
         container.setAutoStartup(true); // 根据配置决定是否自动启动
         container.setPrefetchCount(2);
         container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
+        container.setErrorHandler(t -> log.error("sceneWorkQueue 监听器发生异常", t));
         return container;
     }
 }

+ 1 - 3
720yun_fd_consumer/gis_consumer/src/main/java/com/gis/listener/container/SceneQueueListener.java

@@ -91,6 +91,7 @@ public class SceneQueueListener implements ChannelAwareMessageListener {
             if (ObjectUtil.isEmpty(param.getSceneCode())) {
                 log.error("参数列表错误(缺少),格式不匹配,{}",param);
                 channel.basicAck(deliveryTag, false);
+                return;
             }
             sceneCode=param.getSceneCode();
             WorkEntity workEntity=workService.findByNum(param.getSceneCode());
@@ -98,7 +99,6 @@ public class SceneQueueListener implements ChannelAwareMessageListener {
                 //初始化创建work表数据
                 workService.entityAdd2(param);
                 sendStartMq(param);
-                channel.basicAck(deliveryTag, false);
             }if (ObjectUtil.isNotEmpty(workEntity)&&param.getStatus()!=1){
                 //修改状态
                 workEntity.setCalcStatus(param.getStatus());
@@ -111,7 +111,6 @@ public class SceneQueueListener implements ChannelAwareMessageListener {
                 }
                 workService.updateById(workEntity);
                 sendStartMq(param);
-                channel.basicAck(deliveryTag, false);
             }else if (ObjectUtil.isNotEmpty(param.getSceneCode())&&param.getStatus()==1) {
                 //发送到处理队列
                 //rabbitMqProducerUtil.sendByWorkQueue(sceneQueueWork, msg);
@@ -135,7 +134,6 @@ public class SceneQueueListener implements ChannelAwareMessageListener {
 
         }finally {
             channel.basicAck(deliveryTag, false);
-
         }
     }
 

+ 2 - 3
720yun_fd_consumer/gis_consumer/src/main/java/com/gis/listener/container/SceneWorkQueueListener.java

@@ -141,7 +141,7 @@ public class SceneWorkQueueListener implements ChannelAwareMessageListener {
             SceneQueueDTO param = JSONObject.parseObject(JSON.parse(msg).toString(), SceneQueueDTO.class);
             if (ObjectUtil.isEmpty(param.getSceneCode())) {
                 log.error("参数列表错误(缺少),格式不匹配,{}",param);
-                channel.basicAck(deliveryTag, false);
+                return;
             }
             sceneCode=param.getSceneCode();
             WorkEntity workEntity=workService.findByNum(param.getSceneCode());
@@ -229,7 +229,6 @@ public class SceneWorkQueueListener implements ChannelAwareMessageListener {
                 workService.updateById(workEntity);
                 sendEndMq(param);
                 updateOssStatusJson(param.getSceneCode(),1);
-                channel.basicAck(deliveryTag, false);
             }
             log.info("场景计算结束,队列名:{},id:{}", queueNameService.getQueueName(), messageId);
 
@@ -238,12 +237,12 @@ public class SceneWorkQueueListener implements ChannelAwareMessageListener {
             if (StrUtil.isNotEmpty(sceneCode)){
                 updateOssStatusJson(sceneCode,0);
             }
-            channel.basicAck(deliveryTag, false);
             log.error("场景sceneQueue报错{}",e.getMessage());
             e.printStackTrace();
         }finally {
             //计算完毕,将当前系统构建状态改为false
             redisUtil.delete(modelingKey);
+            channel.basicAck(deliveryTag, false);
         }
     }
     private void sendEndMq(SceneQueueDTO param) {