浏览代码

增加websocket服务

xiewenjie 3 年之前
父节点
当前提交
82265612a0

+ 5 - 4
pom.xml

@@ -345,10 +345,11 @@
                 </exclusion>
             </exclusions>
         </dependency>
-        <dependency><!-- Ehcache 3.x //-->
-            <groupId>org.ehcache</groupId>
-            <artifactId>ehcache</artifactId>
-            <version>3.4.0</version>
+
+        <dependency>
+            <groupId>org.yeauty</groupId>
+            <artifactId>netty-websocket-spring-boot-starter</artifactId>
+            <version>0.12.0</version>
         </dependency>
         <!-- 钉钉告警 -->
         <!--		<dependency>-->

+ 1 - 1
sxz-application/src/main/resources/config/j2cache.properties

@@ -52,7 +52,7 @@ j2cache.L2.provider_class = lettuce
 j2cache.sync_ttl_to_redis = true
 
 # Whether to cache null objects by default (default false)
-j2cache.default_cache_null_object = true
+j2cache.default_cache_null_object = false
 
 #########################################
 # Cache Serialization Provider

+ 5 - 0
sxz-base/pom.xml

@@ -250,6 +250,11 @@
             <groupId>net.oschina.j2cache</groupId>
             <artifactId>j2cache-core</artifactId>
         </dependency>
+        <!-- websocket -->
+        <dependency>
+            <groupId>org.yeauty</groupId>
+            <artifactId>netty-websocket-spring-boot-starter</artifactId>
+        </dependency>
     </dependencies>
     <properties>
         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>

+ 36 - 0
sxz-base/src/main/java/com/fdkk/sxz/util/OkHttpUtils.java

@@ -118,6 +118,42 @@ public class OkHttpUtils {
         return null;
     }
 
