|
@@ -0,0 +1,97 @@
|
|
|
+package com.fdkankan.manage.service.impl;
|
|
|
+import java.util.Calendar;
|
|
|
+import java.util.Date;
|
|
|
+
|
|
|
+import cn.hutool.core.collection.CollUtil;
|
|
|
+import com.alibaba.fastjson.JSON;
|
|
|
+import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
|
|
+import com.fdkankan.common.constant.CommonStatus;
|
|
|
+import com.fdkankan.common.constant.ErrorCode;
|
|
|
+import com.fdkankan.common.constant.RecStatus;
|
|
|
+import com.fdkankan.common.exception.BusinessException;
|
|
|
+import com.fdkankan.manage.common.ResultData;
|
|
|
+import com.fdkankan.manage.entity.MqBackup;
|
|
|
+import com.fdkankan.manage.mapper.IMqBackupMapper;
|
|
|
+import com.fdkankan.manage.service.IMqBackupService;
|
|
|
+import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
|
|
|
+import com.fdkankan.rabbitmq.util.RabbitMqProducer;
|
|
|
+import com.rabbitmq.client.Channel;
|
|
|
+import com.rabbitmq.client.GetResponse;
|
|
|
+import org.springframework.amqp.rabbit.core.ChannelCallback;
|
|
|
+import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
|
|
+import org.springframework.beans.factory.annotation.Autowired;
|
|
|
+import org.springframework.stereotype.Service;
|
|
|
+
|
|
|
+import java.io.UnsupportedEncodingException;
|
|
|
+import java.nio.charset.StandardCharsets;
|
|
|
+import java.util.List;
|
|
|
+import java.util.Objects;
|
|
|
+
|
|
|
+/**
|
|
|
+ * <p>
|
|
|
+ * 服务实现类
|
|
|
+ * </p>
|
|
|
+ *
|
|
|
+ * @author
|
|
|
+ * @since 2023-01-05
|
|
|
+ */
|
|
|
+@Service
|
|
|
+public class MqBackupServiceImpl extends ServiceImpl<IMqBackupMapper, MqBackup> implements IMqBackupService {
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ private RabbitTemplate rabbitTemplate;
|
|
|
+ @Autowired
|
|
|
+ private RabbitMqProducer rabbitMqProducer;
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public ResultData getRabbitMqMsg(String queue, int needRequeue) throws UnsupportedEncodingException {
|
|
|
+ if(needRequeue != CommonStatus.YES.code() && needRequeue != CommonStatus.NO.code()){
|
|
|
+ throw new BusinessException(ErrorCode.FAILURE_CODE_5012);
|
|
|
+ }
|
|
|
+ boolean exit = false;
|
|
|
+ do {
|
|
|
+ GetResponse getResponse = rabbitTemplate.execute(channel -> channel.basicGet(queue, true));
|
|
|
+ if(Objects.nonNull(getResponse)){
|
|
|
+ byte[] body = getResponse.getBody();
|
|
|
+ MqBackup mqBackup = new MqBackup();
|
|
|
+ mqBackup.setQueue(queue);
|
|
|
+ mqBackup.setMsg(new String(body, StandardCharsets.UTF_8.name()));
|
|
|
+ mqBackup.setNeedRequeue(needRequeue);
|
|
|
+ mqBackup.setCreateTime(Calendar.getInstance().getTime());
|
|
|
+ this.save(mqBackup);
|
|
|
+ }else{
|
|
|
+ exit = true;
|
|
|
+ }
|
|
|
+ }while (!exit);
|
|
|
+
|
|
|
+ return ResultData.ok();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public ResultData rabbitmqRequeue(String queue) throws UnsupportedEncodingException {
|
|
|
+
|
|
|
+ int index = 0;
|
|
|
+ int size = 500;
|
|
|
+ boolean exit = false;
|
|
|
+ do {
|
|
|
+ List<MqBackup> list = this.list(new LambdaQueryWrapper<MqBackup>()
|
|
|
+ .eq(MqBackup::getQueue, queue)
|
|
|
+ .eq(MqBackup::getNeedRequeue, CommonStatus.YES.code().intValue())
|
|
|
+ .eq(MqBackup::getRequeue, CommonStatus.NO.code().intValue()).last("limit " + index + "," + size));
|
|
|
+ if(CollUtil.isNotEmpty(list)){
|
|
|
+ list.stream().forEach(mqBackup -> {
|
|
|
+ rabbitMqProducer.sendByWorkQueue(mqBackup.getQueue(), JSON.parseObject(mqBackup.getMsg()));
|
|
|
+ mqBackup.setRequeue(CommonStatus.YES.code().intValue());
|
|
|
+ mqBackup.setRecStatus(RecStatus.DISABLE.code());
|
|
|
+ this.updateById(mqBackup);
|
|
|
+ this.removeById(mqBackup);
|
|
|
+ });
|
|
|
+ index += size;
|
|
|
+ }else{
|
|
|
+ exit = true;
|
|
|
+ }
|
|
|
+ }while (!exit);
|
|
|
+ return ResultData.ok();
|
|
|
+ }
|
|
|
+
|
|
|
+}
|