Coverage for bim2sim/elements/base_elements.py: 76%

445 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-05-16 08:28 +0000

1import logging 

2import pickle 

3import re 

4from json import JSONEncoder 

5from typing import Union, Iterable, Dict, List, Tuple, Type, Optional, Any 

6 

7import numpy as np 

8import ifcopenshell.geom 

9from ifcopenshell import guid 

10 

11from bim2sim.elements.aggregation import AggregationMixin 

12from bim2sim.kernel.decision import Decision, DecisionBunch 

13from bim2sim.kernel import IFCDomainError 

14from bim2sim.elements.mapping import condition, attribute, ifc2python 

15from bim2sim.elements.mapping.finder import TemplateFinder, SourceTool 

16from bim2sim.elements.mapping.units import ureg 

17from bim2sim.utilities.common_functions import angle_equivalent, vector_angle, \ 

18 remove_umlaut 

19from bim2sim.utilities.pyocc_tools import PyOCCTools 

20from bim2sim.utilities.types import IFCDomain, AttributeDataSource 

21 

22logger = logging.getLogger(__name__) 

23quality_logger = logging.getLogger('bim2sim.QualityReport') 

24settings_products = ifcopenshell.geom.main.settings() 

25settings_products.set(settings_products.USE_PYTHON_OPENCASCADE, True) 

26 

27 

28class ElementError(Exception): 

29 """Error in Element""" 

30 

31 

32class NoValueError(ElementError): 

33 """Value is not available""" 

34 

35 

36class ElementEncoder(JSONEncoder): 

37 """Encoder class for Element""" 

38 

39 # TODO: make Elements serializable and deserializable. 

40 # Ideas: guid to identify, (factory) method to (re)init by guid 

41 # mayby weakref to other elements (Ports, connections, ...) 

42 

43 def default(self, o): 

44 if isinstance(o, Element): 

45 return "<Element(%s)>" % o.guid 

46 return JSONEncoder.default() 

47 

48 

49class Element(metaclass=attribute.AutoAttributeNameMeta): 

50 """Most basic class""" 

51 guid_prefix = '' 

52 _id_counter = 0 

53 

54 def __init__(self, guid=None, **kwargs): 

55 self.guid = guid or self.get_id(self.guid_prefix) 

56 # self.related_decisions: List[Decision] = [] 

57 self.attributes = attribute.AttributeManager(bind=self) 

58 

59 # set attributes based on kwargs 

60 for kw, arg in kwargs.items(): 

61 if kw in self.attributes: # allow only attributes 

62 setattr(self, kw, arg) 

63 else: 

64 raise AttributeError(f"Unused argument in kwargs: {kw}: {arg}") 

65 

66 def __hash__(self): 

67 return hash(self.guid) 

68 

69 def validate_creation(self) -> bool: 

70 """Check if current instance is valid""" 

71 raise NotImplementedError 

72 

73 def validate_attributes(self) -> dict: 

74 """Check if attributes are valid""" 

75 return {} 

76 

77 def _calc_position(self, name) -> np.array: 

78 """Returns position (calculation may be expensive)""" 

79 return None 

80 

81 position = attribute.Attribute( 

82 description='Position of element', 

83 functions=[_calc_position] 

84 ) 

85 

86 @staticmethod 

87 def get_id(prefix=""): 

88 prefix_length = len(prefix) 

89 if prefix_length > 10: 

90 raise AttributeError("Max prefix length is 10!") 

91 Element._id_counter += 1 

92 return "{0:0<8s}{1:0>14d}".format(prefix, Element._id_counter) 

93 

94 @staticmethod 

95 def get_object(guid): 

96 """Get Element object instance with given guid 

97 

98 :returns: None if object with guid was not instanciated""" 

99 raise AssertionError("Obsolete method. " 

100 "Don't rely on global Element.objects. " 

101 "Use e.g. elements from tasks/playground.") 

102 

103 def request(self, name, external_decision: Decision = None) \ 

104 -> Union[None, Decision]: 

105 """Request the elements attribute. 

106 

107 Args: 

108 name: Name of attribute 

109 external_decision: Decision to use instead of default decision 

110 """ 

111 return self.attributes.request(name, external_decision) 

112 

113 def reset(self, name, data_source=AttributeDataSource.manual_overwrite): 