+
+    public static JSONObject httpPostFormReturnJson(String url, Map<String, String> paramsMap, Map<String, String> headers) {
+        if (url == null || "".equals(url)) {
+            log.error("url为null!");
+            return null;
+        }
+        // 添加请求类型
+        MultipartBody.Builder builder = new MultipartBody.Builder();
+        builder.setType(MediaType.parse("multipart/form-data"));
+
+        // 追加表单信息
+        paramsMap.forEach((String key, String value) -> builder.addFormDataPart(key, value));
+
+        RequestBody formBody = builder.build();
+
+        Request.Builder requestBuilder = new Request.Builder().url(url);
+        if (headers != null && headers.size() > 0) {
+            headers.forEach((k, v) -> requestBuilder.addHeader(k, v));
+        }
+
+        Request request = requestBuilder.post(formBody).build();
+        try {
+            Response response = okHttpClient.newCall(request).execute();
+            if (response.code() == 200) {
+                log.info("http PostForm 请求成功; [url={}, requestContent={}]", url, paramsMap.toString());
+                return JSON.parseObject(response.body().string());
+            } else {
+                log.warn("Http PostForm 请求失败; [ errorCode = {}, url={}, param={}]", response.code(), url, paramsMap.toString());
+            }
+        } catch (IOException e) {
+            throw new RuntimeException("同步http请求失败,url:" + url, e);
+        }
+        return null;
+    }
+
+
     /**
      * get请求
      * 对于小文档,响应体上的string()方法非常方便和高效。

+ 8 - 0
sxz-core/src/main/java/com/fdkk/sxz/other/listener/RunBuild.java

@@ -21,6 +21,7 @@ import com.fdkk.sxz.webApi.service.*;
 import com.fdkk.sxz.util.*;
 import com.fdkk.sxz.webApi.service.custom.ICustomComponentService;
 import lombok.extern.slf4j.Slf4j;
+import net.oschina.j2cache.CacheChannel;
 import org.springframework.amqp.rabbit.annotation.RabbitHandler;
 import org.springframework.amqp.rabbit.annotation.RabbitListener;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -83,6 +84,12 @@ public class RunBuild {
     @Autowired
     private RedisUtil redisUtil;
 
+
+    @Autowired
+    private CacheChannel cacheChannel;
+
+
+
     @RabbitHandler
     @RabbitListener(queues = TopicRabbitConfig.CHANGE)
     public void change(String str) {
@@ -1067,6 +1074,7 @@ public class RunBuild {
 //                    componentModelUploadEntity.setImgPath(objPath.replace(buildPath, "").replace(".obj", "") + ".jpg");
                         componentModelUploadEntity.setObjPath(objPath.replace(buildPath, ""));
                         componentModelUploadService.updateById(componentModelUploadEntity);
+                        redisUtil.setEx("componetUploadStatus::id::"+componentModelUploadEntity.getId(),"done",30,TimeUnit.MINUTES);
                         break;
 
                     default:

+ 6 - 4
sxz-core/src/main/java/com/fdkk/sxz/webApi/controller/custom/ComponetManagerController.java

@@ -13,10 +13,7 @@ import com.fdkk.sxz.entity.ModelUploadEntity;
 import com.fdkk.sxz.entity.custuom.CustomComponentClassifyEntity;
 import com.fdkk.sxz.entity.custuom.CustomComponentEntity;
 import com.fdkk.sxz.other.mq.TopicRabbitConfig;
-import com.fdkk.sxz.util.CreateObjUtil;
-import com.fdkk.sxz.util.FileUtils;
-import com.fdkk.sxz.util.RandomUtil;
-import com.fdkk.sxz.util.UploadToOssUtil;
+import com.fdkk.sxz.util.*;
 import com.fdkk.sxz.vo.request.*;
 import com.fdkk.sxz.vo.response.ResponseComponentManager;
 import com.fdkk.sxz.webApi.service.IComponentModelUploadService;
@@ -29,6 +26,7 @@ import io.swagger.annotations.ApiImplicitParam;
 import io.swagger.annotations.ApiImplicitParams;
 import io.swagger.annotations.ApiOperation;
 import lombok.extern.slf4j.Slf4j;
+import net.oschina.j2cache.CacheChannel;
 import org.springframework.amqp.rabbit.core.RabbitTemplate;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Value;
@@ -59,6 +57,9 @@ public class ComponetManagerController extends BaseController {
     @Autowired
     private RabbitTemplate rabbitTemplate;  //使用RabbitTemplate,这提供了接收/发送等等方
 
+    @Autowired
+    private CacheChannel cacheChannel;
+
 
 
     @Autowired
@@ -284,6 +285,7 @@ public class ComponetManagerController extends BaseController {
 
         rabbitTemplate.convertAndSend(TopicRabbitConfig.TOPICE, TopicRabbitConfig.MODEL, componentModelUploadEntity.getId() + ":;" + fileId + ":;" + userId + ":;" + modelType + ":;" + "false"+":;" +"3");
 
+        cacheChannel.set("componetUploadStatus","id"+componentModelUploadEntity.getId(),"running");
         return Result.success(componentModelUploadEntity);
     }
 

+ 8 - 1
sxz-core/src/main/java/com/fdkk/sxz/webApi/controller/testController.java

@@ -7,18 +7,23 @@ import com.fdkk.sxz.util.OkHttpUtils;
 import com.fdkk.sxz.util.RedisUtil;
 import com.fdkk.sxz.base.Result;
 import com.fdkk.sxz.webApi.service.custom.ICustomProductFirstclassifyService;
+import com.fdkk.sxz.webApi.socketController.testJacache;
 import io.swagger.annotations.Api;
 import io.swagger.annotations.ApiImplicitParam;
 import io.swagger.annotations.ApiImplicitParams;
 import io.swagger.annotations.ApiOperation;
 import lombok.extern.slf4j.Slf4j;
 import net.oschina.j2cache.CacheChannel;
+import net.oschina.j2cache.Command;
+import net.oschina.j2cache.cache.support.redis.SpringRedisPubSubPolicy;
+import net.oschina.j2cache.cluster.ClusterPolicy;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.web.bind.annotation.RequestMapping;
 import org.springframework.web.bind.annotation.RequestMethod;
 import org.springframework.web.bind.annotation.RestController;
 
 import java.util.Arrays;
+import java.util.concurrent.TimeUnit;
 
 @Slf4j
 @RestController
@@ -36,6 +41,8 @@ public class testController extends BaseController {
     private CacheChannel cacheChannel;
 
 
+
+
     /**
      * 测试redis
      *
@@ -49,7 +56,7 @@ public class testController extends BaseController {
             @ApiImplicitParam(name = "val", value = "val", dataType = "String")})
     @RequestMapping(value = "/redis", method = RequestMethod.POST)
     public Result redis(String key, String val) {
-        redisUtil.set(key, val);
+        redisUtil.setEx("componetUploadStatus::id::"+key,val,30, TimeUnit.MINUTES);
         return success(redisUtil.get(key));
     }
     /**

+ 213 - 0
sxz-core/src/main/java/com/fdkk/sxz/webApi/socketController/ComponentSocket.java

@@ -0,0 +1,213 @@
+package com.fdkk.sxz.webApi.socketController;
+
+
+import cn.hutool.core.thread.ThreadUtil;
+import cn.hutool.core.util.ObjectUtil;
+import cn.hutool.core.util.StrUtil;
+import cn.hutool.json.JSONArray;
+import cn.hutool.json.JSONObject;
+import cn.hutool.json.JSONUtil;
+import com.fdkk.sxz.entity.ComponentModelUploadEntity;
+import com.fdkk.sxz.util.OkHttpUtils;
+import com.fdkk.sxz.util.RedisUtil;
+import com.fdkk.sxz.webApi.service.IComponentModelUploadService;
+import io.netty.handler.codec.http.HttpHeaders;
+import io.netty.handler.timeout.IdleStateEvent;
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+import net.oschina.j2cache.CacheChannel;
+import net.oschina.j2cache.CacheObject;
+import net.oschina.j2cache.CacheProviderHolder;
+import net.oschina.j2cache.Command;
+import net.oschina.j2cache.cluster.ClusterPolicy;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Component;
+import org.springframework.util.MultiValueMap;
+import org.yeauty.annotation.*;
+import org.yeauty.pojo.Session;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+
+/**
+ * @author XieWj
+ */
+@ServerEndpoint(path = "/componentWSS/", port = "8888")
+@Slf4j
+public class ComponentSocket {
+
+    @Value("${main.url}")
+    private String mainUrl;
+
+
+    private static Map<String, ComponentSocket> ComponentSocketMap = new ConcurrentHashMap<>();
+
+    @Autowired
+    private RedisUtil redisUtil;
+
+    @Autowired
+    private IComponentModelUploadService componentModelUploadService;
+
+    Thread thread = null;
+
+    static boolean isStop = true;
+
+
+    @BeforeHandshake
+    public void handshake(Session session, HttpHeaders headers, @PathVariable Map<String, String> pathMap) {
+        String num = pathMap.get("num");
+        String token = pathMap.get("token");
+        Map<String, String> paramsMap = new HashMap();
+        paramsMap.put("num", num);
+        Map<String, String> postHeaders = new HashMap();
+        postHeaders.put("token", token);
+        session.setSubprotocols("stomp");
+//        JSONObject req=OkHttpUtils.httpPostFormReturnJson(mainUrl + "api/scene/isLogin", paramsMap, postHeaders);
+        if (1 == 0) {
+            log.info("Authentication failed!");
+            session.close();
+        }
+    }
+
+    @OnOpen
+    public void onOpen(Session session, HttpHeaders headers, @PathVariable Map pathMap) {
+        log.info("new connection, sessionId-{},Host-{}", session.id(), headers.get("Host"));
+        isStop = true;
+    }
+
+    @OnClose
+    public void onClose(Session session) throws IOException, InterruptedException {
+        log.info("one connection closed");
+        if (thread != null) {
+            Thread.sleep(1000);
+            thread.interrupt();
+        }
+    }
+
+    @OnError
+    public void onError(Session session, Throwable throwable) {
+        throwable.printStackTrace();
+        thread.interrupt();
+    }
+
+    public void toDo(Session session, String message) throws InterruptedException {
+        while (true) {
+            JSONObject data = JSONUtil.parseObj(message);
+            Long id = data.getLong("id");
+            Thread.sleep(1000);
+            log.warn("查询" + session.id() + "|----------" + id);
+            if (session.isOpen()) {
+                String status = data.getStr("status");
+                if (StrUtil.equals(status, "query")) {
+                    if (redisUtil.hasKey("componetUploadStatus::id::" + id)) {
+                        String ObjStatus = redisUtil.get("componetUploadStatus::id::" + id);
+                        if (StrUtil.equals(ObjStatus, "done")) {
+                            extracted(session, data, id);
+                            return;
+                        }
+                    } else {
+                        ComponentModelUploadEntity entity = componentModelUploadService.findById(id);
+                        if (ObjectUtil.isNotNull(entity)) {
+                            if (entity.getStatus() == 1 && entity.getProgress() == 100) {
+                                extracted(session, data, id);
+                                return;
+                            }
+                        }else {
+                            return;
+                        }
+                    }
+
+                    if (Thread.currentThread().isInterrupted()) {
+                        log.info("i has interputed");
+                        return;
+                    }
+                }
+            } else {
+                //连接关闭直接return
+                isStop = false;
+                ComponentSocketMap.remove("id" + data.getInt("id") + "session" + session.id());
+                return;
+            }
+        }
+    }
+
+    private void extracted(Session session, JSONObject data, Long id) {
+        JSONObject res = JSONUtil.createObj();
+        JSONObject resData = JSONUtil.createObj();
+        resData.set("id", id);
+        resData.set("status", "done");
+        res.set("msg", "done");
+        res.set("data", resData);
+        res.set("code", 0);
+        session.sendText(res.toString());
+        //连接关闭直接return
+        ComponentSocketMap.remove("id" + data.getInt("id") + "session" + session.id());
+        Thread.yield();
+    }
+
+    @OnMessage
+    public void onMessage(Session session, String message) throws InterruptedException {
+        log.info(message);
+        JSONObject res = JSONUtil.createObj();
+        res.set("code", 0);
+        //业务处理 调用异步线程,处理成功返回消息,连接断开,销毁线程
+        if (JSONUtil.isJson(message)) {
+            JSONObject data = JSONUtil.parseObj(message);
+            res.set("msg", "is Json");
+            isStop = true;
+            if (!ComponentSocketMap.containsKey("id" + data.getInt("id") + "session" + session.id())) {
+                ComponentSocketMap.put("id" + data.getInt("id") + "session" + session.id(), this);
+                thread = new Thread(() -> {
+                    try {
+                        toDo(session, message);
+                    } catch (InterruptedException e) {
+                        e.printStackTrace();
+                    }
+
+                });
+                thread.start();
+            }
+        } else {
+            res.set("code", 400);
+            res.set("msg", "Not for Json");
+        }
+        session.sendText(res.toString());
+        session.sendText(session.id().toString());
+    }
+
+
+    @OnBinary
+    public void onBinary(Session session, byte[] bytes) {
+        for (byte b : bytes) {
+            System.out.println(b);
+        }
+        session.sendBinary(bytes);
+    }
+
+    @OnEvent
+    public void onEvent(Session session, Object evt) {
+        if (evt instanceof IdleStateEvent) {
+            IdleStateEvent idleStateEvent = (IdleStateEvent) evt;
+            switch (idleStateEvent.state()) {
+                case READER_IDLE:
+                    log.info("read idle");
+                    break;
+                case WRITER_IDLE:
+                    log.info("write idle");
+                    break;
+                case ALL_IDLE:
+                    log.info("all idle");
+                    break;
+                default:
+                    break;
+            }
+        }
+    }
+}

+ 49 - 0
sxz-core/src/main/java/com/fdkk/sxz/webApi/socketController/testJacache.java

@@ -0,0 +1,49 @@
+package com.fdkk.sxz.webApi.socketController;
+
+import net.oschina.j2cache.CacheProviderHolder;
+import net.oschina.j2cache.Command;
+import net.oschina.j2cache.cluster.ClusterPolicy;
+import org.springframework.stereotype.Component;
+
+import java.util.Properties;
+
+@Component
+public class testJacache implements ClusterPolicy {
+
+    @Override
+    public void connect(Properties properties, CacheProviderHolder cacheProviderHolder) {
+        log.info(properties.toString());
+    }
+
+    @Override
+    public void publish(Command command) {
+        log.info(command.getRegion()+command.getKeys());
+    }
+
+
+    @Override
+    public void disconnect() {
+
+    }
+
+    @Override
+    public void evict(String s, String... strings) {
+        log.info(s);
+    }
+
+    @Override
+    public void clear(String s) {
+        log.info(s);
+
+    }
+
+    @Override
+    public boolean isLocalCommand(Command command) {
+        return false;
+    }
+
+    @Override
+    public void handleCommand(Command cmd) {
+        ClusterPolicy.super.handleCommand(cmd);
+    }
+}