import os import numpy as np import json import cv2 import open3d as o3d import argparse def build_floor_transform_matrix(j_info: dict, floor_id: int): tab = [[0.0] * 3 for _ in range(3)] res_width = None res_height = None for in_json in j_info.get("floors", []): floor_id_in_json = in_json.get("id") if floor_id_in_json != floor_id: continue res_width = in_json.get("resolution", {}).get("width") res_height = in_json.get("resolution", {}).get("height") x_min = in_json.get("bound", {}).get("x_min") x_max = in_json.get("bound", {}).get("x_max") y_min = in_json.get("bound", {}).get("y_min") y_max = in_json.get("bound", {}).get("y_max") # 这个矩阵用于从归一化 [0,1] 空间转换到世界坐标空间 tab[0][0] = x_max - x_min tab[0][1] = 0.0 tab[0][2] = x_min tab[1][0] = 0.0 tab[1][1] = y_min - y_max tab[1][2] = y_max tab[2][0] = 0.0 tab[2][1] = 0.0 tab[2][2] = 1.0 break if res_width is None: return np.identity(3).tolist(), None, None tab_array = np.array(tab, dtype=np.float64) if np.linalg.det(tab_array) == 0: raise ValueError("矩阵是奇异的,无法求逆。") tab_inverse_array = np.linalg.inv(tab_array) tab_inverse = tab_inverse_array.tolist() return tab_inverse, res_width, res_height def generate_maps(ply_path, floor_json_path, floor_id, output_mask_path, output_density_path, output_binary_path): """ Generates multiple maps from a .ply file: a raw projection, a density map, and a manually binarized map. Args: ply_path (str): Path to the input PLY file. floor_json_path (str): Path to the floor JSON file. floor_id (int): The ID of the floor to process. output_mask_path (str): Path to save the raw projection mask. output_density_path (str): Path to save the density map. output_binary_path (str): Path to save the manually binarized map. """ try: with open(floor_json_path, 'r') as f: j_info = json.load(f) except FileNotFoundError: print(f"JSON file not found at {floor_json_path}") return except json.JSONDecodeError: print(f"Error decoding JSON from {floor_json_path}") return matrix, res_w, res_h = build_floor_transform_matrix(j_info, floor_id) if res_w is None or res_h is None: print(f"Could not find floor_id {floor_id} in {os.path.basename(floor_json_path)}") return try: pcd = o3d.io.read_point_cloud(ply_path) if not pcd.has_points(): print(f"Warning: Point cloud is empty in {ply_path}") return cl, ind = pcd.remove_statistical_outlier(nb_neighbors=20, std_ratio=2.0) denoised_pcd = pcd.select_by_index(ind) points = np.asarray(denoised_pcd.points)[:, :2] except Exception as e: print(f"Failed to read or process point cloud file {ply_path}: {e}") return mask_image = np.zeros((res_h, res_w), dtype=np.uint8) density_accumulator = np.zeros((res_h, res_w), dtype=np.float32) matrix = np.array(matrix, dtype=np.float64) for world_point in points: homogeneous_point = np.array([world_point[0], world_point[1], 1.0], dtype=np.float64) normalized_point = matrix @ homogeneous_point pixel_x = int(normalized_point[0] * res_w) pixel_y = int(normalized_point[1] * res_h) if 0 <= pixel_x < res_w and 0 <= pixel_y < res_h: mask_image[pixel_y, pixel_x] = 255 density_accumulator[pixel_y, pixel_x] += 1 output_dir = os.path.dirname(output_mask_path) if output_dir and not os.path.exists(output_dir): os.makedirs(output_dir) # Save raw projection mask # cv2.imwrite(output_mask_path, mask_image) # print(f"Successfully generated and saved wall mask to {output_mask_path}") # Save grayscale density map density_map_grayscale = cv2.normalize(density_accumulator, None, 0, 255, cv2.NORM_MINMAX, dtype=cv2.CV_8U) # cv2.imwrite(output_density_path, density_map_grayscale) # print(f"Successfully generated and saved density map to {output_density_path}") # Binarize the grayscale density map using a manual threshold. # This threshold value (0-255) can be tuned to change sensitivity. manual_threshold = 20 #10 _, binary_map = cv2.threshold(density_map_grayscale, manual_threshold, 255, cv2.THRESH_BINARY) print(f"Binarizing {os.path.basename(output_binary_path)} with manual threshold: {manual_threshold}") # Save the manually binarized map. cv2.imwrite(output_binary_path, binary_map) print(f"Successfully generated and saved binarized map to {output_binary_path}") if __name__ == '__main__': parser = argparse.ArgumentParser( description="输入单个场景文件夹用于墙壁点云概率密度图的生成", formatter_class=argparse.RawTextHelpFormatter ) parser.add_argument( '-i', '--input_folder', type=str, required=True, help='指定输入场景的文件夹路径。' ) args = parser.parse_args() scene_folder = args.input_folder scenece = os.path.basename(scene_folder) floor_id = 0 ply_path = os.path.join(scene_folder, "wall_ply/wall_instances.ply") output_dir = os.path.join(scene_folder, "output_maps_walldensity") if not os.path.exists(output_dir): os.makedirs(output_dir) floor_json_path = os.path.join(scene_folder, f"{scenece}.json") output_mask_path = os.path.join(output_dir, f"{scenece}_floor_{floor_id}_mask.png") output_density_path = os.path.join(output_dir, f"{scenece}_floor_{floor_id}_density.png") output_binary_path = os.path.join(output_dir, f"{scenece}_floor_{floor_id}_binary.png") generate_maps(ply_path, floor_json_path, floor_id, output_mask_path, output_density_path, output_binary_path)