Browse Source

增加redis队列,发送,消费

xiewenjie 3 năm trước cách đây
mục cha
commit
7e523b7940

+ 24 - 3
src/main/java/com/fdkk/fdkkmeta/config/RedisConfig.java

@@ -5,14 +5,17 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo;
 import com.fasterxml.jackson.annotation.PropertyAccessor;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.jsontype.impl.LaissezFaireSubTypeValidator;
-import org.springframework.cache.annotation.CachingConfigurerSupport;
+import com.fdkk.fdkkmeta.redis.impl.RedisSubscribeServiceImpl;
+import constants.RedisQueueConstant;
 import org.springframework.cache.annotation.EnableCaching;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 import org.springframework.data.redis.connection.RedisConnectionFactory;
 import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
 import org.springframework.data.redis.core.RedisTemplate;
-import org.springframework.data.redis.core.script.DefaultRedisScript;
+import org.springframework.data.redis.listener.PatternTopic;
+import org.springframework.data.redis.listener.RedisMessageListenerContainer;
+import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
 import org.springframework.data.redis.serializer.StringRedisSerializer;
 
 /**
@@ -50,4 +53,22 @@ public class RedisConfig
             return template;
         }
 
-    }}
+    }
+    /**
+     * 注入消息监听适配器
+     */
+    @Bean
+    public MessageListenerAdapter messageListenerAdapter( RedisSubscribeServiceImpl receiver){
+        return new MessageListenerAdapter(receiver, RedisQueueConstant.RESP_INFO.get(RedisQueueConstant.ROTATE_QUEUE));
+    }
+    @Bean
+    public RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory connectionFactory,
+                                                                       MessageListenerAdapter messageListenerAdapter) {
+        RedisMessageListenerContainer redisMessageListenerContainer = new RedisMessageListenerContainer();
+        redisMessageListenerContainer.setConnectionFactory(connectionFactory);
+        redisMessageListenerContainer.addMessageListener(messageListenerAdapter, new PatternTopic(RedisQueueConstant.ROTATE_QUEUE));
+
+        return redisMessageListenerContainer;
+    }
+
+}

+ 15 - 16
src/main/java/com/fdkk/fdkkmeta/ctroller/DevController.java

@@ -15,12 +15,16 @@ import com.fdkk.fdkkmeta.domain.entity.mysql.RoomEntity;
 import com.fdkk.fdkkmeta.domain.entity.mysql.RotateFrameEntity;
 import com.fdkk.fdkkmeta.domain.po.AnglePO;
 import com.fdkk.fdkkmeta.domain.po.PointPO;
+import com.fdkk.fdkkmeta.grpc.RotateRequest;
 import com.fdkk.fdkkmeta.redis.RedisCache;
+import com.fdkk.fdkkmeta.redis.RedisPublishService;
 import com.fdkk.fdkkmeta.service.BreakpointsService;
 import com.fdkk.fdkkmeta.service.MoveFrameService;
 import com.fdkk.fdkkmeta.service.RoomService;
 import com.fdkk.fdkkmeta.service.RotateFrameService;
+import com.fdkk.fdkkmeta.util.ProtoJsonUtils;
 import com.google.gson.JsonObject;
+import constants.RedisQueueConstant;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.web.bind.annotation.PostMapping;
@@ -30,6 +34,7 @@ import org.springframework.web.bind.annotation.RestController;
 
 import java.awt.geom.AffineTransform;
 import java.io.File;
+import java.io.IOException;
 import java.math.BigDecimal;
 import java.text.DecimalFormat;
 import java.util.*;
