Coverage for bim2sim/elements/base_elements.py: 69%

497 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-10-01 10:24 +0000

1import logging 

2import pickle 

3import re 

4from json import JSONEncoder 

5from typing import Union, Iterable, Dict, List, Tuple, Type, Optional, Any 

6 

7import numpy as np 

8import ifcopenshell.geom 

9from ifcopenshell import guid 

10 

11from bim2sim.elements.aggregation import AggregationMixin 

12from bim2sim.kernel.decision import Decision, DecisionBunch 

13from bim2sim.kernel import IFCDomainError 

14from bim2sim.elements.mapping import condition, attribute, ifc2python 

15from bim2sim.elements.mapping.finder import TemplateFinder, SourceTool 

16from bim2sim.elements.mapping.units import ureg 

17from bim2sim.utilities.common_functions import angle_equivalent, vector_angle, \ 

18 remove_umlaut 

19from bim2sim.utilities.pyocc_tools import PyOCCTools 

20from bim2sim.utilities.types import IFCDomain, AttributeDataSource 

21 

22logger = logging.getLogger(__name__) 

23quality_logger = logging.getLogger('bim2sim.QualityReport') 

24settings_products = ifcopenshell.geom.main.settings() 

25settings_products.set(settings_products.USE_PYTHON_OPENCASCADE, True) 

26 

27 

28class ElementError(Exception): 

29 """Error in Element""" 

30 

31 

32class NoValueError(ElementError): 

33 """Value is not available""" 

34 

35 

36class ElementEncoder(JSONEncoder): 

37 """Encoder class for Element""" 

38 

39 # TODO: make Elements serializable and deserializable. 

40 # Ideas: guid to identify, (factory) method to (re)init by guid 

41 # mayby weakref to other elements (Ports, connections, ...) 

42 

43 def default(self, o): 

44 if isinstance(o, Element): 

45 return "<Element(%s)>" % o.guid 

46 return JSONEncoder.default() 

47 

48 

49class Element(metaclass=attribute.AutoAttributeNameMeta): 

50 """Most basic class""" 

51 guid_prefix = '' 

52 _id_counter = 0 

53 

54 def __init__(self, guid=None, **kwargs): 

55 self.guid = guid or self.get_id(self.guid_prefix) 

56 # self.related_decisions: List[Decision] = [] 

57 self.attributes = attribute.AttributeManager(bind=self) 

58 self.element_type = self.__class__.__name__ 

59 

60 # set attributes based on kwargs 

61 for kw, arg in kwargs.items(): 

62 if kw in self.attributes: # allow only attributes 

63 setattr(self, kw, arg) 

64 else: 

65 raise AttributeError(f"Unused argument in kwargs: {kw}: {arg}") 

66 

67 def __hash__(self): 

68 return hash(self.guid) 

69 

70 def validate_creation(self) -> bool: 

71 """Check if current instance is valid""" 

72 raise NotImplementedError 

73 

74 def validate_attributes(self) -> dict: 

75 """Check if attributes are valid""" 

76 return {} 

77 

78 def _calc_position(self, name) -> np.array: 

79 """Returns position (calculation may be expensive)""" 

80 return None 

81 

82 position = attribute.Attribute( 

83 description='Position of element', 

84 functions=[_calc_position] 

85 ) 

86 

87 @staticmethod 

88 def get_id(prefix=""): 

89 prefix_length = len(prefix) 

90 if prefix_length > 10: 

91 raise AttributeError("Max prefix length is 10!") 

92 Element._id_counter += 1 

93 return "{0:0<8s}{1:0>14d}".format(prefix, Element._id_counter) 

94 

95 @staticmethod 

96 def get_object(guid): 

97 """Get Element object instance with given guid 

98 

99 :returns: None if object with guid was not instanciated""" 

100 raise AssertionError("Obsolete method. " 

101 "Don't rely on global Element.objects. " 

102 "Use e.g. elements from tasks/playground.") 

103 

104 def request(self, name, external_decision: Decision = None) \ 

105 -> Union[None, Decision]: 

106 """Request the elements attribute. 

107 

108 Args: 

109 name: Name of attribute 

110 external_decision: Decision to use instead of default decision 

111 """ 

112 return self.attributes.request(name, external_decision) 

113 

114 def reset(self, name, data_source=AttributeDataSource.manual_overwrite): 

115 """Reset the attribute of the element. 

116 

117 Args: 

118 name: attribute name 

119 data_source (object): data source of the attribute 

120 """ 

121 

122 return self.attributes.reset(name, data_source) 

123 

124 def source_info(self) -> str: 

125 """Get informative string about source of Element.""" 

126 return '' 

127 

128 @classmethod 

