dengsixing 3 months ago
parent
commit
49e17bc134

+ 23 - 0
pom.xml

@@ -194,7 +194,30 @@
       <groupId>com.github.brainlag</groupId>
       <artifactId>nsq-client</artifactId>
       <version>1.0.0.RC4</version>
+      <exclusions>
+        <exclusion>
+          <groupId>com.google.guava</groupId>
+          <artifactId>guava</artifactId>
+        </exclusion>
+      </exclusions>
     </dependency>
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+      <version>20.0</version>
+    </dependency>
+
+<!--    <dependency>-->
+<!--      <groupId>com.sproutsocial</groupId>-->
+<!--      <artifactId>nsq-j</artifactId>-->
+<!--      <version>1.0</version>-->
+<!--    </dependency>-->
+
+<!--    <dependency>-->
+<!--      <groupId>com.sproutsocial</groupId>-->
+<!--      <artifactId>nsq-j</artifactId>-->
+<!--      <version>1.6.0</version>-->
+<!--    </dependency>-->
 
 
   </dependencies>

+ 13 - 0
src/main/java/com/fdkankan/contro/ModelingControlApplication.java

@@ -26,6 +26,19 @@ import org.springframework.scheduling.annotation.EnableScheduling;
 public class ModelingControlApplication {
     public static void main(String[] args) {
         SpringApplication.run(ModelingControlApplication.class, args);
+
+//        NSQLookup lookup = new DefaultNSQLookup();
+//        lookup.addLookupAddress("192.168.0.127", 4161);
+//        NSQConsumer consumer = new NSQConsumer(lookup, "topic-bd", "channel-bd", (message) -> {
+//            System.out.println("received: " + message);
+//            //now mark the message as finished.
+//            message.finished();
+//
+//            //or you could requeue it, which indicates a failure and puts it back on the queue.
+////            message.requeue();
+//        });
+//
+//        consumer.start();
     }
 }
 

+ 28 - 2
src/main/java/com/fdkankan/contro/controller/SceneFilenameController.java

@@ -1,10 +1,22 @@
 package com.fdkankan.contro.controller;
 
 
+import cn.hutool.crypto.digest.MD5;
+import com.alibaba.fastjson.JSON;
+import com.fdkankan.contro.common.Result;
+import com.fdkankan.contro.entity.SceneOrigBd;
+import com.fdkankan.web.response.ResultData;
+import com.github.brainlag.nsq.NSQProducer;
+import com.github.brainlag.nsq.exceptions.NSQException;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.RequestBody;
 import org.springframework.web.bind.annotation.RequestMapping;
-
 import org.springframework.web.bind.annotation.RestController;
 
+import java.io.File;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.TimeoutException;
+
 /**
  * <p>
  *  前端控制器
@@ -14,8 +26,22 @@ import org.springframework.web.bind.annotation.RestController;
  * @since 2025-05-07
  */
 @RestController
-@RequestMapping("/contro/sceneFilename")
+@RequestMapping("/api/scene/file/bd")
 public class SceneFilenameController {
 
+    @PostMapping
+    public ResultData testNsq(@RequestBody SceneOrigBd sceneOrigBd) throws NSQException, TimeoutException {
+        Result success = Result.success(sceneOrigBd);
+        NSQProducer producer = new NSQProducer().addAddress("192.168.0.127", 4150).start();
+        producer.produce("topic-bd", JSON.toJSONString(success).getBytes(StandardCharsets.UTF_8));
+        producer.shutdown();
+        return ResultData.ok();
+    }
+
+    public static void main(String[] args) {
+        System.out.println(MD5.create().digestHex(new File("D:\\Downloads\\bpvt00010_202504161042512540\\bpvt00010_202504161042512540.zip")));
+    }
+
+
 }
 

+ 3 - 3
src/main/java/com/fdkankan/contro/httpclient/address/MjAddressSource.java

