xiewenjie преди 3 години
родител
ревизия
80cfb5883a

+ 5 - 0
pom.xml

@@ -365,6 +365,11 @@
                 <artifactId>sisyphus-springboot-starter</artifactId>
                 <version>0.0.9</version>
             </dependency>
+            <dependency>
+                <groupId>org.yeauty</groupId>
+                <artifactId>netty-websocket-spring-boot-starter</artifactId>
+                <version>0.12.0</version>
+            </dependency>
             <!-- 钉钉告警 -->
             <!--		<dependency>-->
             <!--			<groupId>com.aliyun</groupId>-->

+ 1 - 0
sxz-application/src/main/resources/application-dev.properties

@@ -90,3 +90,4 @@ logsTimeOut=8
 driveLetter=Z:
 #雪花ID生成终端ID
 snowflake.wordId=2
+ws.port=8123

+ 1 - 0
sxz-application/src/main/resources/application-devuat.properties

@@ -91,3 +91,4 @@ logsTimeOut=8
 driveLetter=Z:
 #雪花ID生成终端ID
 snowflake.wordId=3
+ws.port=8124

+ 1 - 0
sxz-application/src/main/resources/application-prod.properties

@@ -90,3 +90,4 @@ logsTimeOut=8
 driveLetter=Z:
 #雪花ID生成终端ID
 snowflake.wordId=1
+ws.port=8123

+ 1 - 0
sxz-application/src/main/resources/application-uat.properties

@@ -92,3 +92,4 @@ driveLetter=Z:
 server.tomcat.basedir=/mnt/tomcatTmp
 #雪花ID生成终端ID
 snowflake.wordId=1
+ws.port=8123

+ 4 - 0
sxz-base/pom.xml

@@ -263,6 +263,10 @@
             <groupId>com.github.houbb</groupId>
             <artifactId>sisyphus-springboot-starter</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.yeauty</groupId>
+            <artifactId>netty-websocket-spring-boot-starter</artifactId>
+        </dependency>
     </dependencies>
     <properties>
         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>

+ 5 - 5
sxz-core/src/main/java/com/fdkk/sxz/other/mq/TopicRabbitConfig.java

