| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284 |
- import open3d as o3d
- import numpy as np
- import os
- import shutil
- import zipfile
- import json
- from typing import Dict, Any
- import cv2
- import argparse
- # from pointclound_downsampling import run
- # 定义屋顶截断的 Z 轴最大阈值。
- # !!! 请根据您的点云数据特性调整这个值 !!!
- Z_MAX_THRESHOLD = 0.8 # 假设墙体最高点不超过 0.5 米
- Z_MIN_THRESHOLD = -1.7
- def generate_floor_json(width: int, height: int, floor_id: int = 0, name: str = "1楼") -> Dict[str, Any]:
- x_bound = width / 100.0
- y_bound = height / 100.0
- floor_data = {
- "id": floor_id,
- "subgroup": 0,
- "name": name,
- "resolution": {
- "width": width,
- "height": height
- },
- "bound": {
- # 边界范围从负值到正值对称
- "x_min": -x_bound,
- "x_max": x_bound,
- "y_min": -y_bound,
- "y_max": y_bound
- }
- }
- return {
- "floors": [floor_data]
- }
- def save_json_to_file(data: Dict[str, Any], filename: str = "output.json"):
- """
- 将 Python 字典写入格式化的 JSON 文件。
- """
- try:
- with open(filename, 'w', encoding='utf-8') as f:
- # indent=4 用于美化输出,ensure_ascii=False 确保中文正常显示
- json.dump(data, f, indent=4, ensure_ascii=False)
- print(f"✅ JSON 文件已成功保存为: {filename}")
- except IOError as e:
- print(f"❌ 写入文件时发生错误: {e}")
- def build_floor_transform_matrix(j_info: dict, floor_id: int):
- """
- 遍历 JSON 数据中的楼层信息,查找匹配的 floor_id,并构建 3x3 仿射变换矩阵。
- 此版本返回逆矩阵,用于从世界坐标到归一化坐标的转换。
- """
- tab = [[0.0] * 3 for _ in range(3)]
- res_width = None
- res_height = None
- for in_json in j_info.get("floors", []):
- floor_id_in_json = in_json.get("id")
- if floor_id_in_json != floor_id:
- continue
- res_width = in_json.get("resolution", {}).get("width")
- res_height = in_json.get("resolution", {}).get("height")
- x_min = in_json.get("bound", {}).get("x_min")
- x_max = in_json.get("bound", {}).get("x_max")
- y_min = in_json.get("bound", {}).get("y_min")
- y_max = in_json.get("bound", {}).get("y_max")
- # 用于从归一化 [0,1] 空间转换到世界坐标空间
- tab[0][0] = x_max - x_min
- tab[0][1] = 0.0
- tab[0][2] = x_min
- tab[1][0] = 0.0
- tab[1][1] = y_min - y_max
- tab[1][2] = y_max
- tab[2][0] = 0.0
- tab[2][1] = 0.0
- tab[2][2] = 1.0
- break
- if res_width is None:
- # 未找到楼层,返回单位矩阵和None
- return np.identity(3).tolist(), None, None
- # 逆矩阵
- tab_array = np.array(tab, dtype=np.float64)
- if np.linalg.det(tab_array) == 0:
- raise ValueError("矩阵是奇异的,无法求逆。")
- tab_inverse_array = np.linalg.inv(tab_array)
- tab_inverse = tab_inverse_array.tolist()
- return tab_inverse, res_width, res_height
- def zip_folder(folder_path, output_path):
- with zipfile.ZipFile(output_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
- for root, dirs, files in os.walk(folder_path):
- for file in files:
- file_path = os.path.join(root, file)
- zipf.write(file_path, os.path.relpath(file_path, folder_path))
- def extract_ply_data(ply_file_path, output_dir, scene_info_json_path, floor_id=0, voxel_size=0.03):
- # 1. 读取 PLY 文件
- print(f"正在读取文件: {ply_file_path}...")
- pcd = o3d.io.read_point_cloud(ply_file_path)
- original_num_points = len(pcd.points)
- print(f" - 原始点数: {original_num_points}")
- max_z = pcd.get_max_bound()[2]
- min_z = pcd.get_min_bound()[2]
- print(f" - 原始 Z 轴范围: [{min_z:.4f}, {max_z:.4f}]")
- # 1.1. 获取变换矩阵
- if not os.path.exists(scene_info_json_path):
- raise FileNotFoundError(f"找不到场景信息JSON文件: {scene_info_json_path}")
- with open(scene_info_json_path, 'r', encoding='utf-8') as f:
- j_info = json.load(f)
- matrix, res_w, res_h = build_floor_transform_matrix(j_info, floor_id)
- if res_w is None:
- raise ValueError(f"在 {scene_info_json_path} 中找不到 floor_id {floor_id}")
- M = np.array(matrix, dtype=np.float64)
- # 1.2. 屋顶和地板截断:基于 Z 轴和XY投影进行裁剪
- print(f"正在进行截断 (Z 轴和XY平面)...")
- # 转换为 NumPy 数组以便创建布尔掩码
- points_np = np.asarray(pcd.points)
- # Z 轴筛选的布尔掩码
- z_mask = (points_np[:, 2] < Z_MAX_THRESHOLD) & (points_np[:, 2] > Z_MIN_THRESHOLD)
- # X/Y 轴筛选的布尔掩码
- homogeneous_points = np.hstack((points_np[:, :2], np.ones((points_np.shape[0], 1))))
- normalized_points = homogeneous_points @ M.T
- pixel_coords = normalized_points[:, :2] * np.array([res_w, res_h])
- xy_mask = (pixel_coords[:, 0] >= 0) & (pixel_coords[:, 0] < res_w) & \
- (pixel_coords[:, 1] >= 0) & (pixel_coords[:, 1] < res_h)
- # 合并掩码
- combined_mask = z_mask & xy_mask
- inlier_indices = np.where(combined_mask)[0]
- # 使用 select_by_index 方法同步裁剪所有属性
- pcd = pcd.select_by_index(inlier_indices)
- cut_num_points = len(pcd.points)
- print(f" - 截断后点数: {cut_num_points}")
- ######################################################
- # # # 1.2. 下采样到截断后点数的 1/10
- # sampling_ratio = 0.1
- # pcd = pcd.random_down_sample(sampling_ratio=sampling_ratio)
- ######################################################
- ######################################################
- # voxel_size = 0.03 # 0.02
- pcd = pcd.voxel_down_sample(voxel_size=voxel_size)
- print(f"已执行体素下采样,体素大小为: {voxel_size}")
- # MAX_POINTS = 550000
- MAX_POINTS = 500000
- num_points_after_voxel = len(pcd.points)
- print(f"体素下采样后点数: {num_points_after_voxel}")
- if num_points_after_voxel > MAX_POINTS:
- print(f" - 点数仍大于 {MAX_POINTS},正在进行第二阶段随机下采样...")
- indices_to_keep = np.random.choice(num_points_after_voxel, MAX_POINTS, replace=False)
- pcd = pcd.select_by_index(indices_to_keep)
- print(f" - 第二阶段下采样完成,最终点数: {len(pcd.points)}")
- ######################################################
- # o3d.io.write_point_cloud(output_dir+'/1.ply', pcd, write_ascii=True)
- ################************##########################
- # pcd = run(ply_file_path, "random")
- ################************###########################
- downsampled_num_points = len(pcd.points)
- print(f" - 下采样后点数: {downsampled_num_points}")
- print(f" - 实际保留比例 (相对于原始点数): {downsampled_num_points / original_num_points:.4f}")
- # 2. 提取数据并转换为 NumPy 数组
- # 提取坐标 (Points)
- points = np.asarray(pcd.points)
- points_npy_path = os.path.join(output_dir, 'coord.npy')
- np.save(points_npy_path, points)
- print(f" - 坐标信息 (形状: {points.shape}) 已保存到: {points_npy_path}")
- # 提取颜色 (Colors)
- if pcd.has_colors():
- colors = np.asarray(pcd.colors)
- colors_npy_path = os.path.join(output_dir, 'color.npy')
- np.save(colors_npy_path, colors)
- print(f" - 颜色信息 (形状: {colors.shape}, 范围: [0, 1]) 已保存到: {colors_npy_path}")
- else:
- # 如果 PLY 文件不包含颜色信息,则会打印此警告,这是正常现象。
- print(" - 警告: PLY 文件不包含颜色信息,跳过保存。")
- colors = None
- # 提取法线 (Normals)
- if pcd.has_normals():
- normals = np.asarray(pcd.normals)
- normals_npy_path = os.path.join(output_dir, 'normal.npy')
- np.save(normals_npy_path, normals)
- print(f" - 法线信息 (形状: {normals.shape}) 已保存到: {normals_npy_path}")
- else:
- # 如果 PLY 文件不包含法线信息,则会打印此警告,这是正常现象。
- print(" - 警告: PLY 文件不包含法线信息,跳过保存。")
- normals = None
- return points, colors, normals
- if __name__ == '__main__':
- parser = argparse.ArgumentParser(
- description="输入单个场景文件夹用于点云提取,分割和去噪",
- formatter_class=argparse.RawTextHelpFormatter
- )
- parser.add_argument(
- '-i',
- '--input_folder',
- type=str,
- required=True,
- help='指定输入场景的文件夹路径。'
- )
- args = parser.parse_args()
- scene_folder = args.input_folder
- scenece = os.path.basename(scene_folder)
- ply_file_path = os.path.join(scene_folder, "laser.ply")
- scene_info_json_path = os.path.join(scene_folder, f"{scenece}.json")
- floor_path = os.path.join(scene_folder, f"{scenece}.png")
- process_folder = os.path.join(scene_folder, 'scene/val/process_data')
- if not os.path.exists(process_folder):
- os.makedirs(process_folder)
- try:
- if not os.path.exists(ply_file_path):
- print(f"✅ 文件 '{ply_file_path}' 存在")
- except FileNotFoundError:
- raise FileNotFoundError(f"【文件缺失错误】无法找到点云文件:'{ply_file_path}'。请确保文件路径正确。")
- try:
- if not os.path.exists(floor_path):
- print(f"✅ 文件 '{floor_path}' 存在")
- except FileNotFoundError:
- raise FileNotFoundError(f"【文件缺失错误】无法找到平面图文件:'{floor_path}'。请确保文件路径正确。")
- if not os.path.exists(scene_info_json_path):
- img = cv2.imread(floor_path)
- height, width, _ = img.shape
- json_data = generate_floor_json(width, height)
- save_json_to_file(json_data, filename=scene_info_json_path)
- extract_ply_data(
- ply_file_path=ply_file_path,
- output_dir=process_folder,
- scene_info_json_path=scene_info_json_path,
- floor_id=0,
- voxel_size=0.03
- )
- data_file = os.path.join(scene_folder, "scene")
- zip_file = data_file + '.zip'
- zip_folder(data_file, zip_file)
|