129 def get_pending_attribute_decisions( 

130 cls, elements: Iterable['Element']) -> DecisionBunch: 

131 """Get all requested decisions of attributes and functions of attributes 

132 to afterwards calculate said attribute. 

133 

134 all decisions related to given elements are yielded. 

135 all attributes functions are used to calculate the remaining attributes 

136 """ 

137 

138 all_attr_decisions = DecisionBunch() 

139 for inst in elements: 

140 bunch = inst.attributes.get_decisions() 

141 all_attr_decisions.extend(bunch) 

142 

143 # sort decisions to preserve order 

144 all_attr_decisions.sort(key=lambda d: d.global_key) 

145 yield all_attr_decisions 

146 

147 @classmethod 

148 def full_reset(cls): 

149 raise AssertionError("Obsolete method. not required any more.") 

150 

151 

152class IFCBased(Element): 

153 """Element with instantiation from ifc and related methods. 

154 

155 Attributes: 

156 ifc: IfcOpenShell element instance 

157 ifc_types: Dict with ifc_type as key and list of predifined types that 

158 fit to the class as values. 

159 Special values for predifined types: 

160 '*' all which are not overwritten in other classes predfined types. 

161 '-Something' start with minus to exclude 

162 

163 For example: 

164 >>> {'IfcSlab': ['*', '-SomethingSpecialWeDontWant', 'BASESLAB']} 

165 >>> {'IfcRoof': ['FLAT_ROOF', 'SHED_ROOF',...], 

166 >>> 'IfcSlab': ['ROOF']}""" 

167 

168 ifc_types: Dict[str, List[str]] = None 

169 pattern_ifc_type = [] 

170 

171 def __init__(self, *args, 

172 ifc=None, 

173 finder: TemplateFinder = None, 

174 ifc_units: dict = None, 

175 ifc_domain: IFCDomain = None, 

176 **kwargs): 

177 super().__init__(*args, **kwargs) 

178 

179 self.ifc = ifc 

180 self.predefined_type = ifc2python.get_predefined_type(ifc) 

181 self.ifc_domain = ifc_domain 

182 self.finder = finder 

183 self.ifc_units = ifc_units 

184 self.source_tool: SourceTool = None 

185 

186 # TBD 

187 self.enrichment = {} # TODO: DJA 

188 self._propertysets = None 

189 self._type_propertysets = None 

190 self._decision_results = {} 

191 

192 @classmethod 

193 def ifc2args(cls, ifc) -> Tuple[tuple, dict]: 

194 """Extract init args and kwargs from ifc""" 

195 guid = getattr(ifc, 'GlobalId', None) 

196 kwargs = {'guid': guid, 'ifc': ifc} 

197 return (), kwargs 

198 

199 @classmethod 

200 def from_ifc(cls, ifc, *args, **kwargs): 

201 """Factory method to create instance from ifc""" 

202 ifc_args, ifc_kwargs = cls.ifc2args(ifc) 

203 kwargs.update(ifc_kwargs) 

204 return cls(*(args + ifc_args), **kwargs) 

205 

206 @property 

207 def ifc_type(self): 

208 if self.ifc: 

209 return self.ifc.is_a() 

210 

211 @classmethod 

212 def pre_validate(cls, ifc) -> bool: 

213 """Check if ifc meets conditions to create element from it""" 

214 raise NotImplementedError 

215 

216 def _calc_position(self, name): 

217 """returns absolute position""" 

218 if hasattr(self.ifc, 'ObjectPlacement'): 

219 absolute = np.array(self.ifc.ObjectPlacement.RelativePlacement.Location.Coordinates) 

220 placementrel = self.ifc.ObjectPlacement.PlacementRelTo 

221 while placementrel is not None: 

222 absolute += np.array(placementrel.RelativePlacement.Location.Coordinates) 

223 placementrel = placementrel.PlacementRelTo 

224 else: 

225 absolute = None 

226 

227 return absolute 

228 

229 def _get_name_from_ifc(self, name): 

230 ifc_name = self.get_ifc_attribute('Name') 

231 if ifc_name: 

232 return remove_umlaut(ifc_name) 

233 

234 name = attribute.Attribute( 

235 description="Name of element based on IFC attribute.", 

236 functions=[_get_name_from_ifc] 

237 ) 

238 

239 def get_ifc_attribute(self, attribute): 

240 """ 

241 Fetches non-empty attributes (if they exist). 

242 """ 

243 return getattr(self.ifc, attribute, None) 

244 

245 def get_propertyset(self, propertysetname): 

246 return ifc2python.get_property_set_by_name( 

247 propertysetname, self.ifc, self.ifc_units) 

248 

249 def get_propertysets(self): 