114 """Reset the attribute of the element. 

115 

116 Args: 

117 name: attribute name 

118 data_source (object): data source of the attribute 

119 """ 

120 

121 return self.attributes.reset(name, data_source) 

122 

123 def source_info(self) -> str: 

124 """Get informative string about source of Element.""" 

125 return '' 

126 

127 @classmethod 

128 def get_pending_attribute_decisions( 

129 cls, elements: Iterable['Element']) -> DecisionBunch: 

130 """Get all requested decisions of attributes and functions of attributes 

131 to afterwards calculate said attribute. 

132 

133 all decisions related to given elements are yielded. 

134 all attributes functions are used to calculate the remaining attributes 

135 """ 

136 

137 all_attr_decisions = DecisionBunch() 

138 for inst in elements: 

139 bunch = inst.attributes.get_decisions() 

140 all_attr_decisions.extend(bunch) 

141 

142 # sort decisions to preserve order 

143 all_attr_decisions.sort(key=lambda d: d.global_key) 

144 yield all_attr_decisions 

145 

146 @classmethod 

147 def full_reset(cls): 

148 raise AssertionError("Obsolete method. not required any more.") 

149 

150 

151class IFCBased(Element): 

152 """Element with instantiation from ifc and related methods. 

153 

154 Attributes: 

155 ifc: IfcOpenShell element instance 

156 ifc_types: Dict with ifc_type as key and list of predifined types that 

157 fit to the class as values. 

158 Special values for predifined types: 

159 '*' all which are not overwritten in other classes predfined types. 

160 '-Something' start with minus to exclude 

161 

162 For example: 

163 {'IfcSlab': ['*', '-SomethingSpecialWeDontWant', 'BASESLAB']} 

164 {'IfcRoof': ['FLAT_ROOF', 'SHED_ROOF',...], 

165 'IfcSlab': ['ROOF']}""" 

166 

167 ifc_types: Dict[str, List[str]] = None 

168 pattern_ifc_type = [] 

169 

170 def __init__(self, *args, 

171 ifc=None, 

172 finder: TemplateFinder = None, 

173 ifc_units: dict = None, 

174 ifc_domain: IFCDomain = None, 

175 **kwargs): 

176 super().__init__(*args, **kwargs) 

177 

178 self.ifc = ifc 

179 self.predefined_type = ifc2python.get_predefined_type(ifc) 

180 self.ifc_domain = ifc_domain 

181 self.finder = finder 

182 self.ifc_units = ifc_units 

183 self.source_tool: SourceTool = None 

184 

185 # TBD 

186 self.enrichment = {} # TODO: DJA 

187 self._propertysets = None 

188 self._type_propertysets = None 

189 self._decision_results = {} 

190 

191 @classmethod 

192 def ifc2args(cls, ifc) -> Tuple[tuple, dict]: 

193 """Extract init args and kwargs from ifc""" 

194 guid = getattr(ifc, 'GlobalId', None) 

195 kwargs = {'guid': guid, 'ifc': ifc} 

196 return (), kwargs 

197 

198 @classmethod 

199 def from_ifc(cls, ifc, *args, **kwargs): 

200 """Factory method to create instance from ifc""" 

201 ifc_args, ifc_kwargs = cls.ifc2args(ifc) 

202 kwargs.update(ifc_kwargs) 

203 return cls(*(args + ifc_args), **kwargs) 

204 

205 @property 

206 def ifc_type(self): 

207 if self.ifc: 

208 return self.ifc.is_a() 

209 

210 @classmethod 

211 def pre_validate(cls, ifc) -> bool: 

212 """Check if ifc meets conditions to create element from it""" 

213 raise NotImplementedError 

214 

215 def _calc_position(self, name): 

216 """returns absolute position""" 

217 if hasattr(self.ifc, 'ObjectPlacement'): 

218 absolute = np.array(self.ifc.ObjectPlacement.RelativePlacement.Location.Coordinates) 

219 placementrel = self.ifc.ObjectPlacement.PlacementRelTo 

220 while placementrel is not None: 

221 absolute += np.array(placementrel.RelativePlacement.Location.Coordinates) 

222 placementrel = placementrel.PlacementRelTo 

223 else: 

224 absolute = None 

225 

226 return absolute 

227 

228 def _get_name_from_ifc(self, name): 

229 ifc_name = self.get_ifc_attribute('Name') 

230 if ifc_name: 

