Coverage for bim2sim/elements/base_elements.py: 77%

442 statements  

« prev     ^ index     » next       coverage.py v7.6.12, created at 2025-03-12 17:09 +0000

1import logging 

2import pickle 

3import re 

4from json import JSONEncoder 

5from typing import Union, Iterable, Dict, List, Tuple, Type, Optional, Any 

6 

7import numpy as np 

8import ifcopenshell.geom 

9from ifcopenshell import guid 

10 

11from bim2sim.elements.aggregation import AggregationMixin 

12from bim2sim.kernel.decision import Decision, DecisionBunch 

13from bim2sim.kernel import IFCDomainError 

14from bim2sim.elements.mapping import condition, attribute, ifc2python 

15from bim2sim.elements.mapping.finder import TemplateFinder, SourceTool 

16from bim2sim.elements.mapping.units import ureg 

17from bim2sim.utilities.common_functions import angle_equivalent, vector_angle, \ 

18 remove_umlaut 

19from bim2sim.utilities.pyocc_tools import PyOCCTools 

20from bim2sim.utilities.types import IFCDomain, AttributeDataSource 

21 

22logger = logging.getLogger(__name__) 

23quality_logger = logging.getLogger('bim2sim.QualityReport') 

24settings_products = ifcopenshell.geom.main.settings() 

25settings_products.set(settings_products.USE_PYTHON_OPENCASCADE, True) 

26 

27 

28class ElementError(Exception): 

29 """Error in Element""" 

30 

31 

32class NoValueError(ElementError): 

33 """Value is not available""" 

34 

35 

36class ElementEncoder(JSONEncoder): 

37 """Encoder class for Element""" 

38 

39 # TODO: make Elements serializable and deserializable. 

40 # Ideas: guid to identify, (factory) method to (re)init by guid 

41 # mayby weakref to other elements (Ports, connections, ...) 

42 

43 def default(self, o): 

44 if isinstance(o, Element): 

45 return "<Element(%s)>" % o.guid 

46 return JSONEncoder.default() 

47 

48 

49class Element(metaclass=attribute.AutoAttributeNameMeta): 

50 """Most basic class""" 

51 guid_prefix = '' 

52 _id_counter = 0 

53 

54 def __init__(self, guid=None, **kwargs): 

55 self.guid = guid or self.get_id(self.guid_prefix) 

56 # self.related_decisions: List[Decision] = [] 

57 self.attributes = attribute.AttributeManager(bind=self) 

58 

59 # set attributes based on kwargs 

60 for kw, arg in kwargs.items(): 

61 if kw in self.attributes: # allow only attributes 

62 setattr(self, kw, arg) 

63 else: 

64 raise AttributeError(f"Unused argument in kwargs: {kw}: {arg}") 

65 

66 def __hash__(self): 

67 return hash(self.guid) 

68 

69 def validate_creation(self) -> bool: 

70 """Check if current instance is valid""" 

71 raise NotImplementedError 

72 

73 def validate_attributes(self) -> dict: 

74 """Check if attributes are valid""" 

75 return {} 

76 

77 def _calc_position(self, name) -> np.array: 

78 """Returns position (calculation may be expensive)""" 

79 return None 

80 

81 position = attribute.Attribute( 

82 description='Position of element', 

83 functions=[_calc_position] 

84 ) 

85 

86 @staticmethod 

87 def get_id(prefix=""): 

88 prefix_length = len(prefix) 

89 if prefix_length > 10: 

90 raise AttributeError("Max prefix length is 10!") 

91 Element._id_counter += 1 

92 return "{0:0<8s}{1:0>14d}".format(prefix, Element._id_counter) 

93 

94 @staticmethod 

95 def get_object(guid): 

96 """Get Element object instance with given guid 

97 

98 :returns: None if object with guid was not instanciated""" 

99 raise AssertionError("Obsolete method. " 

100 "Don't rely on global Element.objects. " 

101 "Use e.g. elements from tasks/playground.") 

102 

103 def request(self, name, external_decision: Decision = None) \ 

104 -> Union[None, Decision]: 

105 """Request the elements attribute. 

106 

107 Args: 

108 name: Name of attribute 

109 external_decision: Decision to use instead of default decision 

110 """ 

111 return self.attributes.request(name, external_decision) 

112 

