Sfoglia il codice sorgente

优化socket线程调用,处理关闭连接,连接错误终止线程

xiewenjie 3 anni fa
parent
commit
c26344c3fe

+ 66 - 35
sxz-core/src/main/java/com/fdkk/sxz/webApi/socket/actionSocket.java

@@ -32,7 +32,7 @@ public class actionSocket {
     private String mainUrl;
 
 
-    private static Map<String, actionSocket> componentSocketMap = new ConcurrentHashMap<>();
+    private static Map<String, Thread> socketSessionBizMap = new ConcurrentHashMap<>();
 
     @Autowired
     private RedisUtil redisUtil;
@@ -67,18 +67,29 @@ public class actionSocket {
     @OnClose
     public void onClose(Session session) throws IOException, InterruptedException {
         actionSocket.log.info("one connection closed");
-        if (thread != null) {
-            Thread.sleep(1000);
-            thread.interrupt();
-        }
+        actionSocket.socketSessionBizMap.keySet().stream().forEach(
+                key -> {
+                    if (key.contains(session.id().toString())) {
+                        actionSocket.socketSessionBizMap.get(key).interrupt();
+                        actionSocket.socketSessionBizMap.remove(key);
+                    }
+                }
+
+        );
     }
 
     @OnError
-    public void onError(Session session, Throwable throwable) {
+    public void onError(Session session, Throwable throwable) throws InterruptedException {
         throwable.printStackTrace();
-        if (thread != null) {
-            thread.interrupt();
-        }
+        actionSocket.socketSessionBizMap.keySet().stream().forEach(
+                key -> {
+                    if (key.contains(session.id().toString())) {
+                        actionSocket.socketSessionBizMap.get(key).interrupt();
+                        actionSocket.socketSessionBizMap.remove(key);
+                    }
+                }
+
+        );
     }
 
     @OnMessage
@@ -91,33 +102,53 @@ public class actionSocket {
         res.set("action", action);
         //业务处理 调用异步线程,处理成功返回消息,连接断开,销毁线程
         if (JSONUtil.isJson(message)) {
-            switch (action) {
-                case "componentBiz":
-                    ComponentBizImpl componentBiz = new ComponentBizImpl();
-                    String componentBizCallId = componentBiz.getCallId();
-                    componentBiz.todo(session, message, componentBizCallId);
-                    componentBiz.listenResult(this::sendMessage, componentBizCallId, new ParamContext("session", session));
-                    break;
-                case "modelUploadBiz":
-                    ModelUploadBizImpl modelUploadBiz = new ModelUploadBizImpl();
-                    String modelUploadBizCallId = modelUploadBiz.getCallId();
-                    modelUploadBiz.todo(session, message, modelUploadBizCallId);
-                    modelUploadBiz.listenResult(this::sendMessage, modelUploadBizCallId, new ParamContext("session", session));
-                    break;
-                case "createImgBiz":
-                    CreateImgBizImpl createImgBiz = new CreateImgBizImpl();
-                    String createImgBizCallId = createImgBiz.getCallId();
-                    createImgBiz.todo(session, message, createImgBizCallId);
-                    createImgBiz.listenResult(this::sendMessage, createImgBizCallId, new ParamContext("session", session));
-                    break;
-                case "componentCreateImgBiz":
-                    ComponentCreateImgBizImpl componentCreateImgBiz = new ComponentCreateImgBizImpl();
-                    String componentCreateImgBizCallId = componentCreateImgBiz.getCallId();
-                    componentCreateImgBiz.todo(session, message, componentCreateImgBizCallId);
-                    componentCreateImgBiz.listenResult(this::sendMessage, componentCreateImgBizCallId, new ParamContext("session", session));
-                    break;
+            try {
+                switch (action) {
+                    case "componentBiz":
+                        ComponentBizImpl componentBiz = new ComponentBizImpl();
+                        String componentBizCallId = componentBiz.getCallId();
+                        if (!actionSocket.socketSessionBizMap.containsKey("componentBiz-id" + data.getInt("id") + "session" + session.id())) {
+                            Thread componentBizTodo = componentBiz.todo(session, message, componentBizCallId);
+                            actionSocket.socketSessionBizMap.put("componentBiz-id" + data.getInt("id") + "session" + session.id(), componentBizTodo);
+                            componentBizTodo.start();
+                            componentBiz.listenResult(this::sendMessage, componentBizCallId, new ParamContext("session", session));
+                        }
+                        break;
+                    case "modelUploadBiz":
+                        ModelUploadBizImpl modelUploadBiz = new ModelUploadBizImpl();
+                        String modelUploadBizCallId = modelUploadBiz.getCallId();
+                        if (!actionSocket.socketSessionBizMap.containsKey("modelUploadBiz-id" + data.getInt("id") + "session" + session.id())) {
+                            Thread modelUploadBizTodo = modelUploadBiz.todo(session, message, modelUploadBizCallId);
+                            actionSocket.socketSessionBizMap.put("modelUploadBiz-id" + data.getInt("id") + "session" + session.id(), modelUploadBizTodo);
+                            modelUploadBizTodo.start();
+                            modelUploadBiz.listenResult(this::sendMessage, modelUploadBizCallId, new ParamContext("session", session));
+
+                        }
+                        break;
+                    case "createImgBiz":
+                        if (!actionSocket.socketSessionBizMap.containsKey("createImgBiz-id" + data.getInt("id") + "session" + session.id())) {
+                            CreateImgBizImpl createImgBiz = new CreateImgBizImpl();
+                            String createImgBizCallId = createImgBiz.getCallId();
+                            Thread createImgBizTodo = createImgBiz.todo(session, message, createImgBizCallId);
+                            actionSocket.socketSessionBizMap.put("createImgBiz-id" + data.getInt("id") + "session" + session.id(), createImgBizTodo);
+                            createImgBizTodo.start();
+                            createImgBiz.listenResult(this::sendMessage, createImgBizCallId, new ParamContext("session", session));
+                        }
+                        break;
+                    case "componentCreateImgBiz":
+                        if (!actionSocket.socketSessionBizMap.containsKey("componentCreateImgBiz-id" + data.getInt("id") + "session" + session.id())) {
+                            ComponentCreateImgBizImpl componentCreateImgBiz = new ComponentCreateImgBizImpl();
+                            String componentCreateImgBizCallId = componentCreateImgBiz.getCallId();
+                            Thread componentCreateImgBizTodo = componentCreateImgBiz.todo(session, message, componentCreateImgBizCallId);
+                            actionSocket.socketSessionBizMap.put("componentCreateImgBiz-id" + data.getInt("id") + "session" + session.id(), componentCreateImgBizTodo);
+                            componentCreateImgBizTodo.start();
+                            componentCreateImgBiz.listenResult(this::sendMessage, componentCreateImgBizCallId, new ParamContext("session", session));
+                        }
+                        break;
+                }
+            } catch (InterruptedException e) {
+                return;
             }
-
         } else {
             res.set("code", 400);
             res.set("msg", "Not for Json");

+ 34 - 30
sxz-core/src/main/java/com/fdkk/sxz/webApi/socket/service/ComponentBizImpl.java

@@ -33,8 +33,8 @@ public class ComponentBizImpl {
     private IComponentModelUploadService componentModelUploadService = SpringUtil.getBean(IComponentModelUploadService.class);
 
 
-    public void todo(Session session, String msgData, String callId) throws InterruptedException {
-        new Thread(new Runnable() {
+    public Thread todo(Session session, String msgData, String callId) throws InterruptedException {
+        return new Thread(new Runnable() {
             @SneakyThrows
             @Override
             public void run() {
@@ -43,44 +43,48 @@ public class ComponentBizImpl {
                 Long id = data.getLong("id");
                 String action = data.getStr("action");
                 String status = data.getStr("status");
-                while (flag) {
-                    Thread.sleep(3000);
-                    ComponentBizImpl.log.warn("查询" + session.id() + "|----------" + id);
-                    if (session.isOpen()) {
-                        if (StrUtil.equals(status, "query")) {
-                            if (redisUtil.hasKey("componentUploadStatus:id:" + id)) {
-                                String ObjStatus = redisUtil.get("componentUploadStatus:id:" + id);
-                                if (StrUtil.equals(ObjStatus, "done") || StrUtil.equals(ObjStatus, "error")) {
-                                    waitForResult(new ParamContext("msg", extracted(id, ObjStatus, action)), callId);
-                                    flag = false;
-                                }
-                            } else {
-                                ComponentModelUploadEntity entity = componentModelUploadService.findById(id);
-                                Thread.sleep(2000);
-                                if (ObjectUtil.isNotNull(entity)) {
-                                    if (entity.getStatus() == 1 && entity.getProgress() == 100) {
-                                        redisUtil.setEx("componentUploadStatus:id:" + id, "done", 5, TimeUnit.MINUTES);
-                                        waitForResult(new ParamContext("msg", extracted(id, "done", action)), callId);
+                try {
+                    while (flag) {
+                        Thread.sleep(3000);
+                        ComponentBizImpl.log.warn("查询" + session.id() + "|----------" + id);
+                        if (session.isOpen()) {
+                            if (StrUtil.equals(status, "query")) {
+                                if (redisUtil.hasKey("componentUploadStatus:id:" + id)) {
+                                    String ObjStatus = redisUtil.get("componentUploadStatus:id:" + id);
+                                    if (StrUtil.equals(ObjStatus, "done") || StrUtil.equals(ObjStatus, "error")) {
+                                        waitForResult(new ParamContext("msg", extracted(id, ObjStatus, action)), callId);
                                         flag = false;
-                                    } else if (entity.getStatus() == -1) {
-                                        redisUtil.delete("modelUploadStatus:id:" + id);
+                                    }
+                                } else {
+                                    ComponentModelUploadEntity entity = componentModelUploadService.findById(id);
+                                    Thread.sleep(2000);
+                                    if (ObjectUtil.isNotNull(entity)) {
+                                        if (entity.getStatus() == 1 && entity.getProgress() == 100) {
+                                            redisUtil.setEx("componentUploadStatus:id:" + id, "done", 5, TimeUnit.MINUTES);
+                                            waitForResult(new ParamContext("msg", extracted(id, "done", action)), callId);
+                                            flag = false;
+                                        } else if (entity.getStatus() == -1) {
+                                            redisUtil.delete("modelUploadStatus:id:" + id);
+                                            waitForResult(new ParamContext("msg", extracted(id, "error", action)), callId);
+                                            flag = false;
+                                        }
+                                    } else {
+                                        //查不到直接返回错误
                                         waitForResult(new ParamContext("msg", extracted(id, "error", action)), callId);
                                         flag = false;
                                     }
-                                } else {
-                                    //查不到直接返回错误
-                                    waitForResult(new ParamContext("msg", extracted(id, "error", action)), callId);
-                                    flag = false;
                                 }
                             }
+                        } else {
+                            ComponentBizImpl.log.info("连接关闭" + session.id());
+                            flag = false;
                         }
-                    } else {
-                        ComponentBizImpl.log.info("连接关闭" + session.id());
-                        flag = false;
                     }
+                } catch (InterruptedException e) {
+                    return;
                 }
             }
-        }).start();
+        });
     }
 
     private String extracted(Long id, String status, String action) {

+ 34 - 30
sxz-core/src/main/java/com/fdkk/sxz/webApi/socket/service/ComponentCreateImgBizImpl.java

@@ -33,8 +33,8 @@ public class ComponentCreateImgBizImpl {
     private IComponentModelUploadService componentModelUploadService = SpringUtil.getBean(IComponentModelUploadService.class);
 
 
-    public void todo(Session session, String msgData, String callId) throws InterruptedException {
-        new Thread(new Runnable() {
+    public Thread todo(Session session, String msgData, String callId) throws InterruptedException {
+        return new Thread(new Runnable() {
             @SneakyThrows
             @Override
             public void run() {
@@ -43,44 +43,48 @@ public class ComponentCreateImgBizImpl {
                 Long id = data.getLong("id");
                 String action = data.getStr("action");
                 String status = data.getStr("status");
-                while (flag) {
-                    Thread.sleep(3000);
-                    ComponentCreateImgBizImpl.log.warn("查询" + session.id() + "|----------" + id);
-                    if (session.isOpen()) {
-                        if (StrUtil.equals(status, "query")) {
-                            if (redisUtil.hasKey("componentCreateImgStatus:id:" + id)) {
-                                String ObjStatus = redisUtil.get("componentCreateImgStatus:id:" + id);
-                                if (StrUtil.equals(ObjStatus, "done") || StrUtil.equals(ObjStatus, "error")) {
-                                    waitForResult(new ParamContext("msg", extracted(id, ObjStatus, action)), callId);
-                                    flag = false;
-                                }
-                            } else {
-                                ComponentModelUploadEntity entity = componentModelUploadService.findById(id);
-                                Thread.sleep(2000);
-                                if (ObjectUtil.isNotNull(entity)) {
-                                    if (entity.getStatus() == 1 && entity.getProgress() == 100) {
-                                        redisUtil.setEx("componentCreateImgStatus:id:" + id, "done", 5, TimeUnit.MINUTES);
-                                        waitForResult(new ParamContext("msg", extracted(id, "done", action)), callId);
+                try {
+                    while (flag) {
+                        Thread.sleep(3000);
+                        ComponentCreateImgBizImpl.log.warn("查询" + session.id() + "|----------" + id);
+                        if (session.isOpen()) {
+                            if (StrUtil.equals(status, "query")) {
+                                if (redisUtil.hasKey("componentCreateImgStatus:id:" + id)) {
+                                    String ObjStatus = redisUtil.get("componentCreateImgStatus:id:" + id);
+                                    if (StrUtil.equals(ObjStatus, "done") || StrUtil.equals(ObjStatus, "error")) {
+                                        waitForResult(new ParamContext("msg", extracted(id, ObjStatus, action)), callId);
                                         flag = false;
-                                    } else if (entity.getStatus() == -1) {
-                                        redisUtil.delete("modelUploadStatus:id:" + id);
+                                    }
+                                } else {
+                                    ComponentModelUploadEntity entity = componentModelUploadService.findById(id);
+                                    Thread.sleep(2000);
+                                    if (ObjectUtil.isNotNull(entity)) {
+                                        if (entity.getStatus() == 1 && entity.getProgress() == 100) {
+                                            redisUtil.setEx("componentCreateImgStatus:id:" + id, "done", 5, TimeUnit.MINUTES);
+                                            waitForResult(new ParamContext("msg", extracted(id, "done", action)), callId);
+                                            flag = false;
+                                        } else if (entity.getStatus() == -1) {
+                                            redisUtil.delete("modelUploadStatus:id:" + id);
+                                            waitForResult(new ParamContext("msg", extracted(id, "error", action)), callId);
+                                            flag = false;
+                                        }
+                                    } else {
+                                        //查不到直接返回错误
                                         waitForResult(new ParamContext("msg", extracted(id, "error", action)), callId);
                                         flag = false;
                                     }
-                                } else {
-                                    //查不到直接返回错误
-                                    waitForResult(new ParamContext("msg", extracted(id, "error", action)), callId);
-                                    flag = false;
                                 }
                             }
+                        } else {
+                            ComponentCreateImgBizImpl.log.info("连接关闭" + session.id());
+                            flag = false;
                         }
-                    } else {
-                        ComponentCreateImgBizImpl.log.info("连接关闭" + session.id());
-                        flag = false;
                     }
+                } catch (InterruptedException e) {
+                    return;
                 }
             }
-        }).start();
+        });
     }
 
     private String extracted(Long id, String status, String action) {

+ 34 - 30
sxz-core/src/main/java/com/fdkk/sxz/webApi/socket/service/CreateImgBizImpl.java

@@ -33,8 +33,8 @@ public class CreateImgBizImpl {
     private IModelUploadService modelUploadService = SpringUtil.getBean(IModelUploadService.class);
 
 
-    public void todo(Session session, String msgData, String callId) throws InterruptedException {
-        new Thread(new Runnable() {
+    public Thread todo(Session session, String msgData, String callId) throws InterruptedException {
+        return new Thread(new Runnable() {
             @SneakyThrows
             @Override
             public void run() {
@@ -43,44 +43,48 @@ public class CreateImgBizImpl {
                 Long id = data.getLong("id");
                 String action = data.getStr("action");
                 String status = data.getStr("status");
-                while (flag) {
-                    Thread.sleep(3000);
-                    CreateImgBizImpl.log.warn("查询" + session.id() + "|----------" + id);
-                    if (session.isOpen()) {
-                        if (StrUtil.equals(status, "query")) {
-                            if (redisUtil.hasKey("createImgStatus:id:" + id)) {
-                                String ObjStatus = redisUtil.get("createImgStatus:id:" + id);
-                                if (StrUtil.equals(ObjStatus, "done") || StrUtil.equals(ObjStatus, "error")) {
-                                    waitForResult(new ParamContext("msg", extracted(id, ObjStatus, action)), callId);
-                                    flag = false;
-                                }
-                            } else {
-                                ModelUploadEntity entity = modelUploadService.findById(id);
-                                Thread.sleep(2000);
-                                if (ObjectUtil.isNotNull(entity)) {
-                                    if (entity.getStatus() == 1 && entity.getProgress() == 100) {
-                                        redisUtil.setEx("createImgStatus:id:" + id, "done", 5, TimeUnit.MINUTES);
-                                        waitForResult(new ParamContext("msg", extracted(id, "done", action)), callId);
+                try {
+                    while (flag) {
+                        Thread.sleep(3000);
+                        CreateImgBizImpl.log.warn("查询" + session.id() + "|----------" + id);
+                        if (session.isOpen()) {
+                            if (StrUtil.equals(status, "query")) {
+                                if (redisUtil.hasKey("createImgStatus:id:" + id)) {
+                                    String ObjStatus = redisUtil.get("createImgStatus:id:" + id);
+                                    if (StrUtil.equals(ObjStatus, "done") || StrUtil.equals(ObjStatus, "error")) {
+                                        waitForResult(new ParamContext("msg", extracted(id, ObjStatus, action)), callId);
                                         flag = false;
-                                    } else if (entity.getStatus() == -1) {
-                                        redisUtil.delete("createImgStatus:id:" + id);
+                                    }
+                                } else {
+                                    ModelUploadEntity entity = modelUploadService.findById(id);
+                                    Thread.sleep(2000);
+                                    if (ObjectUtil.isNotNull(entity)) {
+                                        if (entity.getStatus() == 1 && entity.getProgress() == 100) {
+                                            redisUtil.setEx("createImgStatus:id:" + id, "done", 5, TimeUnit.MINUTES);
+                                            waitForResult(new ParamContext("msg", extracted(id, "done", action)), callId);
+                                            flag = false;
+                                        } else if (entity.getStatus() == -1) {
+                                            redisUtil.delete("createImgStatus:id:" + id);
+                                            waitForResult(new ParamContext("msg", extracted(id, "error", action)), callId);
+                                            flag = false;
+                                        }
+                                    } else {
+                                        //查不到直接返回错误
                                         waitForResult(new ParamContext("msg", extracted(id, "error", action)), callId);
                                         flag = false;
                                     }
-                                } else {
-                                    //查不到直接返回错误
-                                    waitForResult(new ParamContext("msg", extracted(id, "error", action)), callId);
-                                    flag = false;
                                 }
                             }
+                        } else {
+                            CreateImgBizImpl.log.info("连接关闭" + session.id());
+                            flag = false;
                         }
-                    } else {
-                        CreateImgBizImpl.log.info("连接关闭" + session.id());
-                        flag = false;
                     }
+                } catch (InterruptedException e) {
+                    return;
                 }
             }
-        }).start();
+        });
     }
 
     private String extracted(Long id, String status, String action) {

+ 35 - 30
sxz-core/src/main/java/com/fdkk/sxz/webApi/socket/service/ModelUploadBizImpl.java

@@ -33,8 +33,8 @@ public class ModelUploadBizImpl {
     private IModelUploadService modelUploadService = SpringUtil.getBean(IModelUploadService.class);
 
 
-    public void todo(Session session, String msgData, String callId) throws InterruptedException {
-        new Thread(new Runnable() {
+    public Thread todo(Session session, String msgData, String callId) throws InterruptedException {
+        return new Thread(new Runnable() {
             @SneakyThrows
             @Override
             public void run() {
@@ -43,44 +43,49 @@ public class ModelUploadBizImpl {
                 Long id = data.getLong("id");
                 String action = data.getStr("action");
                 String status = data.getStr("status");
-                while (flag) {
-                    Thread.sleep(3000);
-                    ModelUploadBizImpl.log.warn("查询" + session.id() + "|----------" + id);
-                    if (session.isOpen()) {
-                        if (StrUtil.equals(status, "query")) {
-                            if (redisUtil.hasKey("modelUploadStatus:id:" + id)) {
-                                String ObjStatus = redisUtil.get("modelUploadStatus:id:" + id);
-                                if (StrUtil.equals(ObjStatus, "done") || StrUtil.equals(ObjStatus, "error")) {
-                                    waitForResult(new ParamContext("msg", extracted(id, ObjStatus, action)), callId);
-                                    flag = false;
-                                }
-                            } else {
-                                ModelUploadEntity entity = modelUploadService.findById(id);
-                                Thread.sleep(2000);
-                                if (ObjectUtil.isNotNull(entity)) {
-                                    if (entity.getStatus() == 1 && entity.getProgress() == 100) {
-                                        redisUtil.setEx("modelUploadStatus:id:" + id, "done", 5, TimeUnit.MINUTES);
-                                        waitForResult(new ParamContext("msg", extracted(id, "done", action)), callId);
+                try {
+                    while (flag) {
+                        Thread.sleep(3000);
+                        ModelUploadBizImpl.log.warn("查询" + session.id() + "|----------" + id);
+                        if (session.isOpen()) {
+                            if (StrUtil.equals(status, "query")) {
+                                if (redisUtil.hasKey("modelUploadStatus:id:" + id)) {
+                                    String ObjStatus = redisUtil.get("modelUploadStatus:id:" + id);
+                                    if (StrUtil.equals(ObjStatus, "done") || StrUtil.equals(ObjStatus, "error")) {
+                                        waitForResult(new ParamContext("msg", extracted(id, ObjStatus, action)), callId);
                                         flag = false;
-                                    } else if (entity.getStatus() == -1) {
-                                        redisUtil.delete("modelUploadStatus:id:" + id);
+                                    }
+                                } else {
+                                    ModelUploadEntity entity = modelUploadService.findById(id);
+                                    Thread.sleep(2000);
+                                    if (ObjectUtil.isNotNull(entity)) {
+                                        if (entity.getStatus() == 1 && entity.getProgress() == 100) {
+                                            redisUtil.setEx("modelUploadStatus:id:" + id, "done", 5, TimeUnit.MINUTES);
+                                            waitForResult(new ParamContext("msg", extracted(id, "done", action)), callId);
+                                            flag = false;
+                                        } else if (entity.getStatus() == -1) {
+                                            redisUtil.delete("modelUploadStatus:id:" + id);
+                                            waitForResult(new ParamContext("msg", extracted(id, "error", action)), callId);
+                                            flag = false;
+                                        }
+                                    } else {
+                                        //查不到直接返回错误
                                         waitForResult(new ParamContext("msg", extracted(id, "error", action)), callId);
                                         flag = false;
                                     }
-                                } else {
-                                    //查不到直接返回错误
-                                    waitForResult(new ParamContext("msg", extracted(id, "error", action)), callId);
-                                    flag = false;
                                 }
                             }
+                        } else {
+                            ModelUploadBizImpl.log.info("连接关闭" + session.id());
+                            flag = false;
                         }
-                    } else {
-                        ModelUploadBizImpl.log.info("连接关闭" + session.id());
-                        flag = false;
                     }
+                } catch (InterruptedException e) {
+                    return;
                 }
+
             }
-        }).start();
+        });
     }
 
     private String extracted(Long id, String status, String action) {