TaskService.java 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396
  1. package com.fdkankan.mqcontroller.task;
  2. import cn.hutool.core.date.DateUnit;
  3. import cn.hutool.core.date.DateUtil;
  4. import cn.hutool.log.Log;
  5. import com.alibaba.fastjson.JSONObject;
  6. import com.fdkankan.mqcontroller.entity.*;
  7. import com.fdkankan.mqcontroller.service.*;
  8. import com.fdkankan.mqcontroller.utils.ECSUtils;
  9. import com.fdkankan.mqcontroller.utils.RabbitMqUtils;
  10. import com.fdkankan.mqcontroller.utils.RedisKey;
  11. import com.fdkankan.rabbitmq.util.RabbitMqProducer;
  12. import com.fdkankan.redis.util.RedisUtil;
  13. import lombok.extern.slf4j.Slf4j;
  14. import org.apache.commons.lang3.StringUtils;
  15. import org.springframework.beans.factory.FactoryBean;
  16. import org.springframework.beans.factory.annotation.Autowired;
  17. import org.springframework.beans.factory.annotation.Value;
  18. import org.springframework.cloud.context.config.annotation.RefreshScope;
  19. import org.springframework.scheduling.annotation.Async;
  20. import org.springframework.stereotype.Service;
  21. import org.springframework.web.bind.annotation.PostMapping;
  22. import javax.annotation.PostConstruct;
  23. import java.sql.SQLOutput;
  24. import java.util.*;
  25. import java.util.concurrent.LinkedBlockingQueue;
  26. import java.util.stream.Collectors;
  27. @Service
  28. @RefreshScope
  29. @Slf4j
  30. public class TaskService {
  31. @Autowired
  32. RabbitMqProducer rabbitMqProducer;
  33. @Autowired
  34. IMqSendLogService mqSendLogService;
  35. @Autowired
  36. IMqQueueConfigService queueConfigService;
  37. @Autowired
  38. IMqScalingConfigService mqScalingConfigService;
  39. @Autowired
  40. IMqEcsService mqEcsService;
  41. @Autowired
  42. RedisUtil redisUtil;
  43. @Autowired
  44. IScenePlusService scenePlusService;
  45. @Autowired
  46. RabbitMqService rabbitMqService;
  47. public static Integer checkOpenCount = 0;
  48. private static final LinkedBlockingQueue<DelEcsVo> delList = new LinkedBlockingQueue<>();
  49. private static final HashMap<String,LinkedBlockingQueue<DelEcsVo>> openMap = new HashMap<>();
  50. public static HashMap<Integer,List<MqSendLog>> configLogsMap = new HashMap<>();
  51. public static HashMap<String,MqMsg> mqMsgMap = new HashMap<>();
  52. public static HashMap<String,Integer> countMap = new HashMap<>();
  53. /**
  54. * 将数据库中待计算的mq消息推送,rabbitmq
  55. */
  56. public void sendMq() {
  57. checkCount();
  58. List<MqSendLog> mqSendLogs = mqSendLogService.getNoSendMsg();
  59. if(mqSendLogs.isEmpty()){
  60. configLogsMap.clear();
  61. return;
  62. }
  63. List<MqSendLog> fdkkList = mqSendLogs.stream().filter(e -> "4dkk".equals(e.getType())).collect(Collectors.toList());
  64. List<MqSendLog> panoList = mqSendLogs.stream().filter(e -> "pano".equals(e.getType())).collect(Collectors.toList());
  65. List<MqQueueConfig> queueConfigList = queueConfigService.list();
  66. rabbitMqService.getMqMsgMap(queueConfigList);
  67. if(!fdkkList.isEmpty()){
  68. sendMqByType(fdkkList,"4dkk");
  69. }
  70. if(!panoList.isEmpty()){
  71. sendMqByType(panoList,"pano");
  72. }
  73. log.info("未分配的mq队列数4dkk:{},pano:{}",fdkkList.size(),panoList.size());
  74. }
  75. private void sendMqByType(List<MqSendLog> mqSendLogs,String type){
  76. //设置分配队列
  77. HashSet<String> numList = new HashSet<>();
  78. for (MqSendLog mqSendLog : mqSendLogs) {
  79. if(StringUtils.isBlank(mqSendLog.getContent())){
  80. mqSendLogService.updateStatus(mqSendLog.getId(),2,null);
  81. continue;
  82. }
  83. Integer configId = scenePlusService.getQueueConfig(mqSendLog);
  84. if(numList.contains(mqSendLog.getNum())){
  85. mqSendLogService.updateStatus(mqSendLog.getId(),2,null);
  86. continue;
  87. }
  88. if(configId == null ){
  89. mqSendLogService.updateStatus(mqSendLog.getId(),3,null);
  90. continue;
  91. }
  92. mqSendLog.setConfigId(configId);
  93. numList.add(mqSendLog.getNum());
  94. }
  95. List<MqQueueConfig> queueConfigList = queueConfigService.getByType(type);
  96. for (MqQueueConfig config : queueConfigList) {
  97. List<MqSendLog> msgList = mqSendLogs.stream().filter(e -> e.getConfigId().equals(config.getId())).collect(Collectors.toList());
  98. configLogsMap.put(config.getId(),msgList);
  99. }
  100. //是否需要重新组合队列
  101. allocateQueue2(queueConfigList,mqSendLogs,type);
  102. for (MqQueueConfig mqQueueConfig : queueConfigList) {
  103. List<MqSendLog> msgList = configLogsMap.get(mqQueueConfig.getId());
  104. if(msgList == null || msgList.isEmpty()){
  105. log.info("配置config消息为空:{}",mqQueueConfig.getId());
  106. continue;
  107. }
  108. MqMsg mqMsg = mqMsgMap.get(mqQueueConfig.getQueueName());
  109. if(mqMsg == null){
  110. log.info("获取mq队列数据失败:{}",mqQueueConfig);
  111. continue;
  112. }
  113. if(mqQueueConfig.getOpenScaling() == 0){ //不开启弹性伸缩
  114. if(mqMsg.getConsumers() - mqMsg.getMessages_unacknowledged() - mqMsg.getMessages_ready() <= 0 ){ //待计算队列中有任务
  115. continue;
  116. }
  117. }
  118. if(mqQueueConfig.getOpenScaling() == 1 ){ //开启弹性伸缩
  119. checkOpenEcs(mqQueueConfig,msgList);
  120. }
  121. sendRabbitMq(msgList,mqMsg.getConsumers() - mqMsg.getMessages_unacknowledged() - mqMsg.getMessages_ready(),mqQueueConfig.getQueueName());
  122. }
  123. }
  124. /**
  125. * 设置调配队列计算
  126. * 调配A,B,C
  127. */
  128. private void allocateQueue2(List<MqQueueConfig> queueConfigList,List<MqSendLog> mqSendLogs,String type){
  129. //获取常驻队列
  130. for (MqQueueConfig residentConfig : queueConfigList) { // A ,B ,C
  131. if(residentConfig.getIsSpecial() == 1){
  132. continue;
  133. }
  134. Integer residentCount = 0;
  135. MqMsg mqMsg = mqMsgMap.get(residentConfig.getQueueName());
  136. List<MqSendLog> residentLogList = configLogsMap.get(residentConfig.getId());
  137. Integer residentSize = residentLogList!=null ? residentLogList.size() :0;
  138. if(mqMsg!= null ){
  139. residentCount = mqMsg.getConsumers() - mqMsg.getMessages_unacknowledged() - residentSize ;
  140. log.info("allocateQueue2空闲服务器数量为:{},{}",residentConfig.getQueueName(),residentCount);
  141. }
  142. //A空闲,BC转A 。B空闲,C转B。C空闲,B转C
  143. for (MqQueueConfig mqQueueConfig : queueConfigList) { // B , C
  144. if(mqQueueConfig.getIsSpecial() == 1 || mqQueueConfig.getIsResident() == 1){
  145. continue;
  146. }
  147. if(mqQueueConfig.getQueueName().equals(residentConfig.getQueueName())){
  148. continue;
  149. }
  150. List<MqSendLog> msgList = configLogsMap.get(mqQueueConfig.getId());
  151. if(residentCount >0 && msgList!= null && !msgList.isEmpty()){
  152. Integer updateCount = updateMqSendLogConfig(residentCount, mqQueueConfig.getId(), residentConfig.getId(), mqSendLogs, queueConfigList);
  153. residentCount = residentCount - updateCount;
  154. log.info("allocateQueue2-调配成功:{},to:{},count:{}",mqQueueConfig.getQueueName(),residentConfig.getQueueName(),updateCount);
  155. }
  156. }
  157. }
  158. }
  159. private Integer updateMqSendLogConfig(Integer residentCount,Integer configId ,Integer residentConfigId,List<MqSendLog> mqSendLogs,List<MqQueueConfig> queueConfigList){
  160. Integer updateCount = 0;
  161. List<MqSendLog> msgList = configLogsMap.get(configId);
  162. for (int i = 0;i < residentCount ;i++){
  163. if(i > msgList.size() -1){
  164. continue;
  165. }
  166. MqSendLog mqSendLog = msgList.get(i);
  167. mqSendLog.setConfigId(residentConfigId);
  168. updateCount ++;
  169. }
  170. if(updateCount <=0){
  171. return updateCount;
  172. }
  173. for (MqQueueConfig config : queueConfigList) {
  174. List<MqSendLog> msgList2 = mqSendLogs.stream().filter(e -> e.getConfigId().equals(config.getId())).collect(Collectors.toList());
  175. configLogsMap.put(config.getId(),msgList2);
  176. }
  177. return updateCount;
  178. }
  179. private void checkCount() {
  180. if(checkOpenCount > 10000){ //一个W为一个循环
  181. checkOpenCount = 0;
  182. }
  183. checkOpenCount ++;
  184. }
  185. private synchronized void sendRabbitMq(List<MqSendLog> msgList, Integer msgCount,String queueName) {
  186. for (int i = 0;i < msgCount ;i++){
  187. if(i > msgList.size() -1){
  188. continue;
  189. }
  190. MqSendLog mqSendLog = msgList.get(i);
  191. MqSendLog dbMqSendLog = mqSendLogService.getById(mqSendLog.getId());
  192. if(dbMqSendLog.getStatus() == 0){
  193. mqSendLogService.updateStatus(mqSendLog.getId(),1,queueName);
  194. rabbitMqProducer.sendByWorkQueue(queueName,JSONObject.parseObject(mqSendLog.getContent()));
  195. }
  196. }
  197. }
  198. private void checkOpenEcs(MqQueueConfig mqQueueConfig,List<MqSendLog> msgList) {
  199. //获取未关闭的服务器
  200. List<MqEcs> list = mqEcsService.getNoStopByQueueName(mqQueueConfig.getQueueName());
  201. countMap.put(mqQueueConfig.getQueueName(),list.size());
  202. LinkedBlockingQueue<DelEcsVo> openList = openMap.computeIfAbsent(
  203. mqQueueConfig.getQueueName(),
  204. k -> new LinkedBlockingQueue<>()
  205. );
  206. Integer ecsCount = countMap.get(mqQueueConfig.getQueueName());
  207. MqMsg mqMsg = mqMsgMap.get(mqQueueConfig.getQueueName());
  208. //时间判断
  209. Boolean flag = mqQueueConfig.getOpenScalingTime() == 0 || checkOpenCount % (mqQueueConfig.getOpenScalingTime() * 6) == 0;
  210. //未在计算的服务器数量
  211. Integer noModelingCount = mqMsg.getConsumers() - mqMsg.getMessages_unacknowledged();
  212. //未启动的服务器数量
  213. Integer notStartCount = ecsCount - mqMsg.getConsumers();
  214. Boolean flag2 = msgList.size() > mqQueueConfig.getScalingThreshold() + noModelingCount + notStartCount || (!msgList.isEmpty() && ecsCount + mqMsg.getConsumers() == 0 );
  215. if(flag2 && flag){
  216. List<DelEcsVo> collect = openList.stream().filter(e -> e.getQueueName().equals(mqQueueConfig.getQueueName())).collect(Collectors.toList());
  217. if(collect.isEmpty()){
  218. log.info("将待开启弹性伸缩放入队列:{}",mqQueueConfig.getQueueName());
  219. MqScalingConfig mqScalingConfig = mqScalingConfigService.getById(mqQueueConfig.getScalingConfigId());
  220. DelEcsVo vo = new DelEcsVo(null,mqScalingConfig,mqQueueConfig.getQueueName());
  221. openList.offer(vo);
  222. }
  223. }
  224. }
  225. /**
  226. * 开启弹性伸缩
  227. */
  228. public void openEcsList() {
  229. try {
  230. HashMap<String, MqQueueConfig> queueMap = queueConfigService.getQueueMap();
  231. for (Map.Entry<String, LinkedBlockingQueue<DelEcsVo>> entry : openMap.entrySet()) {
  232. String key = entry.getKey();
  233. LinkedBlockingQueue<DelEcsVo> openList = entry.getValue();
  234. if(openList.isEmpty()){
  235. continue;
  236. }
  237. DelEcsVo take = openList.poll();
  238. MqQueueConfig mqQueueConfig = queueMap.get(key);
  239. List<MqSendLog> msgList = configLogsMap.get(mqQueueConfig.getId());
  240. if(msgList == null || msgList.isEmpty()){
  241. log.info("openEcsList--待推送消息数为0,无需开启:{},{}",key,take.getQueueName());
  242. continue;
  243. }
  244. countMap.merge(mqQueueConfig.getQueueName(), 1, Integer::sum);
  245. log.info("openEcsList--开启弹性伸缩数量:{},{}",key,1);
  246. List<MqEcs> mqEcsList = mqEcsService.getNotUpEcs();
  247. log.info("openEcsList--启动中的ecs数量为:{}",mqEcsList.size());
  248. if(!mqEcsList.isEmpty()){
  249. log.info("openEcsList--有启动中的ecs,等待启动完毕执行。。。");
  250. openList.offer(take);
  251. return;
  252. }
  253. Boolean flag = createEcs(take.getMqScalingConfig());
  254. if(flag){
  255. mqEcsService.add(take.getQueueName(),take.getMqScalingConfig().getType());
  256. sendRabbitMq(msgList,1,mqQueueConfig.getQueueName());
  257. }else {
  258. openList.offer(take);
  259. }
  260. }
  261. }catch (Exception e){
  262. log.info("openEcsList--开启弹性伸缩失败:",e);
  263. }
  264. }
  265. /**
  266. * 判断是否关闭弹性伸缩
  267. */
  268. public void checkDelEcs() {
  269. List<MqEcs> mqEcsList = mqEcsService.getScalingNotStopList();
  270. if(mqEcsList.isEmpty()){
  271. return;
  272. }
  273. log.info("启动中的弹性伸缩数量为:{}",mqEcsList.size());
  274. HashMap<String,MqQueueConfig> queueMap = queueConfigService.getQueueMap();
  275. HashMap<Integer,MqScalingConfig> scalingMap = mqScalingConfigService.getIdMap();
  276. for (MqEcs mqEcs : mqEcsList) {
  277. if(StringUtils.isBlank(mqEcs.getEcsName()) || StringUtils.isBlank(mqEcs.getQueueName())){
  278. continue;
  279. }
  280. MqQueueConfig mqQueueConfig = queueMap.get(mqEcs.getQueueName());
  281. if(mqQueueConfig == null || mqQueueConfig.getScalingConfigId() == null){
  282. continue;
  283. }
  284. long between = DateUtil.between(mqEcs.getCreateTime(), new Date(), DateUnit.MINUTE);
  285. //弹性伸缩按照一个小时计费
  286. long count = between/60;
  287. boolean flag = delList.stream().anyMatch(e -> e.getMqEcs().getEcsName().equals(mqEcs.getEcsName()));
  288. if(between >= mqQueueConfig.getStopScalingTime() + 60 * count && !flag){
  289. List<MqSendLog> mqSendLogs = configLogsMap.get(mqQueueConfig.getId());
  290. Integer mqSize = mqSendLogs != null ? mqSendLogs.size() :0;
  291. MqMsg mqMsg = mqMsgMap.get(mqEcs.getQueueName());
  292. if(mqMsg == null){
  293. continue;
  294. }
  295. log.info("mq消息信息:{},{}",mqEcs.getQueueName(),mqMsg);
  296. Integer notModelingCount = mqMsg.getConsumers() - mqMsg.getMessages_unacknowledged() - mqMsg.getMessages_ready();
  297. if(mqSize >0 && mqSize - (notModelingCount + mqQueueConfig.getScalingThreshold()) >0 ){
  298. log.info("待计算数量:{}大于空闲服务数量:{},阈值{},不关闭",mqSize,notModelingCount,mqQueueConfig.getScalingThreshold());
  299. continue;
  300. }
  301. log.info("checkDelEcs-实例开启{}时间大于{}分钟,开始关闭:{}",between,mqQueueConfig.getStopScalingTime(),mqEcs.getEcsName());
  302. DelEcsVo vo = new DelEcsVo(mqEcs,scalingMap.get(mqQueueConfig.getScalingConfigId()),null);
  303. delList.offer(vo);
  304. }
  305. }
  306. }
  307. /**
  308. * 关闭弹性伸缩
  309. */
  310. public void delEcsList() {
  311. try {
  312. if(delList.isEmpty()){
  313. return;
  314. }
  315. log.info("delEcsList--关闭弹性伸缩数量:{}",delList.size());
  316. DelEcsVo take = delList.poll();
  317. if(take == null){
  318. return;
  319. }
  320. String stopKey = String.format(RedisKey.ecsStopKey,take.getMqEcs().getEcsName());
  321. redisUtil.set(stopKey,take.getMqEcs().getEcsName(),60 * 60 * 24); //设置计算暂停锁
  322. String modelingKey = String.format(RedisKey.modelingKey,take.getMqEcs().getEcsName());
  323. if(redisUtil.hasKey(modelingKey)){
  324. log.info("delEcsList--有场景正在计算中:{},{}",modelingKey,redisUtil.get(modelingKey));
  325. redisUtil.del(stopKey);
  326. return;
  327. }
  328. Boolean delFlag = delEcs(take.getMqScalingConfig(), take.getMqEcs().getEcsName());
  329. if(delFlag){
  330. log.info("checkDelEcs--关闭弹性伸缩实例成功:{}", take.getMqEcs().getEcsName());
  331. mqEcsService.updateMqEcs(take.getMqEcs());
  332. }else {
  333. delList.offer(take);
  334. }
  335. }catch (Exception e){
  336. log.info("delEcsList--关闭弹性伸缩失败:",e);
  337. }
  338. }
  339. public synchronized Boolean createEcs(MqScalingConfig mqScaling){
  340. try {
  341. return ECSUtils.createEcs(mqScaling.getAccessKey(),mqScaling.getSecret(),mqScaling.getEndpoint(),mqScaling.getScalingRuleAri());
  342. }catch (Exception e){
  343. log.info("触发弹性伸缩失败:",e);
  344. }
  345. return false;
  346. }
  347. public synchronized Boolean delEcs(MqScalingConfig mqScaling, String instanceId){
  348. try {
  349. return ECSUtils.delEcs(mqScaling.getAccessKey(),mqScaling.getSecret(),mqScaling.getEndpoint(),mqScaling.getScalingGroupId(),instanceId);
  350. }catch (Exception e){
  351. log.info("关闭弹性伸缩失败:",e);
  352. }
  353. return false;
  354. }
  355. }