113 def reset(self, name, data_source=AttributeDataSource.manual_overwrite): 

114 """Reset the attribute of the element. 

115 

116 Args: 

117 name: attribute name 

118 data_source (object): data source of the attribute 

119 """ 

120 

121 return self.attributes.reset(name, data_source) 

122 

123 def source_info(self) -> str: 

124 """Get informative string about source of Element.""" 

125 return '' 

126 

127 @classmethod 

128 def get_pending_attribute_decisions( 

129 cls, elements: Iterable['Element']) -> DecisionBunch: 

130 """Get all requested decisions of attributes and functions of attributes 

131 to afterwards calculate said attribute. 

132 

133 all decisions related to given elements are yielded. 

134 all attributes functions are used to calculate the remaining attributes 

135 """ 

136 

137 all_attr_decisions = DecisionBunch() 

138 for inst in elements: 

139 bunch = inst.attributes.get_decisions() 

140 all_attr_decisions.extend(bunch) 

141 

142 # sort decisions to preserve order 

143 all_attr_decisions.sort(key=lambda d: d.global_key) 

144 yield all_attr_decisions 

145 

146 @classmethod 

147 def full_reset(cls): 

148 raise AssertionError("Obsolete method. not required any more.") 

149 

150 

151class IFCBased(Element): 

152 """Element with instantiation from ifc and related methods. 

153 

154 Attributes: 

155 ifc: IfcOpenShell element instance 

156 ifc_types: Dict with ifc_type as key and list of predifined types that 

157 fit to the class as values. 

158 Special values for predifined types: 

159 '*' all which are not overwritten in other classes predfined types. 

160 '-Something' start with minus to exclude 

161 

162 For example: 

163 {'IfcSlab': ['*', '-SomethingSpecialWeDontWant', 'BASESLAB']} 

164 {'IfcRoof': ['FLAT_ROOF', 'SHED_ROOF',...], 

165 'IfcSlab': ['ROOF']}""" 

166 

167 ifc_types: Dict[str, List[str]] = None 

168 pattern_ifc_type = [] 

169 

170 def __init__(self, *args, 

171 ifc=None, 

172 finder: TemplateFinder = None, 

173 ifc_units: dict = None, 

174 ifc_domain: IFCDomain = None, 

175 **kwargs): 

176 super().__init__(*args, **kwargs) 

177 

178 self.ifc = ifc 

179 self.predefined_type = ifc2python.get_predefined_type(ifc) 

180 self.ifc_domain = ifc_domain 

181 self.finder = finder 

182 self.ifc_units = ifc_units 

183 self.source_tool: SourceTool = None 

184 

185 # TBD 

186 self.enrichment = {} # TODO: DJA 

187 self._propertysets = None 

188 self._type_propertysets = None 

189 self._decision_results = {} 

190 

191 @classmethod 

192 def ifc2args(cls, ifc) -> Tuple[tuple, dict]: 

193 """Extract init args and kwargs from ifc""" 

194 guid = getattr(ifc, 'GlobalId', None) 

195 kwargs = {'guid': guid, 'ifc': ifc} 

196 return (), kwargs 

197 

198 @classmethod 

199 def from_ifc(cls, ifc, *args, **kwargs): 

200 """Factory method to create instance from ifc""" 

201 ifc_args, ifc_kwargs = cls.ifc2args(ifc) 

202 kwargs.update(ifc_kwargs) 

203 return cls(*(args + ifc_args), **kwargs) 

204 

205 @property 

206 def ifc_type(self): 

207 if self.ifc: 

208 return self.ifc.is_a() 

209 

210 @classmethod 

211 def pre_validate(cls, ifc) -> bool: 

212 """Check if ifc meets conditions to create element from it""" 

213 raise NotImplementedError 

214 

215 def _calc_position(self, name): 

216 """returns absolute position""" 

217 if hasattr(self.ifc, 'ObjectPlacement'): 

218 absolute = np.array(self.ifc.ObjectPlacement.RelativePlacement.Location.Coordinates) 

219 placementrel = self.ifc.ObjectPlacement.PlacementRelTo 

220 while placementrel is not None: 

221 absolute += np.array(placementrel.RelativePlacement.Location.Coordinates) 

222 placementrel = placementrel.PlacementRelTo 

223 else: 

224 absolute = None 

225 

226 return absolute 

227 

