Coverage for bim2sim/elements/base_elements.py: 69%
497 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-01 10:24 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-01 10:24 +0000
1import logging
2import pickle
3import re
4from json import JSONEncoder
5from typing import Union, Iterable, Dict, List, Tuple, Type, Optional, Any
7import numpy as np
8import ifcopenshell.geom
9from ifcopenshell import guid
11from bim2sim.elements.aggregation import AggregationMixin
12from bim2sim.kernel.decision import Decision, DecisionBunch
13from bim2sim.kernel import IFCDomainError
14from bim2sim.elements.mapping import condition, attribute, ifc2python
15from bim2sim.elements.mapping.finder import TemplateFinder, SourceTool
16from bim2sim.elements.mapping.units import ureg
17from bim2sim.utilities.common_functions import angle_equivalent, vector_angle, \
18 remove_umlaut
19from bim2sim.utilities.pyocc_tools import PyOCCTools
20from bim2sim.utilities.types import IFCDomain, AttributeDataSource
22logger = logging.getLogger(__name__)
23quality_logger = logging.getLogger('bim2sim.QualityReport')
24settings_products = ifcopenshell.geom.main.settings()
25settings_products.set(settings_products.USE_PYTHON_OPENCASCADE, True)
28class ElementError(Exception):
29 """Error in Element"""
32class NoValueError(ElementError):
33 """Value is not available"""
36class ElementEncoder(JSONEncoder):
37 """Encoder class for Element"""
39 # TODO: make Elements serializable and deserializable.
40 # Ideas: guid to identify, (factory) method to (re)init by guid
41 # mayby weakref to other elements (Ports, connections, ...)
43 def default(self, o):
44 if isinstance(o, Element):
45 return "<Element(%s)>" % o.guid
46 return JSONEncoder.default()
49class Element(metaclass=attribute.AutoAttributeNameMeta):
50 """Most basic class"""
51 guid_prefix = ''
52 _id_counter = 0
54 def __init__(self, guid=None, **kwargs):
55 self.guid = guid or self.get_id(self.guid_prefix)
56 # self.related_decisions: List[Decision] = []
57 self.attributes = attribute.AttributeManager(bind=self)
58 self.element_type = self.__class__.__name__
60 # set attributes based on kwargs
61 for kw, arg in kwargs.items():
62 if kw in self.attributes: # allow only attributes
63 setattr(self, kw, arg)
64 else:
65 raise AttributeError(f"Unused argument in kwargs: {kw}: {arg}")
67 def __hash__(self):
68 return hash(self.guid)
70 def validate_creation(self) -> bool:
71 """Check if current instance is valid"""
72 raise NotImplementedError
74 def validate_attributes(self) -> dict:
75 """Check if attributes are valid"""
76 return {}
78 def _calc_position(self, name) -> np.array:
79 """Returns position (calculation may be expensive)"""
80 return None
82 position = attribute.Attribute(
83 description='Position of element',
84 functions=[_calc_position]
85 )
87 @staticmethod
88 def get_id(prefix=""):
89 prefix_length = len(prefix)
90 if prefix_length > 10:
91 raise AttributeError("Max prefix length is 10!")
92 Element._id_counter += 1
93 return "{0:0<8s}{1:0>14d}".format(prefix, Element._id_counter)
95 @staticmethod
96 def get_object(guid):
97 """Get Element object instance with given guid
99 :returns: None if object with guid was not instanciated"""
100 raise AssertionError("Obsolete method. "
101 "Don't rely on global Element.objects. "
102 "Use e.g. elements from tasks/playground.")
104 def request(self, name, external_decision: Decision = None) \
105 -> Union[None, Decision]:
106 """Request the elements attribute.
108 Args:
109 name: Name of attribute
110 external_decision: Decision to use instead of default decision
111 """
112 return self.attributes.request(name, external_decision)
114 def reset(self, name, data_source=AttributeDataSource.manual_overwrite):
115 """Reset the attribute of the element.
117 Args:
118 name: attribute name
119 data_source (object): data source of the attribute
120 """
122 return self.attributes.reset(name, data_source)
124 def source_info(self) -> str:
125 """Get informative string about source of Element."""
126 return ''
128 @classmethod
129 def get_pending_attribute_decisions(
130 cls, elements: Iterable['Element']) -> DecisionBunch:
131 """Get all requested decisions of attributes and functions of attributes
132 to afterwards calculate said attribute.
134 all decisions related to given elements are yielded.
135 all attributes functions are used to calculate the remaining attributes
136 """
138 all_attr_decisions = DecisionBunch()
139 for inst in elements:
140 bunch = inst.attributes.get_decisions()
141 all_attr_decisions.extend(bunch)
143 # sort decisions to preserve order
144 all_attr_decisions.sort(key=lambda d: d.global_key)
145 yield all_attr_decisions
147 @classmethod
148 def full_reset(cls):
149 raise AssertionError("Obsolete method. not required any more.")
152class IFCBased(Element):
153 """Element with instantiation from ifc and related methods.
155 Attributes:
156 ifc: IfcOpenShell element instance
157 ifc_types: Dict with ifc_type as key and list of predifined types that
158 fit to the class as values.
159 Special values for predifined types:
160 '*' all which are not overwritten in other classes predfined types.
161 '-Something' start with minus to exclude
163 For example:
164 >>> {'IfcSlab': ['*', '-SomethingSpecialWeDontWant', 'BASESLAB']}
165 >>> {'IfcRoof': ['FLAT_ROOF', 'SHED_ROOF',...],
166 >>> 'IfcSlab': ['ROOF']}"""
168 ifc_types: Dict[str, List[str]] = None
169 pattern_ifc_type = []
171 def __init__(self, *args,
172 ifc=None,
173 finder: TemplateFinder = None,
174 ifc_units: dict = None,
175 ifc_domain: IFCDomain = None,
176 **kwargs):
177 super().__init__(*args, **kwargs)
179 self.ifc = ifc
180 self.predefined_type = ifc2python.get_predefined_type(ifc)
181 self.ifc_domain = ifc_domain
182 self.finder = finder
183 self.ifc_units = ifc_units
184 self.source_tool: SourceTool = None
186 # TBD
187 self.enrichment = {} # TODO: DJA
188 self._propertysets = None
189 self._type_propertysets = None
190 self._decision_results = {}
192 @classmethod
193 def ifc2args(cls, ifc) -> Tuple[tuple, dict]:
194 """Extract init args and kwargs from ifc"""
195 guid = getattr(ifc, 'GlobalId', None)
196 kwargs = {'guid': guid, 'ifc': ifc}
197 return (), kwargs
199 @classmethod
200 def from_ifc(cls, ifc, *args, **kwargs):
201 """Factory method to create instance from ifc"""
202 ifc_args, ifc_kwargs = cls.ifc2args(ifc)
203 kwargs.update(ifc_kwargs)
204 return cls(*(args + ifc_args), **kwargs)
206 @property
207 def ifc_type(self):
208 if self.ifc:
209 return self.ifc.is_a()
211 @classmethod
212 def pre_validate(cls, ifc) -> bool:
213 """Check if ifc meets conditions to create element from it"""
214 raise NotImplementedError
216 def _calc_position(self, name):
217 """returns absolute position"""
218 if hasattr(self.ifc, 'ObjectPlacement'):
219 absolute = np.array(self.ifc.ObjectPlacement.RelativePlacement.Location.Coordinates)
220 placementrel = self.ifc.ObjectPlacement.PlacementRelTo
221 while placementrel is not None:
222 absolute += np.array(placementrel.RelativePlacement.Location.Coordinates)
223 placementrel = placementrel.PlacementRelTo
224 else:
225 absolute = None
227 return absolute
229 def _get_name_from_ifc(self, name):
230 ifc_name = self.get_ifc_attribute('Name')
231 if ifc_name:
232 return remove_umlaut(ifc_name)
234 name = attribute.Attribute(
235 description="Name of element based on IFC attribute.",
236 functions=[_get_name_from_ifc]
237 )
239 def get_ifc_attribute(self, attribute):
240 """
241 Fetches non-empty attributes (if they exist).
242 """
243 return getattr(self.ifc, attribute, None)
245 def get_propertyset(self, propertysetname):
246 return ifc2python.get_property_set_by_name(
247 propertysetname, self.ifc, self.ifc_units)
249 def get_propertysets(self):
250 if self._propertysets is None:
251 self._propertysets = ifc2python.get_property_sets(
252 self.ifc, self.ifc_units)
253 return self._propertysets
255 def get_type_propertysets(self):
256 if self._type_propertysets is None:
257 self._type_propertysets = ifc2python.get_type_property_sets(
258 self.ifc, self.ifc_units)
259 return self._type_propertysets
261 def get_hierarchical_parent(self):
262 return ifc2python.getHierarchicalParent(self.ifc)
264 def get_hierarchical_children(self):
265 return ifc2python.getHierarchicalChildren(self.ifc)
267 def get_spartial_parent(self):
268 return ifc2python.getSpatialParent(self.ifc)
270 def get_spartial_children(self):
271 return ifc2python.getSpatialChildren(self.ifc)
273 def get_space(self):
274 return ifc2python.getSpace(self.ifc)
276 def get_storey(self):
277 return ifc2python.getStorey(self.ifc)
279 def get_building(self):
280 return ifc2python.getBuilding(self.ifc)
282 def get_site(self):
283 return ifc2python.getSite(self.ifc)
285 def get_project(self):
286 return ifc2python.getProject(self.ifc)
288 def get_true_north(self):
289 return ifc2python.get_true_north(self.ifc)
291 def summary(self):
292 return ifc2python.summary(self.ifc)
294 def search_property_hierarchy(self, propertyset_name):
295 """Search for property in all related properties in hierarchical order.
297 1. element's propertysets
298 2. element type's propertysets"""
300 p_set = None
301 p_sets = self.get_propertysets()
302 try:
303 p_set = p_sets[propertyset_name]
304 except KeyError:
305 pass
306 else:
307 return p_set
309 pt_sets = self.get_type_propertysets()
310 try:
311 p_set = pt_sets[propertyset_name]
312 except KeyError:
313 pass
314 else:
315 return p_set
316 return p_set
318 def inverse_properties(self):
319 """Generator yielding tuples of PropertySet name and Property name"""
320 for p_set_name, p_set in self.get_propertysets().items():
321 for p_name in p_set.keys():
322 yield (p_set_name, p_name)
324 def filter_properties(self, patterns):
325 """filter all properties by re pattern
327 :returns: list of tuple(propertyset_name, property_name, match_graph)"""
328 matches = []
329 for propertyset_name, property_name in self.inverse_properties():
330 for pattern in patterns:
331 match = re.match(pattern, property_name)
332 if match:
333 matches.append((propertyset_name, property_name, match))
334 return matches
336 @classmethod
337 def filter_for_text_fragments(cls, ifc_element, ifc_units: dict,
338 optional_locations: list = None):
339 """Find text fragments that match the class patterns in an IFC element.
341 Args:
342 ifc_element: The IFC element to check.
343 ifc_units: Dictionary containing IFC unit information.
344 optional_locations: Additional locations to check patterns beyond
345 name. Defaults to None.
347 Returns:
348 list: List of matched fragments, empty list if no matches found.
349 """
350 results = []
352 # Check name matches
353 name_hits = [p.search(ifc_element.Name) for p in cls.pattern_ifc_type]
354 name_hits = [hit for hit in name_hits if hit is not None]
355 if name_hits:
356 quality_logger.info(
357 f"Identified {cls.ifc_type} through text fragments in name. "
358 f"Criteria: {name_hits}")
359 results.append(name_hits[0][0])
361 # Check optional locations
362 if optional_locations:
363 for loc in optional_locations:
364 prop_value = ifc2python.get_property_set_by_name(
365 loc, ifc_element, ifc_units)
366 if not prop_value:
367 continue
369 loc_hits = [p.search(prop_value) for p in cls.pattern_ifc_type]
370 loc_hits = [hit for hit in loc_hits if hit is not None]
371 if loc_hits:
372 quality_logger.info(
373 f"Identified {cls.ifc_type} through text fragments "
374 f"in {loc}. Criteria: {loc_hits}")
375 results.append(loc_hits[0][0])
377 return results
379 def get_exact_property(self, propertyset_name: str, property_name: str):
380 """Returns value of property specified by propertyset name and property name
382 :Raises: AttributeError if property does not exist"""
383 try:
384 p_set = self.search_property_hierarchy(propertyset_name)
385 value = p_set[property_name]
386 except (AttributeError, KeyError, TypeError):
387 raise NoValueError("Property '%s.%s' does not exist" % (
388 propertyset_name, property_name))
389 return value
391 def select_from_potential_properties(self, patterns, name,
392 collect_decisions):
393 """Ask user to select from all properties matching patterns"""
395 matches = self.filter_properties(patterns)
396 if matches:
397 values = []
398 choices = []
399 for propertyset_name, property_name, match in matches:
400 value = self.get_exact_property(propertyset_name, property_name)
401 values.append(value)
402 choices.append((propertyset_name, property_name))
403 # print("%s.%s = %s"%(propertyset_name, property_name, value))
405 # TODO: Decision: save for all following elements of same class (
406 # dont ask again?)
407 # selected = (propertyset_name, property_name, value)
409 distinct_values = set(values)
410 if len(distinct_values) == 1:
411 # multiple sources but common value
412 return distinct_values.pop()
413 else:
414 quality_logger.warning('Found multiple values for attributes %s of instance %s' % (
415 ', '.join((str((m[0], m[1])) for m in matches)), self))
416 return distinct_values
418 return None
420 # # TODO: Decision with id, key, value
421 # decision = DictDecision("Multiple possibilities found",
422 # choices=dict(zip(choices, values)),
423 # output=self.attributes,
424 # key=name,
425 # global_key="%s_%s.%s" % (self.ifc_type,
426 # self.guid, name),
427 # allow_skip=True, allow_load=True,
428 # allow_save=True,
429 # collect=collect_decisions,
430 # quick_decide=not collect_decisions)
431 #
432 # if collect_decisions:
433 # raise PendingDecisionError()
434 #
435 # return decision.value
436 # raise NoValueError("No matching property for %s" % (patterns))
438 def source_info(self) -> str:
439 return f'{self.ifc_type}:{self.guid}'
442class RelationBased(IFCBased):
443 conditions = []
445 def __repr__(self):
446 return "<%s (guid: %s)>" % (self.__class__.__name__, self.guid)
448 def __str__(self):
449 return "%s" % self.__class__.__name__
452class RelationBased(IFCBased):
454 pass
457class ProductBased(IFCBased):
458 """Elements based on IFC products.
460 Args:
461 material: material of the element
462 material_set: dict of material and fraction [0, 1] if multiple materials
463 """
464 domain = 'GENERAL'
465 key: str = ''
466 key_map: Dict[str, 'Type[ProductBased]'] = {}
467 conditions = []
469 def __init__(self, *args, **kwargs):
470 super().__init__(*args, **kwargs)
471 self.aggregation = None
472 self.ports = self.get_ports()
473 self.material = None
474 self.material_set = {}
475 self.cost_group = self.calc_cost_group()
477 def __init_subclass__(cls, **kwargs):
478 # set key for each class
479 cls.key = f'{cls.domain}-{cls.__name__}'
480 cls.key_map[cls.key] = cls
482 def get_ports(self):
483 return []
485 def get_better_subclass(self) -> Union[None, Type['IFCBased']]:
486 """Returns alternative subclass of current object.
487 CAUTION: only use this if you can't know the result before instantiation
488 of base class
490 Returns:
491 object: subclass of ProductBased or None"""
492 return None
494 @property
495 def neighbors(self):
496 """Directly connected elements"""
497 neighbors = []
498 for port in self.ports:
499 if port.connection:
500 neighbors.append(port.connection.parent)
501 return neighbors
503 def validate_creation(self):
504 """"Validate the element creation in two steps.
505 1. Check if standard parameter are in valid range.
506 2. Check if number of ports are equal to number of expected ports
507 (only for HVAC).
508 """
509 for cond in self.conditions:
510 if cond.critical_for_creation:
511 value = getattr(self, cond.key)
512 # don't prevent creation if value is not existing
513 if value:
514 if not cond.check(self, value):
515 logger.warning("%s validation (%s) failed for %s", self.ifc_type, cond.name, self.guid)
516 return False
517 if not self.validate_ports():
518 logger.warning("%s has %d ports, but %s expected for %s", self.ifc_type, len(self.ports),
519 self.expected_hvac_ports, self.guid)
520 return False
521 return True
523 def validate_attributes(self) -> dict:
524 """Check if all attributes are valid, returns dict with key = attribute
525 and value = True or False"""
526 results = {}
527 for cond in self.conditions:
528 if not cond.critical_for_creation:
529 # todo
530 pass
531 # if not cond.check(self):
532 # logger.warning("%s validation (%s) failed for %s",
533 # self.ifc_type, cond.name, self.guid)
534 # return False
535 # return True
536 return results
538 def validate_ports(self):
539 return True
541 def __repr__(self):
542 return "<%s>" % self.__class__.__name__
544 def calc_cost_group(self) -> Optional[int]:
545 """Calculate the cost group according to DIN276"""
546 return None
548 def calc_product_shape(self):
549 """Calculate the product shape based on IfcProduct representation."""
550 if hasattr(self.ifc, 'Representation'):
551 try:
552 shape = ifcopenshell.geom.create_shape(
553 settings_products, self.ifc).geometry
554 return shape
555 except:
556 logger.warning(f"No calculation of product shape possible "
557 f"for {self.ifc}.")
559 def calc_volume_from_ifc_shape(self):
560 # todo use more efficient iterator to calc all shapes at once
561 # with multiple cores:
562 # https://wiki.osarch.org/index.php?title=IfcOpenShell_code_examples
563 if hasattr(self.ifc, 'Representation'):
564 try:
565 shape = ifcopenshell.geom.create_shape(
566 settings_products, self.ifc).geometry
567 vol = PyOCCTools.get_shape_volume(shape)
568 vol = vol * ureg.meter ** 3
569 return vol
570 except:
571 logger.warning(f"No calculation of geometric volume possible "
572 f"for {self.ifc}.")
574 def _get_volume(self, name):
575 if hasattr(self, "net_volume"):
576 if self.net_volume:
577 vol = self.net_volume
578 return vol
579 vol = self.calc_volume_from_ifc_shape()
580 return vol
582 volume = attribute.Attribute(
583 description="Volume of the attribute",
584 functions=[_get_volume],
585 )
587 def __str__(self):
588 return "<%s>" % (self.__class__.__name__)
591class Port(RelationBased):
592 """Basic port"""
594 def __init__(self, parent, *args, **kwargs):
595 super().__init__(*args, **kwargs)
596 self.parent: ProductBased = parent
597 self.connection = None
599 def connect(self, other):
600 """Connect this interface bidirectional to another interface"""
601 assert isinstance(other, Port), \
602 "Can't connect interfaces of different classes."
603 # if self.flow_direction == 'SOURCE' or \
604 # self.flow_direction == 'SOURCEANDSINK':
605 if self.connection and self.connection is not other:
606 raise AttributeError("Port is already connected!")
607 if other.connection and other.connection is not self:
608 raise AttributeError("Other port is already connected!")
609 self.connection = other
610 other.connection = self
612 def disconnect(self):
613 """remove connection between self and other port"""
614 other = self.connection
615 if other:
616 self.connection = None
617 other.disconnect()
619 def is_connected(self):
620 """Returns truth value of port's connection"""
621 return bool(self.connection)
623 def __repr__(self):
624 if self.parent:
625 try:
626 idx = self.parent.ports.index(self)
627 return "<%s #%d of %s)>" % (
628 self.__class__.__name__, idx, self.parent)
629 except ValueError:
630 return "<%s (broken parent: %s)>" % (
631 self.__class__.__name__, self.parent)
632 return "<%s (*abandoned*)>" % self.__class__.__name__
634 def __str__(self):
635 return self.__repr__()[1:-2]
638class Material(ProductBased):
639 guid_prefix = 'Material_'
640 key: str = 'Material'
641 ifc_types = {
642 'IfcMaterial': ["*"]
643 }
644 name = ''
646 def __init__(self, *args, **kwargs):
647 super().__init__(*args, **kwargs)
648 self.parents: List[ProductBased] = []
650 @staticmethod
651 def get_id(prefix=""):
652 prefix_length = len(prefix)
653 if prefix_length > 10:
654 raise AttributeError("Max prefix length is 10!")
655 ifcopenshell_guid = guid.new()[prefix_length + 1:]
656 return f"{prefix}{ifcopenshell_guid}"
658 conditions = [
659 condition.RangeCondition('spec_heat_capacity',
660 0 * ureg.kilojoule / (ureg.kg * ureg.K),
661 5 * ureg.kilojoule / (ureg.kg * ureg.K),
662 critical_for_creation=False),
663 condition.RangeCondition('density',
664 0 * ureg.kg / ureg.m ** 3,
665 50000 * ureg.kg / ureg.m ** 3,
666 critical_for_creation=False),
667 condition.RangeCondition('thermal_conduc',
668 0 * ureg.W / ureg.m / ureg.K,
669 100 * ureg.W / ureg.m / ureg.K,
670 critical_for_creation=False),
671 condition.RangeCondition('porosity',
672 0 * ureg.dimensionless,
673 1 * ureg.dimensionless, True,
674 critical_for_creation=False),
675 condition.RangeCondition('solar_absorp',
676 0 * ureg.percent,
677 1 * ureg.percent, True,
678 critical_for_creation=False),
679 ]
681 spec_heat_capacity = attribute.Attribute(
682 default_ps=("Pset_MaterialThermal", "SpecificHeatCapacity"),
683 # functions=[get_from_template],
684 unit=ureg.kilojoule / (ureg.kg * ureg.K)
685 )
687 density = attribute.Attribute(
688 default_ps=("Pset_MaterialCommon", "MassDensity"),
689 unit=ureg.kg / ureg.m ** 3
690 )
692 thermal_conduc = attribute.Attribute(
693 default_ps=("Pset_MaterialThermal", "ThermalConductivity"),
694 unit=ureg.W / (ureg.m * ureg.K)
695 )
697 porosity = attribute.Attribute(
698 default_ps=("Pset_MaterialCommon", "Porosity"),
699 unit=ureg.dimensionless
700 )
702 # todo is percent the correct unit? (0-1)
703 solar_absorp = attribute.Attribute(
704 # default_ps=('Pset_MaterialOptical', 'SolarTransmittance'),
705 default=0.7,
706 unit=ureg.percent
707 )
709 def __repr__(self):
710 if self.name:
711 return "<%s %s>" % (self.__class__.__name__, self.name)
712 else:
713 return "<%s>" % self.__class__.__name__
716class Dummy(ProductBased):
717 """Dummy for all unknown elements"""
719 ifc_types = {
720 "IfcElementProxy": ['*']
721 }
723 # def __init__(self, *args, **kwargs):
724 # super().__init__(*args, **kwargs)
725 #
726 # self._ifc_type = self.ifc.get_info()['type']
728 @property
729 def ifc_type(self):
730 return self._ifc_type
732 def __str__(self):
733 return "Dummy '%s'" % self.ifc_type
736class Factory:
737 """Element Factory for :class: `ProductBased`
739 To understand the concept of the factory class, we refer to this article:
740 https://refactoring.guru/design-patterns/factory-method/python/example
742 Example:
743 >>> factory = Factory([Pipe, Boiler], dummy)
744 >>> ele = factory(some_ifc_element)
745 """
747 def __init__(
748 self,
749 relevant_elements: set[ProductBased],
750 ifc_units: dict,
751 ifc_domain: IFCDomain,
752 finder: Union[TemplateFinder, None] = None,
753 dummy=Dummy):
754 self.mapping, self.blacklist, self.defaults = self.create_ifc_mapping(relevant_elements)
755 self.dummy_cls = dummy
756 self.ifc_domain = ifc_domain
757 self.finder = finder
758 self.ifc_units = ifc_units
760 def __call__(self, ifc_entity, *args, ifc_type: str = None, use_dummy=True,
761 **kwargs) -> ProductBased:
762 """Run factory to create element instance.
764 Calls self.create() function but before checks which element_cls is the
765 correct mapping for the given ifc_entity.
767 Args:
768 ifc_entity: IfcOpenShell entity
769 args: additional args passed to element
770 ifc_type: ify type to create element for.
771 defaults to ifc_entity.is_a()
772 use_dummy: use dummy class if nothing is found
773 kwargs: additional kwargs passed to element
774 Raises:
775 LookupError: if no element found and use_dummy = False
776 Returns:
777 element: created element instance
778 """
779 _ifc_type = ifc_type or ifc_entity.is_a()
780 predefined_type = ifc2python.get_predefined_type(ifc_entity)
781 element_cls = self.get_element(_ifc_type, predefined_type)
782 if not element_cls:
783 if use_dummy:
784 element_cls = self.dummy_cls
785 else:
786 raise LookupError(f"No element found for {ifc_entity}")
787 # TODO # 537 Put this to a point where it makes sense, return None is no
788 # solution
789 if hasattr(element_cls, 'from_ifc_domains'):
790 if self.ifc_domain not in element_cls.from_ifc_domains:
791 logger.warning(
792 f"Element has {self.ifc_domain} but f{element_cls.__name__}"
793 f" will only be created for IFC files of domain "
794 f"{element_cls.from_ifc_domains}.")
795 raise IFCDomainError(
796 f"Element has {self.ifc_domain} but f{element_cls.__name__}"
797 f" will only be created for IFC files of domain "
798 f"{element_cls.from_ifc_domains}")
800 element = self.create(element_cls, ifc_entity, *args, **kwargs)
801 return element
803 def create(self, element_cls, ifc_entity, *args, **kwargs):
804 """Create Element from class and ifc"""
805 # instantiate element
807 element = element_cls.from_ifc(
808 ifc_entity, ifc_domain=self.ifc_domain, finder=self.finder,
809 ifc_units=self.ifc_units, *args, **kwargs)
810 # check if it prefers to be sth else
811 better_cls = element.get_better_subclass()
812 if better_cls:
813 logger.info("Creating %s instead of %s", better_cls, element_cls)
814 element = better_cls.from_ifc(
815 ifc_entity,
816 ifc_domain=self.ifc_domain,
817 finder=self.finder,
818 ifc_units=self.ifc_units,
819 *args, **kwargs)
820 return element
822 def get_element(self, ifc_type: str, predefined_type: Union[str, None]) -> \
823 Union[ProductBased, None]:
824 """Get element class by ifc type and predefined type"""
825 if predefined_type:
826 key = (ifc_type.lower(), predefined_type.upper())
827 # 1. go over normal list, if found match_graph --> return
828 element = self.mapping.get(key)
829 if element:
830 return element
831 # 2. go over negative list, if found match_graph --> not existing
832 if key in self.blacklist:
833 return None
834 # 3. go over default list, if found match_graph --> return
835 return self.defaults.get(ifc_type.lower())
837 # def _get_by_guid(self, guid: str) -> Union[ProductBased, None]:
838 # """Get item from given guid created by this factory."""
839 # return self._objects.get(guid)
840 #
841 # def _get_by_cls(self, item_cls: Type[ProductBased]) -> List[ProductBased]:
842 # """Get list of child items from given class created by this factory."""
843 # return [item for item in self._objects.values()
844 # if isinstance(item, item_cls)]
846 @staticmethod
847 def create_ifc_mapping(elements: Iterable) -> Tuple[
848 Dict[Tuple[str, str], ProductBased],
849 List[Tuple[str, ProductBased]],
850 Dict[str, ProductBased]
851 ]:
852 """Create mapping dict, blacklist and default dict from elements
854 WARNING: ifc_type is always converted to lower case
855 and predefined types to upper case
857 Returns:
858 mapping: dict of ifc_type and predefined_type to element class
859 blacklist: list of ifc_type which will not be taken into account
860 default: dict of ifc_type to element class
861 """
862 # TODO: cover virtual elements e.g. Space Boundaries (not products)
864 mapping = {}
865 blacklist = []
866 default = {}
867 _all_ifc_types = set()
869 for ele in elements:
870 for ifc_type, tokens in ele.ifc_types.items():
871 _all_ifc_types.add(ifc_type.lower())
872 for token in tokens:
873 # create default dict where all stars are taken into account
874 # items 'IfcSlab': Slab
875 if token == '*':
876 if ifc_type in default:
877 raise NameError(f"Conflicting default ifc_types for {ifc_type}") # TBD
878 default[ifc_type.lower()] = ele
879 # create blacklist where all - are taken into account
880 # items: ('IfcRoof', 'WeiredStuff')
881 elif token.startswith('-'):
882 blacklist.append((ifc_type.lower(), token[1:].upper()))
883 # create mapping dict
884 # items ('IfcSlab', 'Roof'): Roof
885 else:
886 mapping[(ifc_type.lower(), token.upper())] = ele
888 # check ifc types without default
889 no_default = _all_ifc_types - set(default)
890 if no_default:
891 logger.warning("The following ifc types have no default "
892 "representing Element class. There will be no "
893 "match if predefined type is not provided.\n%s",
894 no_default)
895 return mapping, blacklist, default
898class SerializedElement:
899 """Serialized version of an element.
901 This is a workaround as we can't serialize elements due to the usage of
902 IfcOpenShell which uses unpickable swigPy objects. We just store the most
903 important information which are guid, element_type, storeys, aggregated
904 elements and the attributes from the attribute system."""
905 def __init__(self, element):
906 self.guid = element.guid
907 self.element_type = element.__class__.__name__
908 for attr_name, attr_val in element.attributes.items():
909 # assign value directly to attribute without status
910 # make sure to get the value
911 value = getattr(element, attr_name)
912 if self.is_picklable(value):
913 setattr(self, attr_name, value)
914 else:
915 try:
916 logger.info(
917 f"Attribute {attr_name} will not be serialized, as it's "
918 f"not pickleable, trying to add alternative "
919 f"information instead.")
920 if isinstance(value, (list, tuple)):
921 temp_list = []
922 for val in value:
923 if hasattr(val, 'guid'):
924 temp_list.append(val.guid)
925 setattr(self, attr_name, temp_list)
926 logger.info(f"Successfully linked a list of guids.")
927 elif isinstance(value, str):
928 setattr(self, attr_name, value)
929 logger.info(f"Successfully linked value as string.")
930 elif hasattr(value, 'guid'):
931 setattr(self, attr_name, value.guid)
932 logger.info(f"Successfully linked a single guid.")
933 elif hasattr(value, 'Coord'):
934 setattr(self, attr_name, value.Coord())
935 logger.info(f"Successfully linked a coordinate tuple.")
936 elif value is None:
937 setattr(self, attr_name, None)
938 logger.info(f"Successfully set attribute value to "
939 f"None.")
940 else:
941 logger.info("Linking alternative pickleable attributes "
942 "failed.")
943 except AttributeError:
944 logger.info(f"Linking attribute failed.")
945 for attr_name, attr_val in vars(element).items():
946 if hasattr(self, attr_name) or attr_name == 'attributes':
947 continue
948 else:
949 logger.info(f"Try to add attribute data for attribute "
950 f"'{attr_name}' that is not in AttributeManager.")
951 value = attr_val
952 if isinstance(value, (list, tuple)):
953 temp_list = []
954 for val in value:
955 if hasattr(val, 'guid'):
956 temp_list.append(val.guid)
957 setattr(self, attr_name, temp_list)
958 logger.info(
959 f"Successfully linked a list of guids.")
960 elif isinstance(value, str):
961 setattr(self, attr_name, value)
962 logger.info(
963 f"Successfully added {attr_name} as string.")
964 elif hasattr(value, 'guid'):
965 setattr(self, attr_name, value.guid)
966 logger.info(f"Successfully linked a single guid.")
967 elif hasattr(value, 'Coord'):
968 setattr(self, attr_name, value.Coord())
969 logger.info(f"Successfully linked a coordinate tuple.")
970 elif value is None:
971 setattr(self, attr_name, None)
972 logger.info(f"Successfully set attribute value to "
973 f"None.")
974 else:
975 logger.info("Linking alternative pickleable attributes "
976 "failed.")
977 if issubclass(element.__class__, AggregationMixin):
978 self.elements = [ele.guid for ele in element.elements]
980 @staticmethod
981 def is_picklable(value: Any) -> bool:
982 """Determines if a given value is picklable.
984 This method attempts to serialize the provided value using the `pickle` module.
985 If the value can be successfully serialized, it is considered picklable.
987 Args:
988 value (Any): The value to be tested for picklability.
990 Returns:
991 bool: True if the value is picklable, False otherwise.
992 """
993 try:
994 pickle.dumps(value)
995 return True
996 except (pickle.PicklingError, TypeError):
997 return False
999 def __repr__(self):
1000 return "<serialized %s (guid: '%s')>" % (
1001 self.element_type, self.guid)