Coverage for bim2sim/tasks/hvac/connect_elements.py: 80%

156 statements  

« prev     ^ index     » next       coverage.py v7.6.12, created at 2025-03-12 17:09 +0000

1import itertools 

2import logging 

3from typing import Tuple, Generator, Iterable 

4 

5import networkx as nx 

6import numpy as np 

7 

8from bim2sim.elements import hvac_elements as hvac 

9from bim2sim.elements.base_elements import Port, ProductBased 

10from bim2sim.kernel.decision import DecisionBunch 

11from bim2sim.tasks.base import ITask, Playground 

12 

13 

14quality_logger = logging.getLogger('bim2sim.QualityReport') 

15 

16 

17class ConnectElements(ITask): 

18 """Analyses IFC, creates element elements and connects them. Elements are 

19 stored in elements dict with GUID as key.""" 

20 

21 reads = ('elements',) 

22 touches = ('elements',) 

23 

24 def __init__(self, playground: Playground): 

25 super().__init__(playground) 

26 self.elements = {} 

27 pass 

28 

29 def run(self, elements: dict) -> dict: 

30 """Connect elements based on port information and geometric relations. 

31 

32 This method performs the following steps: 

33 1. Checks the ports of elements. 

34 2. Connects the relevant elements based on relations. 

35 3. Checks the positions of connections and connects ports based on 

36 geometric distance. 

37 4. Connects remaining unconnected ports by position. 

38 5. Logs information about the number of connected and unconnected ports. 

39 

40 Args: 

41 elements: dictionary of elements with GUID as key. 

42 

43 Returns: 

44 elements: dictionary of elements with GUID as key, with updated 

45 connections. 

46 """ 

47 self.logger.info("Connect elements") 

48 

49 # Check ports 

50 self.logger.info("Checking ports of elements ...") 

51 self.check_element_ports(elements) 

52 # Make connections by relations 

53 self.logger.info("Connecting the relevant elements") 

54 self.logger.info(" - Connecting by relations ...") 

55 all_ports = [port for item in elements.values() for port in item.ports] 

56 rel_connections = self.connections_by_relation(all_ports) 

57 self.logger.info(" - Found %d potential connections.", 

58 len(rel_connections)) 

59 # Check connections 

60 pos_connect_tol = (self.playground.sim_settings. 

61 tolerance_connect_by_position) 

62 if self.playground.sim_settings.verify_connection_by_position: 

63 self.logger.info(" - Checking positions of connections via " 

64 "geometric distance ...") 

65 confirmed, unconfirmed, rejected =\ 

66 self.confirm_connections_position( 

67 rel_connections, eps=pos_connect_tol) 

68 else: 

69 self.logger.info( 

70 "Geometric distance check for port connections is deactived by" 

71 " sim_setting 'tolerance_connect_by_position'. All found ports" 

72 " are confirmed without check.") 

73 confirmed = rel_connections 

74 rejected = [] 

75 unconfirmed = [] 

76 self.logger.info( 

77 " - %d connections are confirmed and %d rejected. %d can't be " 

78 "confirmed.", 

79 len(confirmed), len(rejected), len(unconfirmed)) 

80 for port1, port2 in confirmed + unconfirmed: 

81 # Unconfirmed ports have no position data and can not be 

82 # connected by position 

83 port1.connect(port2) 

84 # Connect unconnected ports by position 

85 unconnected_ports = (port for port in all_ports if 

86 not port.is_connected()) 

87 self.logger.info(" - Connecting remaining ports by position ...") 

88 

89 pos_connections = self.connections_by_position( 

90 unconnected_ports, eps=pos_connect_tol) 

91 self.logger.info(" - Found %d additional connections.", 

92 len(pos_connections)) 

93 for port1, port2 in pos_connections: 

94 port1.connect(port2) 

95 # Get number of connected and unconnected ports 

96 nr_total = len(all_ports) 

97 unconnected = [port for port in all_ports if not port.is_connected()] 

98 nr_unconnected = len(unconnected) 

99 nr_connected = nr_total - nr_unconnected 

100 self.logger.info("In total %d of %d ports are connected.", nr_connected, 

101 nr_total) 

102 if nr_total > nr_connected: 

103 self.logger.warning("%d ports are not connected!", nr_unconnected) 

104 # Connect by bounding box TODO: implement 

105 unconnected_elements = {uc.parent for uc in unconnected} 