228 def _get_name_from_ifc(self, name): 

229 ifc_name = self.get_ifc_attribute('Name') 

230 if ifc_name: 

231 return remove_umlaut(ifc_name) 

232 

233 name = attribute.Attribute( 

234 description="Name of element based on IFC attribute.", 

235 functions=[_get_name_from_ifc] 

236 ) 

237 

238 def get_ifc_attribute(self, attribute): 

239 """ 

240 Fetches non-empty attributes (if they exist). 

241 """ 

242 return getattr(self.ifc, attribute, None) 

243 

244 def get_propertyset(self, propertysetname): 

245 return ifc2python.get_property_set_by_name( 

246 propertysetname, self.ifc, self.ifc_units) 

247 

248 def get_propertysets(self): 

249 if self._propertysets is None: 

250 self._propertysets = ifc2python.get_property_sets( 

251 self.ifc, self.ifc_units) 

252 return self._propertysets 

253 

254 def get_type_propertysets(self): 

255 if self._type_propertysets is None: 

256 self._type_propertysets = ifc2python.get_type_property_sets( 

257 self.ifc, self.ifc_units) 

258 return self._type_propertysets 

259 

260 def get_hierarchical_parent(self): 

261 return ifc2python.getHierarchicalParent(self.ifc) 

262 

263 def get_hierarchical_children(self): 

264 return ifc2python.getHierarchicalChildren(self.ifc) 

265 

266 def get_spartial_parent(self): 

267 return ifc2python.getSpatialParent(self.ifc) 

268 

269 def get_spartial_children(self): 

270 return ifc2python.getSpatialChildren(self.ifc) 

271 

272 def get_space(self): 

273 return ifc2python.getSpace(self.ifc) 

274 

275 def get_storey(self): 

276 return ifc2python.getStorey(self.ifc) 

277 

278 def get_building(self): 

279 return ifc2python.getBuilding(self.ifc) 

280 

281 def get_site(self): 

282 return ifc2python.getSite(self.ifc) 

283 

284 def get_project(self): 

285 return ifc2python.getProject(self.ifc) 

286 

287 def get_true_north(self): 

288 return ifc2python.get_true_north(self.ifc) 

289 

290 def summary(self): 

291 return ifc2python.summary(self.ifc) 

292 

293 def search_property_hierarchy(self, propertyset_name): 

294 """Search for property in all related properties in hierarchical order. 

295 

296 1. element's propertysets 

297 2. element type's propertysets""" 

298 

299 p_set = None 

300 p_sets = self.get_propertysets() 

301 try: 

302 p_set = p_sets[propertyset_name] 

303 except KeyError: 

304 pass 

305 else: 

306 return p_set 

307 

308 pt_sets = self.get_type_propertysets() 

309 try: 

310 p_set = pt_sets[propertyset_name] 

311 except KeyError: 

312 pass 

313 else: 

314 return p_set 

315 return p_set 

316 

317 def inverse_properties(self): 

318 """Generator yielding tuples of PropertySet name and Property name""" 

319 for p_set_name, p_set in self.get_propertysets().items(): 

320 for p_name in p_set.keys(): 

321 yield (p_set_name, p_name) 

322 

323 def filter_properties(self, patterns): 

324 """filter all properties by re pattern 

325 

326 :returns: list of tuple(propertyset_name, property_name, match_graph)""" 

327 matches = [] 

328 for propertyset_name, property_name in self.inverse_properties(): 

329 for pattern in patterns: 

330 match = re.match(pattern, property_name) 

331 if match: 

332 matches.append((propertyset_name, property_name, match)) 

333 return matches 

334 

335 @classmethod 

336 def filter_for_text_fragments( 

337 cls, ifc_element, ifc_units: dict, optional_locations: list = None): 

338 """Filter for text fragments in the ifc_element to identify the ifc_element.""" 

339 results = [] 

340 hits = [p.search(ifc_element.Name) for p in cls.pattern_ifc_type] 

341 # hits.extend([p.search(ifc_element.Description or '') for p in cls.pattern_ifc_type]) 

342 hits = [x for x in hits if x is not None] 

343 if any(hits): 

344 quality_logger.info("Identified %s through text fracments in name. Criteria: %s", cls.ifc_type, hits) 

345 results.append(hits[0][0]) 

346 # return hits[0][0] 

