|
@@ -0,0 +1,351 @@
|
|
|
+package com.fdkk.fdkkmeta.socketIO;
|
|
|
+
|
|
|
+import cn.hutool.core.bean.BeanUtil;
|
|
|
+import cn.hutool.core.io.FileUtil;
|
|
|
+import cn.hutool.core.io.IoUtil;
|
|
|
+import cn.hutool.core.io.file.FileWriter;
|
|
|
+import cn.hutool.core.thread.ThreadUtil;
|
|
|
+import cn.hutool.core.util.RuntimeUtil;
|
|
|
+import cn.hutool.core.util.StrUtil;
|
|
|
+import com.alibaba.fastjson.JSON;
|
|
|
+import com.alibaba.fastjson.JSONArray;
|
|
|
+import com.corundumstudio.socketio.AckRequest;
|
|
|
+import com.corundumstudio.socketio.SocketIOClient;
|
|
|
+import com.corundumstudio.socketio.SocketIOServer;
|
|
|
+import com.corundumstudio.socketio.annotation.OnConnect;
|
|
|
+import com.corundumstudio.socketio.annotation.OnDisconnect;
|
|
|
+import com.corundumstudio.socketio.annotation.OnEvent;
|
|
|
+import com.fdkk.fdkkmeta.dto.PushDto;
|
|
|
+import com.fdkk.fdkkmeta.dto.RotateDto;
|
|
|
+import com.fdkk.fdkkmeta.dto.RouteDto;
|
|
|
+import com.fdkk.fdkkmeta.po.RoutePO;
|
|
|
+import com.fdkk.fdkkmeta.redis.RedisCache;
|
|
|
+import com.fdkk.fdkkmeta.util.FpsUtil;
|
|
|
+import com.fdkk.fdkkmeta.util.GenerateUserSig;
|
|
|
+import com.fdkk.fdkkmeta.util.TrtcUtil;
|
|
|
+import com.fdkk.fdkkmeta.util.kesar.GetRoute;
|
|
|
+import com.fdkk.fdkkmeta.util.kesar.MapInfo;
|
|
|
+import javafx.geometry.Point3D;
|
|
|
+import lombok.extern.slf4j.Slf4j;
|
|
|
+import org.springframework.beans.factory.annotation.Autowired;
|
|
|
+import org.springframework.stereotype.Component;
|
|
|
+
|
|
|
+import java.io.FileInputStream;
|
|
|
+import java.io.IOException;
|
|
|
+import java.util.*;
|
|
|
+
|
|
|
+/**
|
|
|
+ * @author Xiewj
|
|
|
+ * @date 2022/3/16
|
|
|
+ */
|
|
|
+@Component
|
|
|
+@Slf4j
|
|
|
+public class SocketIoServer {
|
|
|
+
|
|
|
+ private static SocketIOServer socketIoServer;
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ TrtcUtil trtcUtil;
|
|
|
+ @Autowired
|
|
|
+ RedisCache redisCache;
|
|
|
+ // 获取推送消息接收入口
|
|
|
+ @OnEvent(value = "pushVideo_test")
|
|
|
+ public void pushVideo_test (SocketIOClient client, AckRequest ackRequest, PushDto param) throws IOException, InterruptedException {
|
|
|
+ SocketIoServer.log.info("pushVideo-接收到客户端消息:{}", JSON.toJSONString(param));
|
|
|
+ byte[] bytes = IoUtil.readBytes(new FileInputStream("/home/webrtc/videofile/"+param.getVideoPath()+".mp4"));
|
|
|
+ client.sendEvent("pushVideo_test", bytes);
|
|
|
+ }
|
|
|
+
|
|
|
+ // 获取推送消息接收入口
|
|
|
+ @OnEvent(value = "getRoute")
|
|
|
+ public void getRoute (SocketIOClient client, AckRequest ackRequest, RouteDto param) throws IOException, InterruptedException {
|
|
|
+ SocketIoServer.log.info("getRoute-接收到客户端消息:{}", JSON.toJSONString(param));
|
|
|
+ JSONArray maps = GetRoute.getRoute("/home/webrtc/scenefreespacefile/target_freespace.json", param);
|
|
|
+ pushMessageToUser("getRoute",param.getUserId(),maps);
|
|
|
+ client.sendEvent("getRoute",maps);
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ // getVideo获取推送消息接收入口
|
|
|
+ @OnEvent(value = "getVideo")
|
|
|
+ public void getVideo (SocketIOClient client, AckRequest ackRequest, PushDto param) throws IOException, InterruptedException {
|
|
|
+ SocketIoServer.log.info("getVideo-接收到客户端消息:{}", JSON.toJSONString(param));
|
|
|
+ StringBuilder videoPath=new StringBuilder();
|
|
|
+ if (param.getVideoList().size()>0){
|
|
|
+ for (String s : param.getVideoList()) {
|
|
|
+ videoPath.append(s.replaceAll("/","_"));
|
|
|
+ }
|
|
|
+ }else {
|
|
|
+ log.info("getVideo-VideoList-0");
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+// String key=param.getSceneCode()+":" + "r"+param.getRoomId()+"u"+ param.getUserId()+":"+videoPath;
|
|
|
+ String key=param.getSceneCode()+":"+videoPath;
|
|
|
+ String filePath="/home/webrtc/sceneUserFile/"+param.getSceneCode()+"/" +key+".txt";
|
|
|
+ String outfilePath="/home/webrtc/sceneUserFile/"+param.getSceneCode()+"/" +key+".mp4";
|
|
|
+
|
|
|
+ if (redisCache.hasKey(key)){
|
|
|
+ byte[] bytes = redisCache.getCacheObject(key);
|
|
|
+ pushMessageToUser("getSocketVideo",param.getUserId(),bytes);
|
|
|
+ log.info("getVideo-redis-pushMessageToUser");
|
|
|
+ }else {
|
|
|
+ if (FileUtil.exist(outfilePath)){
|
|
|
+ byte[] bytes = IoUtil.readBytes(new FileInputStream(outfilePath));
|
|
|
+ pushMessageToUser("getSocketVideo",param.getUserId(),bytes);
|
|
|
+ redisCache.setCacheObject(key,bytes);
|
|
|
+ }else {
|
|
|
+ String token = redisCache.tryLock("getVideo:" + key, 1000 * 60 * 2);
|
|
|
+ if (token != null){
|
|
|
+
|
|
|
+ FileWriter writer = new FileWriter(filePath);
|
|
|
+ Set<String> fileList=new HashSet<>();
|
|
|
+ for (String s : param.getVideoList()) {
|
|
|
+ fileList.add("file '/home/webrtc/videofile/"+param.getSceneCode()+"/"+s+".mp4'");
|
|
|
+ }
|
|
|
+ writer.writeLines(fileList,false);
|
|
|
+
|
|
|
+ Process exec = RuntimeUtil.exec("/home/webrtc/socket-pushvideos.sh",
|
|
|
+ filePath, outfilePath
|
|
|
+ );
|
|
|
+ exec.waitFor();
|
|
|
+ byte[] bytes = IoUtil.readBytes(new FileInputStream(outfilePath));
|
|
|
+ pushMessageToUser("getSocketVideo",param.getUserId(),bytes);
|
|
|
+ redisCache.setCacheObject(key,bytes);
|
|
|
+ log.info("getVideo-{} {} {}","/home/webrtc/socket-pushvideos.sh",
|
|
|
+ filePath,outfilePath);
|
|
|
+ }
|
|
|
+ redisCache.unlock("getVideo:"+key, token);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ client.sendEvent("getVideo",param);
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+ }
|
|
|
+ // 获取推送消息接收入口
|
|
|
+ @OnEvent(value = "getRotateVideo")
|
|
|
+ public void getRotateVideo (SocketIOClient client, AckRequest ackRequest, RotateDto param) throws IOException, InterruptedException {
|
|
|
+ SocketIoServer.log.info("getRotateVideo-接收到客户端消息:{}", JSON.toJSONString(param));
|
|
|
+ String key=param.getSceneCode()+":" + param.getVideoPath().replaceAll("/","_")+":"+param.getSangle()+"-"+param.getEangle()+"-"+param.getReverses();
|
|
|
+ String outfilePath = "/home/webrtc/sceneUserFile/"+param.getSceneCode()+"/" + key + "rotate.mp4";
|
|
|
+
|
|
|
+ if (redisCache.hasKey(key)) {
|
|
|
+ byte[] bytes = redisCache.getCacheObject(key);
|
|
|
+ pushMessageToUser("getSocketVideo", param.getUserId(), bytes);
|
|
|
+ log.info("getRotateVideo-redis-pushMessageToUser");
|
|
|
+ }else {
|
|
|
+ if (FileUtil.exist(outfilePath)){
|
|
|
+ byte[] bytes = IoUtil.readBytes(new FileInputStream(outfilePath));
|
|
|
+ pushMessageToUser("getSocketVideo",param.getUserId(),bytes);
|
|
|
+ redisCache.setCacheObject(key,bytes);
|
|
|
+ }else {
|
|
|
+ String token = redisCache.tryLock("getRotateVideo:" + key, 1000 * 60 * 2);
|
|
|
+ if (token != null) {
|
|
|
+ String sangle = param.getSangle();
|
|
|
+ String eangle = param.getEangle();
|
|
|
+ String reverses = param.getReverses().toString();
|
|
|
+ String videoPath = param.getVideoPath();
|
|
|
+ log.info("{}{}{}[}", sangle, eangle, reverses, videoPath);
|
|
|
+ Process exec = RuntimeUtil.exec("/home/webrtc/socket-pushvideo-rotate.sh",
|
|
|
+ "/home/webrtc/videofile/" + videoPath,
|
|
|
+ FpsUtil.getDateFormat(sangle),
|
|
|
+ FpsUtil.getDateFormat(eangle),
|
|
|
+ reverses, outfilePath
|
|
|
+ );
|
|
|
+ exec.waitFor();
|
|
|
+ byte[] bytes = IoUtil.readBytes(new FileInputStream(outfilePath));
|
|
|
+ pushMessageToUser("getSocketVideo", param.getUserId(), bytes);
|
|
|
+ redisCache.setCacheObject(key,bytes);
|
|
|
+ log.info("getRotateVideo-{} {} {} {} {} {}", "/home/webrtc/socket-pushvideo-rotate.sh",
|
|
|
+ "/home/webrtc/videofile/"+param.getSceneCode()+"/" + videoPath,
|
|
|
+ FpsUtil.getDateFormat(sangle),
|
|
|
+ FpsUtil.getDateFormat(eangle),
|
|
|
+ reverses, outfilePath);
|
|
|
+ }
|
|
|
+ redisCache.unlock("getRotateVideo:" + key, token);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ client.sendEvent("getRotateVideo",param);
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ // 获取推送消息接收入口
|
|
|
+ @OnEvent(value = "pushVideo")
|
|
|
+ public void pushVideo (SocketIOClient client, AckRequest ackRequest, PushDto param) throws IOException, InterruptedException {
|
|
|
+ SocketIoServer.log.info("pushVideo-接收到客户端消息:{}", JSON.toJSONString(param));
|
|
|
+ //TODO 调用命令去推送流
|
|
|
+ String key="r:"+param.getRoomId()+"u:"+param.getUserId();
|
|
|
+ log.info("user-key,{}",key);
|
|
|
+ String videoStreamUrl=param.getSceneCode()+"/" + "r"+param.getRoomId()+"u"+ param.getUserId()+"s"+param.getSessionId();
|
|
|
+ //先通知前端需要接收
|
|
|
+ client.sendEvent("getVideo",videoStreamUrl);
|
|
|
+
|
|
|
+ String filePath="/home/webrtc/sceneUserFile/"+videoStreamUrl+".txt";
|
|
|
+
|
|
|
+ FileWriter writer = new FileWriter(filePath);
|
|
|
+ List<String> fileList=new ArrayList<>();
|
|
|
+ for (String s : param.getVideoList()) {
|
|
|
+ fileList.add("file '/home/webrtc/videofile/"+s+".mp4'");
|
|
|
+ }
|
|
|
+ writer.writeLines(fileList,false);
|
|
|
+
|
|
|
+ ThreadUtil.execAsync(() -> RuntimeUtil.execForStr("/home/webrtc/pushvideos.sh",
|
|
|
+ "rtmp://127.0.0.1:1935/"+videoStreamUrl,
|
|
|
+ filePath));
|
|
|
+ log.info("pushVideo-{},{},{}","/home/webrtc/pushvideo.sh",
|
|
|
+ "rtmp://127.0.0.1:1935/"+videoStreamUrl,
|
|
|
+ filePath);
|
|
|
+ }
|
|
|
+
|
|
|
+ // 获取推送消息接收入口
|
|
|
+ @OnEvent(value = "rotate")
|
|
|
+ public void rotate (SocketIOClient client, AckRequest ackRequest, RotateDto param) throws IOException, InterruptedException {
|
|
|
+ SocketIoServer.log.info("rotate-接收到客户端消息:{}", JSON.toJSONString(param));
|
|
|
+ //TODO 调用命令去推送流
|
|
|
+ String key="r:"+param.getRoomId()+"u:"+param.getUserId();
|
|
|
+ log.info("user-key,{}",key);
|
|
|
+ //先通知前端需要接收
|
|
|
+ String videoStreamUrl=param.getSceneCode()+"/" + "r"+param.getRoomId()+"u"+ param.getUserId()+"s"+param.getSessionId();
|
|
|
+ client.sendEvent("getVideo",videoStreamUrl);
|
|
|
+ String sangle = param.getSangle();
|
|
|
+ String eangle = param.getEangle();
|
|
|
+ String reverses = param.getReverses().toString();
|
|
|
+ String videoPath=param.getVideoPath();
|
|
|
+
|
|
|
+ log.info("{}{}{}[}",sangle,eangle,reverses,videoPath);
|
|
|
+
|
|
|
+ ThreadUtil.execAsync(() -> RuntimeUtil.execForStr("/home/webrtc/pushvideo-rotate.sh",
|
|
|
+ "rtmp://127.0.0.1:1935/" + videoStreamUrl,
|
|
|
+ "/home/webrtc/videofile/"+videoPath,
|
|
|
+ FpsUtil.getDateFormat(sangle),
|
|
|
+ FpsUtil.getDateFormat(eangle),
|
|
|
+ reverses
|
|
|
+ )
|
|
|
+
|
|
|
+ );
|
|
|
+ log.info("rotate-{} {} {} {} {} {}","/home/webrtc/pushvideo-rotate.sh",
|
|
|
+ "rtmp://127.0.0.1:1935/" + videoStreamUrl,
|
|
|
+ "/home/webrtc/videofile/"+videoPath,
|
|
|
+ FpsUtil.getDateFormat(sangle),
|
|
|
+ FpsUtil.getDateFormat(eangle),
|
|
|
+ reverses);
|
|
|
+ }
|
|
|
+
|
|
|
+ // 获取推送消息接收入口
|
|
|
+ @OnEvent(value = "getPush")
|
|
|
+ public void getPush(SocketIOClient client, AckRequest ackRequest, PushDto param) throws IOException, InterruptedException {
|
|
|
+ SocketIoServer.log.info("接收到客户端消息:{}", JSON.toJSONString(param));
|
|
|
+ //TODO 调用命令去推送流
|
|
|
+ String pushURl = trtcUtil.getPushURl(param);
|
|
|
+ SocketIoServer.log.info("转换URL:{}", pushURl);
|
|
|
+ String key="r:"+param.getRoomId()+"u:"+param.getUserId();
|
|
|
+ log.info("user-key,{}",key);
|
|
|
+ //创建线程
|
|
|
+ //判断当前用户是否开启推流线程
|
|
|
+ if (!redisCache.hasKey(key)){
|
|
|
+ String userId = param.getUserId();
|
|
|
+ String pid = "123123";
|
|
|
+ log.info("pid{}",pid);
|
|
|
+ }
|
|
|
+// client.sendEvent("getPush", pushURl);
|
|
|
+ }
|
|
|
+ /**
|
|
|
+ * 加入房间
|
|
|
+ * */
|
|
|
+ @OnEvent(value = "joinRoom")
|
|
|
+ public void onJoinRoomEvent(SocketIOClient client, AckRequest request, PushDto data) {
|
|
|
+ client.joinRoom(data.getSceneCode());
|
|
|
+ String key="point:"+data.getSceneCode()+":r:"+data.getRoomId()+"u:"+data.getUserId();
|
|
|
+ redisCache.setCacheObject(key, data);
|
|
|
+
|
|
|
+ }
|
|
|
+ /**
|
|
|
+ * 离开房间
|
|
|
+ * */
|
|
|
+ @OnEvent(value = "leaveRoom")
|
|
|
+ public void onLeaveRoomEvent(SocketIOClient client, AckRequest request, PushDto data) {
|
|
|
+ client.leaveRoom(data.getSceneCode());
|
|
|
+ String key="point:"+data.getSceneCode()+":r:"+data.getRoomId()+"u:"+data.getUserId();
|
|
|
+ redisCache.delete(key);
|
|
|
+
|
|
|
+ }
|
|
|
+ /**
|
|
|
+ * 获取当前房间里面所有人的点位
|
|
|
+ * */
|
|
|
+ @OnEvent(value = "roomPoint")
|
|
|
+ public void getRoomPointEvent(SocketIOClient client, AckRequest request, PushDto data) {
|
|
|
+ Set<Object> keys = redisCache.hKeys("point:" + data.getSceneCode() + "*");
|
|
|
+ List<PushDto> points=new ArrayList<>();
|
|
|
+ for (Object key : keys) {
|
|
|
+ String realKey=String.valueOf(key);
|
|
|
+ PushDto pushDto = redisCache.getCacheObject(realKey);
|
|
|
+ points.add(pushDto);
|
|
|
+ }
|
|
|
+ client.sendEvent("roomPoint",points);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ public SocketIoServer(SocketIOServer server) {
|
|
|
+ SocketIoServer.socketIoServer = server;
|
|
|
+ }
|
|
|
+
|
|
|
+ @OnConnect
|
|
|
+ public void onConnect(SocketIOClient client) {
|
|
|
+ // TODO Auto-generated method stub
|
|
|
+ String sa = client.getRemoteAddress().toString();
|
|
|
+ String clientIp = sa.substring(1, sa.indexOf(":"));// 获取设备ip
|
|
|
+ SocketIoServer.log.info(clientIp + "-------------------------" + "客户端已连接" + clientIp);
|
|
|
+ String userId = getParamsByClient(client);
|
|
|
+ if (userId != null) {
|
|
|
+ SocketIoServer.log.info("userId"+userId);
|
|
|
+ SocketIoServer.log.info("SessionId: " + client.getSessionId());
|
|
|
+ SocketIoServer.log.info("RemoteAddress: " + client.getRemoteAddress());
|
|
|
+ SocketIoServer.log.info("Transport: " + client.getTransport());
|
|
|
+ SocketIoServerMapUtil.put(userId, client);
|
|
|
+ client.sendEvent("getSign", GenerateUserSig.genTestUserSig(userId));
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @OnDisconnect
|
|
|
+ public void onDisconnect(SocketIOClient client) {
|
|
|
+ // TODO Auto-generated method stub
|
|
|
+ String sa = client.getRemoteAddress().toString();
|
|
|
+ String clientIp = sa.substring(1, sa.indexOf(":"));// 获取设备ip
|
|
|
+ SocketIoServer.log.info(clientIp + "-------------------------" + "客户端已断开连接");
|
|
|
+ String userId = getParamsByClient(client);
|
|
|
+ if (userId != null) {
|
|
|
+ SocketIoServerMapUtil.remove(userId);
|
|
|
+ System.out.println("断开连接: " + userId);
|
|
|
+ System.out.println("断开连接: " + client.getSessionId());
|
|
|
+ client.disconnect();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 此方法为获取client连接中的参数,可根据需求更改
|
|
|
+ *
|
|
|
+ * @param client
|
|
|
+ * @return
|
|
|
+ */
|
|
|
+ private String getParamsByClient(SocketIOClient client) {
|
|
|
+ // 从请求的连接中拿出参数(这里的loginUserNum必须是唯一标识)
|
|
|
+ Map<String, List<String>> params = client.getHandshakeData().getUrlParams();
|
|
|
+ List<String> list = params.get("userId");
|
|
|
+ if (list != null && list.size() > 0) {
|
|
|
+ return list.get(0);
|
|
|
+ }
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+//
|
|
|
+ public void pushMessageToUser(String pushEvent,String userId,Object o) {
|
|
|
+ if (StrUtil.isNotBlank(userId)) {
|
|
|
+ SocketIOClient client = SocketIoServerMapUtil.get(userId);
|
|
|
+ if (client != null) {
|
|
|
+ client.sendEvent(pushEvent,o);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|