import open3d as o3d import numpy as np import os import shutil import zipfile import json from typing import Dict, Any import cv2 import argparse # from pointclound_downsampling import run # 定义屋顶截断的 Z 轴最大阈值。 # !!! 请根据您的点云数据特性调整这个值 !!! Z_MAX_THRESHOLD = 0.8 # 假设墙体最高点不超过 0.5 米 Z_MIN_THRESHOLD = -1.7 def generate_floor_json(width: int, height: int, floor_id: int = 0, name: str = "1楼") -> Dict[str, Any]: x_bound = width / 100.0 y_bound = height / 100.0 floor_data = { "id": floor_id, "subgroup": 0, "name": name, "resolution": { "width": width, "height": height }, "bound": { # 边界范围从负值到正值对称 "x_min": -x_bound, "x_max": x_bound, "y_min": -y_bound, "y_max": y_bound } } return { "floors": [floor_data] } def save_json_to_file(data: Dict[str, Any], filename: str = "output.json"): """ 将 Python 字典写入格式化的 JSON 文件。 """ try: with open(filename, 'w', encoding='utf-8') as f: # indent=4 用于美化输出,ensure_ascii=False 确保中文正常显示 json.dump(data, f, indent=4, ensure_ascii=False) print(f"✅ JSON 文件已成功保存为: {filename}") except IOError as e: print(f"❌ 写入文件时发生错误: {e}") def build_floor_transform_matrix(j_info: dict, floor_id: int): """ 遍历 JSON 数据中的楼层信息,查找匹配的 floor_id,并构建 3x3 仿射变换矩阵。 此版本返回逆矩阵,用于从世界坐标到归一化坐标的转换。 """ tab = [[0.0] * 3 for _ in range(3)] res_width = None res_height = None for in_json in j_info.get("floors", []): floor_id_in_json = in_json.get("id") if floor_id_in_json != floor_id: continue res_width = in_json.get("resolution", {}).get("width") res_height = in_json.get("resolution", {}).get("height") x_min = in_json.get("bound", {}).get("x_min") x_max = in_json.get("bound", {}).get("x_max") y_min = in_json.get("bound", {}).get("y_min") y_max = in_json.get("bound", {}).get("y_max") # 用于从归一化 [0,1] 空间转换到世界坐标空间 tab[0][0] = x_max - x_min tab[0][1] = 0.0 tab[0][2] = x_min tab[1][0] = 0.0 tab[1][1] = y_min - y_max tab[1][2] = y_max tab[2][0] = 0.0 tab[2][1] = 0.0 tab[2][2] = 1.0 break if res_width is None: # 未找到楼层,返回单位矩阵和None return np.identity(3).tolist(), None, None # 逆矩阵 tab_array = np.array(tab, dtype=np.float64) if np.linalg.det(tab_array) == 0: raise ValueError("矩阵是奇异的,无法求逆。") tab_inverse_array = np.linalg.inv(tab_array) tab_inverse = tab_inverse_array.tolist() return tab_inverse, res_width, res_height def zip_folder(folder_path, output_path): with zipfile.ZipFile(output_path, 'w', zipfile.ZIP_DEFLATED) as zipf: for root, dirs, files in os.walk(folder_path): for file in files: file_path = os.path.join(root, file) zipf.write(file_path, os.path.relpath(file_path, folder_path)) def extract_ply_data(ply_file_path, output_dir, scene_info_json_path, floor_id=0, voxel_size=0.03): # 1. 读取 PLY 文件 print(f"正在读取文件: {ply_file_path}...") pcd = o3d.io.read_point_cloud(ply_file_path) original_num_points = len(pcd.points) print(f" - 原始点数: {original_num_points}") max_z = pcd.get_max_bound()[2] min_z = pcd.get_min_bound()[2] print(f" - 原始 Z 轴范围: [{min_z:.4f}, {max_z:.4f}]") # 1.1. 获取变换矩阵 if not os.path.exists(scene_info_json_path): raise FileNotFoundError(f"找不到场景信息JSON文件: {scene_info_json_path}") with open(scene_info_json_path, 'r', encoding='utf-8') as f: j_info = json.load(f) matrix, res_w, res_h = build_floor_transform_matrix(j_info, floor_id) if res_w is None: raise ValueError(f"在 {scene_info_json_path} 中找不到 floor_id {floor_id}") M = np.array(matrix, dtype=np.float64) # 1.2. 屋顶和地板截断:基于 Z 轴和XY投影进行裁剪 print(f"正在进行截断 (Z 轴和XY平面)...") # 转换为 NumPy 数组以便创建布尔掩码 points_np = np.asarray(pcd.points) # Z 轴筛选的布尔掩码 z_mask = (points_np[:, 2] < Z_MAX_THRESHOLD) & (points_np[:, 2] > Z_MIN_THRESHOLD) # X/Y 轴筛选的布尔掩码 homogeneous_points = np.hstack((points_np[:, :2], np.ones((points_np.shape[0], 1)))) normalized_points = homogeneous_points @ M.T pixel_coords = normalized_points[:, :2] * np.array([res_w, res_h]) xy_mask = (pixel_coords[:, 0] >= 0) & (pixel_coords[:, 0] < res_w) & \ (pixel_coords[:, 1] >= 0) & (pixel_coords[:, 1] < res_h) # 合并掩码 combined_mask = z_mask & xy_mask inlier_indices = np.where(combined_mask)[0] # 使用 select_by_index 方法同步裁剪所有属性 pcd = pcd.select_by_index(inlier_indices) cut_num_points = len(pcd.points) print(f" - 截断后点数: {cut_num_points}") ###################################################### # # # 1.2. 下采样到截断后点数的 1/10 # sampling_ratio = 0.1 # pcd = pcd.random_down_sample(sampling_ratio=sampling_ratio) ###################################################### ###################################################### # voxel_size = 0.03 # 0.02 pcd = pcd.voxel_down_sample(voxel_size=voxel_size) print(f"已执行体素下采样,体素大小为: {voxel_size}") # MAX_POINTS = 550000 MAX_POINTS = 500000 num_points_after_voxel = len(pcd.points) print(f"体素下采样后点数: {num_points_after_voxel}") if num_points_after_voxel > MAX_POINTS: print(f" - 点数仍大于 {MAX_POINTS},正在进行第二阶段随机下采样...") indices_to_keep = np.random.choice(num_points_after_voxel, MAX_POINTS, replace=False) pcd = pcd.select_by_index(indices_to_keep) print(f" - 第二阶段下采样完成,最终点数: {len(pcd.points)}") ###################################################### # o3d.io.write_point_cloud(output_dir+'/1.ply', pcd, write_ascii=True) ################************########################## # pcd = run(ply_file_path, "random") ################************########################### downsampled_num_points = len(pcd.points) print(f" - 下采样后点数: {downsampled_num_points}") print(f" - 实际保留比例 (相对于原始点数): {downsampled_num_points / original_num_points:.4f}") # 2. 提取数据并转换为 NumPy 数组 # 提取坐标 (Points) points = np.asarray(pcd.points) points_npy_path = os.path.join(output_dir, 'coord.npy') np.save(points_npy_path, points) print(f" - 坐标信息 (形状: {points.shape}) 已保存到: {points_npy_path}") # 提取颜色 (Colors) if pcd.has_colors(): colors = np.asarray(pcd.colors) colors_npy_path = os.path.join(output_dir, 'color.npy') np.save(colors_npy_path, colors) print(f" - 颜色信息 (形状: {colors.shape}, 范围: [0, 1]) 已保存到: {colors_npy_path}") else: # 如果 PLY 文件不包含颜色信息,则会打印此警告,这是正常现象。 print(" - 警告: PLY 文件不包含颜色信息,跳过保存。") colors = None # 提取法线 (Normals) if pcd.has_normals(): normals = np.asarray(pcd.normals) normals_npy_path = os.path.join(output_dir, 'normal.npy') np.save(normals_npy_path, normals) print(f" - 法线信息 (形状: {normals.shape}) 已保存到: {normals_npy_path}") else: # 如果 PLY 文件不包含法线信息,则会打印此警告,这是正常现象。 print(" - 警告: PLY 文件不包含法线信息,跳过保存。") normals = None return points, colors, normals if __name__ == '__main__': parser = argparse.ArgumentParser( description="输入单个场景文件夹用于点云提取,分割和去噪", formatter_class=argparse.RawTextHelpFormatter ) parser.add_argument( '-i', '--input_folder', type=str, required=True, help='指定输入场景的文件夹路径。' ) args = parser.parse_args() scene_folder = args.input_folder scenece = os.path.basename(scene_folder) ply_file_path = os.path.join(scene_folder, "laser.ply") scene_info_json_path = os.path.join(scene_folder, f"{scenece}.json") floor_path = os.path.join(scene_folder, f"{scenece}.png") process_folder = os.path.join(scene_folder, 'scene/val/process_data') if not os.path.exists(process_folder): os.makedirs(process_folder) try: if not os.path.exists(ply_file_path): print(f"✅ 文件 '{ply_file_path}' 存在") except FileNotFoundError: raise FileNotFoundError(f"【文件缺失错误】无法找到点云文件:'{ply_file_path}'。请确保文件路径正确。") try: if not os.path.exists(floor_path): print(f"✅ 文件 '{floor_path}' 存在") except FileNotFoundError: raise FileNotFoundError(f"【文件缺失错误】无法找到平面图文件:'{floor_path}'。请确保文件路径正确。") if not os.path.exists(scene_info_json_path): img = cv2.imread(floor_path) height, width, _ = img.shape json_data = generate_floor_json(width, height) save_json_to_file(json_data, filename=scene_info_json_path) extract_ply_data( ply_file_path=ply_file_path, output_dir=process_folder, scene_info_json_path=scene_info_json_path, floor_id=0, voxel_size=0.03 ) data_file = os.path.join(scene_folder, "scene") zip_file = data_file + '.zip' zip_folder(data_file, zip_file)