@@ -48,6 +53,10 @@ public class DevController   {
     MoveFrameService moveFrameService;
     @Autowired
     RedisCache redisCache;
+
+    @Autowired
+    private RedisPublishService redisPublishService;
+
     /**
      * test
      *
@@ -156,25 +165,15 @@ public class DevController   {
     }
 
     @PostMapping("redis")
-    public void testredis(String userId,Object value){
-        redisCache.pfremove("setCacheRequest:rotate:"+1);
-        redisCache.setCacheList("setCacheRequest:rotate:"+1, Arrays.asList(1));
-        redisCache.setCacheList("setCacheRequest:rotate:"+1, Arrays.asList(2));
-        redisCache.setCacheList("setCacheRequest:rotate:"+1, Arrays.asList(3));
-        redisCache.setCacheList("setCacheRequest:rotate:"+1, Arrays.asList(4));
-        redisCache.setCacheList("setCacheRequest:rotate:"+1, Arrays.asList(5));
-        redisCache.setCacheList("setCacheRequest:rotate:"+1, Arrays.asList(6));
-        redisCache.setCacheList("setCacheRequest:rotate:"+1, Arrays.asList(7));
-        redisCache.setCacheList("setCacheRequest:rotate:"+1, Arrays.asList(8));
+    public void testredis(String userId,Object value) throws IOException, InterruptedException {
 
-        List<Object> cacheList = redisCache.getCacheList("setCacheRequest:rotate:" + 1);
 
-        List<Object> cacheList1 = redisCache.getCacheList("setCacheRequest:rotate:" + 1);
+        for (int i = 0; i < 10000; i++) {
+            RotateRequest.Builder builder = RotateRequest.newBuilder();
+            builder.setUserId("u_"+i);
+            redisPublishService.sendProtocolMessage(RedisQueueConstant.ROTATE_QUEUE,builder);
 
-        redisCache.lTrim("setCacheRequest:rotate:"+1, 1,-1);
-        List<Object> cacheList2 = redisCache.getCacheList("setCacheRequest:rotate:" + 1);
-
-        System.out.println(cacheList);
+        }
     }
 
 }

+ 12 - 8
src/main/java/com/fdkk/fdkkmeta/ctroller/TestRTCController.java

@@ -5,6 +5,8 @@ import java.util.List;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 
+import cn.hutool.cache.CacheUtil;
+import cn.hutool.cache.impl.NoCache;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.web.bind.annotation.PostMapping;
 
@@ -14,10 +16,10 @@ import com.fdkk.fdkkmeta.domain.entity.mysql.RoomEntity;
 import com.fdkk.fdkkmeta.service.RotateFrameService;;
 
 public class TestRTCController {
-	
+
 	private static final int Echo = 1009;
 	private static final int GetNewUserState = 1009;
-	
+
     @Autowired
     RotateFrameService rotateFrameService;
 
@@ -25,13 +27,15 @@ public class TestRTCController {
     public int Init(HttpServletRequest request,HttpServletResponse response) {
     	//创建用户
     	try {
-    		String user_id = request.getParameter("user_id");
+
+
+			String user_id = request.getParameter("user_id");
     		String nick_name = request.getParameter("nick_name");
     		String skin_id = request.getParameter("skin_id");
     		String avatar_id = request.getParameter("avatar_id");
     		String room_id = request.getParameter("room_id");
     		String app_id = request.getParameter("app_id");
-    		
+
     		JSONObject initFrameInfo = rotateFrameService.getInitFrameInfo(app_id);
     		String frameId = initFrameInfo.getString("frameId");
     		String breakPointId = initFrameInfo.getString("breakPointId");
@@ -43,7 +47,7 @@ public class TestRTCController {
     		return -1;
     	}
     }
-    
+
     @PostMapping("/rotate")
     public int rotate(HttpServletRequest request,HttpServletResponse response) {
     	try {
@@ -54,13 +58,13 @@ public class TestRTCController {
     		double horizontal_move = rotation_action.getDoubleValue("horizontal_move");
 
     		double hAngle = horizontal_move * 90;
-    		
+
     		int offFrameIndex = (int)Math.ceil(hAngle);
     		int currentFrame = 0;      //从数据库里读取
-    		
+
     		currentFrame +=offFrameIndex;
     		currentFrame = currentFrame % 359;
-    		
+
     		return 1;
     	}
     	catch(Exception e) {

+ 9 - 0
src/main/java/com/fdkk/fdkkmeta/grpcService/sceneGrpcServer.java

@@ -14,12 +14,14 @@ import com.fdkk.fdkkmeta.domain.entity.mysql.UserEntity;
 import com.fdkk.fdkkmeta.domain.po.*;
 import com.fdkk.fdkkmeta.grpc.*;
 import com.fdkk.fdkkmeta.redis.RedisCache;
+import com.fdkk.fdkkmeta.redis.RedisPublishService;
 import com.fdkk.fdkkmeta.service.BreakpointsService;
 import com.fdkk.fdkkmeta.service.MoveFrameService;
 import com.fdkk.fdkkmeta.service.RotateFrameService;
 import com.fdkk.fdkkmeta.service.UserService;
 import com.fdkk.fdkkmeta.util.ProtoJsonUtils;
 import com.fdkk.fdkkmeta.util.kesar.GetRoute;
+import constants.RedisQueueConstant;
 import io.grpc.stub.StreamObserver;
 import lombok.extern.slf4j.Slf4j;
 import net.devh.boot.grpc.server.service.GrpcService;
@@ -55,6 +57,9 @@ public class sceneGrpcServer extends SceneGrpcServiceGrpc.SceneGrpcServiceImplBa
 
 	@Autowired
 	SceneGrpcService sceneGrpcService;
+
+	@Autowired
+	private RedisPublishService redisPublishService;
     /***
      *     回复 案例
      *    SceneReply res = SceneReply.newBuilder().setRes(id+","+name).build();
@@ -196,6 +201,10 @@ public class sceneGrpcServer extends SceneGrpcServiceGrpc.SceneGrpcServiceImplBa
     @Override
     public void rotate(RotateRequest request, StreamObserver<NormalReply> responseObserver)   {
     	try {
+
+			redisPublishService.sendProtocolMessage(RedisQueueConstant.ROTATE_QUEUE,request.toBuilder());
+
+
 			ActionPO rotateRequestPO = new ActionPO();
 
     		int action_type = request.getActionType();

+ 1 - 0
src/main/java/com/fdkk/fdkkmeta/redis/RedisCache.java

@@ -1226,4 +1226,5 @@ public class RedisCache
         redisTemplate.opsForHyperLogLog().union(key1, key2);
     }
 
+
 }

+ 23 - 0
src/main/java/com/fdkk/fdkkmeta/redis/RedisPublishService.java

@@ -0,0 +1,23 @@
+package com.fdkk.fdkkmeta.redis;
+
+import com.google.protobuf.Message;
+
+import java.io.IOException;
+
+/**
+ * @author Xiewj
+ * @date 2022/5/19
+ */
+
+public interface RedisPublishService {
+    /**
+     * 发布消息
+     *
+     * @param channel 频道
+     * @param message 消息
+     */
+    void sendMessage(String channel, String message);
+
+    void sendProtocolMessage(String channel, Message.Builder builder) throws IOException;
+
+}

+ 19 - 0
src/main/java/com/fdkk/fdkkmeta/redis/RedisSubscribeService.java

@@ -0,0 +1,19 @@
+package com.fdkk.fdkkmeta.redis;
+
+import org.springframework.data.redis.connection.Message;
+import org.springframework.data.redis.connection.MessageListener;
+import org.springframework.lang.Nullable;
+import org.springframework.stereotype.Service;
+
+/**
+ * @author Xiewj
+ * @date 2022/5/19
+ */
+
+@Service
+public interface RedisSubscribeService {
+
+    void rotateMessage(String message);
+
+
+}

+ 47 - 0
src/main/java/com/fdkk/fdkkmeta/redis/impl/RedisPublishServiceImpl.java

@@ -0,0 +1,47 @@
+package com.fdkk.fdkkmeta.redis.impl;
+
+import cn.hutool.core.util.ObjectUtil;
+import cn.hutool.core.util.StrUtil;
+import com.fdkk.fdkkmeta.redis.RedisPublishService;
+import com.fdkk.fdkkmeta.util.ProtoJsonUtils;
+import com.google.protobuf.Message;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.data.redis.core.StringRedisTemplate;
+import org.springframework.stereotype.Component;
+
+import java.io.IOException;
+
+/**
+ * @author Xiewj
+ * @date 2022/5/19
+ */
+@Component
+@Slf4j
+public class RedisPublishServiceImpl implements RedisPublishService {
+   private StringRedisTemplate stringRedisTemplate;
+
+   @Autowired
+   public void setStringRedisTemplate(StringRedisTemplate stringRedisTemplate) {
+      this.stringRedisTemplate = stringRedisTemplate;
+   }
+
+   @Override
+   public void sendMessage(String channel, String message) {
+      if (StrUtil.isBlank(message)) {
+         return;
+      }
+      stringRedisTemplate.convertAndSend(channel, message);
+      log.info("频道【{},{}】消息已发布", channel, message);
+   }
+
+   @Override
+   public void sendProtocolMessage(String channel, Message.Builder message) throws IOException {
+      if (ObjectUtil.isNull(message)) {
+         return;
+      }
+      stringRedisTemplate.convertAndSend(channel, ProtoJsonUtils.toJson(message));
+      log.info("频道【{},{}】消息已发布", channel, message);
+   }
+
+}

+ 35 - 0
src/main/java/com/fdkk/fdkkmeta/redis/impl/RedisSubscribeServiceImpl.java

@@ -0,0 +1,35 @@
+package com.fdkk.fdkkmeta.redis.impl;
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONObject;
+import com.fdkk.fdkkmeta.grpc.RotateRequest;
+import com.fdkk.fdkkmeta.redis.RedisSubscribeService;
+import com.fdkk.fdkkmeta.util.ProtoJsonUtils;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.data.redis.connection.Message;
+import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.Nullable;
+import java.util.Objects;
+
+/**
+ * @author Xiewj
+ * @date 2022/5/19
+ */
+@Slf4j
+@Component
+public class RedisSubscribeServiceImpl  implements RedisSubscribeService {
+    @Override
+    public void rotateMessage(String message) {
+        if (Objects.isNull(message)) {
+            log.error("消息为空");
+            return;
+        }
+        log.info("订阅频道:rotateMessage消息:{}", message);
+        RotateRequest.Builder builder = RotateRequest.newBuilder();
+        ProtoJsonUtils.toProtoBean(builder,message);
+        log.info("builder-userid,{}",builder.getUserId());
+
+    }
+}

+ 5 - 2
src/main/java/com/fdkk/fdkkmeta/test/test.java

@@ -1,10 +1,14 @@
 package com.fdkk.fdkkmeta.test;
 
+import cn.hutool.cache.CacheUtil;
+import cn.hutool.cache.impl.FIFOCache;
+import cn.hutool.cache.impl.NoCache;
 import cn.hutool.core.io.file.FileWriter;
 import com.alibaba.fastjson.JSON;
 import com.alibaba.fastjson.JSONArray;
 import com.alibaba.fastjson.JSONObject;
 import com.fdkk.fdkkmeta.domain.po.RoutePO;
+import com.fdkk.fdkkmeta.redis.RedisCache;
 import com.fdkk.fdkkmeta.util.FileUtils;
 
 import java.util.*;
@@ -100,8 +104,7 @@ public class test {
 //
 //
         public static void main(String[] args) {
-            createdPointsJson("H:\\workfile\\final_freespace(3).csv");
-            creatTargetJson("H:\\workfile\\final_freespace(3).csv");
+            RedisCache cache=new RedisCache();
 
         }
     private static void createdPointsJson(String path) {

+ 13 - 7
src/main/java/com/fdkk/fdkkmeta/util/ProtoJsonUtils.java

@@ -1,5 +1,6 @@
 package com.fdkk.fdkkmeta.util;
 import com.google.gson.Gson;
+import com.google.protobuf.InvalidProtocolBufferException;
 import com.google.protobuf.Message;
 import com.google.protobuf.MessageOrBuilder;
 import com.google.protobuf.util.JsonFormat;
@@ -20,13 +21,18 @@ import java.io.IOException;
 public class ProtoJsonUtils {
 
     public static String toJson(MessageOrBuilder sourceMessage)
-            throws IOException {
-        String json = JsonFormat.printer()
-                .includingDefaultValueFields()
-                .printingEnumsAsInts()
-                .preservingProtoFieldNames()
-                .print(sourceMessage);
-        return json.replaceAll("\\n","");
+         {
+             String json = null;
+             try {
+                 json = JsonFormat.printer()
+                         .includingDefaultValueFields()
+                         .printingEnumsAsInts()
+                         .preservingProtoFieldNames()
+                         .print(sourceMessage);
+             } catch (InvalidProtocolBufferException e) {
+                 throw new RuntimeException(e);
+             }
+             return json.replaceAll("\\n","");
     }
 
     public static Message toProtoBean(Message.Builder targetBuilder, String json){

+ 21 - 0
src/main/java/constants/RedisQueueConstant.java

@@ -0,0 +1,21 @@
+package constants;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * @author Xiewj
+ * @date 2022/5/19
+ */
+public class RedisQueueConstant {
+    /** 旋转队列 */
+    public static final String ROTATE_QUEUE = "rotate";
+
+    public static final Map<String, String> RESP_INFO = new HashMap();
+
+    static {
+        // 队列方法名称 相关
+        RESP_INFO.put(ROTATE_QUEUE, "rotateMessage");
+
+    }
+}