Jelajahi Sumber

场景关联

dengsixing 3 tahun lalu
induk
melakukan
bf9513090f

+ 126 - 126
4dkankan-center-scene/src/main/java/com/fdkankan/scene/listener/RabbitMqListener.java

@@ -1,131 +1,131 @@
-//package com.fdkankan.scene.listener;
-//
-//import com.alibaba.fastjson.JSONObject;
-//import com.fdkankan.rabbitmq.bean.BuildSceneCallMessage;
-//import com.fdkankan.rabbitmq.bean.BuildSceneFailDTMqMessage;
-//import com.fdkankan.rabbitmq.bean.BuildSceneResultMqMessage;
-//import com.fdkankan.scene.service.IBuildSceneDTService;
-//import com.fdkankan.scene.service.IBuildScenePostService;
-//import com.fdkankan.scene.service.IBuildScenePreService;
-//import com.fdkankan.scene.service.impl.BuildSceneDTServiceImpl;
-//import com.rabbitmq.client.Channel;
-//import java.nio.charset.StandardCharsets;
-//import lombok.extern.slf4j.Slf4j;
-//import org.springframework.amqp.core.Message;
-//import org.springframework.amqp.rabbit.annotation.Queue;
-//import org.springframework.amqp.rabbit.annotation.RabbitListener;
-//import org.springframework.beans.factory.annotation.Autowired;
-//import org.springframework.beans.factory.annotation.Value;
-//import org.springframework.stereotype.Component;
-//
-///**
-// * <p>
-// * TODO
-// * </p>
-// *
-// * @author dengsixing
-// * @since 2022/4/19
-// **/
-//@Slf4j
-//@Component
-//public class RabbitMqListener {
-//    @Value("${queue.modeling.modeling-pre}")
-//    private String queueModelingPre;
-//    @Value("${queue.modeling.modeling-post}")
-//    private String queueModelingPost;
-//    @Value("${queue.modeling.modeling-dt}")
-//    private String queueModelingDt;
-//
-//    @Autowired
-//    IBuildScenePreService buildScenePreService;
-//    @Autowired
-//    IBuildScenePostService buildScenePostService;
-//    @Autowired
-//    IBuildSceneDTService buildSceneDTService;
-//
-//
-//
-//    /**
-//     * 开启了手动确认模式,如果没有手动确认,消费者不会重试,当服务重启时会再次消费,因为rabbitmq认为你还没有处理完你的业务
-//     * queuesToDeclare = @Queue("${queue.modeling.modeling-test}"),  如果队列不不存在会自动创建队列
-//     * concurrency = "3"    设置消费线程数,每个线程每次只拉取一条消息消费
-//     */
-////    @RabbitListener(
-////        queuesToDeclare = @Queue("${queue.modeling.modeling-test}"),
-////        concurrency = "1"
-////    )
-////    public void receiveMessageDsx(Channel channel, Message message) throws Exception {
-////        channel.queueDeclare();
-////        String msg = new String(message.getBody(), StandardCharsets.UTF_8);
-////        log.info("开始消费消息-" + msg + "-" + Thread.currentThread().getId());
-////        Thread.sleep(5000L);
-////        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
-////        log.info("结束消息-" + Thread.currentThread().getId());
-////    }
-//
-//
-//    /**
-//     * 场景计算前置资源准备处理
-//     * @param channel
-//     * @param message
-//     * @throws Exception
-//     */
+package com.fdkankan.scene.listener;
+
+import com.alibaba.fastjson.JSONObject;
+import com.fdkankan.rabbitmq.bean.BuildSceneCallMessage;
+import com.fdkankan.rabbitmq.bean.BuildSceneFailDTMqMessage;
+import com.fdkankan.rabbitmq.bean.BuildSceneResultMqMessage;
+import com.fdkankan.scene.service.IBuildSceneDTService;
+import com.fdkankan.scene.service.IBuildScenePostService;
+import com.fdkankan.scene.service.IBuildScenePreService;
+import com.fdkankan.scene.service.impl.BuildSceneDTServiceImpl;
+import com.rabbitmq.client.Channel;
+import java.nio.charset.StandardCharsets;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.amqp.core.Message;
+import org.springframework.amqp.rabbit.annotation.Queue;
+import org.springframework.amqp.rabbit.annotation.RabbitListener;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Component;
+
+/**
+ * <p>
+ * TODO
+ * </p>
+ *
+ * @author dengsixing
+ * @since 2022/4/19
+ **/
+@Slf4j
+@Component
+public class RabbitMqListener {
+    @Value("${queue.modeling.modeling-pre}")
+    private String queueModelingPre;
+    @Value("${queue.modeling.modeling-post}")
+    private String queueModelingPost;
+    @Value("${queue.modeling.modeling-dt}")
+    private String queueModelingDt;
+
+    @Autowired
+    IBuildScenePreService buildScenePreService;
+    @Autowired
+    IBuildScenePostService buildScenePostService;
+    @Autowired
+    IBuildSceneDTService buildSceneDTService;
+
+
+
+    /**
+     * 开启了手动确认模式,如果没有手动确认,消费者不会重试,当服务重启时会再次消费,因为rabbitmq认为你还没有处理完你的业务
+     * queuesToDeclare = @Queue("${queue.modeling.modeling-test}"),  如果队列不不存在会自动创建队列
+     * concurrency = "3"    设置消费线程数,每个线程每次只拉取一条消息消费
+     */
 //    @RabbitListener(
-//        queuesToDeclare = @Queue("${queue.modeling.modeling-pre}"),
-//        concurrency = "${maxThread.modeling.modeling-pre}"
+//        queuesToDeclare = @Queue("${queue.modeling.modeling-test}"),
+//        concurrency = "1"
 //    )
-//    public void buildScenePreHandler(Channel channel, Message message) throws Exception {
-//        Object correlation = message.getMessageProperties().getHeader("spring_returned_message_correlation");
-//        String correlationId = (String) correlation;
+//    public void receiveMessageDsx(Channel channel, Message message) throws Exception {
+//        channel.queueDeclare();
 //        String msg = new String(message.getBody(), StandardCharsets.UTF_8);
-//        log.info("场景计算资源准备开始,队列名:{},id:{},消息体:{}", queueModelingPre, correlationId, msg);
-//        BuildSceneCallMessage buildSceneMessage = JSONObject.parseObject(msg, BuildSceneCallMessage.class);
-//        Thread.sleep(2000L);
-//        buildScenePreService.buildScenePre(buildSceneMessage);
+//        log.info("开始消费消息-" + msg + "-" + Thread.currentThread().getId());
+//        Thread.sleep(5000L);
 //        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
+//        log.info("结束消息-" + Thread.currentThread().getId());
 //    }
-//
-//    /**
-//     * 场景计算后置结果处理
-//     * @param channel
-//     * @param message
-//     * @throws Exception
-//     */
-//    @RabbitListener(
-//        queuesToDeclare = @Queue("${queue.modeling.modeling-post}"),
-//        concurrency = "${maxThread.modeling.modeling-post}"
-//    )
-//    public void buildScenePostHandler(Channel channel, Message message) throws Exception {
-//        Object correlation = message.getMessageProperties().getHeader("spring_returned_message_correlation");
-//        String correlationId = (String) correlation;
-//        String msg = new String(message.getBody(), StandardCharsets.UTF_8);
-//        log.info("场景计算结果处理开始,队列名:{},id:{},消息体:{}", queueModelingPost, correlationId, msg);
-//        BuildSceneResultMqMessage resultMessage = JSONObject.parseObject(msg, BuildSceneResultMqMessage.class);
-//        Thread.sleep(2000L);
-//        buildScenePostService.buildScenePost(resultMessage);
-//        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
-//    }
-//
-//    /**
-//     * 场景计算发送钉钉消息
-//     * @param channel
-//     * @param message
-//     * @throws Exception
-//     */
-//    @RabbitListener(
-//        queuesToDeclare = @Queue("${queue.modeling.modeling-dt}"),
-//        concurrency = "${maxThread.modeling.modeling-dt}"
-//    )
-//    public void buildSceneDTHandler(Channel channel, Message message) throws Exception {
-//        Object correlation = message.getMessageProperties().getHeader("spring_returned_message_correlation");
-//        String correlationId = (String) correlation;
-//        String msg = new String(message.getBody(), StandardCharsets.UTF_8);
-//        log.info("发送钉钉消息处理,队列名:{},id:{},消息体:{}", queueModelingDt, correlationId, msg);
-//        BuildSceneFailDTMqMessage dtMessage = JSONObject.parseObject(msg, BuildSceneFailDTMqMessage.class);
-//        buildSceneDTService.handFail(dtMessage.getReason(), dtMessage.getServerPath(),
-//            dtMessage.getNum(), dtMessage.getHostName(), BuildSceneDTServiceImpl.contentExt);
-//        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
-//    }
-//
-//
-//}
+
+
+    /**
+     * 场景计算前置资源准备处理
+     * @param channel
+     * @param message
+     * @throws Exception
+     */
+    @RabbitListener(
+        queuesToDeclare = @Queue("${queue.modeling.modeling-pre}"),
+        concurrency = "${maxThread.modeling.modeling-pre}"
+    )
+    public void buildScenePreHandler(Channel channel, Message message) throws Exception {
+        Object correlation = message.getMessageProperties().getHeader("spring_returned_message_correlation");
+        String correlationId = (String) correlation;
+        String msg = new String(message.getBody(), StandardCharsets.UTF_8);
+        log.info("场景计算资源准备开始,队列名:{},id:{},消息体:{}", queueModelingPre, correlationId, msg);
+        BuildSceneCallMessage buildSceneMessage = JSONObject.parseObject(msg, BuildSceneCallMessage.class);
+        Thread.sleep(2000L);
+        buildScenePreService.buildScenePre(buildSceneMessage);
+        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
+    }
+
+    /**
+     * 场景计算后置结果处理
+     * @param channel
+     * @param message
+     * @throws Exception
+     */
+    @RabbitListener(
+        queuesToDeclare = @Queue("${queue.modeling.modeling-post}"),
+        concurrency = "${maxThread.modeling.modeling-post}"
+    )
+    public void buildScenePostHandler(Channel channel, Message message) throws Exception {
+        Object correlation = message.getMessageProperties().getHeader("spring_returned_message_correlation");
+        String correlationId = (String) correlation;
+        String msg = new String(message.getBody(), StandardCharsets.UTF_8);
+        log.info("场景计算结果处理开始,队列名:{},id:{},消息体:{}", queueModelingPost, correlationId, msg);
+        BuildSceneResultMqMessage resultMessage = JSONObject.parseObject(msg, BuildSceneResultMqMessage.class);
+        Thread.sleep(2000L);
+        buildScenePostService.buildScenePost(resultMessage);
+        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
+    }
+
+    /**
+     * 场景计算发送钉钉消息
+     * @param channel
+     * @param message
+     * @throws Exception
+     */
+    @RabbitListener(
+        queuesToDeclare = @Queue("${queue.modeling.modeling-dt}"),
+        concurrency = "${maxThread.modeling.modeling-dt}"
+    )
+    public void buildSceneDTHandler(Channel channel, Message message) throws Exception {
+        Object correlation = message.getMessageProperties().getHeader("spring_returned_message_correlation");
+        String correlationId = (String) correlation;
+        String msg = new String(message.getBody(), StandardCharsets.UTF_8);
+        log.info("发送钉钉消息处理,队列名:{},id:{},消息体:{}", queueModelingDt, correlationId, msg);
+        BuildSceneFailDTMqMessage dtMessage = JSONObject.parseObject(msg, BuildSceneFailDTMqMessage.class);
+        buildSceneDTService.handFail(dtMessage.getReason(), dtMessage.getServerPath(),
+            dtMessage.getNum(), dtMessage.getHostName(), BuildSceneDTServiceImpl.contentExt);
+        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
+    }
+
+
+}

+ 1 - 1
4dkankan-center-scene/src/main/resources/bootstrap-test.yml

@@ -33,7 +33,7 @@ spring:
             refresh: true
       discovery:
         server-addr: 120.24.144.164:8848
-        namespace: 4dkankan-dev
+        namespace: 4dkankan-test
     sentinel:
       transport:
         dashboard: 120.24.144.164:8888