| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420 |
- import numpy as np
- import open3d as o3d
- import os
- import time
- import json
- import cv2
- from itertools import groupby
- import argparse
- # ==============================================================================
- # 优化算法辅助函数
- # ==============================================================================
- def get_class_specific_dbscan_params(class_name):
- """为不同类别返回定制化的DBSCAN超参数。"""
- default_eps = 0.25
- default_min_points = 150
- params = {
- 'bed': {'eps': 0.23, 'min_points': 100},
- 'sofa': {'eps': 0.3, 'min_points': 300},
- 'table': {'eps': 0.3, 'min_points': 300},
- 'desk': {'eps': 0.3, 'min_points': 300},
- 'bookshelf': {'eps': 0.3, 'min_points': 300},
- 'chair': {'eps': 0.2, 'min_points': 100},
- 'refrigerator': {'eps': 0.25, 'min_points': 200},
- 'cabinet': {'eps': 0.3, 'min_points': 200},
- 'door': {'eps': 0.2, 'min_points': 100}
- }
- config = params.get(class_name, {'eps': default_eps, 'min_points': default_min_points})
- return config['eps'], config['min_points']
- # ==============================================================================
- # 2D投影和绘图函数
- # ==============================================================================
- def is_box_dimension_plausible_2d(box_extent_2d, class_name):
- """检查2D包围盒的尺寸是否在合理范围内(单位:米)。"""
- plausible_ranges_2d = {
- 'bed': ([1.2, 0.7], [3.0, 2.8]), # Significantly relaxed range for beds
- 'sofa': ([1.0, 0.7], [4.0, 1.8]),
- 'table': ([0.5, 0.5], [3.0, 1.5]),
- 'desk': ([0.8, 0.5], [2.5, 1.2]),
- 'bookshelf': ([0.5, 0.2], [2.5, 0.8]),
- 'chair': ([0.3, 0.3], [1.2, 1.2]),
- 'refrigerator': ([0.5, 0.5], [1.2, 1.2]),
- 'cabinet': ([0.4, 0.3], [3.0, 1.0]),
- 'door': ([0.6, 0.05], [1.2, 0.3]),
- 'window': ([0.4, 0.05], [3.0, 0.4])
- }
- if class_name not in plausible_ranges_2d:
- return True
- min_dims, max_dims = plausible_ranges_2d[class_name]
- sorted_extent = sorted(box_extent_2d)
- sorted_min = sorted(min_dims)
- sorted_max = sorted(max_dims)
- for i in range(2):
- if not (sorted_min[i] <= sorted_extent[i] <= sorted_max[i]):
- return False
- return True
- def calculate_2d_iou(box1, box2):
- """计算两个2D包围盒的IoU。盒子格式为[min_x, min_y, max_x, max_y]"""
- b1 = box1['bbox_2d_pixels']
- b2 = box2['bbox_2d_pixels']
-
- xA = max(b1[0], b2[0])
- yA = max(b1[1], b2[1])
- xB = min(b1[2], b2[2])
- yB = min(b1[3], b2[3])
- interArea = max(0, xB - xA) * max(0, yB - yA)
- if interArea == 0:
- return 0.0
- box1Area = (b1[2] - b1[0]) * (b1[3] - b1[1])
- box2Area = (b2[2] - b2[0]) * (b2[3] - b2[1])
-
- unionArea = float(box1Area + box2Area - interArea)
- if unionArea == 0:
- return 0.0
-
- return interArea / unionArea
- def post_process_in_2d(instances_with_pixel_boxes, x_m_per_px, y_m_per_px, iou_threshold=0.5):
- """在2D像素空间中对实例进行尺寸过滤和非极大值抑制(NMS)。"""
- # 1. 尺寸过滤
- plausible_instances = []
- for inst in instances_with_pixel_boxes:
- px_box = inst['bbox_2d_pixels'] # [min_x, min_y, max_x, max_y]
- px_width = px_box[2] - px_box[0]
- px_height = px_box[3] - px_box[1]
-
- metric_width = px_width * x_m_per_px
- metric_height = px_height * y_m_per_px
- extent_2d = [metric_width, metric_height]
-
- if is_box_dimension_plausible_2d(extent_2d, inst['label']):
- plausible_instances.append(inst)
- else:
- print(f" - 过滤掉一个2D尺寸异常的 '{inst['label']}' 实例,尺寸: {[f'{x:.2f}' for x in extent_2d]}")
- if not plausible_instances:
- return []
- # 2. 按类别分组进行后处理
- final_instances = []
- plausible_instances.sort(key=lambda x: x['label'])
-
- for class_name, group in groupby(plausible_instances, key=lambda x: x['label']):
- class_instances = list(group)
-
- # --- SPECIAL MERGING LOGIC FOR BEDS ---
- if class_name == 'bed':
- if not class_instances:
- continue
- # Build adjacency matrix for overlapping beds
- num_instances = len(class_instances)
- adj_matrix = np.zeros((num_instances, num_instances))
- for i in range(num_instances):
- for j in range(i, num_instances):
- # Use a low threshold to merge any overlap
- if calculate_2d_iou(class_instances[i], class_instances[j]) > 0.05:
- adj_matrix[i, j] = 1
- adj_matrix[j, i] = 1
- # Find connected components (groups of overlapping boxes)
- visited = [False] * num_instances
- groups = []
- for i in range(num_instances):
- if not visited[i]:
- component = []
- q = [i]
- visited[i] = True
- while q:
- u = q.pop(0)
- component.append(u)
- for v in range(num_instances):
- if adj_matrix[u, v] == 1 and not visited[v]:
- visited[v] = True
- q.append(v)
- groups.append(component)
-
- # Merge each group into a single instance
- merged_instances = []
- for group_indices in groups:
- instances_in_group = [class_instances[i] for i in group_indices]
-
- # Create the merged bounding box
- min_x = min(inst['bbox_2d_pixels'][0] for inst in instances_in_group)
- min_y = min(inst['bbox_2d_pixels'][1] for inst in instances_in_group)
- max_x = max(inst['bbox_2d_pixels'][2] for inst in instances_in_group)
- max_y = max(inst['bbox_2d_pixels'][3] for inst in instances_in_group)
-
- # Aggregate score and find a representative instance for metadata
- total_score = sum(inst['score'] for inst in instances_in_group)
- representative_instance = max(instances_in_group, key=lambda x: x['score'])
-
- new_instance = representative_instance.copy()
- new_instance['bbox_2d_pixels'] = [min_x, min_y, max_x, max_y]
- new_instance['score'] = total_score
- merged_instances.append(new_instance)
-
- final_instances.extend(merged_instances)
- print(f" - 类别 'bed': Merged {len(class_instances)} candidates into {len(merged_instances)} final instances.")
- # --- STANDARD NMS FOR OTHER CLASSES ---
- else:
- class_instances.sort(key=lambda x: x['score'], reverse=True)
- kept_instances = []
- while class_instances:
- best_inst = class_instances.pop(0)
- kept_instances.append(best_inst)
-
- remaining_instances = []
- for other_inst in class_instances:
- iou = calculate_2d_iou(best_inst, other_inst)
- if iou < iou_threshold:
- remaining_instances.append(other_inst)
- else:
- print(f" - 2D NMS: 抑制一个与更佳实例IoU为 {iou:.2f} 的 '{class_name}' 实例。")
- class_instances = remaining_instances
-
- final_instances.extend(kept_instances)
- print(f" - 类别 '{class_name}': 经过2D过滤和NMS后,剩余 {len(kept_instances)} 个有效实例。")
-
- return final_instances
- def build_floor_transform_matrix(j_info: dict, floor_id: int):
- tab = [[0.0] * 3 for _ in range(3)]
- res_width = None
- res_height = None
- for in_json in j_info.get("floors", []):
- if in_json.get("id") != floor_id:
- continue
- res_width = in_json.get("resolution", {}).get("width")
- res_height = in_json.get("resolution", {}).get("height")
- bound = in_json.get("bound", {})
- x_min, x_max = bound.get("x_min"), bound.get("x_max")
- y_min, y_max = bound.get("y_min"), bound.get("y_max")
- tab[0][0] = x_max - x_min
- tab[0][2] = x_min
- tab[1][1] = y_min - y_max
- tab[1][2] = y_max
- tab[2][2] = 1.0
- break
- if res_width is None: return np.identity(3).tolist(), None, None
- tab_array = np.array(tab, dtype=np.float64)
- if np.linalg.det(tab_array) == 0: raise ValueError("矩阵是奇异的,无法求逆。")
- return np.linalg.inv(tab_array).tolist(), res_width, res_height
- def process_and_draw_bboxes(picture_name, floor_path, instances_path, floor_id, output_image_path, output_json_path):
- try:
- img = cv2.imread(picture_name)
- if img is None: raise FileNotFoundError(f"无法加载背景图片: {picture_name}")
-
- with open(floor_path, 'r', encoding='utf-8') as f: j_info = json.load(f)
- with open(instances_path, 'r', encoding='utf-8') as f: raw_bbox_data = json.load(f)
- matrix, res_w, res_h = build_floor_transform_matrix(j_info, floor_id)
- if res_w is None: raise ValueError(f"未在 {floor_path} 中找到 ID 为 {floor_id} 的楼层信息。")
- M = np.array(matrix, dtype=np.float64)
- floor_info = next((f for f in j_info.get("floors", []) if f.get("id") == floor_id), None)
- bound = floor_info.get("bound", {})
- x_m_per_px = (bound.get("x_max") - bound.get("x_min")) / res_w
- y_m_per_px = abs(bound.get("y_max") - bound.get("y_min")) / res_h
- instances_with_pixel_boxes = []
- for item in raw_bbox_data:
- corners = item.get("corners", [])
- if len(corners) < 4: continue
-
- points_2d = []
- for i in range(4):
- norm_pt = M @ np.array([corners[i][0], corners[i][1], 1.0])
- points_2d.append([int(norm_pt[0] * res_w), int(norm_pt[1] * res_h)])
-
- x_coords, y_coords = [p[0] for p in points_2d], [p[1] for p in points_2d]
- new_item = item.copy()
- new_item['bbox_2d_pixels'] = [min(x_coords), min(y_coords), max(x_coords), max(y_coords)]
- instances_with_pixel_boxes.append(new_item)
- print("\n开始在2D空间进行后处理...")
- filtered_bbox_data = post_process_in_2d(instances_with_pixel_boxes, x_m_per_px, y_m_per_px)
- print("2D后处理完成。")
-
- instances_2d_data = []
- for item in filtered_bbox_data:
- min_x, min_y, max_x, max_y = item['bbox_2d_pixels']
- label = item["label"]
- color_bgr = (item["color"][2], item["color"][1], item["color"][0])
- instances_2d_data.append({"label": label, "color": item["color"], "bbox_2d": item['bbox_2d_pixels']})
- cv2.rectangle(img, (min_x, min_y), (max_x, max_y), color_bgr, 2)
- font = cv2.FONT_HERSHEY_SIMPLEX
- (text_w, text_h), _ = cv2.getTextSize(label, font, 0.5, 1)
- label_y = min_y - 10 if min_y - 10 > text_h else min_y + text_h + 10
- cv2.rectangle(img, (min_x, label_y - text_h - 5), (min_x + text_w, label_y + 5), color_bgr, -1)
- cv2.putText(img, label, (min_x, label_y), font, 0.5, (255, 255, 255), 1, cv2.LINE_AA)
- os.makedirs(os.path.dirname(output_image_path), exist_ok=True)
- os.makedirs(os.path.dirname(output_json_path), exist_ok=True)
- cv2.imwrite(output_image_path, img)
- with open(output_json_path, 'w', encoding='utf-8') as f:
- json.dump(instances_2d_data, f, indent=4)
- print(f"\n处理完成!2D结果已保存到: {output_image_path} 和 {output_json_path}")
- return output_json_path, output_image_path
- except Exception as e:
- print(f"发生错误: {e}")
- return None, None
- # ==============================================================================
- # 主函数
- # ==============================================================================
- def visualize_point_cloud_segmentation(coords_file, preds_file, classes_to_show='all',
- classes_to_ignore=None,
- save_pcd_path=None,
- save_3d_json_path=None,
- if_save_ply=False,
- if_save_vision=False):
- CLASS_NAMES = [
- 'refrigerator', 'desk', 'curtain', 'sofa', 'bookshelf', 'bed',
- 'table', 'window', 'cabinet', 'door', 'chair', 'floor', 'wall',
- 'sink', 'toilet', 'bathtub', 'shower curtain', 'picture', 'counter'
- ]
- COLOR_MAP = np.array([
- [174, 199, 232], [255, 127, 14], [44, 160, 44], [214, 39, 40],
- [148, 103, 189], [255, 187, 120], [140, 86, 75], [152, 223, 138],
- [23, 190, 207], [247, 182, 210], [196, 156, 148], [127, 127, 127],
- [199, 199, 199], [188, 189, 34], [219, 219, 141], [227, 119, 194],
- [31, 119, 180], [255, 152, 150], [82, 84, 163]
- ])
- try:
- coords = np.load(coords_file)
- predictions = np.load(preds_file)
- except FileNotFoundError as e:
- print(f"错误: 找不到文件 {e.filename}。")
- return None
- if len(coords) != len(predictions):
- print("警告: 坐标点数和预测标签数不匹配!")
- return None
- default_ignore_classes = {'floor', 'wall', 'picture'}
- ignore_set = default_ignore_classes.union(set(classes_to_ignore or []))
- show_set = set(classes_to_show) if isinstance(classes_to_show, (list, set)) else None
- final_instances_data, all_instance_points, all_instance_colors = [], [], []
- print(f"\n通过DBSCAN寻找原始实例...")
- for pred_idx in np.unique(predictions):
- class_name = CLASS_NAMES[pred_idx]
- if class_name in ignore_set or (show_set and class_name not in show_set):
- continue
- dbscan_eps, dbscan_min_points = get_class_specific_dbscan_params(class_name)
- class_points_indices = np.where(predictions == pred_idx)[0]
- if len(class_points_indices) < dbscan_min_points: continue
- class_pcd_temp = o3d.geometry.PointCloud(o3d.utility.Vector3dVector(coords[class_points_indices]))
- instance_labels = np.array(class_pcd_temp.cluster_dbscan(eps=dbscan_eps, min_points=dbscan_min_points, print_progress=False))
-
- unique_instances = np.unique(instance_labels[instance_labels != -1])
- if len(unique_instances) > 0: print(f"- 类别 '{class_name}': 找到 {len(unique_instances)} 个原始候选实例")
- for instance_id in unique_instances:
- instance_point_indices = np.where(instance_labels == instance_id)[0]
- if len(instance_point_indices) < dbscan_min_points / 2: continue
-
- instance_pcd = class_pcd_temp.select_by_index(instance_point_indices)
- try:
- aabb = instance_pcd.get_axis_aligned_bounding_box()
- points_np = np.asarray(instance_pcd.points)
- final_instances_data.append({
- "label": class_name, "color": COLOR_MAP[pred_idx].tolist(),
- "corners": np.asarray(aabb.get_box_points()).tolist(), "score": len(points_np)
- })
- all_instance_points.append(points_np)
- all_instance_colors.append(np.tile(COLOR_MAP[pred_idx] / 255.0, (len(points_np), 1)))
- except RuntimeError: continue
- print("\n所有原始实例处理完毕。")
- if save_3d_json_path:
- os.makedirs(os.path.dirname(save_3d_json_path), exist_ok=True)
- with open(save_3d_json_path, 'w', encoding='utf-8') as f:
- json.dump(final_instances_data, f, ensure_ascii=False, indent=4)
- print(f"原始3D实例JSON信息已保存至: {save_3d_json_path}")
- if if_save_ply and save_pcd_path and all_instance_points:
- instance_pcd = o3d.geometry.PointCloud()
- instance_pcd.points = o3d.utility.Vector3dVector(np.vstack(all_instance_points))
- instance_pcd.colors = o3d.utility.Vector3dVector(np.vstack(all_instance_colors))
- o3d.io.write_point_cloud(save_pcd_path, instance_pcd)
- print(f"所有检测到的实例点云已保存至: {save_pcd_path}")
- if if_save_vision:
- pcd = o3d.geometry.PointCloud(points=o3d.utility.Vector3dVector(coords),
- colors=o3d.utility.Vector3dVector(COLOR_MAP[predictions] / 255.0))
- o3d.visualization.draw_geometries([pcd], window_name="原始点云", width=1280, height=720)
- return save_3d_json_path
- if __name__ == "__main__":
- parser = argparse.ArgumentParser(
- description="输入单个场景文件夹用于结果保存",
- formatter_class=argparse.RawTextHelpFormatter
- )
- parser.add_argument(
- '-i',
- '--input_folder',
- type=str,
- required=True,
- help='指定输入场景的文件夹路径。'
- )
- args = parser.parse_args()
- scene_folder = args.input_folder
- scenece = os.path.basename(scene_folder)
- coords_file = os.path.join(scene_folder, 'scene/val/process_data/coord.npy')
- preds_file = os.path.join(scene_folder, "output/pred.npy")
- floor_plan_image = os.path.join(scene_folder, f"{scenece}.png")
- scene_info_json = os.path.join(scene_folder, f"{scenece}.json")
- output_dir = os.path.join(scene_folder, 'result_2d_filtered')
- os.makedirs(output_dir, exist_ok=True)
- raw_instances3d_json_path = os.path.join(output_dir, 'instances3d_raw.json')
- final_instances2d_json_path = os.path.join(output_dir, 'instances2d_final.json')
- instances_ply_path = os.path.join(output_dir, 'instances_raw.ply')
- segment_onfloor_png_path = os.path.join(output_dir, 'segment_onfloor_final.png')
- saved_3d_json = visualize_point_cloud_segmentation(
- coords_file=coords_file, preds_file=preds_file,
- classes_to_ignore=['curtain', 'bookshelf', 'floor', 'wall', 'sink', 'toilet', 'bathtub', 'shower curtain', 'picture'],
- save_3d_json_path=raw_instances3d_json_path, save_pcd_path=instances_ply_path, if_save_ply=False
- )
- if saved_3d_json and all(os.path.exists(f) for f in [floor_plan_image, scene_info_json]):
- print("\n--- 开始进行2D投影和后处理 ---")
- process_and_draw_bboxes(
- picture_name=floor_plan_image, floor_path=scene_info_json,
- instances_path=saved_3d_json, floor_id=0,
- output_image_path=segment_onfloor_png_path, output_json_path=final_instances2d_json_path
- )
- else:
- print("\nSkipping 2D projection due to missing files.")
|