347 if optional_locations: 

348 for loc in optional_locations: 

349 hits = [p.search(ifc2python.get_property_set_by_name( 

350 loc, ifc_element, ifc_units) or '') 

351 for p in cls.pattern_ifc_type 

352 if ifc2python.get_property_set_by_name( 

353 loc, ifc_element, ifc_units)] 

354 hits = [x for x in hits if x is not None] 

355 if any(hits): 

356 quality_logger.info("Identified %s through text fracments in %s. Criteria: %s", cls.ifc_type, loc, hits) 

357 results.append(hits[0][0]) 

358 return results if results else '' 

359 

360 def get_exact_property(self, propertyset_name: str, property_name: str): 

361 """Returns value of property specified by propertyset name and property name 

362 

363 :Raises: AttributeError if property does not exist""" 

364 try: 

365 p_set = self.search_property_hierarchy(propertyset_name) 

366 value = p_set[property_name] 

367 except (AttributeError, KeyError, TypeError): 

368 raise NoValueError("Property '%s.%s' does not exist" % ( 

369 propertyset_name, property_name)) 

370 return value 

371 

372 def select_from_potential_properties(self, patterns, name, 

373 collect_decisions): 

374 """Ask user to select from all properties matching patterns""" 

375 

376 matches = self.filter_properties(patterns) 

377 if matches: 

378 values = [] 

379 choices = [] 

380 for propertyset_name, property_name, match in matches: 

381 value = self.get_exact_property(propertyset_name, property_name) 

382 values.append(value) 

383 choices.append((propertyset_name, property_name)) 

384 # print("%s.%s = %s"%(propertyset_name, property_name, value)) 

385 

386 # TODO: Decision: save for all following elements of same class ( 

387 # dont ask again?) 

388 # selected = (propertyset_name, property_name, value) 

389 

390 distinct_values = set(values) 

391 if len(distinct_values) == 1: 

392 # multiple sources but common value 

393 return distinct_values.pop() 

394 else: 

395 quality_logger.warning('Found multiple values for attributes %s of instance %s' % ( 

396 ', '.join((str((m[0], m[1])) for m in matches)), self)) 

397 return distinct_values 

398 

399 return None 

400 

401 # # TODO: Decision with id, key, value 

402 # decision = DictDecision("Multiple possibilities found", 

403 # choices=dict(zip(choices, values)), 

404 # output=self.attributes, 

405 # key=name, 

406 # global_key="%s_%s.%s" % (self.ifc_type, 

407 # self.guid, name), 

408 # allow_skip=True, allow_load=True, 

409 # allow_save=True, 

410 # collect=collect_decisions, 

411 # quick_decide=not collect_decisions) 

412 # 

413 # if collect_decisions: 

414 # raise PendingDecisionError() 

415 # 

416 # return decision.value 

417 # raise NoValueError("No matching property for %s" % (patterns)) 

418 

419 def source_info(self) -> str: 

420 return f'{self.ifc_type}:{self.guid}' 

421 

422 

423class RelationBased(IFCBased): 

424 conditions = [] 

425 

426 def __repr__(self): 

427 return "<%s (guid: %s)>" % (self.__class__.__name__, self.guid) 

428 

429 def __str__(self): 

430 return "%s" % self.__class__.__name__ 

431 

432 

433class RelationBased(IFCBased): 

434 

435 pass 

436 

437 

438class ProductBased(IFCBased): 

439 """Elements based on IFC products. 

440 

441 Args: 

442 material: material of the element 

443 material_set: dict of material and fraction [0, 1] if multiple materials 

444 """ 

445 domain = 'GENERAL' 

446 key: str = '' 

447 key_map: Dict[str, 'Type[ProductBased]'] = {} 

448 conditions = [] 

449 

450 def __init__(self, *args, **kwargs): 

451 super().__init__(*args, **kwargs) 

452 self.aggregation = None 

453 self.ports = self.get_ports() 

454 self.material = None 

455 self.material_set = {} 

456 self.cost_group = self.calc_cost_group() 

457 

458 def __init_subclass__(cls, **kwargs): 

459 # set key for each class 

460 cls.key = f'{cls.domain}-{cls.__name__}' 

461 cls.key_map[cls.key] = cls 

462 

463 def get_ports(self): 

464 return [] 

465 

