|
@@ -0,0 +1,114 @@
|
|
|
+package com.fdkankan.manage.mq.consumer;
|
|
|
+
|
|
|
+import com.alibaba.fastjson.JSONObject;
|
|
|
+import com.fdkankan.fyun.face.FYunFileServiceInterface;
|
|
|
+import com.fdkankan.manage.common.RedisKeyUtil;
|
|
|
+import com.fdkankan.manage.common.ResultCode;
|
|
|
+import com.fdkankan.manage.exception.BusinessException;
|
|
|
+import com.fdkankan.manage.mq.param.SceneRestStoreVo;
|
|
|
+import com.fdkankan.manage.service.ISceneColdStorageLogService;
|
|
|
+import com.fdkankan.manage.service.ISceneColdStorageService;
|
|
|
+import com.fdkankan.manage.thread.ThreadService;
|
|
|
+import com.fdkankan.redis.util.RedisUtil;
|
|
|
+import com.rabbitmq.client.Channel;
|
|
|
+import lombok.extern.slf4j.Slf4j;
|
|
|
+import org.springframework.amqp.core.Message;
|
|
|
+import org.springframework.amqp.rabbit.annotation.Queue;
|
|
|
+import org.springframework.amqp.rabbit.annotation.RabbitListener;
|
|
|
+import org.springframework.beans.factory.annotation.Autowired;
|
|
|
+import org.springframework.stereotype.Component;
|
|
|
+
|
|
|
+import java.io.IOException;
|
|
|
+import java.nio.charset.StandardCharsets;
|
|
|
+import java.util.Arrays;
|
|
|
+import java.util.concurrent.ScheduledThreadPoolExecutor;
|
|
|
+import java.util.concurrent.TimeUnit;
|
|
|
+import java.util.concurrent.atomic.AtomicReference;
|
|
|
+
|
|
|
+/**
|
|
|
+ * 场景解冻 mq
|
|
|
+ */
|
|
|
+@Slf4j
|
|
|
+@Component
|
|
|
+public class SceneRestStoreConsumer {
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ FYunFileServiceInterface fYunFileServiceInterface;
|
|
|
+ @Autowired
|
|
|
+ RedisUtil redisUtil;
|
|
|
+ @Autowired
|
|
|
+ ISceneColdStorageService sceneColdStorageService;
|
|
|
+ @Autowired
|
|
|
+ ISceneColdStorageLogService sceneColdStorageLogService;
|
|
|
+
|
|
|
+ @RabbitListener(
|
|
|
+ queuesToDeclare = @Queue("${queue.scene.rest-store:scene-rest-store}"),
|
|
|
+ concurrency = "10"
|
|
|
+ )
|
|
|
+ public void consumerQueue(Channel channel, Message message) {
|
|
|
+ try {
|
|
|
+ String messageId = message.getMessageProperties().getMessageId();
|
|
|
+ String msg = new String(message.getBody(), StandardCharsets.UTF_8);
|
|
|
+ log.info("scene-restStore-mq--messageId:{},msg:{}",messageId,msg);
|
|
|
+
|
|
|
+ SceneRestStoreVo sceneRestStoreVo = JSONObject.parseObject(msg, SceneRestStoreVo.class);
|
|
|
+ String num = sceneRestStoreVo.getNum();
|
|
|
+ String cloudBucket= sceneRestStoreVo.getCloudBucket();
|
|
|
+ String bucket= sceneRestStoreVo.getBucket();
|
|
|
+ String folderName= sceneRestStoreVo.getFolderName();
|
|
|
+ String redisKey = String.format(RedisKeyUtil.restStoreKey, num);
|
|
|
+ if(redisUtil.hasKey(redisKey)){
|
|
|
+ channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ redisUtil.set(redisKey,folderName);
|
|
|
+
|
|
|
+ try {
|
|
|
+ fYunFileServiceInterface.restoreFolder(cloudBucket,folderName,1);
|
|
|
+ }catch (Exception e){
|
|
|
+ channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ AtomicReference<Integer> state = new AtomicReference<>(1);
|
|
|
+ AtomicReference<String> reason = new AtomicReference<>("");
|
|
|
+ ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1);
|
|
|
+ //等待任务执行结束,在间隔2秒执行。
|
|
|
+ scheduledThreadPoolExecutor.scheduleWithFixedDelay(()->{
|
|
|
+ try {
|
|
|
+ Integer process = fYunFileServiceInterface.getRestoreFolderProcess(cloudBucket, folderName);
|
|
|
+ System.out.println(process);
|
|
|
+ log.info("ThreadService-cloudBucket:{},bucket:{},folderName:{},process:{}",cloudBucket,bucket,folderName,process);
|
|
|
+ if(process == 100){
|
|
|
+ fYunFileServiceInterface.copyFileBetweenBucket(cloudBucket,folderName,bucket,folderName);
|
|
|
+ log.info("ThreadService-copy-bucket");
|
|
|
+ sceneColdStorageService.updateStatus(num,2);
|
|
|
+ scheduledThreadPoolExecutor.shutdown();
|
|
|
+ }
|
|
|
+ }catch (Exception e){
|
|
|
+ log.info("ThreadService-error:",e);
|
|
|
+ state.set(-1);
|
|
|
+ reason.set(e.getMessage());
|
|
|
+ scheduledThreadPoolExecutor.shutdown();
|
|
|
+ throw new BusinessException(ResultCode.SCENE_NOT_STORE);
|
|
|
+ }finally {
|
|
|
+ redisUtil.del(redisKey);
|
|
|
+ sceneColdStorageLogService.saveLog(num,folderName,state,reason);
|
|
|
+ try {
|
|
|
+ channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
|
|
|
+ } catch (IOException e) {
|
|
|
+ e.printStackTrace();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ //任务启动多久之后 ,周期 每10s执行一次,时间单位
|
|
|
+ },1000,60*1000, TimeUnit.MILLISECONDS);
|
|
|
+
|
|
|
+ }catch (Exception e){
|
|
|
+ log.info("scene-restStore----消费失败",e);
|
|
|
+ }finally {
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+}
|