Coverage for bim2sim / elements / base_elements.py: 69%
499 statements
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-18 09:34 +0000
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-18 09:34 +0000
1import logging
2import pickle
3import re
4from json import JSONEncoder
5from typing import Union, Iterable, Dict, List, Tuple, Type, Optional, Any
7import numpy as np
8import ifcopenshell.geom
9from ifcopenshell import guid
11from bim2sim.elements.aggregation import AggregationMixin
12from bim2sim.kernel.decision import Decision, DecisionBunch
13from bim2sim.kernel import IFCDomainError
14from bim2sim.elements.mapping import condition, attribute, ifc2python
15from bim2sim.elements.mapping.finder import TemplateFinder, SourceTool
16from bim2sim.elements.mapping.units import ureg
17from bim2sim.utilities.common_functions import angle_equivalent, vector_angle, \
18 remove_umlaut
19from bim2sim.utilities.pyocc_tools import PyOCCTools
20from bim2sim.utilities.types import IFCDomain, AttributeDataSource
22logger = logging.getLogger(__name__)
23quality_logger = logging.getLogger('bim2sim.QualityReport')
24settings_products = ifcopenshell.geom.settings()
25settings_products.set(settings_products.USE_PYTHON_OPENCASCADE, True)
26settings_products.set(settings_products.USE_WORLD_COORDS, True)
27settings_products.set(settings_products.PRECISION, 1e-6)
30class ElementError(Exception):
31 """Error in Element"""
34class NoValueError(ElementError):
35 """Value is not available"""
38class ElementEncoder(JSONEncoder):
39 """Encoder class for Element"""
41 # TODO: make Elements serializable and deserializable.
42 # Ideas: guid to identify, (factory) method to (re)init by guid
43 # mayby weakref to other elements (Ports, connections, ...)
45 def default(self, o):
46 if isinstance(o, Element):
47 return "<Element(%s)>" % o.guid
48 return JSONEncoder.default()
51class Element(metaclass=attribute.AutoAttributeNameMeta):
52 """Most basic class"""
53 guid_prefix = ''
54 _id_counter = 0
56 def __init__(self, guid=None, **kwargs):
57 self.guid = guid or self.get_id(self.guid_prefix)
58 # self.related_decisions: List[Decision] = []
59 self.attributes = attribute.AttributeManager(bind=self)
60 self.element_type = self.__class__.__name__
62 # set attributes based on kwargs
63 for kw, arg in kwargs.items():
64 if kw in self.attributes: # allow only attributes
65 setattr(self, kw, arg)
66 else:
67 raise AttributeError(f"Unused argument in kwargs: {kw}: {arg}")
69 def __hash__(self):
70 return hash(self.guid)
72 def validate_creation(self) -> bool:
73 """Check if current instance is valid"""
74 raise NotImplementedError
76 def validate_attributes(self) -> dict:
77 """Check if attributes are valid"""
78 return {}
80 def _calc_position(self, name) -> np.array:
81 """Returns position (calculation may be expensive)"""
82 return None
84 position = attribute.Attribute(
85 description='Position of element',
86 functions=[_calc_position]
87 )
89 @staticmethod
90 def get_id(prefix=""):
91 prefix_length = len(prefix)
92 if prefix_length > 10:
93 raise AttributeError("Max prefix length is 10!")
94 Element._id_counter += 1
95 return "{0:0<8s}{1:0>14d}".format(prefix, Element._id_counter)
97 @staticmethod
98 def get_object(guid):
99 """Get Element object instance with given guid
101 :returns: None if object with guid was not instanciated"""
102 raise AssertionError("Obsolete method. "
103 "Don't rely on global Element.objects. "
104 "Use e.g. elements from tasks/playground.")
106 def request(self, name, external_decision: Decision = None) \
107 -> Union[None, Decision]:
108 """Request the elements attribute.
110 Args:
111 name: Name of attribute
112 external_decision: Decision to use instead of default decision
113 """
114 return self.attributes.request(name, external_decision)
116 def reset(self, name, data_source=AttributeDataSource.manual_overwrite):
117 """Reset the attribute of the element.
119 Args:
120 name: attribute name
121 data_source (object): data source of the attribute
122 """
124 return self.attributes.reset(name, data_source)
126 def source_info(self) -> str:
127 """Get informative string about source of Element."""
128 return ''
130 @classmethod
131 def get_pending_attribute_decisions(
132 cls, elements: Iterable['Element']) -> DecisionBunch:
133 """Get all requested decisions of attributes and functions of attributes
134 to afterwards calculate said attribute.
136 all decisions related to given elements are yielded.
137 all attributes functions are used to calculate the remaining attributes
138 """
140 all_attr_decisions = DecisionBunch()
141 for inst in elements:
142 bunch = inst.attributes.get_decisions()
143 all_attr_decisions.extend(bunch)
145 # sort decisions to preserve order
146 all_attr_decisions.sort(key=lambda d: d.global_key)
147 yield all_attr_decisions
149 @classmethod
150 def full_reset(cls):
151 raise AssertionError("Obsolete method. not required any more.")
154class IFCBased(Element):
155 """Element with instantiation from ifc and related methods.
157 Attributes:
158 ifc: IfcOpenShell element instance
159 ifc_types: Dict with ifc_type as key and list of predifined types that
160 fit to the class as values.
161 Special values for predifined types:
162 '*' all which are not overwritten in other classes predfined types.
163 '-Something' start with minus to exclude
165 For example:
166 >>> {'IfcSlab': ['*', '-SomethingSpecialWeDontWant', 'BASESLAB']}
167 >>> {'IfcRoof': ['FLAT_ROOF', 'SHED_ROOF',...],
168 >>> 'IfcSlab': ['ROOF']}"""
170 ifc_types: Dict[str, List[str]] = None
171 pattern_ifc_type = []
173 def __init__(self, *args,
174 ifc=None,
175 finder: TemplateFinder = None,
176 ifc_units: dict = None,
177 ifc_domain: IFCDomain = None,
178 **kwargs):
179 super().__init__(*args, **kwargs)
181 self.ifc = ifc
182 self.predefined_type = ifc2python.get_predefined_type(ifc)
183 self.ifc_domain = ifc_domain
184 self.finder = finder
185 self.ifc_units = ifc_units
186 self.source_tool: SourceTool = None
188 # TBD
189 self.enrichment = {} # TODO: DJA
190 self._propertysets = None
191 self._type_propertysets = None
192 self._decision_results = {}
194 @classmethod
195 def ifc2args(cls, ifc) -> Tuple[tuple, dict]:
196 """Extract init args and kwargs from ifc"""
197 guid = getattr(ifc, 'GlobalId', None)
198 kwargs = {'guid': guid, 'ifc': ifc}
199 return (), kwargs
201 @classmethod
202 def from_ifc(cls, ifc, *args, **kwargs):
203 """Factory method to create instance from ifc"""
204 ifc_args, ifc_kwargs = cls.ifc2args(ifc)
205 kwargs.update(ifc_kwargs)
206 return cls(*(args + ifc_args), **kwargs)
208 @property
209 def ifc_type(self):
210 if self.ifc:
211 return self.ifc.is_a()
213 @classmethod
214 def pre_validate(cls, ifc) -> bool:
215 """Check if ifc meets conditions to create element from it"""
216 raise NotImplementedError
218 def _calc_position(self, name):
219 """returns absolute position"""
220 if hasattr(self.ifc, 'ObjectPlacement'):
221 absolute = np.array(self.ifc.ObjectPlacement.RelativePlacement.Location.Coordinates)
222 placementrel = self.ifc.ObjectPlacement.PlacementRelTo
223 while placementrel is not None:
224 absolute += np.array(placementrel.RelativePlacement.Location.Coordinates)
225 placementrel = placementrel.PlacementRelTo
226 else:
227 absolute = None
229 return absolute
231 def _get_name_from_ifc(self, name):
232 ifc_name = self.get_ifc_attribute('Name')
233 if ifc_name:
234 return remove_umlaut(ifc_name)
236 name = attribute.Attribute(
237 description="Name of element based on IFC attribute.",
238 functions=[_get_name_from_ifc]
239 )
241 def get_ifc_attribute(self, attribute):
242 """
243 Fetches non-empty attributes (if they exist).
244 """
245 return getattr(self.ifc, attribute, None)
247 def get_propertyset(self, propertysetname):
248 return ifc2python.get_property_set_by_name(
249 propertysetname, self.ifc, self.ifc_units)
251 def get_propertysets(self):
252 if self._propertysets is None:
253 self._propertysets = ifc2python.get_property_sets(
254 self.ifc, self.ifc_units)
255 return self._propertysets
257 def get_type_propertysets(self):
258 if self._type_propertysets is None:
259 self._type_propertysets = ifc2python.get_type_property_sets(
260 self.ifc, self.ifc_units)
261 return self._type_propertysets
263 def get_hierarchical_parent(self):
264 return ifc2python.getHierarchicalParent(self.ifc)
266 def get_hierarchical_children(self):
267 return ifc2python.getHierarchicalChildren(self.ifc)
269 def get_spartial_parent(self):
270 return ifc2python.getSpatialParent(self.ifc)
272 def get_spartial_children(self):
273 return ifc2python.getSpatialChildren(self.ifc)
275 def get_space(self):
276 return ifc2python.getSpace(self.ifc)
278 def get_storey(self):
279 return ifc2python.getStorey(self.ifc)
281 def get_building(self):
282 return ifc2python.getBuilding(self.ifc)
284 def get_site(self):
285 return ifc2python.getSite(self.ifc)
287 def get_project(self):
288 return ifc2python.getProject(self.ifc)
290 def get_true_north(self):
291 return ifc2python.get_true_north(self.ifc)
293 def summary(self):
294 return ifc2python.summary(self.ifc)
296 def search_property_hierarchy(self, propertyset_name):
297 """Search for property in all related properties in hierarchical order.
299 1. element's propertysets
300 2. element type's propertysets"""
302 p_set = None
303 p_sets = self.get_propertysets()
304 try:
305 p_set = p_sets[propertyset_name]
306 except KeyError:
307 pass
308 else:
309 return p_set
311 pt_sets = self.get_type_propertysets()
312 try:
313 p_set = pt_sets[propertyset_name]
314 except KeyError:
315 pass
316 else:
317 return p_set
318 return p_set
320 def inverse_properties(self):
321 """Generator yielding tuples of PropertySet name and Property name"""
322 for p_set_name, p_set in self.get_propertysets().items():
323 for p_name in p_set.keys():
324 yield (p_set_name, p_name)
326 def filter_properties(self, patterns):
327 """filter all properties by re pattern
329 :returns: list of tuple(propertyset_name, property_name, match_graph)"""
330 matches = []
331 for propertyset_name, property_name in self.inverse_properties():
332 for pattern in patterns:
333 match = re.match(pattern, property_name)
334 if match:
335 matches.append((propertyset_name, property_name, match))
336 return matches
338 @classmethod
339 def filter_for_text_fragments(cls, ifc_element, ifc_units: dict,
340 optional_locations: list = None):
341 """Find text fragments that match the class patterns in an IFC element.
343 Args:
344 ifc_element: The IFC element to check.
345 ifc_units: Dictionary containing IFC unit information.
346 optional_locations: Additional locations to check patterns beyond
347 name. Defaults to None.
349 Returns:
350 list: List of matched fragments, empty list if no matches found.
351 """
352 results = []
354 # Check name matches
355 name_hits = [p.search(ifc_element.Name) for p in cls.pattern_ifc_type]
356 name_hits = [hit for hit in name_hits if hit is not None]
357 if name_hits:
358 quality_logger.info(
359 f"Identified {cls.ifc_type} through text fragments in name. "
360 f"Criteria: {name_hits}")
361 results.append(name_hits[0][0])
363 # Check optional locations
364 if optional_locations:
365 for loc in optional_locations:
366 prop_value = ifc2python.get_property_set_by_name(
367 loc, ifc_element, ifc_units)
368 if not prop_value:
369 continue
371 loc_hits = [p.search(prop_value) for p in cls.pattern_ifc_type]
372 loc_hits = [hit for hit in loc_hits if hit is not None]
373 if loc_hits:
374 quality_logger.info(
375 f"Identified {cls.ifc_type} through text fragments "
376 f"in {loc}. Criteria: {loc_hits}")
377 results.append(loc_hits[0][0])
379 return results
381 def get_exact_property(self, propertyset_name: str, property_name: str):
382 """Returns value of property specified by propertyset name and property name
384 :Raises: AttributeError if property does not exist"""
385 try:
386 p_set = self.search_property_hierarchy(propertyset_name)
387 value = p_set[property_name]
388 except (AttributeError, KeyError, TypeError):
389 raise NoValueError("Property '%s.%s' does not exist" % (
390 propertyset_name, property_name))
391 return value
393 def select_from_potential_properties(self, patterns, name,
394 collect_decisions):
395 """Ask user to select from all properties matching patterns"""
397 matches = self.filter_properties(patterns)
398 if matches:
399 values = []
400 choices = []
401 for propertyset_name, property_name, match in matches:
402 value = self.get_exact_property(propertyset_name, property_name)
403 values.append(value)
404 choices.append((propertyset_name, property_name))
405 # print("%s.%s = %s"%(propertyset_name, property_name, value))
407 # TODO: Decision: save for all following elements of same class (
408 # dont ask again?)
409 # selected = (propertyset_name, property_name, value)
411 distinct_values = set(values)
412 if len(distinct_values) == 1:
413 # multiple sources but common value
414 return distinct_values.pop()
415 else:
416 quality_logger.warning('Found multiple values for attributes %s of instance %s' % (
417 ', '.join((str((m[0], m[1])) for m in matches)), self))
418 return distinct_values
420 return None
422 # # TODO: Decision with id, key, value
423 # decision = DictDecision("Multiple possibilities found",
424 # choices=dict(zip(choices, values)),
425 # output=self.attributes,
426 # key=name,
427 # global_key="%s_%s.%s" % (self.ifc_type,
428 # self.guid, name),
429 # allow_skip=True, allow_load=True,
430 # allow_save=True,
431 # collect=collect_decisions,
432 # quick_decide=not collect_decisions)
433 #
434 # if collect_decisions:
435 # raise PendingDecisionError()
436 #
437 # return decision.value
438 # raise NoValueError("No matching property for %s" % (patterns))
440 def source_info(self) -> str:
441 return f'{self.ifc_type}:{self.guid}'
444class RelationBased(IFCBased):
445 conditions = []
447 def __repr__(self):
448 return "<%s (guid: %s)>" % (self.__class__.__name__, self.guid)
450 def __str__(self):
451 return "%s" % self.__class__.__name__
454class RelationBased(IFCBased):
456 pass
459class ProductBased(IFCBased):
460 """Elements based on IFC products.
462 Args:
463 material: material of the element
464 material_set: dict of material and fraction [0, 1] if multiple materials
465 """
466 domain = 'GENERAL'
467 key: str = ''
468 key_map: Dict[str, 'Type[ProductBased]'] = {}
469 conditions = []
471 def __init__(self, *args, **kwargs):
472 super().__init__(*args, **kwargs)
473 self.aggregation = None
474 self.ports = self.get_ports()
475 self.material = None
476 self.material_set = {}
477 self.cost_group = self.calc_cost_group()
479 def __init_subclass__(cls, **kwargs):
480 # set key for each class
481 cls.key = f'{cls.domain}-{cls.__name__}'
482 cls.key_map[cls.key] = cls
484 def get_ports(self):
485 return []
487 def get_better_subclass(self) -> Union[None, Type['IFCBased']]:
488 """Returns alternative subclass of current object.
489 CAUTION: only use this if you can't know the result before instantiation
490 of base class
492 Returns:
493 object: subclass of ProductBased or None"""
494 return None
496 @property
497 def neighbors(self):
498 """Directly connected elements"""
499 neighbors = []
500 for port in self.ports:
501 if port.connection:
502 neighbors.append(port.connection.parent)
503 return neighbors
505 def validate_creation(self):
506 """"Validate the element creation in two steps.
507 1. Check if standard parameter are in valid range.
508 2. Check if number of ports are equal to number of expected ports
509 (only for HVAC).
510 """
511 for cond in self.conditions:
512 if cond.critical_for_creation:
513 value = getattr(self, cond.key)
514 # don't prevent creation if value is not existing
515 if value:
516 if not cond.check(self, value):
517 logger.warning("%s validation (%s) failed for %s", self.ifc_type, cond.name, self.guid)
518 return False
519 if not self.validate_ports():
520 logger.warning("%s has %d ports, but %s expected for %s", self.ifc_type, len(self.ports),
521 self.expected_hvac_ports, self.guid)
522 return False
523 return True
525 def validate_attributes(self) -> dict:
526 """Check if all attributes are valid, returns dict with key = attribute
527 and value = True or False"""
528 results = {}
529 for cond in self.conditions:
530 if not cond.critical_for_creation:
531 # todo
532 pass
533 # if not cond.check(self):
534 # logger.warning("%s validation (%s) failed for %s",
535 # self.ifc_type, cond.name, self.guid)
536 # return False
537 # return True
538 return results
540 def validate_ports(self):
541 return True
543 def __repr__(self):
544 return "<%s>" % self.__class__.__name__
546 def calc_cost_group(self) -> Optional[int]:
547 """Calculate the cost group according to DIN276"""
548 return None
550 def calc_product_shape(self):
551 """Calculate the product shape based on IfcProduct representation."""
552 if hasattr(self.ifc, 'Representation'):
553 try:
554 shape = ifcopenshell.geom.create_shape(
555 settings_products, self.ifc).geometry
556 return shape
557 except:
558 logger.warning(f"No calculation of product shape possible "
559 f"for {self.ifc}.")
561 def calc_volume_from_ifc_shape(self):
562 # todo use more efficient iterator to calc all shapes at once
563 # with multiple cores:
564 # https://wiki.osarch.org/index.php?title=IfcOpenShell_code_examples
565 if hasattr(self.ifc, 'Representation'):
566 try:
567 shape = ifcopenshell.geom.create_shape(
568 settings_products, self.ifc).geometry
569 vol = PyOCCTools.get_shape_volume(shape)
570 vol = vol * ureg.meter ** 3
571 return vol
572 except:
573 logger.warning(f"No calculation of geometric volume possible "
574 f"for {self.ifc}.")
576 def _get_volume(self, name):
577 if hasattr(self, "net_volume"):
578 if self.net_volume:
579 vol = self.net_volume
580 return vol
581 vol = self.calc_volume_from_ifc_shape()
582 return vol
584 volume = attribute.Attribute(
585 description="Volume of the attribute",
586 functions=[_get_volume],
587 )
589 def __str__(self):
590 return "<%s>" % (self.__class__.__name__)
593class Port(RelationBased):
594 """Basic port"""
596 def __init__(self, parent, *args, **kwargs):
597 super().__init__(*args, **kwargs)
598 self.parent: ProductBased = parent
599 self.connection = None
601 def connect(self, other):
602 """Connect this interface bidirectional to another interface"""
603 assert isinstance(other, Port), \
604 "Can't connect interfaces of different classes."
605 # if self.flow_direction == 'SOURCE' or \
606 # self.flow_direction == 'SOURCEANDSINK':
607 if self.connection and self.connection is not other:
608 raise AttributeError("Port is already connected!")
609 if other.connection and other.connection is not self:
610 raise AttributeError("Other port is already connected!")
611 self.connection = other
612 other.connection = self
614 def disconnect(self):
615 """remove connection between self and other port"""
616 other = self.connection
617 if other:
618 self.connection = None
619 other.disconnect()
621 def is_connected(self):
622 """Returns truth value of port's connection"""
623 return bool(self.connection)
625 def __repr__(self):
626 if self.parent:
627 try:
628 idx = self.parent.ports.index(self)
629 return "<%s #%d of %s)>" % (
630 self.__class__.__name__, idx, self.parent)
631 except ValueError:
632 return "<%s (broken parent: %s)>" % (
633 self.__class__.__name__, self.parent)
634 return "<%s (*abandoned*)>" % self.__class__.__name__
636 def __str__(self):
637 return self.__repr__()[1:-2]
640class Material(ProductBased):
641 guid_prefix = 'Material_'
642 key: str = 'Material'
643 ifc_types = {
644 'IfcMaterial': ["*"]
645 }
646 name = ''
648 def __init__(self, *args, **kwargs):
649 super().__init__(*args, **kwargs)
650 self.parents: List[ProductBased] = []
652 @staticmethod
653 def get_id(prefix=""):
654 prefix_length = len(prefix)
655 if prefix_length > 10:
656 raise AttributeError("Max prefix length is 10!")
657 ifcopenshell_guid = guid.new()[prefix_length + 1:]
658 return f"{prefix}{ifcopenshell_guid}"
660 conditions = [
661 condition.RangeCondition('spec_heat_capacity',
662 0 * ureg.kilojoule / (ureg.kg * ureg.K),
663 5 * ureg.kilojoule / (ureg.kg * ureg.K),
664 critical_for_creation=False),
665 condition.RangeCondition('density',
666 0 * ureg.kg / ureg.m ** 3,
667 50000 * ureg.kg / ureg.m ** 3,
668 critical_for_creation=False),
669 condition.RangeCondition('thermal_conduc',
670 0 * ureg.W / ureg.m / ureg.K,
671 100 * ureg.W / ureg.m / ureg.K,
672 critical_for_creation=False),
673 condition.RangeCondition('porosity',
674 0 * ureg.dimensionless,
675 1 * ureg.dimensionless, True,
676 critical_for_creation=False),
677 condition.RangeCondition('solar_absorp',
678 0 * ureg.percent,
679 1 * ureg.percent, True,
680 critical_for_creation=False),
681 ]
683 spec_heat_capacity = attribute.Attribute(
684 default_ps=("Pset_MaterialThermal", "SpecificHeatCapacity"),
685 # functions=[get_from_template],
686 unit=ureg.kilojoule / (ureg.kg * ureg.K)
687 )
689 density = attribute.Attribute(
690 default_ps=("Pset_MaterialCommon", "MassDensity"),
691 unit=ureg.kg / ureg.m ** 3
692 )
694 thermal_conduc = attribute.Attribute(
695 default_ps=("Pset_MaterialThermal", "ThermalConductivity"),
696 unit=ureg.W / (ureg.m * ureg.K)
697 )
699 porosity = attribute.Attribute(
700 default_ps=("Pset_MaterialCommon", "Porosity"),
701 unit=ureg.dimensionless
702 )
704 # todo is percent the correct unit? (0-1)
705 solar_absorp = attribute.Attribute(
706 # default_ps=('Pset_MaterialOptical', 'SolarTransmittance'),
707 default=0.7,
708 unit=ureg.percent
709 )
711 def __repr__(self):
712 if self.name:
713 return "<%s %s>" % (self.__class__.__name__, self.name)
714 else:
715 return "<%s>" % self.__class__.__name__
718class Dummy(ProductBased):
719 """Dummy for all unknown elements"""
721 ifc_types = {
722 "IfcElementProxy": ['*']
723 }
725 # def __init__(self, *args, **kwargs):
726 # super().__init__(*args, **kwargs)
727 #
728 # self._ifc_type = self.ifc.get_info()['type']
730 @property
731 def ifc_type(self):
732 return self._ifc_type
734 def __str__(self):
735 return "Dummy '%s'" % self.ifc_type
738class Factory:
739 """Element Factory for :class: `ProductBased`
741 To understand the concept of the factory class, we refer to this article:
742 https://refactoring.guru/design-patterns/factory-method/python/example
744 Example:
745 >>> factory = Factory([Pipe, Boiler], dummy)
746 >>> ele = factory(some_ifc_element)
747 """
749 def __init__(
750 self,
751 relevant_elements: set[ProductBased],
752 ifc_units: dict,
753 ifc_domain: IFCDomain,
754 finder: Union[TemplateFinder, None] = None,
755 dummy=Dummy):
756 self.mapping, self.blacklist, self.defaults = self.create_ifc_mapping(relevant_elements)
757 self.dummy_cls = dummy
758 self.ifc_domain = ifc_domain
759 self.finder = finder
760 self.ifc_units = ifc_units
762 def __call__(self, ifc_entity, *args, ifc_type: str = None, use_dummy=True,
763 **kwargs) -> ProductBased:
764 """Run factory to create element instance.
766 Calls self.create() function but before checks which element_cls is the
767 correct mapping for the given ifc_entity.
769 Args:
770 ifc_entity: IfcOpenShell entity
771 args: additional args passed to element
772 ifc_type: ify type to create element for.
773 defaults to ifc_entity.is_a()
774 use_dummy: use dummy class if nothing is found
775 kwargs: additional kwargs passed to element
776 Raises:
777 LookupError: if no element found and use_dummy = False
778 Returns:
779 element: created element instance
780 """
781 _ifc_type = ifc_type or ifc_entity.is_a()
782 predefined_type = ifc2python.get_predefined_type(ifc_entity)
783 element_cls = self.get_element(_ifc_type, predefined_type)
784 if not element_cls:
785 if use_dummy:
786 element_cls = self.dummy_cls
787 else:
788 raise LookupError(f"No element found for {ifc_entity}")
789 # TODO # 537 Put this to a point where it makes sense, return None is no
790 # solution
791 if hasattr(element_cls, 'from_ifc_domains'):
792 if self.ifc_domain not in element_cls.from_ifc_domains:
793 logger.warning(
794 f"Element has {self.ifc_domain} but f{element_cls.__name__}"
795 f" will only be created for IFC files of domain "
796 f"{element_cls.from_ifc_domains}.")
797 raise IFCDomainError(
798 f"Element has {self.ifc_domain} but f{element_cls.__name__}"
799 f" will only be created for IFC files of domain "
800 f"{element_cls.from_ifc_domains}")
802 element = self.create(element_cls, ifc_entity, *args, **kwargs)
803 return element
805 def create(self, element_cls, ifc_entity, *args, **kwargs):
806 """Create Element from class and ifc"""
807 # instantiate element
809 element = element_cls.from_ifc(
810 ifc_entity, ifc_domain=self.ifc_domain, finder=self.finder,
811 ifc_units=self.ifc_units, *args, **kwargs)
812 # check if it prefers to be sth else
813 better_cls = element.get_better_subclass()
814 if better_cls:
815 logger.info("Creating %s instead of %s", better_cls, element_cls)
816 element = better_cls.from_ifc(
817 ifc_entity,
818 ifc_domain=self.ifc_domain,
819 finder=self.finder,
820 ifc_units=self.ifc_units,
821 *args, **kwargs)
822 return element
824 def get_element(self, ifc_type: str, predefined_type: Union[str, None]) -> \
825 Union[ProductBased, None]:
826 """Get element class by ifc type and predefined type"""
827 if predefined_type:
828 key = (ifc_type.lower(), predefined_type.upper())
829 # 1. go over normal list, if found match_graph --> return
830 element = self.mapping.get(key)
831 if element:
832 return element
833 # 2. go over negative list, if found match_graph --> not existing
834 if key in self.blacklist:
835 return None
836 # 3. go over default list, if found match_graph --> return
837 return self.defaults.get(ifc_type.lower())
839 # def _get_by_guid(self, guid: str) -> Union[ProductBased, None]:
840 # """Get item from given guid created by this factory."""
841 # return self._objects.get(guid)
842 #
843 # def _get_by_cls(self, item_cls: Type[ProductBased]) -> List[ProductBased]:
844 # """Get list of child items from given class created by this factory."""
845 # return [item for item in self._objects.values()
846 # if isinstance(item, item_cls)]
848 @staticmethod
849 def create_ifc_mapping(elements: Iterable) -> Tuple[
850 Dict[Tuple[str, str], ProductBased],
851 List[Tuple[str, ProductBased]],
852 Dict[str, ProductBased]
853 ]:
854 """Create mapping dict, blacklist and default dict from elements
856 WARNING: ifc_type is always converted to lower case
857 and predefined types to upper case
859 Returns:
860 mapping: dict of ifc_type and predefined_type to element class
861 blacklist: list of ifc_type which will not be taken into account
862 default: dict of ifc_type to element class
863 """
864 # TODO: cover virtual elements e.g. Space Boundaries (not products)
866 mapping = {}
867 blacklist = []
868 default = {}
869 _all_ifc_types = set()
871 for ele in elements:
872 for ifc_type, tokens in ele.ifc_types.items():
873 _all_ifc_types.add(ifc_type.lower())
874 for token in tokens:
875 # create default dict where all stars are taken into account
876 # items 'IfcSlab': Slab
877 if token == '*':
878 if ifc_type in default:
879 raise NameError(f"Conflicting default ifc_types for {ifc_type}") # TBD
880 default[ifc_type.lower()] = ele
881 # create blacklist where all - are taken into account
882 # items: ('IfcRoof', 'WeiredStuff')
883 elif token.startswith('-'):
884 blacklist.append((ifc_type.lower(), token[1:].upper()))
885 # create mapping dict
886 # items ('IfcSlab', 'Roof'): Roof
887 else:
888 mapping[(ifc_type.lower(), token.upper())] = ele
890 # check ifc types without default
891 no_default = _all_ifc_types - set(default)
892 if no_default:
893 logger.warning("The following ifc types have no default "
894 "representing Element class. There will be no "
895 "match if predefined type is not provided.\n%s",
896 no_default)
897 return mapping, blacklist, default
900class SerializedElement:
901 """Serialized version of an element.
903 This is a workaround as we can't serialize elements due to the usage of
904 IfcOpenShell which uses unpickable swigPy objects. We just store the most
905 important information which are guid, element_type, storeys, aggregated
906 elements and the attributes from the attribute system."""
907 def __init__(self, element):
908 self.guid = element.guid
909 self.element_type = element.__class__.__name__
910 for attr_name, attr_val in element.attributes.items():
911 # assign value directly to attribute without status
912 # make sure to get the value
913 value = getattr(element, attr_name)
914 if self.is_picklable(value):
915 setattr(self, attr_name, value)
916 else:
917 try:
918 logger.info(
919 f"Attribute {attr_name} will not be serialized, as it's "
920 f"not pickleable, trying to add alternative "
921 f"information instead.")
922 if isinstance(value, (list, tuple)):
923 temp_list = []
924 for val in value:
925 if hasattr(val, 'guid'):
926 temp_list.append(val.guid)
927 setattr(self, attr_name, temp_list)
928 logger.info(f"Successfully linked a list of guids.")
929 elif isinstance(value, str):
930 setattr(self, attr_name, value)
931 logger.info(f"Successfully linked value as string.")
932 elif hasattr(value, 'guid'):
933 setattr(self, attr_name, value.guid)
934 logger.info(f"Successfully linked a single guid.")
935 elif hasattr(value, 'Coord'):
936 setattr(self, attr_name, value.Coord())
937 logger.info(f"Successfully linked a coordinate tuple.")
938 elif value is None:
939 setattr(self, attr_name, None)
940 logger.info(f"Successfully set attribute value to "
941 f"None.")
942 else:
943 logger.info("Linking alternative pickleable attributes "
944 "failed.")
945 except AttributeError:
946 logger.info(f"Linking attribute failed.")
947 for attr_name, attr_val in vars(element).items():
948 if hasattr(self, attr_name) or attr_name == 'attributes':
949 continue
950 else:
951 logger.info(f"Try to add attribute data for attribute "
952 f"'{attr_name}' that is not in AttributeManager.")
953 value = attr_val
954 if isinstance(value, (list, tuple)):
955 temp_list = []
956 for val in value:
957 if hasattr(val, 'guid'):
958 temp_list.append(val.guid)
959 setattr(self, attr_name, temp_list)
960 logger.info(
961 f"Successfully linked a list of guids.")
962 elif isinstance(value, str):
963 setattr(self, attr_name, value)
964 logger.info(
965 f"Successfully added {attr_name} as string.")
966 elif hasattr(value, 'guid'):
967 setattr(self, attr_name, value.guid)
968 logger.info(f"Successfully linked a single guid.")
969 elif hasattr(value, 'Coord'):
970 setattr(self, attr_name, value.Coord())
971 logger.info(f"Successfully linked a coordinate tuple.")
972 elif value is None:
973 setattr(self, attr_name, None)
974 logger.info(f"Successfully set attribute value to "
975 f"None.")
976 else:
977 logger.info("Linking alternative pickleable attributes "
978 "failed.")
979 if issubclass(element.__class__, AggregationMixin):
980 self.elements = [ele.guid for ele in element.elements]
982 @staticmethod
983 def is_picklable(value: Any) -> bool:
984 """Determines if a given value is picklable.
986 This method attempts to serialize the provided value using the `pickle` module.
987 If the value can be successfully serialized, it is considered picklable.
989 Args:
990 value (Any): The value to be tested for picklability.
992 Returns:
993 bool: True if the value is picklable, False otherwise.
994 """
995 try:
996 pickle.dumps(value)
997 return True
998 except (pickle.PicklingError, TypeError):
999 return False
1001 def __repr__(self):
1002 return "<serialized %s (guid: '%s')>" % (
1003 self.element_type, self.guid)