250 if self._propertysets is None: 

251 self._propertysets = ifc2python.get_property_sets( 

252 self.ifc, self.ifc_units) 

253 return self._propertysets 

254 

255 def get_type_propertysets(self): 

256 if self._type_propertysets is None: 

257 self._type_propertysets = ifc2python.get_type_property_sets( 

258 self.ifc, self.ifc_units) 

259 return self._type_propertysets 

260 

261 def get_hierarchical_parent(self): 

262 return ifc2python.getHierarchicalParent(self.ifc) 

263 

264 def get_hierarchical_children(self): 

265 return ifc2python.getHierarchicalChildren(self.ifc) 

266 

267 def get_spartial_parent(self): 

268 return ifc2python.getSpatialParent(self.ifc) 

269 

270 def get_spartial_children(self): 

271 return ifc2python.getSpatialChildren(self.ifc) 

272 

273 def get_space(self): 

274 return ifc2python.getSpace(self.ifc) 

275 

276 def get_storey(self): 

277 return ifc2python.getStorey(self.ifc) 

278 

279 def get_building(self): 

280 return ifc2python.getBuilding(self.ifc) 

281 

282 def get_site(self): 

283 return ifc2python.getSite(self.ifc) 

284 

285 def get_project(self): 

286 return ifc2python.getProject(self.ifc) 

287 

288 def get_true_north(self): 

289 return ifc2python.get_true_north(self.ifc) 

290 

291 def summary(self): 

292 return ifc2python.summary(self.ifc) 

293 

294 def search_property_hierarchy(self, propertyset_name): 

295 """Search for property in all related properties in hierarchical order. 

296 

297 1. element's propertysets 

298 2. element type's propertysets""" 

299 

300 p_set = None 

301 p_sets = self.get_propertysets() 

302 try: 

303 p_set = p_sets[propertyset_name] 

304 except KeyError: 

305 pass 

306 else: 

307 return p_set 

308 

309 pt_sets = self.get_type_propertysets() 

310 try: 

311 p_set = pt_sets[propertyset_name] 

312 except KeyError: 

313 pass 

314 else: 

315 return p_set 

316 return p_set 

317 

318 def inverse_properties(self): 

319 """Generator yielding tuples of PropertySet name and Property name""" 

320 for p_set_name, p_set in self.get_propertysets().items(): 

321 for p_name in p_set.keys(): 

322 yield (p_set_name, p_name) 

323 

324 def filter_properties(self, patterns): 

325 """filter all properties by re pattern 

326 

327 :returns: list of tuple(propertyset_name, property_name, match_graph)""" 

328 matches = [] 

329 for propertyset_name, property_name in self.inverse_properties(): 

330 for pattern in patterns: 

331 match = re.match(pattern, property_name) 

332 if match: 

333 matches.append((propertyset_name, property_name, match)) 

334 return matches 

335 

336 @classmethod 

337 def filter_for_text_fragments(cls, ifc_element, ifc_units: dict, 

338 optional_locations: list = None): 

339 """Find text fragments that match the class patterns in an IFC element. 

340 

341 Args: 

342 ifc_element: The IFC element to check. 

343 ifc_units: Dictionary containing IFC unit information. 

344 optional_locations: Additional locations to check patterns beyond 

345 name. Defaults to None. 

346 

347 Returns: 

348 list: List of matched fragments, empty list if no matches found. 

349 """ 

350 results = [] 

351 

352 # Check name matches 

353 name_hits = [p.search(ifc_element.Name) for p in cls.pattern_ifc_type] 

354 name_hits = [hit for hit in name_hits if hit is not None] 

355 if name_hits: 

356 quality_logger.info( 

357 f"Identified {cls.ifc_type} through text fragments in name. " 

358 f"Criteria: {name_hits}") 

359 results.append(name_hits[0][0]) 

360 

361 # Check optional locations 

362 if optional_locations: 

363 for loc in optional_locations: 

364 prop_value = ifc2python.get_property_set_by_name( 

365 loc, ifc_element, ifc_units) 

366 if not prop_value: 

367 continue 

368 

369 loc_hits = [p.search(prop_value) for p in cls.pattern_ifc_type] 

370 loc_hits = [hit for hit in loc_hits if hit is not None] 

371 if loc_hits: 

372 quality_logger.info( 

373 f"Identified {cls.ifc_type} through text fragments " 

374 f"in {loc}. Criteria: {loc_hits}") 

375 results.append(loc_hits[0][0]) 

376 

377 return results 

378 

379 def get_exact_property(self, propertyset_name: str, property_name: str): 

380 """Returns value of property specified by propertyset name and property name 

381 

382 :Raises: AttributeError if property does not exist""" 