231 return remove_umlaut(ifc_name) 

232 

233 name = attribute.Attribute( 

234 description="Name of element based on IFC attribute.", 

235 functions=[_get_name_from_ifc] 

236 ) 

237 

238 def get_ifc_attribute(self, attribute): 

239 """ 

240 Fetches non-empty attributes (if they exist). 

241 """ 

242 return getattr(self.ifc, attribute, None) 

243 

244 def get_propertyset(self, propertysetname): 

245 return ifc2python.get_property_set_by_name( 

246 propertysetname, self.ifc, self.ifc_units) 

247 

248 def get_propertysets(self): 

249 if self._propertysets is None: 

250 self._propertysets = ifc2python.get_property_sets( 

251 self.ifc, self.ifc_units) 

252 return self._propertysets 

253 

254 def get_type_propertysets(self): 

255 if self._type_propertysets is None: 

256 self._type_propertysets = ifc2python.get_type_property_sets( 

257 self.ifc, self.ifc_units) 

258 return self._type_propertysets 

259 

260 def get_hierarchical_parent(self): 

261 return ifc2python.getHierarchicalParent(self.ifc) 

262 

263 def get_hierarchical_children(self): 

264 return ifc2python.getHierarchicalChildren(self.ifc) 

265 

266 def get_spartial_parent(self): 

267 return ifc2python.getSpatialParent(self.ifc) 

268 

269 def get_spartial_children(self): 

270 return ifc2python.getSpatialChildren(self.ifc) 

271 

272 def get_space(self): 

273 return ifc2python.getSpace(self.ifc) 

274 

275 def get_storey(self): 

276 return ifc2python.getStorey(self.ifc) 

277 

278 def get_building(self): 

279 return ifc2python.getBuilding(self.ifc) 

280 

281 def get_site(self): 

282 return ifc2python.getSite(self.ifc) 

283 

284 def get_project(self): 

285 return ifc2python.getProject(self.ifc) 

286 

287 def get_true_north(self): 

288 return ifc2python.get_true_north(self.ifc) 

289 

290 def summary(self): 

291 return ifc2python.summary(self.ifc) 

292 

293 def search_property_hierarchy(self, propertyset_name): 

294 """Search for property in all related properties in hierarchical order. 

295 

296 1. element's propertysets 

297 2. element type's propertysets""" 

298 

299 p_set = None 

300 p_sets = self.get_propertysets() 

301 try: 

302 p_set = p_sets[propertyset_name] 

303 except KeyError: 

304 pass 

305 else: 

306 return p_set 

307 

308 pt_sets = self.get_type_propertysets() 

309 try: 

310 p_set = pt_sets[propertyset_name] 

311 except KeyError: 

312 pass 

313 else: 

314 return p_set 

315 return p_set 

316 

317 def inverse_properties(self): 

318 """Generator yielding tuples of PropertySet name and Property name""" 

319 for p_set_name, p_set in self.get_propertysets().items(): 

320 for p_name in p_set.keys(): 

321 yield (p_set_name, p_name) 

322 

323 def filter_properties(self, patterns): 

324 """filter all properties by re pattern 

325 

326 :returns: list of tuple(propertyset_name, property_name, match_graph)""" 

327 matches = [] 

328 for propertyset_name, property_name in self.inverse_properties(): 

329 for pattern in patterns: 

330 match = re.match(pattern, property_name) 

331 if match: 

332 matches.append((propertyset_name, property_name, match)) 

333 return matches 

334 

335 @classmethod 

336 def filter_for_text_fragments(cls, ifc_element, ifc_units: dict, 

337 optional_locations: list = None): 

338 """Find text fragments that match the class patterns in an IFC element. 

339 

340 Args: 

341 ifc_element: The IFC element to check. 

342 ifc_units: Dictionary containing IFC unit information. 

343 optional_locations: Additional locations to check patterns beyond 

344 name. Defaults to None. 

345 

346 Returns: 

347 list: List of matched fragments, empty list if no matches found. 

348 """ 

349 results = [] 

350 

351 # Check name matches 

352 name_hits = [p.search(ifc_element.Name) for p in cls.pattern_ifc_type] 

353 name_hits = [hit for hit in name_hits if hit is not None] 

354 if name_hits: 

355 quality_logger.info( 

356 f"Identified {cls.ifc_type} through text fragments in name. " 

357 f"Criteria: {name_hits}") 