466 def get_better_subclass(self) -> Union[None, Type['IFCBased']]: 

467 """Returns alternative subclass of current object. 

468 CAUTION: only use this if you can't know the result before instantiation 

469 of base class 

470 

471 Returns: 

472 object: subclass of ProductBased or None""" 

473 return None 

474 

475 @property 

476 def neighbors(self): 

477 """Directly connected elements""" 

478 neighbors = [] 

479 for port in self.ports: 

480 if port.connection: 

481 neighbors.append(port.connection.parent) 

482 return neighbors 

483 

484 def validate_creation(self): 

485 """"Validate the element creation in two steps. 

486 1. Check if standard parameter are in valid range. 

487 2. Check if number of ports are equal to number of expected ports 

488 (only for HVAC). 

489 """ 

490 for cond in self.conditions: 

491 if cond.critical_for_creation: 

492 value = getattr(self, cond.key) 

493 # don't prevent creation if value is not existing 

494 if value: 

495 if not cond.check(self, value): 

496 logger.warning("%s validation (%s) failed for %s", self.ifc_type, cond.name, self.guid) 

497 return False 

498 if not self.validate_ports(): 

499 logger.warning("%s has %d ports, but %s expected for %s", self.ifc_type, len(self.ports), 

500 self.expected_hvac_ports, self.guid) 

501 return False 

502 return True 

503 

504 def validate_attributes(self) -> dict: 

505 """Check if all attributes are valid, returns dict with key = attribute 

506 and value = True or False""" 

507 results = {} 

508 for cond in self.conditions: 

509 if not cond.critical_for_creation: 

510 # todo 

511 pass 

512 # if not cond.check(self): 

513 # logger.warning("%s validation (%s) failed for %s", 

514 # self.ifc_type, cond.name, self.guid) 

515 # return False 

516 # return True 

517 return results 

518 

519 def validate_ports(self): 

520 return True 

521 

522 def __repr__(self): 

523 return "<%s>" % self.__class__.__name__ 

524 

525 def calc_cost_group(self) -> Optional[int]: 

526 """Calculate the cost group according to DIN276""" 

527 return None 

528 

529 def calc_volume_from_ifc_shape(self): 

530 # todo use more efficient iterator to calc all shapes at once 

531 # with multiple cores: 

532 # https://wiki.osarch.org/index.php?title=IfcOpenShell_code_examples 

533 if hasattr(self.ifc, 'Representation'): 

534 try: 

535 shape = ifcopenshell.geom.create_shape( 

536 settings_products, self.ifc).geometry 

537 vol = PyOCCTools.get_shape_volume(shape) 

538 vol = vol * ureg.meter ** 3 

539 return vol 

540 except: 

541 logger.warning(f"No calculation of geometric volume possible " 

542 f"for {self.ifc}.") 

543 

544 def _get_volume(self, name): 

545 if hasattr(self, "net_volume"): 

546 if self.net_volume: 

547 vol = self.net_volume 

548 return vol 

549 vol = self.calc_volume_from_ifc_shape() 

550 return vol 

551 

552 volume = attribute.Attribute( 

553 description="Volume of the attribute", 

554 functions=[_get_volume], 

555 ) 

556 

557 def __str__(self): 

558 return "<%s>" % (self.__class__.__name__) 

559 

560 

561class Port(RelationBased): 

562 """Basic port""" 

563 

564 def __init__(self, parent, *args, **kwargs): 

565 super().__init__(*args, **kwargs) 

566 self.parent: ProductBased = parent 

567 self.connection = None 

568 

569 def connect(self, other): 

570 """Connect this interface bidirectional to another interface""" 

571 assert isinstance(other, Port), \ 

572 "Can't connect interfaces of different classes." 

573 # if self.flow_direction == 'SOURCE' or \ 

574 # self.flow_direction == 'SOURCEANDSINK': 

575 if self.connection and self.connection is not other: 

576 raise AttributeError("Port is already connected!") 

577 if other.connection and other.connection is not self: 

578 raise AttributeError("Other port is already connected!") 

579 self.connection = other 

580 other.connection = self 

581 

582 def disconnect(self): 

583 """remove connection between self and other port""" 

584 other = self.connection 

585 if other: 

586 self.connection = None 

587 other.disconnect() 

588 

589 def is_connected(self): 

590 """Returns truth value of port's connection""" 