383 try: 

384 p_set = self.search_property_hierarchy(propertyset_name) 

385 value = p_set[property_name] 

386 except (AttributeError, KeyError, TypeError): 

387 raise NoValueError("Property '%s.%s' does not exist" % ( 

388 propertyset_name, property_name)) 

389 return value 

390 

391 def select_from_potential_properties(self, patterns, name, 

392 collect_decisions): 

393 """Ask user to select from all properties matching patterns""" 

394 

395 matches = self.filter_properties(patterns) 

396 if matches: 

397 values = [] 

398 choices = [] 

399 for propertyset_name, property_name, match in matches: 

400 value = self.get_exact_property(propertyset_name, property_name) 

401 values.append(value) 

402 choices.append((propertyset_name, property_name)) 

403 # print("%s.%s = %s"%(propertyset_name, property_name, value)) 

404 

405 # TODO: Decision: save for all following elements of same class ( 

406 # dont ask again?) 

407 # selected = (propertyset_name, property_name, value) 

408 

409 distinct_values = set(values) 

410 if len(distinct_values) == 1: 

411 # multiple sources but common value 

412 return distinct_values.pop() 

413 else: 

414 quality_logger.warning('Found multiple values for attributes %s of instance %s' % ( 

415 ', '.join((str((m[0], m[1])) for m in matches)), self)) 

416 return distinct_values 

417 

418 return None 

419 

420 # # TODO: Decision with id, key, value 

421 # decision = DictDecision("Multiple possibilities found", 

422 # choices=dict(zip(choices, values)), 

423 # output=self.attributes, 

424 # key=name, 

425 # global_key="%s_%s.%s" % (self.ifc_type, 

426 # self.guid, name), 

427 # allow_skip=True, allow_load=True, 

428 # allow_save=True, 

429 # collect=collect_decisions, 

430 # quick_decide=not collect_decisions) 

431 # 

432 # if collect_decisions: 

433 # raise PendingDecisionError() 

434 # 

435 # return decision.value 

436 # raise NoValueError("No matching property for %s" % (patterns)) 

437 

438 def source_info(self) -> str: 

439 return f'{self.ifc_type}:{self.guid}' 

440 

441 

442class RelationBased(IFCBased): 

443 conditions = [] 

444 

445 def __repr__(self): 

446 return "<%s (guid: %s)>" % (self.__class__.__name__, self.guid) 

447 

448 def __str__(self): 

449 return "%s" % self.__class__.__name__ 

450 

451 

452class RelationBased(IFCBased): 

453 

454 pass 

455 

456 

457class ProductBased(IFCBased): 

458 """Elements based on IFC products. 

459 

460 Args: 

461 material: material of the element 

462 material_set: dict of material and fraction [0, 1] if multiple materials 

463 """ 

464 domain = 'GENERAL' 

465 key: str = '' 

466 key_map: Dict[str, 'Type[ProductBased]'] = {} 

467 conditions = [] 

468 

469 def __init__(self, *args, **kwargs): 

470 super().__init__(*args, **kwargs) 

471 self.aggregation = None 

472 self.ports = self.get_ports() 

473 self.material = None 

474 self.material_set = {} 

475 self.cost_group = self.calc_cost_group() 

476 

477 def __init_subclass__(cls, **kwargs): 

478 # set key for each class 

479 cls.key = f'{cls.domain}-{cls.__name__}' 

480 cls.key_map[cls.key] = cls 

481 

482 def get_ports(self): 

483 return [] 

484 

485 def get_better_subclass(self) -> Union[None, Type['IFCBased']]: 

486 """Returns alternative subclass of current object. 

487 CAUTION: only use this if you can't know the result before instantiation 

488 of base class 

489 

490 Returns: 

491 object: subclass of ProductBased or None""" 

492 return None 

493 

494 @property 

495 def neighbors(self): 

496 """Directly connected elements""" 

497 neighbors = [] 

498 for port in self.ports: 

499 if port.connection: 

500 neighbors.append(port.connection.parent) 

501 return neighbors 

502 

503 def validate_creation(self): 

504 """"Validate the element creation in two steps. 

505 1. Check if standard parameter are in valid range. 

506 2. Check if number of ports are equal to number of expected ports 

507 (only for HVAC). 

508 """ 

509 for cond in self.conditions: 

510 if cond.critical_for_creation: 

511 value = getattr(self, cond.key) 

512 # don't prevent creation if value is not existing 

513 if value: 

514 if not cond.check(self, value): 

515 logger.warning("%s validation (%s) failed for %s", self.ifc_type, cond.name, self.guid) 

516 return False 

517 if not self.validate_ports(): 