106 if unconnected_elements: 

107 bb_connections = self.connections_by_boundingbox( 

108 unconnected, 

109 unconnected_elements 

110 ) 

111 self.logger.warning( 

112 "Connecting by bounding box is not implemented.") 

113 # Check inner connections 

114 yield from self.check_inner_connections(elements.values()) 

115 

116 # TODO: manually add / modify connections 

117 return elements, 

118 

119 @staticmethod 

120 def check_element_ports(elements: dict): 

121 """Checks position of all ports for each element. 

122 

123 Args: 

124 elements: dictionary of elements to be checked with GUID as key. 

125 """ 

126 for ele in elements.values(): 

127 for port_a, port_b in itertools.combinations(ele.ports, 2): 

128 if np.allclose(port_a.position, port_b.position, rtol=1e-7, 

129 atol=1): 

130 quality_logger.warning("Poor quality of elements %s: " 

131 "Overlapping ports (%s and %s @%s)", 

132 ele.ifc, port_a.guid, port_b.guid, 

133 port_a.position) 

134 connections = ConnectElements.connections_by_relation( 

135 [port_a, port_b], include_conflicts=True) 

136 all_ports = [port for connection in connections for port in 

137 connection] 

138 other_ports = [port for port in all_ports if 

139 port not in [port_a, port_b]] 

140 if port_a in all_ports and port_b in all_ports and len( 

141 set(other_ports)) == 1: 

142 # Both ports connected to same other port -> merge ports 

143 quality_logger.info( 

144 "Removing %s and set %s as SINKANDSOURCE.", 

145 port_b.ifc, port_a.ifc) 

146 ele.ports.remove(port_b) 

147 port_b.parent = None 

148 port_a.flow_direction = 0 

149 port_a.flow_master = True 

150 

151 @staticmethod 

152 def connections_by_relation(ports: list, include_conflicts: bool = False) \ 

153 -> list: 

154 """Connect ports of elements by IFC relations. 

155 

156 This method uses IfcRelConnects relations to establish connections 

157 between ports. It can include conflicting connections in the output 

158 if specified. 

159 

160 Args: 

161 ports: list of ports to be connected 

162 include_conflicts: if true, conflicts are included. Defaults to 

163 false. 

164 

165 Returns: 

166 connections: list of tuples of ports that are connected. 

167 """ 

168 connections = [] 

169 port_mapping = {port.guid: port for port in ports} 

170 for port in ports: 

171 if not port.ifc: 

172 continue 

173 connected_ports = [conn.RelatingPort for conn in 

174 port.ifc.ConnectedFrom] + [conn.RelatedPort for 

175 conn in 

176 port.ifc.ConnectedTo] 

177 if connected_ports: 

178 other_port = None 

179 if len(connected_ports) > 1: 

180 # conflicts 

181 quality_logger.warning("%s has multiple connections", 

182 port.ifc) 

183 possibilities = [] 

184 for connected_port in connected_ports: 

185 possible_port = port_mapping.get( 

186 connected_port.GlobalId) 

187 

188 if possible_port.parent is not None: 

189 possibilities.append(possible_port) 

190 

191 # solving conflicts 

192 if include_conflicts: 

193 for poss in possibilities: 

194 connections.append((port, poss)) 

195 else: 

196 if len(possibilities) == 1: 

197 other_port = possibilities[0] 

198 quality_logger.info( 

199 "Solved by ignoring deleted connection.") 

200 else: 

201 quality_logger.error( 

202 "Unable to solve conflicting connections. " 

203 "Continue without connecting %s", port.ifc) 

204 else: 

205 # explicit 

206 other_port = port_mapping.get(connected_ports[0].GlobalId) 

207 if other_port: 

208 if port.parent and other_port.parent: 

209 connections.append((port, other_port)) 

210 else: 

211 quality_logger.debug( 

212 "Not connecting ports without parent (%s, %s)", 

213 port, 

214 other_port 

215 ) 

216 return connections 

217 

218 @staticmethod 

219 def confirm_connections_position(connections: list, eps: float = 1)\ 

220 -> Tuple[list, list, list]: 

221 """Checks distance between port positions. 

222 

223 The method uses the 'port_distance' function from 'ConnectElements' 

224 to calculate distances. If distance < eps, the connection is 

225 confirmed otherwise rejected. 

226 

227 Args: 

228 connections: list of connections to be checked. 

229 eps: distance tolerance for which connections are either confirmed 

230 or rejected. Defaults to 1. 

231 

232 Returns: 

233 tuple of lists of connections (confirmed, unconfirmed, rejected) 

234 """ 