591 return bool(self.connection) 

592 

593 def __repr__(self): 

594 if self.parent: 

595 try: 

596 idx = self.parent.ports.index(self) 

597 return "<%s #%d of %s)>" % ( 

598 self.__class__.__name__, idx, self.parent) 

599 except ValueError: 

600 return "<%s (broken parent: %s)>" % ( 

601 self.__class__.__name__, self.parent) 

602 return "<%s (*abandoned*)>" % self.__class__.__name__ 

603 

604 def __str__(self): 

605 return self.__repr__()[1:-2] 

606 

607 

608class Material(ProductBased): 

609 guid_prefix = 'Material_' 

610 key: str = 'Material' 

611 ifc_types = { 

612 'IfcMaterial': ["*"] 

613 } 

614 name = '' 

615 

616 def __init__(self, *args, **kwargs): 

617 super().__init__(*args, **kwargs) 

618 self.parents: List[ProductBased] = [] 

619 

620 @staticmethod 

621 def get_id(prefix=""): 

622 prefix_length = len(prefix) 

623 if prefix_length > 10: 

624 raise AttributeError("Max prefix length is 10!") 

625 ifcopenshell_guid = guid.new()[prefix_length + 1:] 

626 return f"{prefix}{ifcopenshell_guid}" 

627 

628 conditions = [ 

629 condition.RangeCondition('spec_heat_capacity', 

630 0 * ureg.kilojoule / (ureg.kg * ureg.K), 

631 5 * ureg.kilojoule / (ureg.kg * ureg.K), 

632 critical_for_creation=False), 

633 condition.RangeCondition('density', 

634 0 * ureg.kg / ureg.m ** 3, 

635 50000 * ureg.kg / ureg.m ** 3, 

636 critical_for_creation=False), 

637 condition.RangeCondition('thermal_conduc', 

638 0 * ureg.W / ureg.m / ureg.K, 

639 100 * ureg.W / ureg.m / ureg.K, 

640 critical_for_creation=False), 

641 condition.RangeCondition('porosity', 

642 0 * ureg.dimensionless, 

643 1 * ureg.dimensionless, True, 

644 critical_for_creation=False), 

645 condition.RangeCondition('solar_absorp', 

646 0 * ureg.percent, 

647 1 * ureg.percent, True, 

648 critical_for_creation=False), 

649 ] 

650 

651 spec_heat_capacity = attribute.Attribute( 

652 default_ps=("Pset_MaterialThermal", "SpecificHeatCapacity"), 

653 # functions=[get_from_template], 

654 unit=ureg.kilojoule / (ureg.kg * ureg.K) 

655 ) 

656 

657 density = attribute.Attribute( 

658 default_ps=("Pset_MaterialCommon", "MassDensity"), 

659 unit=ureg.kg / ureg.m ** 3 

660 ) 

661 

662 thermal_conduc = attribute.Attribute( 

663 default_ps=("Pset_MaterialThermal", "ThermalConductivity"), 

664 unit=ureg.W / (ureg.m * ureg.K) 

665 ) 

666 

667 porosity = attribute.Attribute( 

668 default_ps=("Pset_MaterialCommon", "Porosity"), 

669 unit=ureg.dimensionless 

670 ) 

671 

672 # todo is percent the correct unit? (0-1) 

673 solar_absorp = attribute.Attribute( 

674 # default_ps=('Pset_MaterialOptical', 'SolarTransmittance'), 

675 default=0.7, 

676 unit=ureg.percent 

677 ) 

678 

679 def __repr__(self): 

680 if self.name: 

681 return "<%s %s>" % (self.__class__.__name__, self.name) 

682 else: 

683 return "<%s>" % self.__class__.__name__ 

684 

685 

686class Dummy(ProductBased): 

687 """Dummy for all unknown elements""" 

688 

689 ifc_types = { 

690 "IfcElementProxy": ['*'] 

691 } 

692 

693 # def __init__(self, *args, **kwargs): 

694 # super().__init__(*args, **kwargs) 

695 # 

696 # self._ifc_type = self.ifc.get_info()['type'] 

697 

698 @property 

699 def ifc_type(self): 

700 return self._ifc_type 

701 

702 def __str__(self): 

703 return "Dummy '%s'" % self.ifc_type 

704 

705 

706class Factory: 