518 logger.warning("%s has %d ports, but %s expected for %s", self.ifc_type, len(self.ports), 

519 self.expected_hvac_ports, self.guid) 

520 return False 

521 return True 

522 

523 def validate_attributes(self) -> dict: 

524 """Check if all attributes are valid, returns dict with key = attribute 

525 and value = True or False""" 

526 results = {} 

527 for cond in self.conditions: 

528 if not cond.critical_for_creation: 

529 # todo 

530 pass 

531 # if not cond.check(self): 

532 # logger.warning("%s validation (%s) failed for %s", 

533 # self.ifc_type, cond.name, self.guid) 

534 # return False 

535 # return True 

536 return results 

537 

538 def validate_ports(self): 

539 return True 

540 

541 def __repr__(self): 

542 return "<%s>" % self.__class__.__name__ 

543 

544 def calc_cost_group(self) -> Optional[int]: 

545 """Calculate the cost group according to DIN276""" 

546 return None 

547 

548 def calc_product_shape(self): 

549 """Calculate the product shape based on IfcProduct representation.""" 

550 if hasattr(self.ifc, 'Representation'): 

551 try: 

552 shape = ifcopenshell.geom.create_shape( 

553 settings_products, self.ifc).geometry 

554 return shape 

555 except: 

556 logger.warning(f"No calculation of product shape possible " 

557 f"for {self.ifc}.") 

558 

559 def calc_volume_from_ifc_shape(self): 

560 # todo use more efficient iterator to calc all shapes at once 

561 # with multiple cores: 

562 # https://wiki.osarch.org/index.php?title=IfcOpenShell_code_examples 

563 if hasattr(self.ifc, 'Representation'): 

564 try: 

565 shape = ifcopenshell.geom.create_shape( 

566 settings_products, self.ifc).geometry 

567 vol = PyOCCTools.get_shape_volume(shape) 

568 vol = vol * ureg.meter ** 3 

569 return vol 

570 except: 

571 logger.warning(f"No calculation of geometric volume possible " 

572 f"for {self.ifc}.") 

573 

574 def _get_volume(self, name): 

575 if hasattr(self, "net_volume"): 

576 if self.net_volume: 

577 vol = self.net_volume 

578 return vol 

579 vol = self.calc_volume_from_ifc_shape() 

580 return vol 

581 

582 volume = attribute.Attribute( 

583 description="Volume of the attribute", 

584 functions=[_get_volume], 

585 ) 

586 

587 def __str__(self): 

588 return "<%s>" % (self.__class__.__name__) 

589 

590 

591class Port(RelationBased): 

592 """Basic port""" 

593 

594 def __init__(self, parent, *args, **kwargs): 

595 super().__init__(*args, **kwargs) 

596 self.parent: ProductBased = parent 

597 self.connection = None 

598 

599 def connect(self, other): 

600 """Connect this interface bidirectional to another interface""" 

601 assert isinstance(other, Port), \ 

602 "Can't connect interfaces of different classes." 

603 # if self.flow_direction == 'SOURCE' or \ 

604 # self.flow_direction == 'SOURCEANDSINK': 

605 if self.connection and self.connection is not other: 

606 raise AttributeError("Port is already connected!") 

607 if other.connection and other.connection is not self: 

608 raise AttributeError("Other port is already connected!") 

609 self.connection = other 

610 other.connection = self 

611 

612 def disconnect(self): 

613 """remove connection between self and other port""" 

614 other = self.connection 

615 if other: 

616 self.connection = None 

617 other.disconnect() 

618 

619 def is_connected(self): 

620 """Returns truth value of port's connection""" 

621 return bool(self.connection) 

622 

623 def __repr__(self): 

624 if self.parent: 

625 try: 

626 idx = self.parent.ports.index(self) 

627 return "<%s #%d of %s)>" % ( 

628 self.__class__.__name__, idx, self.parent) 

629 except ValueError: 

630 return "<%s (broken parent: %s)>" % ( 

631 self.__class__.__name__, self.parent) 

632 return "<%s (*abandoned*)>" % self.__class__.__name__ 

633 

634 def __str__(self): 

635 return self.__repr__()[1:-2] 

636 

637 

638class Material(ProductBased): 

639 guid_prefix = 'Material_' 

640 key: str = 'Material' 

641 ifc_types = { 

642 'IfcMaterial': ["*"] 

643 } 

644 name = '' 

645 

646 def __init__(self, *args, **kwargs): 

647 super().__init__(*args, **kwargs) 

648 self.parents: List[ProductBased] = [] 

649 

650 @staticmethod 

651 def get_id(prefix=""): 

652 prefix_length = len(prefix) 

653 if prefix_length > 10: 

