Coverage for bim2sim/tasks/hvac/fixports.py: 24%
102 statements
« prev ^ index » next coverage.py v7.6.12, created at 2025-03-12 17:09 +0000
« prev ^ index » next coverage.py v7.6.12, created at 2025-03-12 17:09 +0000
1import json
2from itertools import permutations
3from pathlib import Path
5import ifcopenshell
6import numpy as np
7from ifcopenshell.entity_instance import entity_instance
9import bim2sim
10from bim2sim.tasks.base import ITask
11from bim2sim.sim_settings import BaseSimSettings
14class FixPorts(ITask):
15 """Remove invalid ports from ifc."""
17 reads = ('ifc',)
18 touches = ('ifc',)
20 def run(self, ifc: ifcopenshell.file) -> tuple:
21 self.logger.info("Removing invalid ports from ifc")
23 to_remove = set()
24 to_remove.update(self.unconnected_ports_on_same_position(ifc))
25 to_remove.update(self.ports_with_same_parent_and_same_position(ifc))
26 to_remove.update(self.entities_with_unusual_number_of_ports(ifc))
28 print(to_remove)
29 self.logger.info("Removing %d ports ...", len(to_remove))
30 with open('./port_blacklist.json', 'w') as file:
31 json.dump([entity.GlobalId for entity in to_remove], file)
32 raise NotImplementedError("This tasks is only a temporary fix.")
33 for entity in to_remove:
34 # this fails on ifcopenshell 0.6
35 # https://github.com/IfcOpenShell/IfcOpenShell/issues/275
36 ifc.remove(entity)
37 path = './cleaned.ifc'
38 ifc.write(path)
39 return ifc,
41 def unconnected_ports_on_same_position(self, ifc: ifcopenshell.file) -> set:
42 """Analyse IFC file for unconnected ports on the same position.
44 Args:
45 ifc: the IFC file that is inspected
47 Returns:
48 to_remove: set of ports to be removed
49 """
50 positions = {}
51 for port in ifc.by_type('IfcDistributionPort'):
52 position = self._get_position(port)
53 positions.setdefault(tuple(np.round(position, 3)), []).append(port)
55 to_remove = []
56 for pos, ports in positions.items():
57 if len(ports) > 2:
58 # get unconnected from ports on same position
59 for port in positions[pos]:
60 if not self._is_connected(port):
61 to_remove.append(port)
62 return set(to_remove)
64 def ports_with_same_parent_and_same_position(self, ifc: ifcopenshell.file) -> set:
65 parents = {}
66 for port in self.get_unconnected_ports(ifc):
67 parents.setdefault(port.ContainedIn[0].RelatedElement, []).append(port)
69 to_remove = set()
70 for parent in parents:
71 ports = [rel.RelatingPort for rel in parent.HasPorts]
72 for port1, port2 in permutations(ports, 2):
73 unconnected_ports = []
74 if np.allclose(
75 self._get_position(port1),
76 self._get_position(port2)):
77 if port1 in parents[parent]:
78 unconnected_ports.append(port1)
79 if port2 in parents[parent]:
80 unconnected_ports.append(port2)
81 # remove only one of them
82 if unconnected_ports:
83 to_remove.add(unconnected_ports[0])
84 return to_remove
86 def entities_with_unusual_number_of_ports(self, ifc: ifcopenshell.file) -> set:
87 to_remove = []
88 two_port_elements = {'IfcPipeSegment', }
89 for item in two_port_elements:
90 for entity in ifc.by_type(item):
91 if len(entity.HasPorts) > 2:
92 for port in entity.HasPorts:
93 if not self._is_connected(port):
94 to_remove.append(port)
95 return set(to_remove)
97 def get_unconnected_ports(self, ifc: ifcopenshell.file) -> list:
98 return [port for port in ifc.by_type('IfcDistributionPort')
99 if not self._is_connected(port)]
101 @staticmethod
102 def _is_connected(port):
103 if port.ConnectedTo:
104 other = port.ConnectedTo[0].RelatedPort
105 elif port.ConnectedFrom:
106 other = port.ConnectedFrom[0].RelatingPort
107 else:
108 return False
109 return True
111 @staticmethod
112 def _get_position(entity: entity_instance) -> np.array:
113 port_coordinates_relative = np.array(entity.ObjectPlacement.RelativePlacement.Location.Coordinates)
115 parent = entity.ContainedIn[0].RelatedElement
116 try:
117 relative_placement = parent.ObjectPlacement.RelativePlacement
118 x_direction = np.array(relative_placement.RefDirection.DirectionRatios)
119 z_direction = np.array(relative_placement.Axis.DirectionRatios)
120 except AttributeError:
121 x_direction = np.array([1, 0, 0])
122 z_direction = np.array([0, 0, 1])
123 y_direction = np.cross(z_direction, x_direction)
124 directions = np.array((x_direction, y_direction, z_direction)).T
126 coordinates = FixPorts.get_product_position(parent) + np.matmul(directions, port_coordinates_relative)
127 return coordinates
129 @staticmethod
130 def get_product_position(entity: entity_instance):
131 if hasattr(entity, 'ObjectPlacement'):
132 absolute = np.array(entity.ObjectPlacement.RelativePlacement.Location.Coordinates)
133 placement_rel = entity.ObjectPlacement.PlacementRelTo
134 while placement_rel is not None:
135 absolute += np.array(placement_rel.RelativePlacement.Location.Coordinates)
136 placement_rel = placement_rel.PlacementRelTo
137 else:
138 absolute = None
139 return absolute
142if __name__ == '__main__':
143 folder = Path(bim2sim.__file__).parent.parent /\
144 'test/resources/hydraulic/ifc'
145 path = folder / 'B03_Heating.ifc'
146 ifc = ifcopenshell.open(path)
147 FixPorts().run(None, ifc)