Coverage for bim2sim/tasks/hvac/connect_elements.py: 80%
156 statements
« prev ^ index » next coverage.py v7.6.12, created at 2025-03-12 17:09 +0000
« prev ^ index » next coverage.py v7.6.12, created at 2025-03-12 17:09 +0000
1import itertools
2import logging
3from typing import Tuple, Generator, Iterable
5import networkx as nx
6import numpy as np
8from bim2sim.elements import hvac_elements as hvac
9from bim2sim.elements.base_elements import Port, ProductBased
10from bim2sim.kernel.decision import DecisionBunch
11from bim2sim.tasks.base import ITask, Playground
14quality_logger = logging.getLogger('bim2sim.QualityReport')
17class ConnectElements(ITask):
18 """Analyses IFC, creates element elements and connects them. Elements are
19 stored in elements dict with GUID as key."""
21 reads = ('elements',)
22 touches = ('elements',)
24 def __init__(self, playground: Playground):
25 super().__init__(playground)
26 self.elements = {}
27 pass
29 def run(self, elements: dict) -> dict:
30 """Connect elements based on port information and geometric relations.
32 This method performs the following steps:
33 1. Checks the ports of elements.
34 2. Connects the relevant elements based on relations.
35 3. Checks the positions of connections and connects ports based on
36 geometric distance.
37 4. Connects remaining unconnected ports by position.
38 5. Logs information about the number of connected and unconnected ports.
40 Args:
41 elements: dictionary of elements with GUID as key.
43 Returns:
44 elements: dictionary of elements with GUID as key, with updated
45 connections.
46 """
47 self.logger.info("Connect elements")
49 # Check ports
50 self.logger.info("Checking ports of elements ...")
51 self.check_element_ports(elements)
52 # Make connections by relations
53 self.logger.info("Connecting the relevant elements")
54 self.logger.info(" - Connecting by relations ...")
55 all_ports = [port for item in elements.values() for port in item.ports]
56 rel_connections = self.connections_by_relation(all_ports)
57 self.logger.info(" - Found %d potential connections.",
58 len(rel_connections))
59 # Check connections
60 pos_connect_tol = (self.playground.sim_settings.
61 tolerance_connect_by_position)
62 if self.playground.sim_settings.verify_connection_by_position:
63 self.logger.info(" - Checking positions of connections via "
64 "geometric distance ...")
65 confirmed, unconfirmed, rejected =\
66 self.confirm_connections_position(
67 rel_connections, eps=pos_connect_tol)
68 else:
69 self.logger.info(
70 "Geometric distance check for port connections is deactived by"
71 " sim_setting 'tolerance_connect_by_position'. All found ports"
72 " are confirmed without check.")
73 confirmed = rel_connections
74 rejected = []
75 unconfirmed = []
76 self.logger.info(
77 " - %d connections are confirmed and %d rejected. %d can't be "
78 "confirmed.",
79 len(confirmed), len(rejected), len(unconfirmed))
80 for port1, port2 in confirmed + unconfirmed:
81 # Unconfirmed ports have no position data and can not be
82 # connected by position
83 port1.connect(port2)
84 # Connect unconnected ports by position
85 unconnected_ports = (port for port in all_ports if
86 not port.is_connected())
87 self.logger.info(" - Connecting remaining ports by position ...")
89 pos_connections = self.connections_by_position(
90 unconnected_ports, eps=pos_connect_tol)
91 self.logger.info(" - Found %d additional connections.",
92 len(pos_connections))
93 for port1, port2 in pos_connections:
94 port1.connect(port2)
95 # Get number of connected and unconnected ports
96 nr_total = len(all_ports)
97 unconnected = [port for port in all_ports if not port.is_connected()]
98 nr_unconnected = len(unconnected)
99 nr_connected = nr_total - nr_unconnected
100 self.logger.info("In total %d of %d ports are connected.", nr_connected,
101 nr_total)
102 if nr_total > nr_connected:
103 self.logger.warning("%d ports are not connected!", nr_unconnected)
104 # Connect by bounding box TODO: implement
105 unconnected_elements = {uc.parent for uc in unconnected}
106 if unconnected_elements:
107 bb_connections = self.connections_by_boundingbox(
108 unconnected,
109 unconnected_elements
110 )
111 self.logger.warning(
112 "Connecting by bounding box is not implemented.")
113 # Check inner connections
114 yield from self.check_inner_connections(elements.values())
116 # TODO: manually add / modify connections
117 return elements,
119 @staticmethod
120 def check_element_ports(elements: dict):
121 """Checks position of all ports for each element.
123 Args:
124 elements: dictionary of elements to be checked with GUID as key.
125 """
126 for ele in elements.values():
127 for port_a, port_b in itertools.combinations(ele.ports, 2):
128 if np.allclose(port_a.position, port_b.position, rtol=1e-7,
129 atol=1):
130 quality_logger.warning("Poor quality of elements %s: "
131 "Overlapping ports (%s and %s @%s)",
132 ele.ifc, port_a.guid, port_b.guid,
133 port_a.position)
134 connections = ConnectElements.connections_by_relation(
135 [port_a, port_b], include_conflicts=True)
136 all_ports = [port for connection in connections for port in
137 connection]
138 other_ports = [port for port in all_ports if
139 port not in [port_a, port_b]]
140 if port_a in all_ports and port_b in all_ports and len(
141 set(other_ports)) == 1:
142 # Both ports connected to same other port -> merge ports
143 quality_logger.info(
144 "Removing %s and set %s as SINKANDSOURCE.",
145 port_b.ifc, port_a.ifc)
146 ele.ports.remove(port_b)
147 port_b.parent = None
148 port_a.flow_direction = 0
149 port_a.flow_master = True
151 @staticmethod
152 def connections_by_relation(ports: list, include_conflicts: bool = False) \
153 -> list:
154 """Connect ports of elements by IFC relations.
156 This method uses IfcRelConnects relations to establish connections
157 between ports. It can include conflicting connections in the output
158 if specified.
160 Args:
161 ports: list of ports to be connected
162 include_conflicts: if true, conflicts are included. Defaults to
163 false.
165 Returns:
166 connections: list of tuples of ports that are connected.
167 """
168 connections = []
169 port_mapping = {port.guid: port for port in ports}
170 for port in ports:
171 if not port.ifc:
172 continue
173 connected_ports = [conn.RelatingPort for conn in
174 port.ifc.ConnectedFrom] + [conn.RelatedPort for
175 conn in
176 port.ifc.ConnectedTo]
177 if connected_ports:
178 other_port = None
179 if len(connected_ports) > 1:
180 # conflicts
181 quality_logger.warning("%s has multiple connections",
182 port.ifc)
183 possibilities = []
184 for connected_port in connected_ports:
185 possible_port = port_mapping.get(
186 connected_port.GlobalId)
188 if possible_port.parent is not None:
189 possibilities.append(possible_port)
191 # solving conflicts
192 if include_conflicts:
193 for poss in possibilities:
194 connections.append((port, poss))
195 else:
196 if len(possibilities) == 1:
197 other_port = possibilities[0]
198 quality_logger.info(
199 "Solved by ignoring deleted connection.")
200 else:
201 quality_logger.error(
202 "Unable to solve conflicting connections. "
203 "Continue without connecting %s", port.ifc)
204 else:
205 # explicit
206 other_port = port_mapping.get(connected_ports[0].GlobalId)
207 if other_port:
208 if port.parent and other_port.parent:
209 connections.append((port, other_port))
210 else:
211 quality_logger.debug(
212 "Not connecting ports without parent (%s, %s)",
213 port,
214 other_port
215 )
216 return connections
218 @staticmethod
219 def confirm_connections_position(connections: list, eps: float = 1)\
220 -> Tuple[list, list, list]:
221 """Checks distance between port positions.
223 The method uses the 'port_distance' function from 'ConnectElements'
224 to calculate distances. If distance < eps, the connection is
225 confirmed otherwise rejected.
227 Args:
228 connections: list of connections to be checked.
229 eps: distance tolerance for which connections are either confirmed
230 or rejected. Defaults to 1.
232 Returns:
233 tuple of lists of connections (confirmed, unconfirmed, rejected)
234 """
235 confirmed = []
236 unconfirmed = []
237 rejected = []
238 for port1, port2 in connections:
239 delta = ConnectElements.port_distance(port1, port2)
240 if delta is None:
241 unconfirmed.append((port1, port2))
242 elif max(abs(delta)) < eps:
243 confirmed.append((port1, port2))
244 else:
245 rejected.append((port1, port2))
246 return confirmed, unconfirmed, rejected
248 @staticmethod
249 def port_distance(port1: Port, port2: Port) -> np.array:
250 """Calculates distance (delta in x, y, z) of ports.
252 Args:
253 port1: the first port.
254 port2: the seconds port.
256 Returns:
257 delta: distance between port1 and port2 in x, y, z coordinates.
258 """
259 try:
260 delta = port1.position - port2.position
261 except AttributeError:
262 delta = None
263 return delta
265 @staticmethod
266 def connections_by_position(ports: Generator, eps: float = 10) -> list:
267 """Connect ports of elements by computing geometric distance.
269 The method uses geometric distance between ports to establish
270 connections. If multiple candidates are found for a port, the method
271 prioritizes the closest one.
273 Args:
274 ports: A generator of ports to be connected.
275 eps: distance tolerance for which ports are connected. Defaults
276 to 10.
278 Returns:
279 list of tuples of ports that are connected.
280 """
281 graph = nx.Graph()
282 for port1, port2 in itertools.combinations(ports, 2):
283 if port1.parent == port2.parent:
284 continue
285 delta = ConnectElements.port_distance(port1, port2)
286 if delta is None:
287 continue
288 abs_delta = max(abs(delta))
289 if abs_delta < eps:
290 graph.add_edge(port1, port2, delta=abs_delta)
292 # verify
293 conflicts = [port for port, deg in graph.degree() if deg > 1]
294 for port in conflicts:
295 candidates = sorted(graph.edges(port, data=True),
296 key=lambda t: t[2].get('delta', eps)
297 )
298 # initially there are at least two candidates, but there will be
299 # less, if previous conflicts belong to them
300 if len(candidates) <= 1:
301 # no action required
302 continue
303 quality_logger.warning(
304 "Found %d geometrically close ports around %s. Details: %s",
305 len(candidates), port, candidates)
306 if candidates[0][2]['delta'] < candidates[1][2]['delta']:
307 # keep first
308 first = 1
309 quality_logger.info(
310 "Accept closest ports with delta %d as connection "
311 "(%s - %s)",
312 candidates[0][2]['delta'],
313 candidates[0][0],
314 candidates[0][1]
315 )
316 else:
317 # remove all
318 first = 0
319 quality_logger.warning(
320 "No connection determined, because there are no two "
321 "closest ports.")
322 for cand in candidates[first:]:
323 graph.remove_edge(cand[0], cand[1])
325 return list(graph.edges())
327 @staticmethod
328 def check_inner_connections(elements: Iterable[ProductBased]) -> \
329 Generator[DecisionBunch, None, None]:
330 """Check inner connections of HVACProducts.
332 Args:
333 elements: An iterable of elements, where each element is a subclass
334 of ProductBased.
336 Yields:
337 Yields decisions to set inner connections.
339 """
340 # TODO: if a lot of decisions occur, it would help to merge
341 # DecisionBunches before yielding them
342 for element in elements:
343 if isinstance(element, hvac.HVACProduct) \
344 and not element.inner_connections:
345 yield from element.decide_inner_connections()
347 @staticmethod
348 def connections_by_boundingbox(open_ports, elements):
349 """Search for open ports in elements bounding boxes.
351 This is especially useful for vessel like elements with variable
352 number of ports (and bad ifc export) or proxy elements.
353 Missing ports on element side are created on demand."""
355 # TODO: implement
356 connections = []
357 return connections