358 results.append(name_hits[0][0]) 

359 

360 # Check optional locations 

361 if optional_locations: 

362 for loc in optional_locations: 

363 prop_value = ifc2python.get_property_set_by_name( 

364 loc, ifc_element, ifc_units) 

365 if not prop_value: 

366 continue 

367 

368 loc_hits = [p.search(prop_value) for p in cls.pattern_ifc_type] 

369 loc_hits = [hit for hit in loc_hits if hit is not None] 

370 if loc_hits: 

371 quality_logger.info( 

372 f"Identified {cls.ifc_type} through text fragments " 

373 f"in {loc}. Criteria: {loc_hits}") 

374 results.append(loc_hits[0][0]) 

375 

376 return results 

377 

378 def get_exact_property(self, propertyset_name: str, property_name: str): 

379 """Returns value of property specified by propertyset name and property name 

380 

381 :Raises: AttributeError if property does not exist""" 

382 try: 

383 p_set = self.search_property_hierarchy(propertyset_name) 

384 value = p_set[property_name] 

385 except (AttributeError, KeyError, TypeError): 

386 raise NoValueError("Property '%s.%s' does not exist" % ( 

387 propertyset_name, property_name)) 

388 return value 

389 

390 def select_from_potential_properties(self, patterns, name, 

391 collect_decisions): 

392 """Ask user to select from all properties matching patterns""" 

393 

394 matches = self.filter_properties(patterns) 

395 if matches: 

396 values = [] 

397 choices = [] 

398 for propertyset_name, property_name, match in matches: 

399 value = self.get_exact_property(propertyset_name, property_name) 

400 values.append(value) 

401 choices.append((propertyset_name, property_name)) 

402 # print("%s.%s = %s"%(propertyset_name, property_name, value)) 

403 

404 # TODO: Decision: save for all following elements of same class ( 

405 # dont ask again?) 

406 # selected = (propertyset_name, property_name, value) 

407 

408 distinct_values = set(values) 

409 if len(distinct_values) == 1: 

410 # multiple sources but common value 

411 return distinct_values.pop() 

412 else: 

413 quality_logger.warning('Found multiple values for attributes %s of instance %s' % ( 

414 ', '.join((str((m[0], m[1])) for m in matches)), self)) 

415 return distinct_values 

416 

417 return None 

418 

419 # # TODO: Decision with id, key, value 

420 # decision = DictDecision("Multiple possibilities found", 

421 # choices=dict(zip(choices, values)), 

422 # output=self.attributes, 

423 # key=name, 

424 # global_key="%s_%s.%s" % (self.ifc_type, 

425 # self.guid, name), 

426 # allow_skip=True, allow_load=True, 

427 # allow_save=True, 

428 # collect=collect_decisions, 

429 # quick_decide=not collect_decisions) 

430 # 

431 # if collect_decisions: 

432 # raise PendingDecisionError() 

433 # 

434 # return decision.value 

435 # raise NoValueError("No matching property for %s" % (patterns)) 

436 

437 def source_info(self) -> str: 

438 return f'{self.ifc_type}:{self.guid}' 

439 

440 

441class RelationBased(IFCBased): 

442 conditions = [] 

443 

444 def __repr__(self): 

445 return "<%s (guid: %s)>" % (self.__class__.__name__, self.guid) 

446 

447 def __str__(self): 

448 return "%s" % self.__class__.__name__ 

449 

450 

451class RelationBased(IFCBased): 

452 

453 pass 

454 

455 

456class ProductBased(IFCBased): 

457 """Elements based on IFC products. 

458 

459 Args: 

460 material: material of the element 

461 material_set: dict of material and fraction [0, 1] if multiple materials 

462 """ 

463 domain = 'GENERAL' 

464 key: str = '' 

465 key_map: Dict[str, 'Type[ProductBased]'] = {} 

466 conditions = [] 

467 

468 def __init__(self, *args, **kwargs): 

469 super().__init__(*args, **kwargs) 

470 self.aggregation = None 

471 self.ports = self.get_ports() 

472 self.material = None 

473 self.material_set = {} 

474 self.cost_group = self.calc_cost_group() 

475 

476 def __init_subclass__(cls, **kwargs): 

477 # set key for each class 

478 cls.key = f'{cls.domain}-{cls.__name__}' 

