Coverage for bim2sim/tasks/hvac/fixports.py: 24%

102 statements  

« prev     ^ index     » next       coverage.py v7.6.12, created at 2025-03-12 17:09 +0000

1import json 

2from itertools import permutations 

3from pathlib import Path 

4 

5import ifcopenshell 

6import numpy as np 

7from ifcopenshell.entity_instance import entity_instance 

8 

9import bim2sim 

10from bim2sim.tasks.base import ITask 

11from bim2sim.sim_settings import BaseSimSettings 

12 

13 

14class FixPorts(ITask): 

15 """Remove invalid ports from ifc.""" 

16 

17 reads = ('ifc',) 

18 touches = ('ifc',) 

19 

20 def run(self, ifc: ifcopenshell.file) -> tuple: 

21 self.logger.info("Removing invalid ports from ifc") 

22 

23 to_remove = set() 

24 to_remove.update(self.unconnected_ports_on_same_position(ifc)) 

25 to_remove.update(self.ports_with_same_parent_and_same_position(ifc)) 

26 to_remove.update(self.entities_with_unusual_number_of_ports(ifc)) 

27 

28 print(to_remove) 

29 self.logger.info("Removing %d ports ...", len(to_remove)) 

30 with open('./port_blacklist.json', 'w') as file: 

31 json.dump([entity.GlobalId for entity in to_remove], file) 

32 raise NotImplementedError("This tasks is only a temporary fix.") 

33 for entity in to_remove: 

34 # this fails on ifcopenshell 0.6 

35 # https://github.com/IfcOpenShell/IfcOpenShell/issues/275 

36 ifc.remove(entity) 

37 path = './cleaned.ifc' 

38 ifc.write(path) 

39 return ifc, 

40 

41 def unconnected_ports_on_same_position(self, ifc: ifcopenshell.file) -> set: 

42 """Analyse IFC file for unconnected ports on the same position. 

43 

44 Args: 

45 ifc: the IFC file that is inspected 

46 

47 Returns: 

48 to_remove: set of ports to be removed 

49 """ 

50 positions = {} 

51 for port in ifc.by_type('IfcDistributionPort'): 

52 position = self._get_position(port) 

53 positions.setdefault(tuple(np.round(position, 3)), []).append(port) 

54 

55 to_remove = [] 

56 for pos, ports in positions.items(): 

57 if len(ports) > 2: 

58 # get unconnected from ports on same position 

59 for port in positions[pos]: 

60 if not self._is_connected(port): 

61 to_remove.append(port) 

62 return set(to_remove) 

63 

64 def ports_with_same_parent_and_same_position(self, ifc: ifcopenshell.file) -> set: 

65 parents = {} 

66 for port in self.get_unconnected_ports(ifc): 

67 parents.setdefault(port.ContainedIn[0].RelatedElement, []).append(port) 

68 

69 to_remove = set() 

70 for parent in parents: 

71 ports = [rel.RelatingPort for rel in parent.HasPorts] 

72 for port1, port2 in permutations(ports, 2): 

73 unconnected_ports = [] 

74 if np.allclose( 

75 self._get_position(port1), 

76 self._get_position(port2)): 

77 if port1 in parents[parent]: 

78 unconnected_ports.append(port1) 

79 if port2 in parents[parent]: 

80 unconnected_ports.append(port2) 

81 # remove only one of them 

82 if unconnected_ports: 

83 to_remove.add(unconnected_ports[0]) 

84 return to_remove 

85 

86 def entities_with_unusual_number_of_ports(self, ifc: ifcopenshell.file) -> set: 

87 to_remove = [] 

88 two_port_elements = {'IfcPipeSegment', } 

89 for item in two_port_elements: 

90 for entity in ifc.by_type(item): 

91 if len(entity.HasPorts) > 2: 

92 for port in entity.HasPorts: 

93 if not self._is_connected(port): 

94 to_remove.append(port) 

95 return set(to_remove) 

96 

97 def get_unconnected_ports(self, ifc: ifcopenshell.file) -> list: 

98 return [port for port in ifc.by_type('IfcDistributionPort') 

99 if not self._is_connected(port)] 

100 

101 @staticmethod 

102 def _is_connected(port): 

103 if port.ConnectedTo: 

104 other = port.ConnectedTo[0].RelatedPort 

105 elif port.ConnectedFrom: 

106 other = port.ConnectedFrom[0].RelatingPort 

107 else: 

108 return False 

109 return True 

110 

111 @staticmethod 

112 def _get_position(entity: entity_instance) -> np.array: 

113 port_coordinates_relative = np.array(entity.ObjectPlacement.RelativePlacement.Location.Coordinates) 

114 

115 parent = entity.ContainedIn[0].RelatedElement 

116 try: 

117 relative_placement = parent.ObjectPlacement.RelativePlacement 

118 x_direction = np.array(relative_placement.RefDirection.DirectionRatios) 

119 z_direction = np.array(relative_placement.Axis.DirectionRatios) 

120 except AttributeError: 

121 x_direction = np.array([1, 0, 0]) 

122 z_direction = np.array([0, 0, 1]) 

123 y_direction = np.cross(z_direction, x_direction) 

124 directions = np.array((x_direction, y_direction, z_direction)).T 

125 

126 coordinates = FixPorts.get_product_position(parent) + np.matmul(directions, port_coordinates_relative) 

127 return coordinates 

128 

129 @staticmethod 

130 def get_product_position(entity: entity_instance): 

131 if hasattr(entity, 'ObjectPlacement'): 

132 absolute = np.array(entity.ObjectPlacement.RelativePlacement.Location.Coordinates) 

133 placement_rel = entity.ObjectPlacement.PlacementRelTo 

134 while placement_rel is not None: 

135 absolute += np.array(placement_rel.RelativePlacement.Location.Coordinates) 

136 placement_rel = placement_rel.PlacementRelTo 

137 else: 

138 absolute = None 

139 return absolute 

140 

141 

142if __name__ == '__main__': 

143 folder = Path(bim2sim.__file__).parent.parent /\ 

144 'test/resources/hydraulic/ifc' 

145 path = folder / 'B03_Heating.ifc' 

146 ifc = ifcopenshell.open(path) 

147 FixPorts().run(None, ifc)