|
|
@@ -0,0 +1,180 @@
|
|
|
+package com.fdkankan.modeling.job;
|
|
|
+
|
|
|
+import cn.hutool.core.collection.CollUtil;
|
|
|
+import com.fdkankan.modeling.constants.TaskConstants;
|
|
|
+import com.fdkankan.modeling.constants.TaskStatus;
|
|
|
+import com.fdkankan.modeling.entity.ModelingSchedule;
|
|
|
+import com.fdkankan.modeling.exception.BuildException;
|
|
|
+import com.fdkankan.modeling.mapper.IModelingScheduleMapper;
|
|
|
+import com.fdkankan.modeling.service.IModelingScheduleService;
|
|
|
+import com.fdkankan.modeling.util.ServerUtils;
|
|
|
+import lombok.SneakyThrows;
|
|
|
+import lombok.extern.slf4j.Slf4j;
|
|
|
+import org.redisson.api.RLock;
|
|
|
+import org.redisson.api.RedissonClient;
|
|
|
+import org.springframework.beans.factory.annotation.Autowired;
|
|
|
+import org.springframework.stereotype.Component;
|
|
|
+
|
|
|
+import javax.annotation.Resource;
|
|
|
+import java.sql.Timestamp;
|
|
|
+import java.util.List;
|
|
|
+import java.util.concurrent.*;
|
|
|
+
|
|
|
+/**
|
|
|
+ * 单服务器串行执行控制器:保证5秒触发、串行执行、48小时超时
|
|
|
+ */
|
|
|
+@Slf4j
|
|
|
+@Component
|
|
|
+public class SerialTaskExecutor {
|
|
|
+ // 定时任务线程池(单线程,保证定时触发串行)
|
|
|
+ private final ScheduledExecutorService scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
|
|
|
+ // 任务执行线程池(单线程,保证任务串行)
|
|
|
+ private final ExecutorService taskExecutor = Executors.newSingleThreadExecutor();
|
|
|
+ // 标记当前是否有任务在执行(volatile保证可见性)
|
|
|
+ private volatile boolean isTaskRunning = false;
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ private IModelingScheduleService modelingScheduleService;
|
|
|
+
|
|
|
+ @Resource
|
|
|
+ private RedissonClient redissonClient;
|
|
|
+ @Autowired
|
|
|
+ private ServerUtils serverUtils;
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 启动定时任务(应用启动时调用)
|
|
|
+ */
|
|
|
+ public void startScheduledTask() {
|
|
|
+ // 延迟0秒启动,每隔5秒触发一次
|
|
|
+ scheduledExecutor.scheduleAtFixedRate(
|
|
|
+ this::executeSingleTask,
|
|
|
+ 0,
|
|
|
+ TaskConstants.TASK_TRIGGER_INTERVAL,
|
|
|
+ TaskConstants.TRIGGER_TIME_UNIT
|
|
|
+ );
|
|
|
+ log.info("串行定时任务已启动,每隔5秒触发一次");
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 执行单个任务(核心逻辑)
|
|
|
+ */
|
|
|
+ private void executeSingleTask(){
|
|
|
+ // 检查:上一个任务未执行完成则跳过
|
|
|
+ if (isTaskRunning) {
|
|
|
+ log.info("上一个任务仍在执行,跳过本次触发");
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ //有新的计算任务进来时,会先去查同一个unicode是否有待执行的任务,
|
|
|
+ // 如果有,会把待执行的标记为3(合并)并把parent_uuid赋值为当前进入任务列表的uuid,
|
|
|
+ // 所以这里只需要找出状态为待处理任务即可,再根据待处理任务找到子任务,把子任务文件依次下载替换,进行合并计算即可
|
|
|
+ ModelingSchedule modelingSchedule = modelingScheduleService.selectOneWaitTask();// 只查1条
|
|
|
+ if (modelingSchedule == null) {
|
|
|
+ System.out.println("获取不到等待执行的任务,跳过本次触发");
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ // 步骤3:标记为执行中,开始执行(带超时控制)
|
|
|
+ isTaskRunning = true;
|
|
|
+ //开始执行计算业务
|
|
|
+ executeTaskWithTimeout(modelingSchedule);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 执行任务并添加48小时超时控制
|
|
|
+ */
|
|
|
+ private void executeTaskWithTimeout(ModelingSchedule modelingSchedule) {
|
|
|
+ // 提交任务到执行线程池,返回Future用于超时控制
|
|
|
+ Future<?> future = taskExecutor.submit(() -> {
|
|
|
+ try {
|
|
|
+ // 执行核心业务逻辑(替换为你的实际任务代码)
|
|
|
+ doBusinessTask(modelingSchedule);
|
|
|
+ // 执行成功:更新状态为「退出计算」(或自定义「完成」状态)
|
|
|
+ modelingScheduleService.updateTaskStatus(modelingSchedule.getId(), TaskStatus.SUCCESS.getCode(), "任务执行成功", serverUtils.getLocalServerId());
|
|
|
+ } catch (Exception e) {
|
|
|
+ // 执行失败:更新状态为「失败」
|
|
|
+ modelingScheduleService.updateTaskStatus(modelingSchedule.getId(), TaskStatus.FAILED.getCode(), "执行失败:" + e.getMessage(), serverUtils.getLocalServerId());
|
|
|
+ log.error("计算失败,modelingScheduleId:{}", modelingSchedule.getId(), e);
|
|
|
+ }
|
|
|
+ });
|
|
|
+
|
|
|
+ // 超时控制:等待任务执行,最多48小时
|
|
|
+ try {
|
|
|
+ future.get(TaskConstants.TASK_TIMEOUT, TaskConstants.TIMEOUT_TIME_UNIT);
|
|
|
+ } catch (TimeoutException e) {
|
|
|
+ // 任务超时:中断任务+更新状态
|
|
|
+ future.cancel(true); // 中断任务线程
|
|
|
+ modelingScheduleService.updateTaskStatus(modelingSchedule.getId(), TaskStatus.TIMEOUT.getCode(), "任务执行超时(48小时)", serverUtils.getLocalServerId());
|
|
|
+ System.err.println("任务[" + modelingSchedule.getUuid() + "]执行超时,已中断");
|
|
|
+ } catch (InterruptedException | ExecutionException | BuildException e) {
|
|
|
+ // 其他异常:标记为失败
|
|
|
+ String remark = "任务执行异常";
|
|
|
+ if (e.getCause() instanceof BuildException) {
|
|
|
+ remark = "算法异常";
|
|
|
+ }
|
|
|
+ modelingScheduleService.updateTaskStatus(modelingSchedule.getId(), TaskStatus.FAILED.getCode(), remark + e.getMessage(), serverUtils.getLocalServerId());
|
|
|
+ log.error("任务执行异常", e);
|
|
|
+ } finally {
|
|
|
+ // 释放分布式锁+重置执行状态
|
|
|
+ String lockKey = TaskConstants.TASK_LOCK_PREFIX + modelingSchedule.getUuid();
|
|
|
+ RLock lock = redissonClient.getLock(lockKey);
|
|
|
+ if (lock.isHeldByCurrentThread()) {
|
|
|
+ lock.unlock();
|
|
|
+ }
|
|
|
+ isTaskRunning = false;
|
|
|
+ log.info("任务[" + modelingSchedule.getUuid() + "]执行完成(/超时/失败),释放锁并重置状态");
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 实际业务任务逻辑(替换为你的代码)
|
|
|
+ */
|
|
|
+ private void doBusinessTask(ModelingSchedule task) throws Exception {
|
|
|
+ log.info("开始执行任务[" + task.getUuid() + "],场景码:" + task.getNum());
|
|
|
+ //前置处理
|
|
|
+ //查询子任务
|
|
|
+ List<ModelingSchedule> subTasks = modelingScheduleService.selectSubTask(task.getUuid());
|
|
|
+ if(CollUtil.isNotEmpty(subTasks)){
|
|
|
+ for (ModelingSchedule subTask : subTasks) {
|
|
|
+ String source = subTask.getSource();
|
|
|
+ if(source.equals("hx")){//海鑫
|
|
|
+
|
|
|
+ }else if(source.equals("jm")){//江门
|
|
|
+
|
|
|
+ }else if(source.equals("manage")){//管理后台
|
|
|
+
|
|
|
+ }else if(source.equals("local")){//局域网版方式
|
|
|
+
|
|
|
+ }else if(source.equals("rebuild")){//重算
|
|
|
+
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+ //调用算法
|
|
|
+
|
|
|
+ //后置处理
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 关闭线程池(应用停止时调用)
|
|
|
+ */
|
|
|
+ public void shutdown() {
|
|
|
+ scheduledExecutor.shutdown();
|
|
|
+ taskExecutor.shutdown();
|
|
|
+ try {
|
|
|
+ if (!scheduledExecutor.awaitTermination(1, TimeUnit.MINUTES)) {
|
|
|
+ scheduledExecutor.shutdownNow();
|
|
|
+ }
|
|
|
+ if (!taskExecutor.awaitTermination(1, TimeUnit.MINUTES)) {
|
|
|
+ taskExecutor.shutdownNow();
|
|
|
+ }
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ scheduledExecutor.shutdownNow();
|
|
|
+ taskExecutor.shutdownNow();
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|