479 cls.key_map[cls.key] = cls 

480 

481 def get_ports(self): 

482 return [] 

483 

484 def get_better_subclass(self) -> Union[None, Type['IFCBased']]: 

485 """Returns alternative subclass of current object. 

486 CAUTION: only use this if you can't know the result before instantiation 

487 of base class 

488 

489 Returns: 

490 object: subclass of ProductBased or None""" 

491 return None 

492 

493 @property 

494 def neighbors(self): 

495 """Directly connected elements""" 

496 neighbors = [] 

497 for port in self.ports: 

498 if port.connection: 

499 neighbors.append(port.connection.parent) 

500 return neighbors 

501 

502 def validate_creation(self): 

503 """"Validate the element creation in two steps. 

504 1. Check if standard parameter are in valid range. 

505 2. Check if number of ports are equal to number of expected ports 

506 (only for HVAC). 

507 """ 

508 for cond in self.conditions: 

509 if cond.critical_for_creation: 

510 value = getattr(self, cond.key) 

511 # don't prevent creation if value is not existing 

512 if value: 

513 if not cond.check(self, value): 

514 logger.warning("%s validation (%s) failed for %s", self.ifc_type, cond.name, self.guid) 

515 return False 

516 if not self.validate_ports(): 

517 logger.warning("%s has %d ports, but %s expected for %s", self.ifc_type, len(self.ports), 

518 self.expected_hvac_ports, self.guid) 

519 return False 

520 return True 

521 

522 def validate_attributes(self) -> dict: 

523 """Check if all attributes are valid, returns dict with key = attribute 

524 and value = True or False""" 

525 results = {} 

526 for cond in self.conditions: 

527 if not cond.critical_for_creation: 

528 # todo 

529 pass 

530 # if not cond.check(self): 

531 # logger.warning("%s validation (%s) failed for %s", 

532 # self.ifc_type, cond.name, self.guid) 

533 # return False 

534 # return True 

535 return results 

536 

537 def validate_ports(self): 

538 return True 

539 

540 def __repr__(self): 

541 return "<%s>" % self.__class__.__name__ 

542 

543 def calc_cost_group(self) -> Optional[int]: 

544 """Calculate the cost group according to DIN276""" 

545 return None 

546 

547 def calc_volume_from_ifc_shape(self): 

548 # todo use more efficient iterator to calc all shapes at once 

549 # with multiple cores: 

550 # https://wiki.osarch.org/index.php?title=IfcOpenShell_code_examples 

551 if hasattr(self.ifc, 'Representation'): 

552 try: 

553 shape = ifcopenshell.geom.create_shape( 

554 settings_products, self.ifc).geometry 

555 vol = PyOCCTools.get_shape_volume(shape) 

556 vol = vol * ureg.meter ** 3 

557 return vol 

558 except: 

559 logger.warning(f"No calculation of geometric volume possible " 

560 f"for {self.ifc}.") 

561 

562 def _get_volume(self, name): 

563 if hasattr(self, "net_volume"): 

564 if self.net_volume: 

565 vol = self.net_volume 

566 return vol 

567 vol = self.calc_volume_from_ifc_shape() 

568 return vol 

569 

570 volume = attribute.Attribute( 

571 description="Volume of the attribute", 

572 functions=[_get_volume], 

573 ) 

574 

575 def __str__(self): 

576 return "<%s>" % (self.__class__.__name__) 

577 

578 

579class Port(RelationBased): 

580 """Basic port""" 

581 

582 def __init__(self, parent, *args, **kwargs): 

583 super().__init__(*args, **kwargs) 

584 self.parent: ProductBased = parent 

585 self.connection = None 

586 

587 def connect(self, other): 

588 """Connect this interface bidirectional to another interface""" 

589 assert isinstance(other, Port), \ 

590 "Can't connect interfaces of different classes." 

591 # if self.flow_direction == 'SOURCE' or \ 

592 # self.flow_direction == 'SOURCEANDSINK': 

593 if self.connection and self.connection is not other: 

594 raise AttributeError("Port is already connected!") 

595 if other.connection and other.connection is not self: 

596 raise AttributeError("Other port is already connected!") 

597 self.connection = other 

598 other.connection = self 

599 

600 def disconnect(self): 

601 """remove connection between self and other port""" 

602 other = self.connection 

603 if other: 

604 self.connection = None 

