import numpy as np import open3d as o3d import os import time import json import cv2 from itertools import groupby import argparse CLASS_MAPPING = { 'refrigerator': {'id': '0', 'name': '冰箱'}, 'desk': {'id': '1', 'name': '书桌'}, 'curtain': {'id': '2', 'name': '窗帘'}, 'sofa': {'id': '3', 'name': '沙发'}, 'bookshelf': {'id': '4', 'name': '书架'}, 'bed': {'id': '5', 'name': '床'}, 'table': {'id': '6', 'name': '桌子'}, 'window': {'id': '7', 'name': '窗户'}, 'cabinet': {'id': '8', 'name': '橱柜'}, 'door': {'id': '9', 'name': '门'}, 'chair': {'id': '10', 'name': '椅子'}, 'floor': {'id': '11', 'name': '地板'}, 'wall': {'id': '12', 'name': '墙'}, 'sink': {'id': '13', 'name': '水槽'}, 'toilet': {'id': '14', 'name': '马桶'}, 'bathtub': {'id': '15', 'name': '浴缸'}, 'shower curtain': {'id': '16', 'name': '浴帘'}, 'picture': {'id': '17', 'name': '画'}, 'counter': {'id': '18', 'name': '柜台'}, } # ============================================================================== # 优化算法辅助函数 # ============================================================================== def get_class_specific_dbscan_params(class_name): """为不同类别返回定制化的DBSCAN超参数。""" default_eps = 0.25 default_min_points = 150 params = { 'bed': {'eps': 0.23, 'min_points': 100}, 'sofa': {'eps': 0.3, 'min_points': 300}, 'table': {'eps': 0.3, 'min_points': 300}, 'desk': {'eps': 0.3, 'min_points': 300}, 'bookshelf': {'eps': 0.3, 'min_points': 300}, 'chair': {'eps': 0.2, 'min_points': 100}, 'refrigerator': {'eps': 0.25, 'min_points': 200}, 'cabinet': {'eps': 0.3, 'min_points': 200}, 'door': {'eps': 0.2, 'min_points': 100} } config = params.get(class_name, {'eps': default_eps, 'min_points': default_min_points}) return config['eps'], config['min_points'] # ============================================================================== # 2D投影和绘图函数 # ============================================================================== def is_box_dimension_plausible_2d(box_extent_2d, class_name): """检查2D包围盒的尺寸是否在合理范围内(单位:米)。""" plausible_ranges_2d = { 'bed': ([1.2, 0.7], [3.0, 2.8]), # Significantly relaxed range for beds 'sofa': ([1.0, 0.7], [4.0, 1.8]), 'table': ([0.5, 0.5], [3.0, 1.5]), 'desk': ([0.8, 0.5], [2.5, 1.2]), 'bookshelf': ([0.5, 0.2], [2.5, 0.8]), 'chair': ([0.3, 0.3], [1.2, 1.2]), 'refrigerator': ([0.5, 0.5], [1.2, 1.2]), 'cabinet': ([0.4, 0.3], [3.0, 1.0]), 'door': ([0.6, 0.05], [1.2, 0.3]), 'window': ([0.4, 0.05], [3.0, 0.4]) } if class_name not in plausible_ranges_2d: return True min_dims, max_dims = plausible_ranges_2d[class_name] sorted_extent = sorted(box_extent_2d) sorted_min = sorted(min_dims) sorted_max = sorted(max_dims) for i in range(2): if not (sorted_min[i] <= sorted_extent[i] <= sorted_max[i]): return False return True def calculate_2d_iou(box1, box2): """计算两个2D包围盒的IoU。盒子格式为[min_x, min_y, max_x, max_y]""" b1 = box1['bbox_2d_pixels'] b2 = box2['bbox_2d_pixels'] xA = max(b1[0], b2[0]) yA = max(b1[1], b2[1]) xB = min(b1[2], b2[2]) yB = min(b1[3], b2[3]) interArea = max(0, xB - xA) * max(0, yB - yA) if interArea == 0: return 0.0 box1Area = (b1[2] - b1[0]) * (b1[3] - b1[1]) box2Area = (b2[2] - b2[0]) * (b2[3] - b2[1]) unionArea = float(box1Area + box2Area - interArea) if unionArea == 0: return 0.0 return interArea / unionArea def post_process_in_2d(instances_with_pixel_boxes, x_m_per_px, y_m_per_px, iou_threshold=0.5): """在2D像素空间中对实例进行尺寸过滤和非极大值抑制(NMS)。""" # 1. 尺寸过滤 plausible_instances = [] for inst in instances_with_pixel_boxes: px_box = inst['bbox_2d_pixels'] # [min_x, min_y, max_x, max_y] px_width = px_box[2] - px_box[0] px_height = px_box[3] - px_box[1] metric_width = px_width * x_m_per_px metric_height = px_height * y_m_per_px extent_2d = [metric_width, metric_height] if is_box_dimension_plausible_2d(extent_2d, inst['label']): plausible_instances.append(inst) else: print(f" - 过滤掉一个2D尺寸异常的 '{inst['label']}' 实例,尺寸: {[f'{x:.2f}' for x in extent_2d]}") if not plausible_instances: return [] # 2. 按类别分组进行后处理 final_instances = [] plausible_instances.sort(key=lambda x: x['label']) for class_name, group in groupby(plausible_instances, key=lambda x: x['label']): class_instances = list(group) # --- SPECIAL MERGING LOGIC FOR BEDS --- if class_name == 'bed': if not class_instances: continue # Build adjacency matrix for overlapping beds num_instances = len(class_instances) adj_matrix = np.zeros((num_instances, num_instances)) for i in range(num_instances): for j in range(i, num_instances): # Use a low threshold to merge any overlap if calculate_2d_iou(class_instances[i], class_instances[j]) > 0.05: adj_matrix[i, j] = 1 adj_matrix[j, i] = 1 # Find connected components (groups of overlapping boxes) visited = [False] * num_instances groups = [] for i in range(num_instances): if not visited[i]: component = [] q = [i] visited[i] = True while q: u = q.pop(0) component.append(u) for v in range(num_instances): if adj_matrix[u, v] == 1 and not visited[v]: visited[v] = True q.append(v) groups.append(component) # Merge each group into a single instance merged_instances = [] for group_indices in groups: instances_in_group = [class_instances[i] for i in group_indices] # Create the merged bounding box min_x = min(inst['bbox_2d_pixels'][0] for inst in instances_in_group) min_y = min(inst['bbox_2d_pixels'][1] for inst in instances_in_group) max_x = max(inst['bbox_2d_pixels'][2] for inst in instances_in_group) max_y = max(inst['bbox_2d_pixels'][3] for inst in instances_in_group) # Aggregate score and find a representative instance for metadata total_score = sum(inst['score'] for inst in instances_in_group) representative_instance = max(instances_in_group, key=lambda x: x['score']) new_instance = representative_instance.copy() new_instance['bbox_2d_pixels'] = [min_x, min_y, max_x, max_y] new_instance['score'] = total_score merged_instances.append(new_instance) final_instances.extend(merged_instances) print(f" - 类别 'bed': Merged {len(class_instances)} candidates into {len(merged_instances)} final instances.") # --- STANDARD NMS FOR OTHER CLASSES --- else: class_instances.sort(key=lambda x: x['score'], reverse=True) kept_instances = [] while class_instances: best_inst = class_instances.pop(0) kept_instances.append(best_inst) remaining_instances = [] for other_inst in class_instances: iou = calculate_2d_iou(best_inst, other_inst) if iou < iou_threshold: remaining_instances.append(other_inst) else: print(f" - 2D NMS: 抑制一个与更佳实例IoU为 {iou:.2f} 的 '{class_name}' 实例。") class_instances = remaining_instances final_instances.extend(kept_instances) print(f" - 类别 '{class_name}': 经过2D过滤和NMS后,剩余 {len(kept_instances)} 个有效实例。") return final_instances def build_floor_transform_matrix(j_info: dict, floor_id: int): tab = [[0.0] * 3 for _ in range(3)] res_width = None res_height = None for in_json in j_info.get("floors", []): if in_json.get("id") != floor_id: continue res_width = in_json.get("resolution", {}).get("width") res_height = in_json.get("resolution", {}).get("height") bound = in_json.get("bound", {}) x_min, x_max = bound.get("x_min"), bound.get("x_max") y_min, y_max = bound.get("y_min"), bound.get("y_max") tab[0][0] = x_max - x_min tab[0][2] = x_min tab[1][1] = y_min - y_max tab[1][2] = y_max tab[2][2] = 1.0 break if res_width is None: return np.identity(3).tolist(), None, None tab_array = np.array(tab, dtype=np.float64) if np.linalg.det(tab_array) == 0: raise ValueError("矩阵是奇异的,无法求逆。") return np.linalg.inv(tab_array).tolist(), res_width, res_height def process_and_draw_bboxes(picture_name, floor_path, instances_path, floor_id, output_image_path, output_json_path): try: img = cv2.imread(picture_name) if img is None: raise FileNotFoundError(f"无法加载背景图片: {picture_name}") with open(floor_path, 'r', encoding='utf-8') as f: j_info = json.load(f) with open(instances_path, 'r', encoding='utf-8') as f: raw_bbox_data = json.load(f) matrix, res_w, res_h = build_floor_transform_matrix(j_info, floor_id) if res_w is None: raise ValueError(f"未在 {floor_path} 中找到 ID 为 {floor_id} 的楼层信息。") M = np.array(matrix, dtype=np.float64) floor_info = next((f for f in j_info.get("floors", []) if f.get("id") == floor_id), None) bound = floor_info.get("bound", {}) x_m_per_px = (bound.get("x_max") - bound.get("x_min")) / res_w y_m_per_px = abs(bound.get("y_max") - bound.get("y_min")) / res_h instances_with_pixel_boxes = [] for item in raw_bbox_data: corners = item.get("corners", []) if len(corners) < 4: continue points_2d = [] for i in range(4): norm_pt = M @ np.array([corners[i][0], corners[i][1], 1.0]) points_2d.append([int(norm_pt[0] * res_w), int(norm_pt[1] * res_h)]) x_coords, y_coords = [p[0] for p in points_2d], [p[1] for p in points_2d] new_item = item.copy() new_item['bbox_2d_pixels'] = [min(x_coords), min(y_coords), max(x_coords), max(y_coords)] instances_with_pixel_boxes.append(new_item) print("\n开始在2D空间进行后处理...") filtered_bbox_data = post_process_in_2d(instances_with_pixel_boxes, x_m_per_px, y_m_per_px) print("2D后处理完成。") img_height, img_width, _ = img.shape shapes = [] for item in filtered_bbox_data: min_x, min_y, max_x, max_y = item['bbox_2d_pixels'] category = item["label"] color_rgb = item["color"] color_bgr = (color_rgb[2], color_rgb[1], color_rgb[0]) cv2.rectangle(img, (min_x, min_y), (max_x, max_y), color_bgr, 2) font = cv2.FONT_HERSHEY_SIMPLEX (text_w, text_h), _ = cv2.getTextSize(category, font, 0.5, 1) label_y = min_y - 10 if min_y - 10 > text_h else min_y + text_h + 10 cv2.rectangle(img, (min_x, label_y - text_h - 5), (min_x + text_w, label_y + 5), color_bgr, -1) cv2.putText(img, category, (min_x, label_y), font, 0.5, (255, 255, 255), 1, cv2.LINE_AA) bbox_poly = [min_x, min_y, max_x, min_y, max_x, max_y, min_x, max_y] class_info = CLASS_MAPPING.get(category, {'id': '-1', 'name': '未知'}) shapes.append({ "bbox": bbox_poly, "category": category, "color": color_rgb, "label": class_info['id'], "name": class_info['name'] }) output_json_data = { "shapes": shapes, "imageHeight": img_height, "imagePath": os.path.basename(picture_name), "imageWidth": img_width, "version": "4Dage_Furniture_Detection_0.0.1" } os.makedirs(os.path.dirname(output_image_path), exist_ok=True) os.makedirs(os.path.dirname(output_json_path), exist_ok=True) cv2.imwrite(output_image_path, img) with open(output_json_path, 'w', encoding='utf-8') as f: json.dump(output_json_data, f, ensure_ascii=False, indent=4) print(f"\n处理完成!2D结果已保存到: {output_image_path} 和 {output_json_path}") return output_json_path, output_image_path except Exception as e: print(f"发生错误: {e}") return None, None # ============================================================================== # 主函数 # ============================================================================== def visualize_point_cloud_segmentation(coords_file, preds_file, classes_to_show='all', classes_to_ignore=None, save_pcd_path=None, save_3d_json_path=None, if_save_ply=False, if_save_vision=False): CLASS_NAMES = [ 'refrigerator', 'desk', 'curtain', 'sofa', 'bookshelf', 'bed', 'table', 'window', 'cabinet', 'door', 'chair', 'floor', 'wall', 'sink', 'toilet', 'bathtub', 'shower curtain', 'picture', 'counter' ] COLOR_MAP = np.array([ [174, 199, 232], [255, 127, 14], [44, 160, 44], [214, 39, 40], [148, 103, 189], [255, 187, 120], [140, 86, 75], [152, 223, 138], [23, 190, 207], [247, 182, 210], [196, 156, 148], [127, 127, 127], [199, 199, 199], [188, 189, 34], [219, 219, 141], [227, 119, 194], [31, 119, 180], [255, 152, 150], [82, 84, 163] ]) try: coords = np.load(coords_file) predictions = np.load(preds_file) except FileNotFoundError as e: print(f"错误: 找不到文件 {e.filename}。") return None if len(coords) != len(predictions): print("警告: 坐标点数和预测标签数不匹配!") return None default_ignore_classes = {'floor', 'wall', 'picture'} ignore_set = default_ignore_classes.union(set(classes_to_ignore or [])) show_set = set(classes_to_show) if isinstance(classes_to_show, (list, set)) else None final_instances_data, all_instance_points, all_instance_colors = [], [], [] print(f"\n通过DBSCAN寻找原始实例...") for pred_idx in np.unique(predictions): class_name = CLASS_NAMES[pred_idx] if class_name in ignore_set or (show_set and class_name not in show_set): continue dbscan_eps, dbscan_min_points = get_class_specific_dbscan_params(class_name) class_points_indices = np.where(predictions == pred_idx)[0] if len(class_points_indices) < dbscan_min_points: continue class_pcd_temp = o3d.geometry.PointCloud(o3d.utility.Vector3dVector(coords[class_points_indices])) instance_labels = np.array(class_pcd_temp.cluster_dbscan(eps=dbscan_eps, min_points=dbscan_min_points, print_progress=False)) unique_instances = np.unique(instance_labels[instance_labels != -1]) if len(unique_instances) > 0: print(f"- 类别 '{class_name}': 找到 {len(unique_instances)} 个原始候选实例") for instance_id in unique_instances: instance_point_indices = np.where(instance_labels == instance_id)[0] if len(instance_point_indices) < dbscan_min_points / 2: continue instance_pcd = class_pcd_temp.select_by_index(instance_point_indices) try: aabb = instance_pcd.get_axis_aligned_bounding_box() points_np = np.asarray(instance_pcd.points) final_instances_data.append({ "label": class_name, "color": COLOR_MAP[pred_idx].tolist(), "corners": np.asarray(aabb.get_box_points()).tolist(), "score": len(points_np) }) all_instance_points.append(points_np) all_instance_colors.append(np.tile(COLOR_MAP[pred_idx] / 255.0, (len(points_np), 1))) except RuntimeError: continue print("\n所有原始实例处理完毕。") if save_3d_json_path: os.makedirs(os.path.dirname(save_3d_json_path), exist_ok=True) with open(save_3d_json_path, 'w', encoding='utf-8') as f: json.dump(final_instances_data, f, ensure_ascii=False, indent=4) print(f"原始3D实例JSON信息已保存至: {save_3d_json_path}") if if_save_ply and save_pcd_path and all_instance_points: instance_pcd = o3d.geometry.PointCloud() instance_pcd.points = o3d.utility.Vector3dVector(np.vstack(all_instance_points)) instance_pcd.colors = o3d.utility.Vector3dVector(np.vstack(all_instance_colors)) o3d.io.write_point_cloud(save_pcd_path, instance_pcd) print(f"所有检测到的实例点云已保存至: {save_pcd_path}") if if_save_vision: pcd = o3d.geometry.PointCloud(points=o3d.utility.Vector3dVector(coords), colors=o3d.utility.Vector3dVector(COLOR_MAP[predictions] / 255.0)) o3d.visualization.draw_geometries([pcd], window_name="原始点云", width=1280, height=720) return save_3d_json_path if __name__ == "__main__": parser = argparse.ArgumentParser( description="输入单个场景文件夹用于结果保存", formatter_class=argparse.RawTextHelpFormatter ) parser.add_argument( '-i', '--input_folder', type=str, required=True, help='指定输入场景的文件夹路径。' ) args = parser.parse_args() scene_folder = args.input_folder scenece = os.path.basename(scene_folder) coords_file = os.path.join(scene_folder, 'scene/val/process_data/coord.npy') preds_file = os.path.join(scene_folder, "output/pred.npy") floor_plan_image = os.path.join(scene_folder, f"{scenece}.png") scene_info_json = os.path.join(scene_folder, f"{scenece}.json") output_dir = os.path.join(scene_folder, 'result_2d_filtered') os.makedirs(output_dir, exist_ok=True) raw_instances3d_json_path = os.path.join(output_dir, 'instances3d_raw.json') final_instances2d_json_path = os.path.join(output_dir, 'instances2d_final.json') instances_ply_path = os.path.join(output_dir, 'instances_raw.ply') segment_onfloor_png_path = os.path.join(output_dir, 'segment_onfloor_final.png') saved_3d_json = visualize_point_cloud_segmentation( coords_file=coords_file, preds_file=preds_file, classes_to_ignore=['curtain', 'bookshelf', 'floor', 'wall', 'sink', 'toilet', 'bathtub', 'shower curtain', 'picture'], save_3d_json_path=raw_instances3d_json_path, save_pcd_path=instances_ply_path, if_save_ply=False ) if saved_3d_json and all(os.path.exists(f) for f in [floor_plan_image, scene_info_json]): print("\n--- 开始进行2D投影和后处理 ---") process_and_draw_bboxes( picture_name=floor_plan_image, floor_path=scene_info_json, instances_path=saved_3d_json, floor_id=0, output_image_path=segment_onfloor_png_path, output_json_path=final_instances2d_json_path ) else: print("\nSkipping 2D projection due to missing files.")