707 """Element Factory for :class: `ProductBased` 

708 

709 To understand the concept of the factory class, we refer to this article: 

710 https://refactoring.guru/design-patterns/factory-method/python/example 

711 

712 Example: 

713 factory = Factory([Pipe, Boiler], dummy) 

714 ele = factory(some_ifc_element) 

715 """ 

716 

717 def __init__( 

718 self, 

719 relevant_elements: set[ProductBased], 

720 ifc_units: dict, 

721 ifc_domain: IFCDomain, 

722 finder: Union[TemplateFinder, None] = None, 

723 dummy=Dummy): 

724 self.mapping, self.blacklist, self.defaults = self.create_ifc_mapping(relevant_elements) 

725 self.dummy_cls = dummy 

726 self.ifc_domain = ifc_domain 

727 self.finder = finder 

728 self.ifc_units = ifc_units 

729 

730 def __call__(self, ifc_entity, *args, ifc_type: str = None, use_dummy=True, 

731 **kwargs) -> ProductBased: 

732 """Run factory to create element instance. 

733 

734 Calls self.create() function but before checks which element_cls is the 

735 correct mapping for the given ifc_entity. 

736 

737 Args: 

738 ifc_entity: IfcOpenShell entity 

739 args: additional args passed to element 

740 ifc_type: ify type to create element for. 

741 defaults to ifc_entity.is_a() 

742 use_dummy: use dummy class if nothing is found 

743 kwargs: additional kwargs passed to element 

744 Raises: 

745 LookupError: if no element found and use_dummy = False 

746 Returns: 

747 element: created element instance 

748 """ 

749 _ifc_type = ifc_type or ifc_entity.is_a() 

750 predefined_type = ifc2python.get_predefined_type(ifc_entity) 

751 element_cls = self.get_element(_ifc_type, predefined_type) 

752 if not element_cls: 

753 if use_dummy: 

754 element_cls = self.dummy_cls 

755 else: 

756 raise LookupError(f"No element found for {ifc_entity}") 

757 # TODO # 537 Put this to a point where it makes sense, return None is no 

758 # solution 

759 if hasattr(element_cls, 'from_ifc_domains'): 

760 if self.ifc_domain not in element_cls.from_ifc_domains: 

761 logger.warning( 

762 f"Element has {self.ifc_domain} but f{element_cls.__name__}" 

763 f" will only be created for IFC files of domain " 

764 f"{element_cls.from_ifc_domains}.") 

765 raise IFCDomainError( 

766 f"Element has {self.ifc_domain} but f{element_cls.__name__}" 

767 f" will only be created for IFC files of domain " 

768 f"{element_cls.from_ifc_domains}") 

769 

770 element = self.create(element_cls, ifc_entity, *args, **kwargs) 

771 return element 

772 

773 def create(self, element_cls, ifc_entity, *args, **kwargs): 

774 """Create Element from class and ifc""" 

775 # instantiate element 

776 

777 element = element_cls.from_ifc( 

778 ifc_entity, ifc_domain=self.ifc_domain, finder=self.finder, 

779 ifc_units=self.ifc_units, *args, **kwargs) 

780 # check if it prefers to be sth else 

781 better_cls = element.get_better_subclass() 

782 if better_cls: 

783 logger.info("Creating %s instead of %s", better_cls, element_cls) 

784 element = better_cls.from_ifc( 

785 ifc_entity, 

786 ifc_domain=self.ifc_domain, 

787 finder=self.finder, 

788 ifc_units=self.ifc_units, 

789 *args, **kwargs) 

790 return element 

791 

792 def get_element(self, ifc_type: str, predefined_type: Union[str, None]) -> \ 

793 Union[ProductBased, None]: 

794 """Get element class by ifc type and predefined type""" 

795 if predefined_type: 

796 key = (ifc_type.lower(), predefined_type.upper()) 

797 # 1. go over normal list, if found match_graph --> return 

798 element = self.mapping.get(key) 

799 if element: 

800 return element 

801 # 2. go over negative list, if found match_graph --> not existing 

802 if key in self.blacklist: 

803 return None 

804 # 3. go over default list, if found match_graph --> return 

805 return self.defaults.get(ifc_type.lower()) 

806 

807 # def _get_by_guid(self, guid: str) -> Union[ProductBased, None]: 