605 other.disconnect() 

606 

607 def is_connected(self): 

608 """Returns truth value of port's connection""" 

609 return bool(self.connection) 

610 

611 def __repr__(self): 

612 if self.parent: 

613 try: 

614 idx = self.parent.ports.index(self) 

615 return "<%s #%d of %s)>" % ( 

616 self.__class__.__name__, idx, self.parent) 

617 except ValueError: 

618 return "<%s (broken parent: %s)>" % ( 

619 self.__class__.__name__, self.parent) 

620 return "<%s (*abandoned*)>" % self.__class__.__name__ 

621 

622 def __str__(self): 

623 return self.__repr__()[1:-2] 

624 

625 

626class Material(ProductBased): 

627 guid_prefix = 'Material_' 

628 key: str = 'Material' 

629 ifc_types = { 

630 'IfcMaterial': ["*"] 

631 } 

632 name = '' 

633 

634 def __init__(self, *args, **kwargs): 

635 super().__init__(*args, **kwargs) 

636 self.parents: List[ProductBased] = [] 

637 

638 @staticmethod 

639 def get_id(prefix=""): 

640 prefix_length = len(prefix) 

641 if prefix_length > 10: 

642 raise AttributeError("Max prefix length is 10!") 

643 ifcopenshell_guid = guid.new()[prefix_length + 1:] 

644 return f"{prefix}{ifcopenshell_guid}" 

645 

646 conditions = [ 

647 condition.RangeCondition('spec_heat_capacity', 

648 0 * ureg.kilojoule / (ureg.kg * ureg.K), 

649 5 * ureg.kilojoule / (ureg.kg * ureg.K), 

650 critical_for_creation=False), 

651 condition.RangeCondition('density', 

652 0 * ureg.kg / ureg.m ** 3, 

653 50000 * ureg.kg / ureg.m ** 3, 

654 critical_for_creation=False), 

655 condition.RangeCondition('thermal_conduc', 

656 0 * ureg.W / ureg.m / ureg.K, 

657 100 * ureg.W / ureg.m / ureg.K, 

658 critical_for_creation=False), 

659 condition.RangeCondition('porosity', 

660 0 * ureg.dimensionless, 

661 1 * ureg.dimensionless, True, 

662 critical_for_creation=False), 

663 condition.RangeCondition('solar_absorp', 

664 0 * ureg.percent, 

665 1 * ureg.percent, True, 

666 critical_for_creation=False), 

667 ] 

668 

669 spec_heat_capacity = attribute.Attribute( 

670 default_ps=("Pset_MaterialThermal", "SpecificHeatCapacity"), 

671 # functions=[get_from_template], 

672 unit=ureg.kilojoule / (ureg.kg * ureg.K) 

673 ) 

674 

675 density = attribute.Attribute( 

676 default_ps=("Pset_MaterialCommon", "MassDensity"), 

677 unit=ureg.kg / ureg.m ** 3 

678 ) 

679 

680 thermal_conduc = attribute.Attribute( 

681 default_ps=("Pset_MaterialThermal", "ThermalConductivity"), 

682 unit=ureg.W / (ureg.m * ureg.K) 

683 ) 

684 

685 porosity = attribute.Attribute( 

686 default_ps=("Pset_MaterialCommon", "Porosity"), 

687 unit=ureg.dimensionless 

688 ) 

689 

690 # todo is percent the correct unit? (0-1) 

691 solar_absorp = attribute.Attribute( 

692 # default_ps=('Pset_MaterialOptical', 'SolarTransmittance'), 

693 default=0.7, 

694 unit=ureg.percent 

695 ) 

696 

697 def __repr__(self): 

698 if self.name: 

699 return "<%s %s>" % (self.__class__.__name__, self.name) 

700 else: 

701 return "<%s>" % self.__class__.__name__ 

702 

703 

704class Dummy(ProductBased): 

705 """Dummy for all unknown elements""" 

706 

707 ifc_types = { 

708 "IfcElementProxy": ['*'] 

709 } 

710 

711 # def __init__(self, *args, **kwargs): 

712 # super().__init__(*args, **kwargs) 

713 # 

714 # self._ifc_type = self.ifc.get_info()['type'] 

715 

716 @property 

717 def ifc_type(self): 

718 return self._ifc_type 

719 

720 def __str__(self): 