654 raise AttributeError("Max prefix length is 10!") 

655 ifcopenshell_guid = guid.new()[prefix_length + 1:] 

656 return f"{prefix}{ifcopenshell_guid}" 

657 

658 conditions = [ 

659 condition.RangeCondition('spec_heat_capacity', 

660 0 * ureg.kilojoule / (ureg.kg * ureg.K), 

661 5 * ureg.kilojoule / (ureg.kg * ureg.K), 

662 critical_for_creation=False), 

663 condition.RangeCondition('density', 

664 0 * ureg.kg / ureg.m ** 3, 

665 50000 * ureg.kg / ureg.m ** 3, 

666 critical_for_creation=False), 

667 condition.RangeCondition('thermal_conduc', 

668 0 * ureg.W / ureg.m / ureg.K, 

669 100 * ureg.W / ureg.m / ureg.K, 

670 critical_for_creation=False), 

671 condition.RangeCondition('porosity', 

672 0 * ureg.dimensionless, 

673 1 * ureg.dimensionless, True, 

674 critical_for_creation=False), 

675 condition.RangeCondition('solar_absorp', 

676 0 * ureg.percent, 

677 1 * ureg.percent, True, 

678 critical_for_creation=False), 

679 ] 

680 

681 spec_heat_capacity = attribute.Attribute( 

682 default_ps=("Pset_MaterialThermal", "SpecificHeatCapacity"), 

683 # functions=[get_from_template], 

684 unit=ureg.kilojoule / (ureg.kg * ureg.K) 

685 ) 

686 

687 density = attribute.Attribute( 

688 default_ps=("Pset_MaterialCommon", "MassDensity"), 

689 unit=ureg.kg / ureg.m ** 3 

690 ) 

691 

692 thermal_conduc = attribute.Attribute( 

693 default_ps=("Pset_MaterialThermal", "ThermalConductivity"), 

694 unit=ureg.W / (ureg.m * ureg.K) 

695 ) 

696 

697 porosity = attribute.Attribute( 

698 default_ps=("Pset_MaterialCommon", "Porosity"), 

699 unit=ureg.dimensionless 

700 ) 

701 

702 # todo is percent the correct unit? (0-1) 

703 solar_absorp = attribute.Attribute( 

704 # default_ps=('Pset_MaterialOptical', 'SolarTransmittance'), 

705 default=0.7, 

706 unit=ureg.percent 

707 ) 

708 

709 def __repr__(self): 

710 if self.name: 

711 return "<%s %s>" % (self.__class__.__name__, self.name) 

712 else: 

713 return "<%s>" % self.__class__.__name__ 

714 

715 

716class Dummy(ProductBased): 

717 """Dummy for all unknown elements""" 

718 

719 ifc_types = { 

720 "IfcElementProxy": ['*'] 

721 } 

722 

723 # def __init__(self, *args, **kwargs): 

724 # super().__init__(*args, **kwargs) 

725 # 

726 # self._ifc_type = self.ifc.get_info()['type'] 

727 

728 @property 

729 def ifc_type(self): 

730 return self._ifc_type 

731 

732 def __str__(self): 

733 return "Dummy '%s'" % self.ifc_type 

734 

735 

736class Factory: 

737 """Element Factory for :class: `ProductBased` 

738 

739 To understand the concept of the factory class, we refer to this article: 

740 https://refactoring.guru/design-patterns/factory-method/python/example 

741 

742 Example: 

743 >>> factory = Factory([Pipe, Boiler], dummy) 

744 >>> ele = factory(some_ifc_element) 

745 """ 

746 

747 def __init__( 

748 self, 

749 relevant_elements: set[ProductBased], 

750 ifc_units: dict, 

751 ifc_domain: IFCDomain, 

752 finder: Union[TemplateFinder, None] = None, 

753 dummy=Dummy): 

754 self.mapping, self.blacklist, self.defaults = self.create_ifc_mapping(relevant_elements) 

755 self.dummy_cls = dummy 

756 self.ifc_domain = ifc_domain 

757 self.finder = finder 

758 self.ifc_units = ifc_units 

759 

760 def __call__(self, ifc_entity, *args, ifc_type: str = None, use_dummy=True, 

761 **kwargs) -> ProductBased: 

762 """Run factory to create element instance. 

763 

764 Calls self.create() function but before checks which element_cls is the 

765 correct mapping for the given ifc_entity. 

766 

767 Args: 

768 ifc_entity: IfcOpenShell entity 

769 args: additional args passed to element 

770 ifc_type: ify type to create element for. 

771 defaults to ifc_entity.is_a() 

772 use_dummy: use dummy class if nothing is found 

773 kwargs: additional kwargs passed to element 

774 Raises: 

775 LookupError: if no element found and use_dummy = False 

776 Returns: 

777 element: created element instance 

778 """ 