808 # """Get item from given guid created by this factory.""" 

809 # return self._objects.get(guid) 

810 # 

811 # def _get_by_cls(self, item_cls: Type[ProductBased]) -> List[ProductBased]: 

812 # """Get list of child items from given class created by this factory.""" 

813 # return [item for item in self._objects.values() 

814 # if isinstance(item, item_cls)] 

815 

816 @staticmethod 

817 def create_ifc_mapping(elements: Iterable) -> Tuple[ 

818 Dict[Tuple[str, str], ProductBased], 

819 List[Tuple[str, ProductBased]], 

820 Dict[str, ProductBased] 

821 ]: 

822 """Create mapping dict, blacklist and default dict from elements 

823 

824 WARNING: ifc_type is always converted to lower case 

825 and predefined types to upper case 

826 

827 Returns: 

828 mapping: dict of ifc_type and predefined_type to element class 

829 blacklist: list of ifc_type which will not be taken into account 

830 default: dict of ifc_type to element class 

831 """ 

832 # TODO: cover virtual elements e.g. Space Boundaries (not products) 

833 

834 mapping = {} 

835 blacklist = [] 

836 default = {} 

837 _all_ifc_types = set() 

838 

839 for ele in elements: 

840 for ifc_type, tokens in ele.ifc_types.items(): 

841 _all_ifc_types.add(ifc_type.lower()) 

842 for token in tokens: 

843 # create default dict where all stars are taken into account 

844 # items 'IfcSlab': Slab 

845 if token == '*': 

846 if ifc_type in default: 

847 raise NameError(f"Conflicting default ifc_types for {ifc_type}") # TBD 

848 default[ifc_type.lower()] = ele 

849 # create blacklist where all - are taken into account 

850 # items: ('IfcRoof', 'WeiredStuff') 

851 elif token.startswith('-'): 

852 blacklist.append((ifc_type.lower(), token[1:].upper())) 

853 # create mapping dict 

854 # items ('IfcSlab', 'Roof'): Roof 

855 else: 

856 mapping[(ifc_type.lower(), token.upper())] = ele 

857 

858 # check ifc types without default 

859 no_default = _all_ifc_types - set(default) 

860 if no_default: 

861 logger.warning("The following ifc types have no default " 

862 "representing Element class. There will be no " 

863 "match if predefined type is not provided.\n%s", 

864 no_default) 

865 return mapping, blacklist, default 

866 

867 

868class SerializedElement: 

869 """Serialized version of an element. 

870 

871 This is a workaround as we can't serialize elements due to the usage of 

872 IfcOpenShell which uses unpickable swigPy objects. We just store the most 

873 important information which are guid, element_type, storeys, aggregated 

874 elements and the attributes from the attribute system.""" 

875 def __init__(self, element): 

876 self.guid = element.guid 

877 self.element_type = element.__class__.__name__ 

878 for attr_name, attr_val in element.attributes.items(): 

879 # assign value directly to attribute without status 

880 # make sure to get the value 

881 value = getattr(element, attr_name) 

882 if self.is_picklable(value): 

883 setattr(self, attr_name, value) 

884 else: 

885 logger.info( 

886 f"Attribute {attr_name} will not be serialized, as it's " 

887 f"not pickleable") 

888 if hasattr(element, "space_boundaries"): 

889 self.space_boundaries = [bound.guid for bound in 

890 element.space_boundaries] 

891 if hasattr(element, "storeys"): 

892 self.storeys = [storey.guid for storey in element.storeys] 

893 if issubclass(element.__class__, AggregationMixin): 

894 self.elements = [ele.guid for ele in element.elements] 

895 

896 @staticmethod 

897 def is_picklable(value: Any) -> bool: 

898 """Determines if a given value is picklable. 

899 

900 This method attempts to serialize the provided value using the `pickle` module. 

901 If the value can be successfully serialized, it is considered picklable. 

902 

903 Args: 

904 value (Any): The value to be tested for picklability. 

905 

906 Returns: 

907 bool: True if the value is picklable, False otherwise. 

908 """ 

909 try: 

910 pickle.dumps(value) 

911 return True 

912 except (pickle.PicklingError, TypeError): 

913 return False 

914 

915 def __repr__(self): 

916 return "<serialized %s (guid: '%s')>" % ( 

917 self.element_type, self.guid)