721 return "Dummy '%s'" % self.ifc_type 

722 

723 

724class Factory: 

725 """Element Factory for :class: `ProductBased` 

726 

727 To understand the concept of the factory class, we refer to this article: 

728 https://refactoring.guru/design-patterns/factory-method/python/example 

729 

730 Example: 

731 factory = Factory([Pipe, Boiler], dummy) 

732 ele = factory(some_ifc_element) 

733 """ 

734 

735 def __init__( 

736 self, 

737 relevant_elements: set[ProductBased], 

738 ifc_units: dict, 

739 ifc_domain: IFCDomain, 

740 finder: Union[TemplateFinder, None] = None, 

741 dummy=Dummy): 

742 self.mapping, self.blacklist, self.defaults = self.create_ifc_mapping(relevant_elements) 

743 self.dummy_cls = dummy 

744 self.ifc_domain = ifc_domain 

745 self.finder = finder 

746 self.ifc_units = ifc_units 

747 

748 def __call__(self, ifc_entity, *args, ifc_type: str = None, use_dummy=True, 

749 **kwargs) -> ProductBased: 

750 """Run factory to create element instance. 

751 

752 Calls self.create() function but before checks which element_cls is the 

753 correct mapping for the given ifc_entity. 

754 

755 Args: 

756 ifc_entity: IfcOpenShell entity 

757 args: additional args passed to element 

758 ifc_type: ify type to create element for. 

759 defaults to ifc_entity.is_a() 

760 use_dummy: use dummy class if nothing is found 

761 kwargs: additional kwargs passed to element 

762 Raises: 

763 LookupError: if no element found and use_dummy = False 

764 Returns: 

765 element: created element instance 

766 """ 

767 _ifc_type = ifc_type or ifc_entity.is_a() 

768 predefined_type = ifc2python.get_predefined_type(ifc_entity) 

769 element_cls = self.get_element(_ifc_type, predefined_type) 

770 if not element_cls: 

771 if use_dummy: 

772 element_cls = self.dummy_cls 

773 else: 

774 raise LookupError(f"No element found for {ifc_entity}") 

775 # TODO # 537 Put this to a point where it makes sense, return None is no 

776 # solution 

777 if hasattr(element_cls, 'from_ifc_domains'): 

778 if self.ifc_domain not in element_cls.from_ifc_domains: 

779 logger.warning( 

780 f"Element has {self.ifc_domain} but f{element_cls.__name__}" 

781 f" will only be created for IFC files of domain " 

782 f"{element_cls.from_ifc_domains}.") 

783 raise IFCDomainError( 

784 f"Element has {self.ifc_domain} but f{element_cls.__name__}" 

785 f" will only be created for IFC files of domain " 

786 f"{element_cls.from_ifc_domains}") 

787 

788 element = self.create(element_cls, ifc_entity, *args, **kwargs) 

789 return element 

790 

791 def create(self, element_cls, ifc_entity, *args, **kwargs): 

792 """Create Element from class and ifc""" 

793 # instantiate element 

794 

795 element = element_cls.from_ifc( 

796 ifc_entity, ifc_domain=self.ifc_domain, finder=self.finder, 

797 ifc_units=self.ifc_units, *args, **kwargs) 

798 # check if it prefers to be sth else 

799 better_cls = element.get_better_subclass() 

800 if better_cls: 

801 logger.info("Creating %s instead of %s", better_cls, element_cls) 

802 element = better_cls.from_ifc( 

803 ifc_entity, 

804 ifc_domain=self.ifc_domain, 

805 finder=self.finder, 

806 ifc_units=self.ifc_units, 

807 *args, **kwargs) 

808 return element 

809 

810 def get_element(self, ifc_type: str, predefined_type: Union[str, None]) -> \ 

811 Union[ProductBased, None]: 

812 """Get element class by ifc type and predefined type""" 

813 if predefined_type: 

814 key = (ifc_type.lower(), predefined_type.upper()) 

815 # 1. go over normal list, if found match_graph --> return 

816 element = self.mapping.get(key) 

817 if element: 

818 return element 

819 # 2. go over negative list, if found match_graph --> not existing 

820 if key in self.blacklist: 

821 return None 

822 # 3. go over default list, if found match_graph --> return 

823 return self.defaults.get(ifc_type.lower()) 

824 

825 # def _get_by_guid(self, guid: str) -> Union[ProductBased, None]: 