779 _ifc_type = ifc_type or ifc_entity.is_a() 

780 predefined_type = ifc2python.get_predefined_type(ifc_entity) 

781 element_cls = self.get_element(_ifc_type, predefined_type) 

782 if not element_cls: 

783 if use_dummy: 

784 element_cls = self.dummy_cls 

785 else: 

786 raise LookupError(f"No element found for {ifc_entity}") 

787 # TODO # 537 Put this to a point where it makes sense, return None is no 

788 # solution 

789 if hasattr(element_cls, 'from_ifc_domains'): 

790 if self.ifc_domain not in element_cls.from_ifc_domains: 

791 logger.warning( 

792 f"Element has {self.ifc_domain} but f{element_cls.__name__}" 

793 f" will only be created for IFC files of domain " 

794 f"{element_cls.from_ifc_domains}.") 

795 raise IFCDomainError( 

796 f"Element has {self.ifc_domain} but f{element_cls.__name__}" 

797 f" will only be created for IFC files of domain " 

798 f"{element_cls.from_ifc_domains}") 

799 

800 element = self.create(element_cls, ifc_entity, *args, **kwargs) 

801 return element 

802 

803 def create(self, element_cls, ifc_entity, *args, **kwargs): 

804 """Create Element from class and ifc""" 

805 # instantiate element 

806 

807 element = element_cls.from_ifc( 

808 ifc_entity, ifc_domain=self.ifc_domain, finder=self.finder, 

809 ifc_units=self.ifc_units, *args, **kwargs) 

810 # check if it prefers to be sth else 

811 better_cls = element.get_better_subclass() 

812 if better_cls: 

813 logger.info("Creating %s instead of %s", better_cls, element_cls) 

814 element = better_cls.from_ifc( 

815 ifc_entity, 

816 ifc_domain=self.ifc_domain, 

817 finder=self.finder, 

818 ifc_units=self.ifc_units, 

819 *args, **kwargs) 

820 return element 

821 

822 def get_element(self, ifc_type: str, predefined_type: Union[str, None]) -> \ 

823 Union[ProductBased, None]: 

824 """Get element class by ifc type and predefined type""" 

825 if predefined_type: 

826 key = (ifc_type.lower(), predefined_type.upper()) 

827 # 1. go over normal list, if found match_graph --> return 

828 element = self.mapping.get(key) 

829 if element: 

830 return element 

831 # 2. go over negative list, if found match_graph --> not existing 

832 if key in self.blacklist: 

833 return None 

834 # 3. go over default list, if found match_graph --> return 

835 return self.defaults.get(ifc_type.lower()) 

836 

837 # def _get_by_guid(self, guid: str) -> Union[ProductBased, None]: 

838 # """Get item from given guid created by this factory.""" 

839 # return self._objects.get(guid) 

840 # 

841 # def _get_by_cls(self, item_cls: Type[ProductBased]) -> List[ProductBased]: 

842 # """Get list of child items from given class created by this factory.""" 

843 # return [item for item in self._objects.values() 

844 # if isinstance(item, item_cls)] 

845 

846 @staticmethod 

847 def create_ifc_mapping(elements: Iterable) -> Tuple[ 

848 Dict[Tuple[str, str], ProductBased], 

849 List[Tuple[str, ProductBased]], 

850 Dict[str, ProductBased] 

851 ]: 

852 """Create mapping dict, blacklist and default dict from elements 

853 

854 WARNING: ifc_type is always converted to lower case 

855 and predefined types to upper case 

856 

857 Returns: 

858 mapping: dict of ifc_type and predefined_type to element class 

859 blacklist: list of ifc_type which will not be taken into account 

860 default: dict of ifc_type to element class 

861 """ 

862 # TODO: cover virtual elements e.g. Space Boundaries (not products) 

863 

864 mapping = {} 

865 blacklist = [] 

866 default = {} 

867 _all_ifc_types = set() 

868 

869 for ele in elements: 

870 for ifc_type, tokens in ele.ifc_types.items(): 

871 _all_ifc_types.add(ifc_type.lower()) 

872 for token in tokens: 

873 # create default dict where all stars are taken into account 

874 # items 'IfcSlab': Slab 

875 if token == '*': 

876 if ifc_type in default: 

877 raise NameError(f"Conflicting default ifc_types for {ifc_type}") # TBD 

878 default[ifc_type.lower()] = ele 

879 # create blacklist where all - are taken into account 

880 # items: ('IfcRoof', 'WeiredStuff') 

881 elif token.startswith('-'): 

882 blacklist.append((ifc_type.lower(), token[1:].upper())) 