@@ -9,13 +9,13 @@ import org.springframework.stereotype.Component;
 @Component
 public class MjAddressSource implements AddressSource {
 
-    @Value("bd.api.scheme:http")
+    @Value("${bd.api.scheme:http}")
     private String scheme;
 
-    @Value("bd.api.host")
+    @Value("${bd.api.scheme:http}")
     private String host;
 
-    @Value("bd.api.port:#{null}")
+    @Value("${bd.api.port}")
     private Integer port;
 
 

+ 24 - 0
src/main/java/com/fdkankan/contro/mq/nsq/NsqClient.java

@@ -0,0 +1,24 @@
+package com.fdkankan.contro.mq.nsq;
+
+import com.github.brainlag.nsq.NSQConsumer;
+import com.github.brainlag.nsq.lookup.DefaultNSQLookup;
+import com.github.brainlag.nsq.lookup.NSQLookup;
+
+public class NsqClient {
+
+    public static void main(String[] args) {
+        NSQLookup lookup = new DefaultNSQLookup();
+        lookup.addLookupAddress("192.168.0.127", 4161);
+        NSQConsumer consumer = new NSQConsumer(lookup, "topic-bd", "channel-bd", (message) -> {
+            System.out.println("received: " + message);
+            //now mark the message as finished.
+//            message.finished();
+
+            //or you could requeue it, which indicates a failure and puts it back on the queue.
+            message.requeue();
+        });
+
+        consumer.start();
+    }
+
+}

+ 17 - 0
src/main/java/com/fdkankan/contro/mq/nsq/NsqClientPruductor.java

@@ -0,0 +1,17 @@
+package com.fdkankan.contro.mq.nsq;
+
+import com.github.brainlag.nsq.NSQProducer;
+import com.github.brainlag.nsq.exceptions.NSQException;
+
+import java.util.concurrent.TimeoutException;
+
+public class NsqClientPruductor {
+
+    public static void main(String[] args) throws NSQException, TimeoutException {
+
+        NSQProducer producer = new NSQProducer().addAddress("192.168.0.127", 4150).start();
+        producer.produce("topic-bd", ("this is a message").getBytes());
+
+    }
+
+}

+ 9 - 8
src/main/java/com/fdkankan/contro/mq/nsq/NsqConsume.java

@@ -12,17 +12,17 @@ import org.springframework.stereotype.Component;
 @Component
 public class NsqConsume implements ApplicationRunner {
 
-    @Value("${nsq.lookup.host}")
+    @Value("${nsq.lookup.host:192.168.0.127}")
     private String nsqHost;
 
-    @Value("${nsq.lookup.port}")
-    private Integer nsqLookupPort;
+    @Value("${nsq.lookup.port:4161}")
+    private Integer port;
 
-    @Value("${nsq.topic}")
+    @Value("${nsq.topic:topic-bd}")
     private String nsqTopic;
 
-    @Value("${nsq.channel1}")
-    private String channel1;
+    @Value("${nsq.channel:channel-bd}")
+    private String channel;
 
     @Autowired
     private NsqMesgDeal nsqMesgDeal;
@@ -30,8 +30,9 @@ public class NsqConsume implements ApplicationRunner {
     @Override
     public void run(ApplicationArguments args) throws Exception {
         NSQLookup lookup = new DefaultNSQLookup();
-        lookup.addLookupAddress(nsqHost, nsqLookupPort);
-        NSQConsumer consumer = new NSQConsumer(lookup, nsqTopic, channel1, nsqMesgDeal);
+        lookup.addLookupAddress(nsqHost, 4161);
+        NSQConsumer consumer = new NSQConsumer(lookup, nsqTopic, channel, nsqMesgDeal);
         consumer.start();
+
     }
 }

+ 22 - 4
src/main/java/com/fdkankan/contro/mq/nsq/NsqMesgDeal.java

@@ -42,8 +42,11 @@ public class NsqMesgDeal implements NSQMessageCallback {
         String msg = new String(nsqMessage.getMessage());
         String id = new String(nsqMessage.getId());
         log.info("start deal call msg, id:{}, content:{}", id, msg);
-        Result<SceneOrigBd> content = JSON.parseObject(msg, Result.class);
-        SceneOrigBd data = content.getData();
+        nsqMessage.finished();
+        Result content = JSON.parseObject(msg, Result.class);
+//        SceneOrigBd data = content.getData();
+        JSONObject obj = (JSONObject) content.getData();
+        SceneOrigBd data = JSON.parseObject(obj.toJSONString(), SceneOrigBd.class);
         try {
             String path = data.getPath();
             String fileHash = data.getFileHash();
@@ -52,7 +55,7 @@ public class NsqMesgDeal implements NSQMessageCallback {
             String md5 = MD5.create().digestHex(new File(localPath));
             data.setLocalPath(localPath);
             //比对md5是否一致
-            if(md5.equals(fileHash)){
+            if(!md5.equals(fileHash)){
                 throw new RuntimeException("md5不匹配");
             }
             //data.fdage临时文件路径
@@ -75,7 +78,7 @@ public class NsqMesgDeal implements NSQMessageCallback {
             //通知平台文件已下载(删除文件)
             JSONObject param = new JSONObject();
             param.put("filename", data.getFilename());
-            mjHttpClient.notify(param);
+//            mjHttpClient.notify(param);
         }catch (Exception e){
             data.setReason(ExceptionUtil.stacktraceToString(e, 5000));
             data.setStatus(CommonSuccessStatus.FAIL.code());
@@ -85,4 +88,19 @@ public class NsqMesgDeal implements NSQMessageCallback {
 
         log.info("end deal call msg, id:{}, content:{}", id, msg);
     }
+
+    public static void main(String[] args) {
+        try (
+                ZipFile zipFile = new ZipFile("D:\\Downloads\\916ed6689_202501101516307030\\916ed6689_202501101516307030.zip");
+                InputStream ins = ZipUtil.get(zipFile, "data.fdage")
+        ){
+            String tempFdagePath = "D:\\Downloads\\aaa.fdage";
+            FileUtil.writeFromStream(ins, tempFdagePath);
+            JSONObject dataFdage = JSON.parseObject(FileUtil.readUtf8String(tempFdagePath));
+            String unicode = dataFdage.getString("creator") + "_" + dataFdage.getString("uuidtime");
+            System.out.println(unicode);
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
 }

+ 18 - 0
src/main/java/com/fdkankan/contro/mq/nsq/PubExample.java

@@ -0,0 +1,18 @@
+//package com.fdkankan.contro.mq.nsq;
+//
+//import com.sproutsocial.nsq.Subscriber;
+//
+//public class PubExample {
+//
+//    public static void handleData(byte[] data) {
+//        System.out.println("Received:" + new String(data));
+//    }
+//
+//    public static void main(String[] args) {
+//        Subscriber subscriber = new Subscriber("192.168.0.127:4161");
+//        subscriber.subscribe("topic-bd", "channel-bd", PubExample::handleData);
+//
+//
+//
+//    }
+//}

+ 20 - 0
src/main/java/com/fdkankan/contro/mq/nsq/PubExample2.java

@@ -0,0 +1,20 @@
+//package com.fdkankan.contro.mq.nsq;
+//
+//import com.sproutsocial.nsq.Publisher;
+//
+//public class PubExample2 {
+//
+//    public static void handleData(byte[] data) {
+//        System.out.println("Received:" + new String(data));
+//    }
+//
+//    public static void main(String[] args) {
+//
+//        Publisher publisher = new Publisher("192.168.0.127:4150");
+//        byte[] data = "Hello nsq".getBytes();
+//        publisher.publishBuffered("topic-bd", data);
+//        System.out.println(123);
+//
+//
+//    }
+//}

+ 43 - 0
src/main/java/com/fdkankan/contro/mq/nsq/WatchManNsqConsume.java

@@ -0,0 +1,43 @@
+//package com.fdkankan.contro.mq.nsq;
+//
+//import com.sproutsocial.nsq.DirectSubscriber;
+//import com.sproutsocial.nsq.MessageDataHandler;
+//import com.sproutsocial.nsq.Subscriber;
+//import lombok.extern.slf4j.Slf4j;
+//import org.springframework.beans.factory.annotation.Value;
+//import org.springframework.boot.ApplicationArguments;
+//import org.springframework.boot.ApplicationRunner;
+//import org.springframework.stereotype.Component;
+//
+//import java.util.Objects;
+//
+//@Component
+//@Slf4j
+//public class WatchManNsqConsume implements ApplicationRunner {
+//
+//    @Value("${nsq.lookup.host}")
+//    private String host;
+//
+//    @Value("${nsq.topic:topic-bd}")
+//    private String nsqTopic;
+//
+//    @Value("${nsq.channel:channel-bd}")
+//    private String channel;
+//
+//
+//    @Override
+//    public void run(ApplicationArguments applicationArguments) throws InterruptedException {
+//        Subscriber directSubscriber = null;
+//        while(Objects.isNull(directSubscriber) || directSubscriber.getConnectionCount() == 0){
+//            Thread.sleep(500);
+//                directSubscriber = new DirectSubscriber(2, host);
+//                directSubscriber.subscribe(nsqTopic, channel, new MessageDataHandler() {
+//                    @Override
+//                    public void accept(byte[] bytes) {
+//                    }
+//                });
+//        }
+//
+//
+//    }
+//}

+ 11 - 5
src/main/java/com/fdkankan/contro/service/impl/MjServiceImpl.java

@@ -3,13 +3,13 @@ package com.fdkankan.contro.service.impl;
 import cn.hutool.core.collection.CollUtil;
 import cn.hutool.core.io.FileUtil;
 import cn.hutool.core.util.StrUtil;
+import cn.hutool.core.util.ZipUtil;
 import com.alibaba.fastjson.JSON;
 import com.alibaba.fastjson.JSONObject;
 import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
 import com.fdkankan.common.constant.CommonSuccessStatus;
 import com.fdkankan.common.constant.RecStatus;
 import com.fdkankan.common.constant.SceneStatus;
-import com.fdkankan.common.util.CmdUtils;
 import com.fdkankan.common.util.SnowflakeIdGenerator;
 import com.fdkankan.contro.entity.SceneFileBuild;
 import com.fdkankan.contro.entity.SceneFilename;
@@ -78,12 +78,13 @@ public class MjServiceImpl implements MjService {
                     continue;
                 }
 
-                String unzipCmd = "unzip -o -O GBK " + localPath + " -d " + zipDir;
+//                String unzipCmd = "unzip -o GBK " + localPath + " -d " + zipDir;
+//                CmdUtils.callLine(unzipCmd);
 
-                CmdUtils.callLine(unzipCmd);
+                ZipUtil.unzip(new File(localPath), new File(zipDir));
 
                 //解压后删除压缩包
-                FileUtil.del(localPath);
+//                FileUtil.del(localPath);
 
                 String dataFdagePath = zipDir + "data.fdage";
                 JSONObject dataFdageJson = JSON.parseObject(FileUtil.readUtf8String(dataFdagePath));
@@ -126,8 +127,13 @@ public class MjServiceImpl implements MjService {
         JSONObject param = new JSONObject();
         param.put("data", status);
         param.put("filename", filename);
-        mjHttpClient.done(param);
+//        mjHttpClient.done(param);
+    }
+
+    public static void main(String[] args) {
+        ZipUtil.unzip(new File("D:\\Downloads\\916ed6689_202501101516307030\\916ed6689_202501101516307030.zip"), new File("D:\\Downloads\\aaa\\bbb"));
     }
 
 }
 
+

+ 2 - 2
src/main/resources/bootstrap-test.yml

@@ -2,9 +2,9 @@ spring:
   cloud:
     nacos:
       config:
-        server-addr: 120.24.144.164:8848
+        server-addr: 192.168.0.127:8848
         file-extension: yaml
-        namespace: 4dkankan-v4-test
+        namespace: test
         extension-configs:
           - data-id: 4dkankan-center-modeling-control.yaml
             group: DEFAULT_GROUP