@@ -23,17 +23,17 @@ public class TopicRabbitConfig {
 //
 //    public final static String TOPICE = "topicExchangeCopy";
 
-    public final static String CHANGE = "topic.chang.dev";
+    public final static String CHANGE = "topic.chang";
     //灯光预览
-    public final static String LIGHT = "topic.light.dev";
+    public final static String LIGHT = "topic.light";
 
-    public final static String MODEL = "topic.model.dev";
+    public final static String MODEL = "topic.model";
 
     //"更新模型库,syncmodel
-    public final static String SYNCMODEL = "topic.syncmodel.dev";
+    public final static String SYNCMODEL = "topic.syncmodel";
 
     //处理
-    public final static String MVIEW = "delay.mview.dev";
+    public final static String MVIEW = "delay.mview";
 
     public final static String TOPICE = "topicExchange";
     /**

+ 11 - 0
sxz-core/src/main/java/com/fdkk/sxz/webApi/controller/testController.java

@@ -27,9 +27,11 @@ import lombok.extern.slf4j.Slf4j;
 import net.oschina.j2cache.CacheChannel;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Value;
+import org.springframework.web.bind.annotation.GetMapping;
 import org.springframework.web.bind.annotation.RequestMapping;
 import org.springframework.web.bind.annotation.RequestMethod;
 import org.springframework.web.bind.annotation.RestController;
+import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
 
 import javax.servlet.http.HttpServletRequest;
 import java.util.HashMap;
@@ -69,6 +71,15 @@ public class testController extends BaseController {
     @Autowired
     private IComponentModelUploadService componentModelUploadService;
 
+
+    @GetMapping("/subscribe")
+    public SseEmitter subscribe() {
+        // 设置超时时间为5分钟
+        SseEmitter sseEmitter = new SseEmitter(5 * 60 * 1000L);
+        // 直接返回 SseEmitter 对象就可以和客户端连接
+        return sseEmitter;
+    }
+
     /**
      * 替换所有为空的路径
      *

+ 198 - 0
sxz-core/src/main/java/com/fdkk/sxz/webApi/socket/ComponentSocket.java

@@ -0,0 +1,198 @@
+package com.fdkk.sxz.webApi.socket;
+
+import cn.hutool.core.util.ObjectUtil;
+import cn.hutool.core.util.StrUtil;
+import cn.hutool.json.JSONObject;
+import cn.hutool.json.JSONUtil;
+import com.fdkk.sxz.entity.ComponentModelUploadEntity;
+import com.fdkk.sxz.util.RedisUtil;
+import com.fdkk.sxz.webApi.service.IComponentModelUploadService;
+import io.netty.handler.codec.http.HttpHeaders;
+import io.netty.handler.timeout.IdleStateEvent;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.yeauty.annotation.*;
+import org.yeauty.pojo.Session;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * @author XieWj
+ */
+@ServerEndpoint(path = "/wss/componentWSS/", port = "${ws.port}")
+@Slf4j
+public class ComponentSocket {
+
+    @Value("${main.url}")
+    private String mainUrl;
+
+
+    private static Map<String, ComponentSocket> componentSocketMap = new ConcurrentHashMap<>();
+
+    @Autowired
+    private RedisUtil redisUtil;
+
+    @Autowired
+    private IComponentModelUploadService componentModelUploadService;
+
+    Thread thread = null;
+
+    static boolean isStop = true;
+
+
+    @BeforeHandshake
+    public void handshake(Session session, HttpHeaders headers, @PathVariable Map<String, String> pathMap) {
+        String num = pathMap.get("num");
+        String token = pathMap.get("token");
+        Map<String, String> paramsMap = new HashMap();
+        paramsMap.put("num", num);
+        Map<String, String> postHeaders = new HashMap();
+        postHeaders.put("token", token);
+        session.setSubprotocols("stomp");
+//        JSONObject req=OkHttpUtils.httpPostFormReturnJson(mainUrl + "api/scene/isLogin", paramsMap, postHeaders);
+        if (1 == 0) {
+            ComponentSocket.log.info("Authentication failed!");
+            session.close();
+        }
+    }
+
+    @OnOpen
+    public void onOpen(Session session, HttpHeaders headers, @PathVariable Map pathMap) {
+        ComponentSocket.log.info("new connection, sessionId-{},Host-{}", session.id(), headers.get("Host"));
+    }
+
+    @OnClose
+    public void onClose(Session session) throws IOException, InterruptedException {
+        ComponentSocket.log.info("one connection closed");
+        if (thread != null) {
+            Thread.sleep(1000);
+            thread.interrupt();
+        }
+    }
+
+    @OnError
+    public void onError(Session session, Throwable throwable) {
+        throwable.printStackTrace();
+        if (thread != null) {
+            thread.interrupt();
+        }
+    }
+
+    public void toDo(Session session, String message) throws InterruptedException {
+        while (true) {
+            JSONObject data = JSONUtil.parseObj(message);
+            Long id = data.getLong("id");
+            Thread.sleep(1000);
+            ComponentSocket.log.warn("查询" + session.id() + "|----------" + id);
+            if (session.isOpen()) {
+                String status = data.getStr("status");
+                if (StrUtil.equals(status, "query")) {
+                    if (redisUtil.hasKey("componentUploadStatus:id:" + id)) {
+                        String ObjStatus = redisUtil.get("componentUploadStatus:id:" + id);
+                        if (StrUtil.equals(ObjStatus, "done")) {
+                            extracted(session, data, id);
+                            return;
+                        }
+                    } else {
+                        ComponentModelUploadEntity entity = componentModelUploadService.findById(id);
+                        if (ObjectUtil.isNotNull(entity)) {
+                            if (entity.getStatus() == 1 && entity.getProgress() == 100) {
+                                extracted(session, data, id);
+                                return;
+                            }
+                        } else {
+                            return;
+                        }
+                    }
+
+                    if (Thread.currentThread().isInterrupted()) {
+                        ComponentSocket.log.info("i has isInterrupted");
+                        return;
+                    }
+                }
+            } else {
+                //连接关闭直接return
+                ComponentSocket.isStop = false;
+                ComponentSocket.componentSocketMap.remove("id" + data.getInt("id") + "session" + session.id());
+                return;
+            }
+        }
+    }
+
+    private void extracted(Session session, JSONObject data, Long id) {
+        JSONObject res = JSONUtil.createObj();
+        JSONObject resData = JSONUtil.createObj();
+        resData.set("id", id);
+        resData.set("status", "done");
+        res.set("msg", "done");
+        res.set("data", resData);
+        res.set("code", 0);
+        session.sendText(res.toString());
+        //连接关闭直接return
+        ComponentSocket.componentSocketMap.remove("id" + data.getInt("id") + "session" + session.id());
+        Thread.yield();
+    }
+
+    @OnMessage
+    public void onMessage(Session session, String message) throws InterruptedException {
+        ComponentSocket.log.info(message);
+        JSONObject res = JSONUtil.createObj();
+        res.set("code", 0);
+        //业务处理 调用异步线程,处理成功返回消息,连接断开,销毁线程
+        if (JSONUtil.isJson(message)) {
+            JSONObject data = JSONUtil.parseObj(message);
+            res.set("msg", "is Json");
+            ComponentSocket.isStop = true;
+            if (!ComponentSocket.componentSocketMap.containsKey("id" + data.getInt("id") + "session" + session.id())) {
+                ComponentSocket.componentSocketMap.put("id" + data.getInt("id") + "session" + session.id(), this);
+                thread = new Thread(() -> {
+                    try {
+                        toDo(session, message);
+                    } catch (InterruptedException e) {
+                        e.printStackTrace();
+                    }
+
+                });
+                thread.start();
+            }
+        } else {
+            res.set("code", 400);
+            res.set("msg", "Not for Json");
+        }
+        session.sendText(res.toString());
+        session.sendText(session.id().toString());
+    }
+
+
+    @OnBinary
+    public void onBinary(Session session, byte[] bytes) {
+        for (byte b : bytes) {
+            System.out.println(b);
+        }
+        session.sendBinary(bytes);
+    }
+
+    @OnEvent
+    public void onEvent(Session session, Object evt) {
+        if (evt instanceof IdleStateEvent) {
+            IdleStateEvent idleStateEvent = (IdleStateEvent) evt;
+            switch (idleStateEvent.state()) {
+                case READER_IDLE:
+                    ComponentSocket.log.info("read idle");
+                    break;
+                case WRITER_IDLE:
+                    ComponentSocket.log.info("write idle");
+                    break;
+                case ALL_IDLE:
+                    ComponentSocket.log.info("all idle");
+                    break;
+                default:
+                    break;
+            }
+        }
+    }
+}