Forráskód Böngészése

增加finally ack消息,确保不会重复消费

dengsixing 1 napja
szülő
commit
37b67e00b1

+ 50 - 46
src/main/java/com/fdkankan/modeling/receiver/RabbitMqListener.java

@@ -117,57 +117,61 @@ public class RabbitMqListener {
         }
         }
         long deliveryTag = message.getMessageProperties().getDeliveryTag();
         long deliveryTag = message.getMessageProperties().getDeliveryTag();
         channel.basicAck(deliveryTag, false);
         channel.basicAck(deliveryTag, false);
-        String msg = new String(message.getBody(), StandardCharsets.UTF_8);
-        BuildSceneCallMessage buildSceneMessage = JSONObject.parseObject(msg, BuildSceneCallMessage.class);
-        String messageId = message.getMessageProperties().getMessageId();
-        if(Objects.isNull(buildSceneMessage) || StrUtil.isBlank(buildSceneMessage.getSceneNum())){
-            log.error("消息内容错误,id:{},消息体:{}", messageId, msg);
-            return;
-        }
-        String bizType = buildSceneMessage.getBizType();
-        if(StrUtil.isEmpty(bizType)){
-            bizType = "standard";
-            buildSceneMessage.setBizType(bizType);
-        }
-
-        //计算暂停锁,计算中锁。弹性伸缩控制服务使用
-        String modelingKey = String.format(RedisKey.modelingKey,SysConstants.hostName);
-        String stopKey = String.format(RedisKey.ecsStopKey,SysConstants.hostName);
-        //设置为计算超时时间过期
-        redisUtil.set(modelingKey,msg,SysConstants.modelTimeOut * 3600L);
+        try {
+            String msg = new String(message.getBody(), StandardCharsets.UTF_8);
+            BuildSceneCallMessage buildSceneMessage = JSONObject.parseObject(msg, BuildSceneCallMessage.class);
+            String messageId = message.getMessageProperties().getMessageId();
+            if(Objects.isNull(buildSceneMessage) || StrUtil.isBlank(buildSceneMessage.getSceneNum())){
+                log.error("消息内容错误,id:{},消息体:{}", messageId, msg);
+                return;
+            }
+            String bizType = buildSceneMessage.getBizType();
+            if(StrUtil.isEmpty(bizType)){
+                bizType = "standard";
+                buildSceneMessage.setBizType(bizType);
+            }
 
 
-        if(redisUtil.hasKey(stopKey)){
-            log.error("服务实例:{} 正在关闭,退出构建!", SysConstants.hostName);
-            redisUtil.del(modelingKey);
-            channel.basicNack(deliveryTag, true, true);
-            return;
-        }
+            //计算暂停锁,计算中锁。弹性伸缩控制服务使用
+            String modelingKey = String.format(RedisKey.modelingKey,SysConstants.hostName);
+            String stopKey = String.format(RedisKey.ecsStopKey,SysConstants.hostName);
+            //设置为计算超时时间过期
+            redisUtil.set(modelingKey,msg,SysConstants.modelTimeOut * 3600L);
+
+            if(redisUtil.hasKey(stopKey)){
+                log.error("服务实例:{} 正在关闭,退出构建!", SysConstants.hostName);
+                redisUtil.del(modelingKey);
+                channel.basicNack(deliveryTag, true, true);
+                return;
+            }
 
 
-        //进入计算之前,判断算法文件锁是否存在,如果存在,则删除
-        String lockPath = buildSceneMessage.getPath() + "/.lockdirectory";
-        if(FileUtil.exist(lockPath)){
-            FileUtil.del(lockPath);
-        }
+            //进入计算之前,判断算法文件锁是否存在,如果存在,则删除
+            String lockPath = buildSceneMessage.getPath() + "/.lockdirectory";
+            if(FileUtil.exist(lockPath)){
+                FileUtil.del(lockPath);
+            }
 
 
-        BuildLog buildLog = new BuildLog();
-        log.info("场景计算开始,队列名:{},id:{},deliveryTag:{},消息体:{}", queueNameService.getQueueName(), messageId,deliveryTag,msg);
-        buildLog.setMessageId(messageId);
-        buildLog.setContent(msg);
-
-        //这里写得烂,代码冗余,没时间设计,后面的人接手,要骂就骂领导和产品经理
-        log.info("----------------bizType:{}", bizType);
-        if("e57".equals(bizType)){
-            this.process4E57(buildSceneMessage,buildLog);
-        }else if ("reverseE57".equals(bizType)){
-            reverseE57Service.build(buildSceneMessage,buildLog);
-        } else{
-            this.process(buildSceneMessage,buildLog);
-        }
+            BuildLog buildLog = new BuildLog();
+            log.info("场景计算开始,队列名:{},id:{},deliveryTag:{},消息体:{}", queueNameService.getQueueName(), messageId,deliveryTag,msg);
+            buildLog.setMessageId(messageId);
+            buildLog.setContent(msg);
+
+            //这里写得烂,代码冗余,没时间设计,后面的人接手,要骂就骂领导和产品经理
+            log.info("----------------bizType:{}", bizType);
+            if("e57".equals(bizType)){
+                this.process4E57(buildSceneMessage,buildLog);
+            }else if ("reverseE57".equals(bizType)){
+                reverseE57Service.build(buildSceneMessage,buildLog);
+            } else{
+                this.process(buildSceneMessage,buildLog);
+            }
 
 
-        log.info("场景计算结束,队列名:{},id:{}", queueNameService.getQueueName(), messageId);
+            log.info("场景计算结束,队列名:{},id:{}", queueNameService.getQueueName(), messageId);
 
 
-        //计算完毕,将当前系统构建状态改为false
-        redisUtil.del(modelingKey);
+            //计算完毕,将当前系统构建状态改为false
+            redisUtil.del(modelingKey);
+        }finally {
+            channel.basicAck(deliveryTag, false);
+        }
     }
     }
 
 
     public void process(BuildSceneCallMessage message,BuildLog buildLog) {
     public void process(BuildSceneCallMessage message,BuildLog buildLog) {