235 confirmed = [] 

236 unconfirmed = [] 

237 rejected = [] 

238 for port1, port2 in connections: 

239 delta = ConnectElements.port_distance(port1, port2) 

240 if delta is None: 

241 unconfirmed.append((port1, port2)) 

242 elif max(abs(delta)) < eps: 

243 confirmed.append((port1, port2)) 

244 else: 

245 rejected.append((port1, port2)) 

246 return confirmed, unconfirmed, rejected 

247 

248 @staticmethod 

249 def port_distance(port1: Port, port2: Port) -> np.array: 

250 """Calculates distance (delta in x, y, z) of ports. 

251 

252 Args: 

253 port1: the first port. 

254 port2: the seconds port. 

255 

256 Returns: 

257 delta: distance between port1 and port2 in x, y, z coordinates. 

258 """ 

259 try: 

260 delta = port1.position - port2.position 

261 except AttributeError: 

262 delta = None 

263 return delta 

264 

265 @staticmethod 

266 def connections_by_position(ports: Generator, eps: float = 10) -> list: 

267 """Connect ports of elements by computing geometric distance. 

268 

269 The method uses geometric distance between ports to establish 

270 connections. If multiple candidates are found for a port, the method 

271 prioritizes the closest one. 

272 

273 Args: 

274 ports: A generator of ports to be connected. 

275 eps: distance tolerance for which ports are connected. Defaults 

276 to 10. 

277 

278 Returns: 

279 list of tuples of ports that are connected. 

280 """ 

281 graph = nx.Graph() 

282 for port1, port2 in itertools.combinations(ports, 2): 

283 if port1.parent == port2.parent: 

284 continue 

285 delta = ConnectElements.port_distance(port1, port2) 

286 if delta is None: 

287 continue 

288 abs_delta = max(abs(delta)) 

289 if abs_delta < eps: 

290 graph.add_edge(port1, port2, delta=abs_delta) 

291 

292 # verify 

293 conflicts = [port for port, deg in graph.degree() if deg > 1] 

294 for port in conflicts: 

295 candidates = sorted(graph.edges(port, data=True), 

296 key=lambda t: t[2].get('delta', eps) 

297 ) 

298 # initially there are at least two candidates, but there will be 

299 # less, if previous conflicts belong to them 

300 if len(candidates) <= 1: 

301 # no action required 

302 continue 

303 quality_logger.warning( 

304 "Found %d geometrically close ports around %s. Details: %s", 

305 len(candidates), port, candidates) 

306 if candidates[0][2]['delta'] < candidates[1][2]['delta']: 

307 # keep first 

308 first = 1 

309 quality_logger.info( 

310 "Accept closest ports with delta %d as connection " 

311 "(%s - %s)", 

312 candidates[0][2]['delta'], 

313 candidates[0][0], 

314 candidates[0][1] 

315 ) 

316 else: 

317 # remove all 

318 first = 0 

319 quality_logger.warning( 

320 "No connection determined, because there are no two " 

321 "closest ports.") 

322 for cand in candidates[first:]: 

323 graph.remove_edge(cand[0], cand[1]) 

324 

325 return list(graph.edges()) 

326 

327 @staticmethod 

328 def check_inner_connections(elements: Iterable[ProductBased]) -> \ 

329 Generator[DecisionBunch, None, None]: 

330 """Check inner connections of HVACProducts. 

331 

332 Args: 

333 elements: An iterable of elements, where each element is a subclass 

334 of ProductBased. 

335 

336 Yields: 

337 Yields decisions to set inner connections. 

338 

339 """ 

340 # TODO: if a lot of decisions occur, it would help to merge 

341 # DecisionBunches before yielding them 

342 for element in elements: 

343 if isinstance(element, hvac.HVACProduct) \ 

344 and not element.inner_connections: 

345 yield from element.decide_inner_connections() 

346 

347 @staticmethod 

348 def connections_by_boundingbox(open_ports, elements): 

349 """Search for open ports in elements bounding boxes. 

350 

351 This is especially useful for vessel like elements with variable 

352 number of ports (and bad ifc export) or proxy elements. 

353 Missing ports on element side are created on demand.""" 

354 

355 # TODO: implement 

356 connections = [] 

357 return connections