Explorar el Código

D队列重复推送

lyhzzz hace 1 año
padre
commit
9bf46eb772
Se han modificado 1 ficheros con 14 adiciones y 8 borrados
  1. 14 8
      src/main/java/com/fdkankan/mqcontroller/task/TaskService.java

+ 14 - 8
src/main/java/com/fdkankan/mqcontroller/task/TaskService.java

@@ -217,14 +217,17 @@ public class TaskService {
         checkOpenCount ++;
         checkOpenCount ++;
     }
     }
 
 
-    private void sendRabbitMq(List<MqSendLog> msgList, Integer msgCount,String queueName) {
+    private synchronized void sendRabbitMq(List<MqSendLog> msgList, Integer msgCount,String queueName) {
         for (int i = 0;i < msgCount ;i++){
         for (int i = 0;i < msgCount ;i++){
             if(i > msgList.size() -1){
             if(i > msgList.size() -1){
                 continue;
                 continue;
             }
             }
             MqSendLog mqSendLog = msgList.get(i);
             MqSendLog mqSendLog = msgList.get(i);
-            mqSendLogService.updateStatus(mqSendLog.getId(),1,queueName);
-            rabbitMqProducer.sendByWorkQueue(queueName,JSONObject.parseObject(mqSendLog.getContent()));
+            MqSendLog dbMqSendLog = mqSendLogService.getById(mqSendLog.getId());
+            if(dbMqSendLog.getStatus() == 0){
+                mqSendLogService.updateStatus(mqSendLog.getId(),1,queueName);
+                rabbitMqProducer.sendByWorkQueue(queueName,JSONObject.parseObject(mqSendLog.getContent()));
+            }
         }
         }
     }
     }
 
 
@@ -249,10 +252,13 @@ public class TaskService {
         Boolean flag2 = msgList.size()  >  mqQueueConfig.getScalingThreshold() + noModelingCount + notStartCount || (!msgList.isEmpty() &&  ecsCount + mqMsg.getConsumers() == 0 );
         Boolean flag2 = msgList.size()  >  mqQueueConfig.getScalingThreshold() + noModelingCount + notStartCount || (!msgList.isEmpty() &&  ecsCount + mqMsg.getConsumers() == 0 );
 
 
         if(flag2 && flag){
         if(flag2 && flag){
-            log.info("将待开启弹性伸缩放入队列:{}",mqQueueConfig.getQueueName());
-            MqScalingConfig mqScalingConfig = mqScalingConfigService.getById(mqQueueConfig.getScalingConfigId());
-            DelEcsVo vo = new DelEcsVo(null,mqScalingConfig,mqQueueConfig.getQueueName());
-            openList.offer(vo);
+            List<DelEcsVo> collect = openList.stream().filter(e -> e.getQueueName().equals(mqQueueConfig.getQueueName())).collect(Collectors.toList());
+            if(collect.isEmpty()){
+                log.info("将待开启弹性伸缩放入队列:{}",mqQueueConfig.getQueueName());
+                MqScalingConfig mqScalingConfig = mqScalingConfigService.getById(mqQueueConfig.getScalingConfigId());
+                DelEcsVo vo = new DelEcsVo(null,mqScalingConfig,mqQueueConfig.getQueueName());
+                openList.offer(vo);
+            }
         }
         }
     }
     }
 
 
@@ -285,7 +291,7 @@ public class TaskService {
                     openList.offer(take);
                     openList.offer(take);
                     return;
                     return;
                 }
                 }
-                Boolean flag = createEcs( take.getMqScalingConfig());
+                Boolean flag = createEcs(take.getMqScalingConfig());
                 if(flag){
                 if(flag){
                     mqEcsService.add(take.getQueueName());
                     mqEcsService.add(take.getQueueName());
                     sendRabbitMq(msgList,1,mqQueueConfig.getQueueName());
                     sendRabbitMq(msgList,1,mqQueueConfig.getQueueName());