826 # """Get item from given guid created by this factory.""" 

827 # return self._objects.get(guid) 

828 # 

829 # def _get_by_cls(self, item_cls: Type[ProductBased]) -> List[ProductBased]: 

830 # """Get list of child items from given class created by this factory.""" 

831 # return [item for item in self._objects.values() 

832 # if isinstance(item, item_cls)] 

833 

834 @staticmethod 

835 def create_ifc_mapping(elements: Iterable) -> Tuple[ 

836 Dict[Tuple[str, str], ProductBased], 

837 List[Tuple[str, ProductBased]], 

838 Dict[str, ProductBased] 

839 ]: 

840 """Create mapping dict, blacklist and default dict from elements 

841 

842 WARNING: ifc_type is always converted to lower case 

843 and predefined types to upper case 

844 

845 Returns: 

846 mapping: dict of ifc_type and predefined_type to element class 

847 blacklist: list of ifc_type which will not be taken into account 

848 default: dict of ifc_type to element class 

849 """ 

850 # TODO: cover virtual elements e.g. Space Boundaries (not products) 

851 

852 mapping = {} 

853 blacklist = [] 

854 default = {} 

855 _all_ifc_types = set() 

856 

857 for ele in elements: 

858 for ifc_type, tokens in ele.ifc_types.items(): 

859 _all_ifc_types.add(ifc_type.lower()) 

860 for token in tokens: 

861 # create default dict where all stars are taken into account 

862 # items 'IfcSlab': Slab 

863 if token == '*': 

864 if ifc_type in default: 

865 raise NameError(f"Conflicting default ifc_types for {ifc_type}") # TBD 

866 default[ifc_type.lower()] = ele 

867 # create blacklist where all - are taken into account 

868 # items: ('IfcRoof', 'WeiredStuff') 

869 elif token.startswith('-'): 

870 blacklist.append((ifc_type.lower(), token[1:].upper())) 

871 # create mapping dict 

872 # items ('IfcSlab', 'Roof'): Roof 

873 else: 

874 mapping[(ifc_type.lower(), token.upper())] = ele 

875 

876 # check ifc types without default 

877 no_default = _all_ifc_types - set(default) 

878 if no_default: 

879 logger.warning("The following ifc types have no default " 

880 "representing Element class. There will be no " 

881 "match if predefined type is not provided.\n%s", 

882 no_default) 

883 return mapping, blacklist, default 

884 

885 

886class SerializedElement: 

887 """Serialized version of an element. 

888 

889 This is a workaround as we can't serialize elements due to the usage of 

890 IfcOpenShell which uses unpickable swigPy objects. We just store the most 

891 important information which are guid, element_type, storeys, aggregated 

892 elements and the attributes from the attribute system.""" 

893 def __init__(self, element): 

894 self.guid = element.guid 

895 self.element_type = element.__class__.__name__ 

896 for attr_name, attr_val in element.attributes.items(): 

897 # assign value directly to attribute without status 

898 # make sure to get the value 

899 value = getattr(element, attr_name) 

900 if self.is_picklable(value): 

901 setattr(self, attr_name, value) 

902 else: 

903 logger.info( 

904 f"Attribute {attr_name} will not be serialized, as it's " 

905 f"not pickleable") 

906 if hasattr(element, "space_boundaries"): 

907 self.space_boundaries = [bound.guid for bound in 

908 element.space_boundaries] 

909 if hasattr(element, "storeys"): 

910 self.storeys = [storey.guid for storey in element.storeys] 

911 if issubclass(element.__class__, AggregationMixin): 

912 self.elements = [ele.guid for ele in element.elements] 

913 

914 @staticmethod 

915 def is_picklable(value: Any) -> bool: 

916 """Determines if a given value is picklable. 

917 

918 This method attempts to serialize the provided value using the `pickle` module. 

919 If the value can be successfully serialized, it is considered picklable. 

920 

921 Args: 

922 value (Any): The value to be tested for picklability. 

923 

924 Returns: 

925 bool: True if the value is picklable, False otherwise. 

926 """ 

927 try: 

928 pickle.dumps(value) 

929 return True 

930 except (pickle.PicklingError, TypeError): 

931 return False 

932 

933 def __repr__(self): 

934 return "<serialized %s (guid: '%s')>" % ( 

935 self.element_type, self.guid)