883 # create mapping dict 

884 # items ('IfcSlab', 'Roof'): Roof 

885 else: 

886 mapping[(ifc_type.lower(), token.upper())] = ele 

887 

888 # check ifc types without default 

889 no_default = _all_ifc_types - set(default) 

890 if no_default: 

891 logger.warning("The following ifc types have no default " 

892 "representing Element class. There will be no " 

893 "match if predefined type is not provided.\n%s", 

894 no_default) 

895 return mapping, blacklist, default 

896 

897 

898class SerializedElement: 

899 """Serialized version of an element. 

900 

901 This is a workaround as we can't serialize elements due to the usage of 

902 IfcOpenShell which uses unpickable swigPy objects. We just store the most 

903 important information which are guid, element_type, storeys, aggregated 

904 elements and the attributes from the attribute system.""" 

905 def __init__(self, element): 

906 self.guid = element.guid 

907 self.element_type = element.__class__.__name__ 

908 for attr_name, attr_val in element.attributes.items(): 

909 # assign value directly to attribute without status 

910 # make sure to get the value 

911 value = getattr(element, attr_name) 

912 if self.is_picklable(value): 

913 setattr(self, attr_name, value) 

914 else: 

915 try: 

916 logger.info( 

917 f"Attribute {attr_name} will not be serialized, as it's " 

918 f"not pickleable, trying to add alternative " 

919 f"information instead.") 

920 if isinstance(value, (list, tuple)): 

921 temp_list = [] 

922 for val in value: 

923 if hasattr(val, 'guid'): 

924 temp_list.append(val.guid) 

925 setattr(self, attr_name, temp_list) 

926 logger.info(f"Successfully linked a list of guids.") 

927 elif isinstance(value, str): 

928 setattr(self, attr_name, value) 

929 logger.info(f"Successfully linked value as string.") 

930 elif hasattr(value, 'guid'): 

931 setattr(self, attr_name, value.guid) 

932 logger.info(f"Successfully linked a single guid.") 

933 elif hasattr(value, 'Coord'): 

934 setattr(self, attr_name, value.Coord()) 

935 logger.info(f"Successfully linked a coordinate tuple.") 

936 elif value is None: 

937 setattr(self, attr_name, None) 

938 logger.info(f"Successfully set attribute value to " 

939 f"None.") 

940 else: 

941 logger.info("Linking alternative pickleable attributes " 

942 "failed.") 

943 except AttributeError: 

944 logger.info(f"Linking attribute failed.") 

945 for attr_name, attr_val in vars(element).items(): 

946 if hasattr(self, attr_name) or attr_name == 'attributes': 

947 continue 

948 else: 

949 logger.info(f"Try to add attribute data for attribute " 

950 f"'{attr_name}' that is not in AttributeManager.") 

951 value = attr_val 

952 if isinstance(value, (list, tuple)): 

953 temp_list = [] 

954 for val in value: 

955 if hasattr(val, 'guid'): 

956 temp_list.append(val.guid) 

957 setattr(self, attr_name, temp_list) 

958 logger.info( 

959 f"Successfully linked a list of guids.") 

960 elif isinstance(value, str): 

961 setattr(self, attr_name, value) 

962 logger.info( 

963 f"Successfully added {attr_name} as string.") 

964 elif hasattr(value, 'guid'): 

965 setattr(self, attr_name, value.guid) 

966 logger.info(f"Successfully linked a single guid.") 

967 elif hasattr(value, 'Coord'): 

968 setattr(self, attr_name, value.Coord()) 

969 logger.info(f"Successfully linked a coordinate tuple.") 

970 elif value is None: 

971 setattr(self, attr_name, None) 

972 logger.info(f"Successfully set attribute value to " 

973 f"None.") 

974 else: 

975 logger.info("Linking alternative pickleable attributes " 

976 "failed.") 

977 if issubclass(element.__class__, AggregationMixin): 

978 self.elements = [ele.guid for ele in element.elements] 

979 

980 @staticmethod 

981 def is_picklable(value: Any) -> bool: 

982 """Determines if a given value is picklable. 

983 

984 This method attempts to serialize the provided value using the `pickle` module. 

985 If the value can be successfully serialized, it is considered picklable. 

986 

987 Args: 

988 value (Any): The value to be tested for picklability. 

989 

990 Returns: 

991 bool: True if the value is picklable, False otherwise. 

992 """ 

993 try: 

994 pickle.dumps(value) 

995 return True 

996 except (pickle.PicklingError, TypeError): 

997 return False 

998 

999 def __repr__(self): 

1000 return "<serialized %s (guid: '%s')>" % ( 

1001 self.element_type, self.guid)