Coverage for bim2sim/elements/base_elements.py: 77%
442 statements
« prev ^ index » next coverage.py v7.6.12, created at 2025-03-12 17:09 +0000
« prev ^ index » next coverage.py v7.6.12, created at 2025-03-12 17:09 +0000
1import logging
2import pickle
3import re
4from json import JSONEncoder
5from typing import Union, Iterable, Dict, List, Tuple, Type, Optional, Any
7import numpy as np
8import ifcopenshell.geom
9from ifcopenshell import guid
11from bim2sim.elements.aggregation import AggregationMixin
12from bim2sim.kernel.decision import Decision, DecisionBunch
13from bim2sim.kernel import IFCDomainError
14from bim2sim.elements.mapping import condition, attribute, ifc2python
15from bim2sim.elements.mapping.finder import TemplateFinder, SourceTool
16from bim2sim.elements.mapping.units import ureg
17from bim2sim.utilities.common_functions import angle_equivalent, vector_angle, \
18 remove_umlaut
19from bim2sim.utilities.pyocc_tools import PyOCCTools
20from bim2sim.utilities.types import IFCDomain, AttributeDataSource
22logger = logging.getLogger(__name__)
23quality_logger = logging.getLogger('bim2sim.QualityReport')
24settings_products = ifcopenshell.geom.main.settings()
25settings_products.set(settings_products.USE_PYTHON_OPENCASCADE, True)
28class ElementError(Exception):
29 """Error in Element"""
32class NoValueError(ElementError):
33 """Value is not available"""
36class ElementEncoder(JSONEncoder):
37 """Encoder class for Element"""
39 # TODO: make Elements serializable and deserializable.
40 # Ideas: guid to identify, (factory) method to (re)init by guid
41 # mayby weakref to other elements (Ports, connections, ...)
43 def default(self, o):
44 if isinstance(o, Element):
45 return "<Element(%s)>" % o.guid
46 return JSONEncoder.default()
49class Element(metaclass=attribute.AutoAttributeNameMeta):
50 """Most basic class"""
51 guid_prefix = ''
52 _id_counter = 0
54 def __init__(self, guid=None, **kwargs):
55 self.guid = guid or self.get_id(self.guid_prefix)
56 # self.related_decisions: List[Decision] = []
57 self.attributes = attribute.AttributeManager(bind=self)
59 # set attributes based on kwargs
60 for kw, arg in kwargs.items():
61 if kw in self.attributes: # allow only attributes
62 setattr(self, kw, arg)
63 else:
64 raise AttributeError(f"Unused argument in kwargs: {kw}: {arg}")
66 def __hash__(self):
67 return hash(self.guid)
69 def validate_creation(self) -> bool:
70 """Check if current instance is valid"""
71 raise NotImplementedError
73 def validate_attributes(self) -> dict:
74 """Check if attributes are valid"""
75 return {}
77 def _calc_position(self, name) -> np.array:
78 """Returns position (calculation may be expensive)"""
79 return None
81 position = attribute.Attribute(
82 description='Position of element',
83 functions=[_calc_position]
84 )
86 @staticmethod
87 def get_id(prefix=""):
88 prefix_length = len(prefix)
89 if prefix_length > 10:
90 raise AttributeError("Max prefix length is 10!")
91 Element._id_counter += 1
92 return "{0:0<8s}{1:0>14d}".format(prefix, Element._id_counter)
94 @staticmethod
95 def get_object(guid):
96 """Get Element object instance with given guid
98 :returns: None if object with guid was not instanciated"""
99 raise AssertionError("Obsolete method. "
100 "Don't rely on global Element.objects. "
101 "Use e.g. elements from tasks/playground.")
103 def request(self, name, external_decision: Decision = None) \
104 -> Union[None, Decision]:
105 """Request the elements attribute.
107 Args:
108 name: Name of attribute
109 external_decision: Decision to use instead of default decision
110 """
111 return self.attributes.request(name, external_decision)
113 def reset(self, name, data_source=AttributeDataSource.manual_overwrite):
114 """Reset the attribute of the element.
116 Args:
117 name: attribute name
118 data_source (object): data source of the attribute
119 """
121 return self.attributes.reset(name, data_source)
123 def source_info(self) -> str:
124 """Get informative string about source of Element."""
125 return ''
127 @classmethod
128 def get_pending_attribute_decisions(
129 cls, elements: Iterable['Element']) -> DecisionBunch:
130 """Get all requested decisions of attributes and functions of attributes
131 to afterwards calculate said attribute.
133 all decisions related to given elements are yielded.
134 all attributes functions are used to calculate the remaining attributes
135 """
137 all_attr_decisions = DecisionBunch()
138 for inst in elements:
139 bunch = inst.attributes.get_decisions()
140 all_attr_decisions.extend(bunch)
142 # sort decisions to preserve order
143 all_attr_decisions.sort(key=lambda d: d.global_key)
144 yield all_attr_decisions
146 @classmethod
147 def full_reset(cls):
148 raise AssertionError("Obsolete method. not required any more.")
151class IFCBased(Element):
152 """Element with instantiation from ifc and related methods.
154 Attributes:
155 ifc: IfcOpenShell element instance
156 ifc_types: Dict with ifc_type as key and list of predifined types that
157 fit to the class as values.
158 Special values for predifined types:
159 '*' all which are not overwritten in other classes predfined types.
160 '-Something' start with minus to exclude
162 For example:
163 {'IfcSlab': ['*', '-SomethingSpecialWeDontWant', 'BASESLAB']}
164 {'IfcRoof': ['FLAT_ROOF', 'SHED_ROOF',...],
165 'IfcSlab': ['ROOF']}"""
167 ifc_types: Dict[str, List[str]] = None
168 pattern_ifc_type = []
170 def __init__(self, *args,
171 ifc=None,
172 finder: TemplateFinder = None,
173 ifc_units: dict = None,
174 ifc_domain: IFCDomain = None,
175 **kwargs):
176 super().__init__(*args, **kwargs)
178 self.ifc = ifc
179 self.predefined_type = ifc2python.get_predefined_type(ifc)
180 self.ifc_domain = ifc_domain
181 self.finder = finder
182 self.ifc_units = ifc_units
183 self.source_tool: SourceTool = None
185 # TBD
186 self.enrichment = {} # TODO: DJA
187 self._propertysets = None
188 self._type_propertysets = None
189 self._decision_results = {}
191 @classmethod
192 def ifc2args(cls, ifc) -> Tuple[tuple, dict]:
193 """Extract init args and kwargs from ifc"""
194 guid = getattr(ifc, 'GlobalId', None)
195 kwargs = {'guid': guid, 'ifc': ifc}
196 return (), kwargs
198 @classmethod
199 def from_ifc(cls, ifc, *args, **kwargs):
200 """Factory method to create instance from ifc"""
201 ifc_args, ifc_kwargs = cls.ifc2args(ifc)
202 kwargs.update(ifc_kwargs)
203 return cls(*(args + ifc_args), **kwargs)
205 @property
206 def ifc_type(self):
207 if self.ifc:
208 return self.ifc.is_a()
210 @classmethod
211 def pre_validate(cls, ifc) -> bool:
212 """Check if ifc meets conditions to create element from it"""
213 raise NotImplementedError
215 def _calc_position(self, name):
216 """returns absolute position"""
217 if hasattr(self.ifc, 'ObjectPlacement'):
218 absolute = np.array(self.ifc.ObjectPlacement.RelativePlacement.Location.Coordinates)
219 placementrel = self.ifc.ObjectPlacement.PlacementRelTo
220 while placementrel is not None:
221 absolute += np.array(placementrel.RelativePlacement.Location.Coordinates)
222 placementrel = placementrel.PlacementRelTo
223 else:
224 absolute = None
226 return absolute
228 def _get_name_from_ifc(self, name):
229 ifc_name = self.get_ifc_attribute('Name')
230 if ifc_name:
231 return remove_umlaut(ifc_name)
233 name = attribute.Attribute(
234 description="Name of element based on IFC attribute.",
235 functions=[_get_name_from_ifc]
236 )
238 def get_ifc_attribute(self, attribute):
239 """
240 Fetches non-empty attributes (if they exist).
241 """
242 return getattr(self.ifc, attribute, None)
244 def get_propertyset(self, propertysetname):
245 return ifc2python.get_property_set_by_name(
246 propertysetname, self.ifc, self.ifc_units)
248 def get_propertysets(self):
249 if self._propertysets is None:
250 self._propertysets = ifc2python.get_property_sets(
251 self.ifc, self.ifc_units)
252 return self._propertysets
254 def get_type_propertysets(self):
255 if self._type_propertysets is None:
256 self._type_propertysets = ifc2python.get_type_property_sets(
257 self.ifc, self.ifc_units)
258 return self._type_propertysets
260 def get_hierarchical_parent(self):
261 return ifc2python.getHierarchicalParent(self.ifc)
263 def get_hierarchical_children(self):
264 return ifc2python.getHierarchicalChildren(self.ifc)
266 def get_spartial_parent(self):
267 return ifc2python.getSpatialParent(self.ifc)
269 def get_spartial_children(self):
270 return ifc2python.getSpatialChildren(self.ifc)
272 def get_space(self):
273 return ifc2python.getSpace(self.ifc)
275 def get_storey(self):
276 return ifc2python.getStorey(self.ifc)
278 def get_building(self):
279 return ifc2python.getBuilding(self.ifc)
281 def get_site(self):
282 return ifc2python.getSite(self.ifc)
284 def get_project(self):
285 return ifc2python.getProject(self.ifc)
287 def get_true_north(self):
288 return ifc2python.get_true_north(self.ifc)
290 def summary(self):
291 return ifc2python.summary(self.ifc)
293 def search_property_hierarchy(self, propertyset_name):
294 """Search for property in all related properties in hierarchical order.
296 1. element's propertysets
297 2. element type's propertysets"""
299 p_set = None
300 p_sets = self.get_propertysets()
301 try:
302 p_set = p_sets[propertyset_name]
303 except KeyError:
304 pass
305 else:
306 return p_set
308 pt_sets = self.get_type_propertysets()
309 try:
310 p_set = pt_sets[propertyset_name]
311 except KeyError:
312 pass
313 else:
314 return p_set
315 return p_set
317 def inverse_properties(self):
318 """Generator yielding tuples of PropertySet name and Property name"""
319 for p_set_name, p_set in self.get_propertysets().items():
320 for p_name in p_set.keys():
321 yield (p_set_name, p_name)
323 def filter_properties(self, patterns):
324 """filter all properties by re pattern
326 :returns: list of tuple(propertyset_name, property_name, match_graph)"""
327 matches = []
328 for propertyset_name, property_name in self.inverse_properties():
329 for pattern in patterns:
330 match = re.match(pattern, property_name)
331 if match:
332 matches.append((propertyset_name, property_name, match))
333 return matches
335 @classmethod
336 def filter_for_text_fragments(
337 cls, ifc_element, ifc_units: dict, optional_locations: list = None):
338 """Filter for text fragments in the ifc_element to identify the ifc_element."""
339 results = []
340 hits = [p.search(ifc_element.Name) for p in cls.pattern_ifc_type]
341 # hits.extend([p.search(ifc_element.Description or '') for p in cls.pattern_ifc_type])
342 hits = [x for x in hits if x is not None]
343 if any(hits):
344 quality_logger.info("Identified %s through text fracments in name. Criteria: %s", cls.ifc_type, hits)
345 results.append(hits[0][0])
346 # return hits[0][0]
347 if optional_locations:
348 for loc in optional_locations:
349 hits = [p.search(ifc2python.get_property_set_by_name(
350 loc, ifc_element, ifc_units) or '')
351 for p in cls.pattern_ifc_type
352 if ifc2python.get_property_set_by_name(
353 loc, ifc_element, ifc_units)]
354 hits = [x for x in hits if x is not None]
355 if any(hits):
356 quality_logger.info("Identified %s through text fracments in %s. Criteria: %s", cls.ifc_type, loc, hits)
357 results.append(hits[0][0])
358 return results if results else ''
360 def get_exact_property(self, propertyset_name: str, property_name: str):
361 """Returns value of property specified by propertyset name and property name
363 :Raises: AttributeError if property does not exist"""
364 try:
365 p_set = self.search_property_hierarchy(propertyset_name)
366 value = p_set[property_name]
367 except (AttributeError, KeyError, TypeError):
368 raise NoValueError("Property '%s.%s' does not exist" % (
369 propertyset_name, property_name))
370 return value
372 def select_from_potential_properties(self, patterns, name,
373 collect_decisions):
374 """Ask user to select from all properties matching patterns"""
376 matches = self.filter_properties(patterns)
377 if matches:
378 values = []
379 choices = []
380 for propertyset_name, property_name, match in matches:
381 value = self.get_exact_property(propertyset_name, property_name)
382 values.append(value)
383 choices.append((propertyset_name, property_name))
384 # print("%s.%s = %s"%(propertyset_name, property_name, value))
386 # TODO: Decision: save for all following elements of same class (
387 # dont ask again?)
388 # selected = (propertyset_name, property_name, value)
390 distinct_values = set(values)
391 if len(distinct_values) == 1:
392 # multiple sources but common value
393 return distinct_values.pop()
394 else:
395 quality_logger.warning('Found multiple values for attributes %s of instance %s' % (
396 ', '.join((str((m[0], m[1])) for m in matches)), self))
397 return distinct_values
399 return None
401 # # TODO: Decision with id, key, value
402 # decision = DictDecision("Multiple possibilities found",
403 # choices=dict(zip(choices, values)),
404 # output=self.attributes,
405 # key=name,
406 # global_key="%s_%s.%s" % (self.ifc_type,
407 # self.guid, name),
408 # allow_skip=True, allow_load=True,
409 # allow_save=True,
410 # collect=collect_decisions,
411 # quick_decide=not collect_decisions)
412 #
413 # if collect_decisions:
414 # raise PendingDecisionError()
415 #
416 # return decision.value
417 # raise NoValueError("No matching property for %s" % (patterns))
419 def source_info(self) -> str:
420 return f'{self.ifc_type}:{self.guid}'
423class RelationBased(IFCBased):
424 conditions = []
426 def __repr__(self):
427 return "<%s (guid: %s)>" % (self.__class__.__name__, self.guid)
429 def __str__(self):
430 return "%s" % self.__class__.__name__
433class RelationBased(IFCBased):
435 pass
438class ProductBased(IFCBased):
439 """Elements based on IFC products.
441 Args:
442 material: material of the element
443 material_set: dict of material and fraction [0, 1] if multiple materials
444 """
445 domain = 'GENERAL'
446 key: str = ''
447 key_map: Dict[str, 'Type[ProductBased]'] = {}
448 conditions = []
450 def __init__(self, *args, **kwargs):
451 super().__init__(*args, **kwargs)
452 self.aggregation = None
453 self.ports = self.get_ports()
454 self.material = None
455 self.material_set = {}
456 self.cost_group = self.calc_cost_group()
458 def __init_subclass__(cls, **kwargs):
459 # set key for each class
460 cls.key = f'{cls.domain}-{cls.__name__}'
461 cls.key_map[cls.key] = cls
463 def get_ports(self):
464 return []
466 def get_better_subclass(self) -> Union[None, Type['IFCBased']]:
467 """Returns alternative subclass of current object.
468 CAUTION: only use this if you can't know the result before instantiation
469 of base class
471 Returns:
472 object: subclass of ProductBased or None"""
473 return None
475 @property
476 def neighbors(self):
477 """Directly connected elements"""
478 neighbors = []
479 for port in self.ports:
480 if port.connection:
481 neighbors.append(port.connection.parent)
482 return neighbors
484 def validate_creation(self):
485 """"Validate the element creation in two steps.
486 1. Check if standard parameter are in valid range.
487 2. Check if number of ports are equal to number of expected ports
488 (only for HVAC).
489 """
490 for cond in self.conditions:
491 if cond.critical_for_creation:
492 value = getattr(self, cond.key)
493 # don't prevent creation if value is not existing
494 if value:
495 if not cond.check(self, value):
496 logger.warning("%s validation (%s) failed for %s", self.ifc_type, cond.name, self.guid)
497 return False
498 if not self.validate_ports():
499 logger.warning("%s has %d ports, but %s expected for %s", self.ifc_type, len(self.ports),
500 self.expected_hvac_ports, self.guid)
501 return False
502 return True
504 def validate_attributes(self) -> dict:
505 """Check if all attributes are valid, returns dict with key = attribute
506 and value = True or False"""
507 results = {}
508 for cond in self.conditions:
509 if not cond.critical_for_creation:
510 # todo
511 pass
512 # if not cond.check(self):
513 # logger.warning("%s validation (%s) failed for %s",
514 # self.ifc_type, cond.name, self.guid)
515 # return False
516 # return True
517 return results
519 def validate_ports(self):
520 return True
522 def __repr__(self):
523 return "<%s>" % self.__class__.__name__
525 def calc_cost_group(self) -> Optional[int]:
526 """Calculate the cost group according to DIN276"""
527 return None
529 def calc_volume_from_ifc_shape(self):
530 # todo use more efficient iterator to calc all shapes at once
531 # with multiple cores:
532 # https://wiki.osarch.org/index.php?title=IfcOpenShell_code_examples
533 if hasattr(self.ifc, 'Representation'):
534 try:
535 shape = ifcopenshell.geom.create_shape(
536 settings_products, self.ifc).geometry
537 vol = PyOCCTools.get_shape_volume(shape)
538 vol = vol * ureg.meter ** 3
539 return vol
540 except:
541 logger.warning(f"No calculation of geometric volume possible "
542 f"for {self.ifc}.")
544 def _get_volume(self, name):
545 if hasattr(self, "net_volume"):
546 if self.net_volume:
547 vol = self.net_volume
548 return vol
549 vol = self.calc_volume_from_ifc_shape()
550 return vol
552 volume = attribute.Attribute(
553 description="Volume of the attribute",
554 functions=[_get_volume],
555 )
557 def __str__(self):
558 return "<%s>" % (self.__class__.__name__)
561class Port(RelationBased):
562 """Basic port"""
564 def __init__(self, parent, *args, **kwargs):
565 super().__init__(*args, **kwargs)
566 self.parent: ProductBased = parent
567 self.connection = None
569 def connect(self, other):
570 """Connect this interface bidirectional to another interface"""
571 assert isinstance(other, Port), \
572 "Can't connect interfaces of different classes."
573 # if self.flow_direction == 'SOURCE' or \
574 # self.flow_direction == 'SOURCEANDSINK':
575 if self.connection and self.connection is not other:
576 raise AttributeError("Port is already connected!")
577 if other.connection and other.connection is not self:
578 raise AttributeError("Other port is already connected!")
579 self.connection = other
580 other.connection = self
582 def disconnect(self):
583 """remove connection between self and other port"""
584 other = self.connection
585 if other:
586 self.connection = None
587 other.disconnect()
589 def is_connected(self):
590 """Returns truth value of port's connection"""
591 return bool(self.connection)
593 def __repr__(self):
594 if self.parent:
595 try:
596 idx = self.parent.ports.index(self)
597 return "<%s #%d of %s)>" % (
598 self.__class__.__name__, idx, self.parent)
599 except ValueError:
600 return "<%s (broken parent: %s)>" % (
601 self.__class__.__name__, self.parent)
602 return "<%s (*abandoned*)>" % self.__class__.__name__
604 def __str__(self):
605 return self.__repr__()[1:-2]
608class Material(ProductBased):
609 guid_prefix = 'Material_'
610 key: str = 'Material'
611 ifc_types = {
612 'IfcMaterial': ["*"]
613 }
614 name = ''
616 def __init__(self, *args, **kwargs):
617 super().__init__(*args, **kwargs)
618 self.parents: List[ProductBased] = []
620 @staticmethod
621 def get_id(prefix=""):
622 prefix_length = len(prefix)
623 if prefix_length > 10:
624 raise AttributeError("Max prefix length is 10!")
625 ifcopenshell_guid = guid.new()[prefix_length + 1:]
626 return f"{prefix}{ifcopenshell_guid}"
628 conditions = [
629 condition.RangeCondition('spec_heat_capacity',
630 0 * ureg.kilojoule / (ureg.kg * ureg.K),
631 5 * ureg.kilojoule / (ureg.kg * ureg.K),
632 critical_for_creation=False),
633 condition.RangeCondition('density',
634 0 * ureg.kg / ureg.m ** 3,
635 50000 * ureg.kg / ureg.m ** 3,
636 critical_for_creation=False),
637 condition.RangeCondition('thermal_conduc',
638 0 * ureg.W / ureg.m / ureg.K,
639 100 * ureg.W / ureg.m / ureg.K,
640 critical_for_creation=False),
641 condition.RangeCondition('porosity',
642 0 * ureg.dimensionless,
643 1 * ureg.dimensionless, True,
644 critical_for_creation=False),
645 condition.RangeCondition('solar_absorp',
646 0 * ureg.percent,
647 1 * ureg.percent, True,
648 critical_for_creation=False),
649 ]
651 spec_heat_capacity = attribute.Attribute(
652 default_ps=("Pset_MaterialThermal", "SpecificHeatCapacity"),
653 # functions=[get_from_template],
654 unit=ureg.kilojoule / (ureg.kg * ureg.K)
655 )
657 density = attribute.Attribute(
658 default_ps=("Pset_MaterialCommon", "MassDensity"),
659 unit=ureg.kg / ureg.m ** 3
660 )
662 thermal_conduc = attribute.Attribute(
663 default_ps=("Pset_MaterialThermal", "ThermalConductivity"),
664 unit=ureg.W / (ureg.m * ureg.K)
665 )
667 porosity = attribute.Attribute(
668 default_ps=("Pset_MaterialCommon", "Porosity"),
669 unit=ureg.dimensionless
670 )
672 # todo is percent the correct unit? (0-1)
673 solar_absorp = attribute.Attribute(
674 # default_ps=('Pset_MaterialOptical', 'SolarTransmittance'),
675 default=0.7,
676 unit=ureg.percent
677 )
679 def __repr__(self):
680 if self.name:
681 return "<%s %s>" % (self.__class__.__name__, self.name)
682 else:
683 return "<%s>" % self.__class__.__name__
686class Dummy(ProductBased):
687 """Dummy for all unknown elements"""
689 ifc_types = {
690 "IfcElementProxy": ['*']
691 }
693 # def __init__(self, *args, **kwargs):
694 # super().__init__(*args, **kwargs)
695 #
696 # self._ifc_type = self.ifc.get_info()['type']
698 @property
699 def ifc_type(self):
700 return self._ifc_type
702 def __str__(self):
703 return "Dummy '%s'" % self.ifc_type
706class Factory:
707 """Element Factory for :class: `ProductBased`
709 To understand the concept of the factory class, we refer to this article:
710 https://refactoring.guru/design-patterns/factory-method/python/example
712 Example:
713 factory = Factory([Pipe, Boiler], dummy)
714 ele = factory(some_ifc_element)
715 """
717 def __init__(
718 self,
719 relevant_elements: set[ProductBased],
720 ifc_units: dict,
721 ifc_domain: IFCDomain,
722 finder: Union[TemplateFinder, None] = None,
723 dummy=Dummy):
724 self.mapping, self.blacklist, self.defaults = self.create_ifc_mapping(relevant_elements)
725 self.dummy_cls = dummy
726 self.ifc_domain = ifc_domain
727 self.finder = finder
728 self.ifc_units = ifc_units
730 def __call__(self, ifc_entity, *args, ifc_type: str = None, use_dummy=True,
731 **kwargs) -> ProductBased:
732 """Run factory to create element instance.
734 Calls self.create() function but before checks which element_cls is the
735 correct mapping for the given ifc_entity.
737 Args:
738 ifc_entity: IfcOpenShell entity
739 args: additional args passed to element
740 ifc_type: ify type to create element for.
741 defaults to ifc_entity.is_a()
742 use_dummy: use dummy class if nothing is found
743 kwargs: additional kwargs passed to element
744 Raises:
745 LookupError: if no element found and use_dummy = False
746 Returns:
747 element: created element instance
748 """
749 _ifc_type = ifc_type or ifc_entity.is_a()
750 predefined_type = ifc2python.get_predefined_type(ifc_entity)
751 element_cls = self.get_element(_ifc_type, predefined_type)
752 if not element_cls:
753 if use_dummy:
754 element_cls = self.dummy_cls
755 else:
756 raise LookupError(f"No element found for {ifc_entity}")
757 # TODO # 537 Put this to a point where it makes sense, return None is no
758 # solution
759 if hasattr(element_cls, 'from_ifc_domains'):
760 if self.ifc_domain not in element_cls.from_ifc_domains:
761 logger.warning(
762 f"Element has {self.ifc_domain} but f{element_cls.__name__}"
763 f" will only be created for IFC files of domain "
764 f"{element_cls.from_ifc_domains}.")
765 raise IFCDomainError(
766 f"Element has {self.ifc_domain} but f{element_cls.__name__}"
767 f" will only be created for IFC files of domain "
768 f"{element_cls.from_ifc_domains}")
770 element = self.create(element_cls, ifc_entity, *args, **kwargs)
771 return element
773 def create(self, element_cls, ifc_entity, *args, **kwargs):
774 """Create Element from class and ifc"""
775 # instantiate element
777 element = element_cls.from_ifc(
778 ifc_entity, ifc_domain=self.ifc_domain, finder=self.finder,
779 ifc_units=self.ifc_units, *args, **kwargs)
780 # check if it prefers to be sth else
781 better_cls = element.get_better_subclass()
782 if better_cls:
783 logger.info("Creating %s instead of %s", better_cls, element_cls)
784 element = better_cls.from_ifc(
785 ifc_entity,
786 ifc_domain=self.ifc_domain,
787 finder=self.finder,
788 ifc_units=self.ifc_units,
789 *args, **kwargs)
790 return element
792 def get_element(self, ifc_type: str, predefined_type: Union[str, None]) -> \
793 Union[ProductBased, None]:
794 """Get element class by ifc type and predefined type"""
795 if predefined_type:
796 key = (ifc_type.lower(), predefined_type.upper())
797 # 1. go over normal list, if found match_graph --> return
798 element = self.mapping.get(key)
799 if element:
800 return element
801 # 2. go over negative list, if found match_graph --> not existing
802 if key in self.blacklist:
803 return None
804 # 3. go over default list, if found match_graph --> return
805 return self.defaults.get(ifc_type.lower())
807 # def _get_by_guid(self, guid: str) -> Union[ProductBased, None]:
808 # """Get item from given guid created by this factory."""
809 # return self._objects.get(guid)
810 #
811 # def _get_by_cls(self, item_cls: Type[ProductBased]) -> List[ProductBased]:
812 # """Get list of child items from given class created by this factory."""
813 # return [item for item in self._objects.values()
814 # if isinstance(item, item_cls)]
816 @staticmethod
817 def create_ifc_mapping(elements: Iterable) -> Tuple[
818 Dict[Tuple[str, str], ProductBased],
819 List[Tuple[str, ProductBased]],
820 Dict[str, ProductBased]
821 ]:
822 """Create mapping dict, blacklist and default dict from elements
824 WARNING: ifc_type is always converted to lower case
825 and predefined types to upper case
827 Returns:
828 mapping: dict of ifc_type and predefined_type to element class
829 blacklist: list of ifc_type which will not be taken into account
830 default: dict of ifc_type to element class
831 """
832 # TODO: cover virtual elements e.g. Space Boundaries (not products)
834 mapping = {}
835 blacklist = []
836 default = {}
837 _all_ifc_types = set()
839 for ele in elements:
840 for ifc_type, tokens in ele.ifc_types.items():
841 _all_ifc_types.add(ifc_type.lower())
842 for token in tokens:
843 # create default dict where all stars are taken into account
844 # items 'IfcSlab': Slab
845 if token == '*':
846 if ifc_type in default:
847 raise NameError(f"Conflicting default ifc_types for {ifc_type}") # TBD
848 default[ifc_type.lower()] = ele
849 # create blacklist where all - are taken into account
850 # items: ('IfcRoof', 'WeiredStuff')
851 elif token.startswith('-'):
852 blacklist.append((ifc_type.lower(), token[1:].upper()))
853 # create mapping dict
854 # items ('IfcSlab', 'Roof'): Roof
855 else:
856 mapping[(ifc_type.lower(), token.upper())] = ele
858 # check ifc types without default
859 no_default = _all_ifc_types - set(default)
860 if no_default:
861 logger.warning("The following ifc types have no default "
862 "representing Element class. There will be no "
863 "match if predefined type is not provided.\n%s",
864 no_default)
865 return mapping, blacklist, default
868class SerializedElement:
869 """Serialized version of an element.
871 This is a workaround as we can't serialize elements due to the usage of
872 IfcOpenShell which uses unpickable swigPy objects. We just store the most
873 important information which are guid, element_type, storeys, aggregated
874 elements and the attributes from the attribute system."""
875 def __init__(self, element):
876 self.guid = element.guid
877 self.element_type = element.__class__.__name__
878 for attr_name, attr_val in element.attributes.items():
879 # assign value directly to attribute without status
880 # make sure to get the value
881 value = getattr(element, attr_name)
882 if self.is_picklable(value):
883 setattr(self, attr_name, value)
884 else:
885 logger.info(
886 f"Attribute {attr_name} will not be serialized, as it's "
887 f"not pickleable")
888 if hasattr(element, "space_boundaries"):
889 self.space_boundaries = [bound.guid for bound in
890 element.space_boundaries]
891 if hasattr(element, "storeys"):
892 self.storeys = [storey.guid for storey in element.storeys]
893 if issubclass(element.__class__, AggregationMixin):
894 self.elements = [ele.guid for ele in element.elements]
896 @staticmethod
897 def is_picklable(value: Any) -> bool:
898 """Determines if a given value is picklable.
900 This method attempts to serialize the provided value using the `pickle` module.
901 If the value can be successfully serialized, it is considered picklable.
903 Args:
904 value (Any): The value to be tested for picklability.
906 Returns:
907 bool: True if the value is picklable, False otherwise.
908 """
909 try:
910 pickle.dumps(value)
911 return True
912 except (pickle.PicklingError, TypeError):
913 return False
915 def __repr__(self):
916 return "<serialized %s (guid: '%s')>" % (
917 self.element_type, self.guid)