Coverage for bim2sim/elements/base_elements.py: 76%
445 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-05-16 08:28 +0000
« prev ^ index » next coverage.py v7.8.0, created at 2025-05-16 08:28 +0000
1import logging
2import pickle
3import re
4from json import JSONEncoder
5from typing import Union, Iterable, Dict, List, Tuple, Type, Optional, Any
7import numpy as np
8import ifcopenshell.geom
9from ifcopenshell import guid
11from bim2sim.elements.aggregation import AggregationMixin
12from bim2sim.kernel.decision import Decision, DecisionBunch
13from bim2sim.kernel import IFCDomainError
14from bim2sim.elements.mapping import condition, attribute, ifc2python
15from bim2sim.elements.mapping.finder import TemplateFinder, SourceTool
16from bim2sim.elements.mapping.units import ureg
17from bim2sim.utilities.common_functions import angle_equivalent, vector_angle, \
18 remove_umlaut
19from bim2sim.utilities.pyocc_tools import PyOCCTools
20from bim2sim.utilities.types import IFCDomain, AttributeDataSource
22logger = logging.getLogger(__name__)
23quality_logger = logging.getLogger('bim2sim.QualityReport')
24settings_products = ifcopenshell.geom.main.settings()
25settings_products.set(settings_products.USE_PYTHON_OPENCASCADE, True)
28class ElementError(Exception):
29 """Error in Element"""
32class NoValueError(ElementError):
33 """Value is not available"""
36class ElementEncoder(JSONEncoder):
37 """Encoder class for Element"""
39 # TODO: make Elements serializable and deserializable.
40 # Ideas: guid to identify, (factory) method to (re)init by guid
41 # mayby weakref to other elements (Ports, connections, ...)
43 def default(self, o):
44 if isinstance(o, Element):
45 return "<Element(%s)>" % o.guid
46 return JSONEncoder.default()
49class Element(metaclass=attribute.AutoAttributeNameMeta):
50 """Most basic class"""
51 guid_prefix = ''
52 _id_counter = 0
54 def __init__(self, guid=None, **kwargs):
55 self.guid = guid or self.get_id(self.guid_prefix)
56 # self.related_decisions: List[Decision] = []
57 self.attributes = attribute.AttributeManager(bind=self)
59 # set attributes based on kwargs
60 for kw, arg in kwargs.items():
61 if kw in self.attributes: # allow only attributes
62 setattr(self, kw, arg)
63 else:
64 raise AttributeError(f"Unused argument in kwargs: {kw}: {arg}")
66 def __hash__(self):
67 return hash(self.guid)
69 def validate_creation(self) -> bool:
70 """Check if current instance is valid"""
71 raise NotImplementedError
73 def validate_attributes(self) -> dict:
74 """Check if attributes are valid"""
75 return {}
77 def _calc_position(self, name) -> np.array:
78 """Returns position (calculation may be expensive)"""
79 return None
81 position = attribute.Attribute(
82 description='Position of element',
83 functions=[_calc_position]
84 )
86 @staticmethod
87 def get_id(prefix=""):
88 prefix_length = len(prefix)
89 if prefix_length > 10:
90 raise AttributeError("Max prefix length is 10!")
91 Element._id_counter += 1
92 return "{0:0<8s}{1:0>14d}".format(prefix, Element._id_counter)
94 @staticmethod
95 def get_object(guid):
96 """Get Element object instance with given guid
98 :returns: None if object with guid was not instanciated"""
99 raise AssertionError("Obsolete method. "
100 "Don't rely on global Element.objects. "
101 "Use e.g. elements from tasks/playground.")
103 def request(self, name, external_decision: Decision = None) \
104 -> Union[None, Decision]:
105 """Request the elements attribute.
107 Args:
108 name: Name of attribute
109 external_decision: Decision to use instead of default decision
110 """
111 return self.attributes.request(name, external_decision)
113 def reset(self, name, data_source=AttributeDataSource.manual_overwrite):
114 """Reset the attribute of the element.
116 Args:
117 name: attribute name
118 data_source (object): data source of the attribute
119 """
121 return self.attributes.reset(name, data_source)
123 def source_info(self) -> str:
124 """Get informative string about source of Element."""
125 return ''
127 @classmethod
128 def get_pending_attribute_decisions(
129 cls, elements: Iterable['Element']) -> DecisionBunch:
130 """Get all requested decisions of attributes and functions of attributes
131 to afterwards calculate said attribute.
133 all decisions related to given elements are yielded.
134 all attributes functions are used to calculate the remaining attributes
135 """
137 all_attr_decisions = DecisionBunch()
138 for inst in elements:
139 bunch = inst.attributes.get_decisions()
140 all_attr_decisions.extend(bunch)
142 # sort decisions to preserve order
143 all_attr_decisions.sort(key=lambda d: d.global_key)
144 yield all_attr_decisions
146 @classmethod
147 def full_reset(cls):
148 raise AssertionError("Obsolete method. not required any more.")
151class IFCBased(Element):
152 """Element with instantiation from ifc and related methods.
154 Attributes:
155 ifc: IfcOpenShell element instance
156 ifc_types: Dict with ifc_type as key and list of predifined types that
157 fit to the class as values.
158 Special values for predifined types:
159 '*' all which are not overwritten in other classes predfined types.
160 '-Something' start with minus to exclude
162 For example:
163 {'IfcSlab': ['*', '-SomethingSpecialWeDontWant', 'BASESLAB']}
164 {'IfcRoof': ['FLAT_ROOF', 'SHED_ROOF',...],
165 'IfcSlab': ['ROOF']}"""
167 ifc_types: Dict[str, List[str]] = None
168 pattern_ifc_type = []
170 def __init__(self, *args,
171 ifc=None,
172 finder: TemplateFinder = None,
173 ifc_units: dict = None,
174 ifc_domain: IFCDomain = None,
175 **kwargs):
176 super().__init__(*args, **kwargs)
178 self.ifc = ifc
179 self.predefined_type = ifc2python.get_predefined_type(ifc)
180 self.ifc_domain = ifc_domain
181 self.finder = finder
182 self.ifc_units = ifc_units
183 self.source_tool: SourceTool = None
185 # TBD
186 self.enrichment = {} # TODO: DJA
187 self._propertysets = None
188 self._type_propertysets = None
189 self._decision_results = {}
191 @classmethod
192 def ifc2args(cls, ifc) -> Tuple[tuple, dict]:
193 """Extract init args and kwargs from ifc"""
194 guid = getattr(ifc, 'GlobalId', None)
195 kwargs = {'guid': guid, 'ifc': ifc}
196 return (), kwargs
198 @classmethod
199 def from_ifc(cls, ifc, *args, **kwargs):
200 """Factory method to create instance from ifc"""
201 ifc_args, ifc_kwargs = cls.ifc2args(ifc)
202 kwargs.update(ifc_kwargs)
203 return cls(*(args + ifc_args), **kwargs)
205 @property
206 def ifc_type(self):
207 if self.ifc:
208 return self.ifc.is_a()
210 @classmethod
211 def pre_validate(cls, ifc) -> bool:
212 """Check if ifc meets conditions to create element from it"""
213 raise NotImplementedError
215 def _calc_position(self, name):
216 """returns absolute position"""
217 if hasattr(self.ifc, 'ObjectPlacement'):
218 absolute = np.array(self.ifc.ObjectPlacement.RelativePlacement.Location.Coordinates)
219 placementrel = self.ifc.ObjectPlacement.PlacementRelTo
220 while placementrel is not None:
221 absolute += np.array(placementrel.RelativePlacement.Location.Coordinates)
222 placementrel = placementrel.PlacementRelTo
223 else:
224 absolute = None
226 return absolute
228 def _get_name_from_ifc(self, name):
229 ifc_name = self.get_ifc_attribute('Name')
230 if ifc_name:
231 return remove_umlaut(ifc_name)
233 name = attribute.Attribute(
234 description="Name of element based on IFC attribute.",
235 functions=[_get_name_from_ifc]
236 )
238 def get_ifc_attribute(self, attribute):
239 """
240 Fetches non-empty attributes (if they exist).
241 """
242 return getattr(self.ifc, attribute, None)
244 def get_propertyset(self, propertysetname):
245 return ifc2python.get_property_set_by_name(
246 propertysetname, self.ifc, self.ifc_units)
248 def get_propertysets(self):
249 if self._propertysets is None:
250 self._propertysets = ifc2python.get_property_sets(
251 self.ifc, self.ifc_units)
252 return self._propertysets
254 def get_type_propertysets(self):
255 if self._type_propertysets is None:
256 self._type_propertysets = ifc2python.get_type_property_sets(
257 self.ifc, self.ifc_units)
258 return self._type_propertysets
260 def get_hierarchical_parent(self):
261 return ifc2python.getHierarchicalParent(self.ifc)
263 def get_hierarchical_children(self):
264 return ifc2python.getHierarchicalChildren(self.ifc)
266 def get_spartial_parent(self):
267 return ifc2python.getSpatialParent(self.ifc)
269 def get_spartial_children(self):
270 return ifc2python.getSpatialChildren(self.ifc)
272 def get_space(self):
273 return ifc2python.getSpace(self.ifc)
275 def get_storey(self):
276 return ifc2python.getStorey(self.ifc)
278 def get_building(self):
279 return ifc2python.getBuilding(self.ifc)
281 def get_site(self):
282 return ifc2python.getSite(self.ifc)
284 def get_project(self):
285 return ifc2python.getProject(self.ifc)
287 def get_true_north(self):
288 return ifc2python.get_true_north(self.ifc)
290 def summary(self):
291 return ifc2python.summary(self.ifc)
293 def search_property_hierarchy(self, propertyset_name):
294 """Search for property in all related properties in hierarchical order.
296 1. element's propertysets
297 2. element type's propertysets"""
299 p_set = None
300 p_sets = self.get_propertysets()
301 try:
302 p_set = p_sets[propertyset_name]
303 except KeyError:
304 pass
305 else:
306 return p_set
308 pt_sets = self.get_type_propertysets()
309 try:
310 p_set = pt_sets[propertyset_name]
311 except KeyError:
312 pass
313 else:
314 return p_set
315 return p_set
317 def inverse_properties(self):
318 """Generator yielding tuples of PropertySet name and Property name"""
319 for p_set_name, p_set in self.get_propertysets().items():
320 for p_name in p_set.keys():
321 yield (p_set_name, p_name)
323 def filter_properties(self, patterns):
324 """filter all properties by re pattern
326 :returns: list of tuple(propertyset_name, property_name, match_graph)"""
327 matches = []
328 for propertyset_name, property_name in self.inverse_properties():
329 for pattern in patterns:
330 match = re.match(pattern, property_name)
331 if match:
332 matches.append((propertyset_name, property_name, match))
333 return matches
335 @classmethod
336 def filter_for_text_fragments(cls, ifc_element, ifc_units: dict,
337 optional_locations: list = None):
338 """Find text fragments that match the class patterns in an IFC element.
340 Args:
341 ifc_element: The IFC element to check.
342 ifc_units: Dictionary containing IFC unit information.
343 optional_locations: Additional locations to check patterns beyond
344 name. Defaults to None.
346 Returns:
347 list: List of matched fragments, empty list if no matches found.
348 """
349 results = []
351 # Check name matches
352 name_hits = [p.search(ifc_element.Name) for p in cls.pattern_ifc_type]
353 name_hits = [hit for hit in name_hits if hit is not None]
354 if name_hits:
355 quality_logger.info(
356 f"Identified {cls.ifc_type} through text fragments in name. "
357 f"Criteria: {name_hits}")
358 results.append(name_hits[0][0])
360 # Check optional locations
361 if optional_locations:
362 for loc in optional_locations:
363 prop_value = ifc2python.get_property_set_by_name(
364 loc, ifc_element, ifc_units)
365 if not prop_value:
366 continue
368 loc_hits = [p.search(prop_value) for p in cls.pattern_ifc_type]
369 loc_hits = [hit for hit in loc_hits if hit is not None]
370 if loc_hits:
371 quality_logger.info(
372 f"Identified {cls.ifc_type} through text fragments "
373 f"in {loc}. Criteria: {loc_hits}")
374 results.append(loc_hits[0][0])
376 return results
378 def get_exact_property(self, propertyset_name: str, property_name: str):
379 """Returns value of property specified by propertyset name and property name
381 :Raises: AttributeError if property does not exist"""
382 try:
383 p_set = self.search_property_hierarchy(propertyset_name)
384 value = p_set[property_name]
385 except (AttributeError, KeyError, TypeError):
386 raise NoValueError("Property '%s.%s' does not exist" % (
387 propertyset_name, property_name))
388 return value
390 def select_from_potential_properties(self, patterns, name,
391 collect_decisions):
392 """Ask user to select from all properties matching patterns"""
394 matches = self.filter_properties(patterns)
395 if matches:
396 values = []
397 choices = []
398 for propertyset_name, property_name, match in matches:
399 value = self.get_exact_property(propertyset_name, property_name)
400 values.append(value)
401 choices.append((propertyset_name, property_name))
402 # print("%s.%s = %s"%(propertyset_name, property_name, value))
404 # TODO: Decision: save for all following elements of same class (
405 # dont ask again?)
406 # selected = (propertyset_name, property_name, value)
408 distinct_values = set(values)
409 if len(distinct_values) == 1:
410 # multiple sources but common value
411 return distinct_values.pop()
412 else:
413 quality_logger.warning('Found multiple values for attributes %s of instance %s' % (
414 ', '.join((str((m[0], m[1])) for m in matches)), self))
415 return distinct_values
417 return None
419 # # TODO: Decision with id, key, value
420 # decision = DictDecision("Multiple possibilities found",
421 # choices=dict(zip(choices, values)),
422 # output=self.attributes,
423 # key=name,
424 # global_key="%s_%s.%s" % (self.ifc_type,
425 # self.guid, name),
426 # allow_skip=True, allow_load=True,
427 # allow_save=True,
428 # collect=collect_decisions,
429 # quick_decide=not collect_decisions)
430 #
431 # if collect_decisions:
432 # raise PendingDecisionError()
433 #
434 # return decision.value
435 # raise NoValueError("No matching property for %s" % (patterns))
437 def source_info(self) -> str:
438 return f'{self.ifc_type}:{self.guid}'
441class RelationBased(IFCBased):
442 conditions = []
444 def __repr__(self):
445 return "<%s (guid: %s)>" % (self.__class__.__name__, self.guid)
447 def __str__(self):
448 return "%s" % self.__class__.__name__
451class RelationBased(IFCBased):
453 pass
456class ProductBased(IFCBased):
457 """Elements based on IFC products.
459 Args:
460 material: material of the element
461 material_set: dict of material and fraction [0, 1] if multiple materials
462 """
463 domain = 'GENERAL'
464 key: str = ''
465 key_map: Dict[str, 'Type[ProductBased]'] = {}
466 conditions = []
468 def __init__(self, *args, **kwargs):
469 super().__init__(*args, **kwargs)
470 self.aggregation = None
471 self.ports = self.get_ports()
472 self.material = None
473 self.material_set = {}
474 self.cost_group = self.calc_cost_group()
476 def __init_subclass__(cls, **kwargs):
477 # set key for each class
478 cls.key = f'{cls.domain}-{cls.__name__}'
479 cls.key_map[cls.key] = cls
481 def get_ports(self):
482 return []
484 def get_better_subclass(self) -> Union[None, Type['IFCBased']]:
485 """Returns alternative subclass of current object.
486 CAUTION: only use this if you can't know the result before instantiation
487 of base class
489 Returns:
490 object: subclass of ProductBased or None"""
491 return None
493 @property
494 def neighbors(self):
495 """Directly connected elements"""
496 neighbors = []
497 for port in self.ports:
498 if port.connection:
499 neighbors.append(port.connection.parent)
500 return neighbors
502 def validate_creation(self):
503 """"Validate the element creation in two steps.
504 1. Check if standard parameter are in valid range.
505 2. Check if number of ports are equal to number of expected ports
506 (only for HVAC).
507 """
508 for cond in self.conditions:
509 if cond.critical_for_creation:
510 value = getattr(self, cond.key)
511 # don't prevent creation if value is not existing
512 if value:
513 if not cond.check(self, value):
514 logger.warning("%s validation (%s) failed for %s", self.ifc_type, cond.name, self.guid)
515 return False
516 if not self.validate_ports():
517 logger.warning("%s has %d ports, but %s expected for %s", self.ifc_type, len(self.ports),
518 self.expected_hvac_ports, self.guid)
519 return False
520 return True
522 def validate_attributes(self) -> dict:
523 """Check if all attributes are valid, returns dict with key = attribute
524 and value = True or False"""
525 results = {}
526 for cond in self.conditions:
527 if not cond.critical_for_creation:
528 # todo
529 pass
530 # if not cond.check(self):
531 # logger.warning("%s validation (%s) failed for %s",
532 # self.ifc_type, cond.name, self.guid)
533 # return False
534 # return True
535 return results
537 def validate_ports(self):
538 return True
540 def __repr__(self):
541 return "<%s>" % self.__class__.__name__
543 def calc_cost_group(self) -> Optional[int]:
544 """Calculate the cost group according to DIN276"""
545 return None
547 def calc_volume_from_ifc_shape(self):
548 # todo use more efficient iterator to calc all shapes at once
549 # with multiple cores:
550 # https://wiki.osarch.org/index.php?title=IfcOpenShell_code_examples
551 if hasattr(self.ifc, 'Representation'):
552 try:
553 shape = ifcopenshell.geom.create_shape(
554 settings_products, self.ifc).geometry
555 vol = PyOCCTools.get_shape_volume(shape)
556 vol = vol * ureg.meter ** 3
557 return vol
558 except:
559 logger.warning(f"No calculation of geometric volume possible "
560 f"for {self.ifc}.")
562 def _get_volume(self, name):
563 if hasattr(self, "net_volume"):
564 if self.net_volume:
565 vol = self.net_volume
566 return vol
567 vol = self.calc_volume_from_ifc_shape()
568 return vol
570 volume = attribute.Attribute(
571 description="Volume of the attribute",
572 functions=[_get_volume],
573 )
575 def __str__(self):
576 return "<%s>" % (self.__class__.__name__)
579class Port(RelationBased):
580 """Basic port"""
582 def __init__(self, parent, *args, **kwargs):
583 super().__init__(*args, **kwargs)
584 self.parent: ProductBased = parent
585 self.connection = None
587 def connect(self, other):
588 """Connect this interface bidirectional to another interface"""
589 assert isinstance(other, Port), \
590 "Can't connect interfaces of different classes."
591 # if self.flow_direction == 'SOURCE' or \
592 # self.flow_direction == 'SOURCEANDSINK':
593 if self.connection and self.connection is not other:
594 raise AttributeError("Port is already connected!")
595 if other.connection and other.connection is not self:
596 raise AttributeError("Other port is already connected!")
597 self.connection = other
598 other.connection = self
600 def disconnect(self):
601 """remove connection between self and other port"""
602 other = self.connection
603 if other:
604 self.connection = None
605 other.disconnect()
607 def is_connected(self):
608 """Returns truth value of port's connection"""
609 return bool(self.connection)
611 def __repr__(self):
612 if self.parent:
613 try:
614 idx = self.parent.ports.index(self)
615 return "<%s #%d of %s)>" % (
616 self.__class__.__name__, idx, self.parent)
617 except ValueError:
618 return "<%s (broken parent: %s)>" % (
619 self.__class__.__name__, self.parent)
620 return "<%s (*abandoned*)>" % self.__class__.__name__
622 def __str__(self):
623 return self.__repr__()[1:-2]
626class Material(ProductBased):
627 guid_prefix = 'Material_'
628 key: str = 'Material'
629 ifc_types = {
630 'IfcMaterial': ["*"]
631 }
632 name = ''
634 def __init__(self, *args, **kwargs):
635 super().__init__(*args, **kwargs)
636 self.parents: List[ProductBased] = []
638 @staticmethod
639 def get_id(prefix=""):
640 prefix_length = len(prefix)
641 if prefix_length > 10:
642 raise AttributeError("Max prefix length is 10!")
643 ifcopenshell_guid = guid.new()[prefix_length + 1:]
644 return f"{prefix}{ifcopenshell_guid}"
646 conditions = [
647 condition.RangeCondition('spec_heat_capacity',
648 0 * ureg.kilojoule / (ureg.kg * ureg.K),
649 5 * ureg.kilojoule / (ureg.kg * ureg.K),
650 critical_for_creation=False),
651 condition.RangeCondition('density',
652 0 * ureg.kg / ureg.m ** 3,
653 50000 * ureg.kg / ureg.m ** 3,
654 critical_for_creation=False),
655 condition.RangeCondition('thermal_conduc',
656 0 * ureg.W / ureg.m / ureg.K,
657 100 * ureg.W / ureg.m / ureg.K,
658 critical_for_creation=False),
659 condition.RangeCondition('porosity',
660 0 * ureg.dimensionless,
661 1 * ureg.dimensionless, True,
662 critical_for_creation=False),
663 condition.RangeCondition('solar_absorp',
664 0 * ureg.percent,
665 1 * ureg.percent, True,
666 critical_for_creation=False),
667 ]
669 spec_heat_capacity = attribute.Attribute(
670 default_ps=("Pset_MaterialThermal", "SpecificHeatCapacity"),
671 # functions=[get_from_template],
672 unit=ureg.kilojoule / (ureg.kg * ureg.K)
673 )
675 density = attribute.Attribute(
676 default_ps=("Pset_MaterialCommon", "MassDensity"),
677 unit=ureg.kg / ureg.m ** 3
678 )
680 thermal_conduc = attribute.Attribute(
681 default_ps=("Pset_MaterialThermal", "ThermalConductivity"),
682 unit=ureg.W / (ureg.m * ureg.K)
683 )
685 porosity = attribute.Attribute(
686 default_ps=("Pset_MaterialCommon", "Porosity"),
687 unit=ureg.dimensionless
688 )
690 # todo is percent the correct unit? (0-1)
691 solar_absorp = attribute.Attribute(
692 # default_ps=('Pset_MaterialOptical', 'SolarTransmittance'),
693 default=0.7,
694 unit=ureg.percent
695 )
697 def __repr__(self):
698 if self.name:
699 return "<%s %s>" % (self.__class__.__name__, self.name)
700 else:
701 return "<%s>" % self.__class__.__name__
704class Dummy(ProductBased):
705 """Dummy for all unknown elements"""
707 ifc_types = {
708 "IfcElementProxy": ['*']
709 }
711 # def __init__(self, *args, **kwargs):
712 # super().__init__(*args, **kwargs)
713 #
714 # self._ifc_type = self.ifc.get_info()['type']
716 @property
717 def ifc_type(self):
718 return self._ifc_type
720 def __str__(self):
721 return "Dummy '%s'" % self.ifc_type
724class Factory:
725 """Element Factory for :class: `ProductBased`
727 To understand the concept of the factory class, we refer to this article:
728 https://refactoring.guru/design-patterns/factory-method/python/example
730 Example:
731 factory = Factory([Pipe, Boiler], dummy)
732 ele = factory(some_ifc_element)
733 """
735 def __init__(
736 self,
737 relevant_elements: set[ProductBased],
738 ifc_units: dict,
739 ifc_domain: IFCDomain,
740 finder: Union[TemplateFinder, None] = None,
741 dummy=Dummy):
742 self.mapping, self.blacklist, self.defaults = self.create_ifc_mapping(relevant_elements)
743 self.dummy_cls = dummy
744 self.ifc_domain = ifc_domain
745 self.finder = finder
746 self.ifc_units = ifc_units
748 def __call__(self, ifc_entity, *args, ifc_type: str = None, use_dummy=True,
749 **kwargs) -> ProductBased:
750 """Run factory to create element instance.
752 Calls self.create() function but before checks which element_cls is the
753 correct mapping for the given ifc_entity.
755 Args:
756 ifc_entity: IfcOpenShell entity
757 args: additional args passed to element
758 ifc_type: ify type to create element for.
759 defaults to ifc_entity.is_a()
760 use_dummy: use dummy class if nothing is found
761 kwargs: additional kwargs passed to element
762 Raises:
763 LookupError: if no element found and use_dummy = False
764 Returns:
765 element: created element instance
766 """
767 _ifc_type = ifc_type or ifc_entity.is_a()
768 predefined_type = ifc2python.get_predefined_type(ifc_entity)
769 element_cls = self.get_element(_ifc_type, predefined_type)
770 if not element_cls:
771 if use_dummy:
772 element_cls = self.dummy_cls
773 else:
774 raise LookupError(f"No element found for {ifc_entity}")
775 # TODO # 537 Put this to a point where it makes sense, return None is no
776 # solution
777 if hasattr(element_cls, 'from_ifc_domains'):
778 if self.ifc_domain not in element_cls.from_ifc_domains:
779 logger.warning(
780 f"Element has {self.ifc_domain} but f{element_cls.__name__}"
781 f" will only be created for IFC files of domain "
782 f"{element_cls.from_ifc_domains}.")
783 raise IFCDomainError(
784 f"Element has {self.ifc_domain} but f{element_cls.__name__}"
785 f" will only be created for IFC files of domain "
786 f"{element_cls.from_ifc_domains}")
788 element = self.create(element_cls, ifc_entity, *args, **kwargs)
789 return element
791 def create(self, element_cls, ifc_entity, *args, **kwargs):
792 """Create Element from class and ifc"""
793 # instantiate element
795 element = element_cls.from_ifc(
796 ifc_entity, ifc_domain=self.ifc_domain, finder=self.finder,
797 ifc_units=self.ifc_units, *args, **kwargs)
798 # check if it prefers to be sth else
799 better_cls = element.get_better_subclass()
800 if better_cls:
801 logger.info("Creating %s instead of %s", better_cls, element_cls)
802 element = better_cls.from_ifc(
803 ifc_entity,
804 ifc_domain=self.ifc_domain,
805 finder=self.finder,
806 ifc_units=self.ifc_units,
807 *args, **kwargs)
808 return element
810 def get_element(self, ifc_type: str, predefined_type: Union[str, None]) -> \
811 Union[ProductBased, None]:
812 """Get element class by ifc type and predefined type"""
813 if predefined_type:
814 key = (ifc_type.lower(), predefined_type.upper())
815 # 1. go over normal list, if found match_graph --> return
816 element = self.mapping.get(key)
817 if element:
818 return element
819 # 2. go over negative list, if found match_graph --> not existing
820 if key in self.blacklist:
821 return None
822 # 3. go over default list, if found match_graph --> return
823 return self.defaults.get(ifc_type.lower())
825 # def _get_by_guid(self, guid: str) -> Union[ProductBased, None]:
826 # """Get item from given guid created by this factory."""
827 # return self._objects.get(guid)
828 #
829 # def _get_by_cls(self, item_cls: Type[ProductBased]) -> List[ProductBased]:
830 # """Get list of child items from given class created by this factory."""
831 # return [item for item in self._objects.values()
832 # if isinstance(item, item_cls)]
834 @staticmethod
835 def create_ifc_mapping(elements: Iterable) -> Tuple[
836 Dict[Tuple[str, str], ProductBased],
837 List[Tuple[str, ProductBased]],
838 Dict[str, ProductBased]
839 ]:
840 """Create mapping dict, blacklist and default dict from elements
842 WARNING: ifc_type is always converted to lower case
843 and predefined types to upper case
845 Returns:
846 mapping: dict of ifc_type and predefined_type to element class
847 blacklist: list of ifc_type which will not be taken into account
848 default: dict of ifc_type to element class
849 """
850 # TODO: cover virtual elements e.g. Space Boundaries (not products)
852 mapping = {}
853 blacklist = []
854 default = {}
855 _all_ifc_types = set()
857 for ele in elements:
858 for ifc_type, tokens in ele.ifc_types.items():
859 _all_ifc_types.add(ifc_type.lower())
860 for token in tokens:
861 # create default dict where all stars are taken into account
862 # items 'IfcSlab': Slab
863 if token == '*':
864 if ifc_type in default:
865 raise NameError(f"Conflicting default ifc_types for {ifc_type}") # TBD
866 default[ifc_type.lower()] = ele
867 # create blacklist where all - are taken into account
868 # items: ('IfcRoof', 'WeiredStuff')
869 elif token.startswith('-'):
870 blacklist.append((ifc_type.lower(), token[1:].upper()))
871 # create mapping dict
872 # items ('IfcSlab', 'Roof'): Roof
873 else:
874 mapping[(ifc_type.lower(), token.upper())] = ele
876 # check ifc types without default
877 no_default = _all_ifc_types - set(default)
878 if no_default:
879 logger.warning("The following ifc types have no default "
880 "representing Element class. There will be no "
881 "match if predefined type is not provided.\n%s",
882 no_default)
883 return mapping, blacklist, default
886class SerializedElement:
887 """Serialized version of an element.
889 This is a workaround as we can't serialize elements due to the usage of
890 IfcOpenShell which uses unpickable swigPy objects. We just store the most
891 important information which are guid, element_type, storeys, aggregated
892 elements and the attributes from the attribute system."""
893 def __init__(self, element):
894 self.guid = element.guid
895 self.element_type = element.__class__.__name__
896 for attr_name, attr_val in element.attributes.items():
897 # assign value directly to attribute without status
898 # make sure to get the value
899 value = getattr(element, attr_name)
900 if self.is_picklable(value):
901 setattr(self, attr_name, value)
902 else:
903 logger.info(
904 f"Attribute {attr_name} will not be serialized, as it's "
905 f"not pickleable")
906 if hasattr(element, "space_boundaries"):
907 self.space_boundaries = [bound.guid for bound in
908 element.space_boundaries]
909 if hasattr(element, "storeys"):
910 self.storeys = [storey.guid for storey in element.storeys]
911 if issubclass(element.__class__, AggregationMixin):
912 self.elements = [ele.guid for ele in element.elements]
914 @staticmethod
915 def is_picklable(value: Any) -> bool:
916 """Determines if a given value is picklable.
918 This method attempts to serialize the provided value using the `pickle` module.
919 If the value can be successfully serialized, it is considered picklable.
921 Args:
922 value (Any): The value to be tested for picklability.
924 Returns:
925 bool: True if the value is picklable, False otherwise.
926 """
927 try:
928 pickle.dumps(value)
929 return True
930 except (pickle.PicklingError, TypeError):
931 return False
933 def __repr__(self):
934 return "<serialized %s (guid: '%s')>" % (
935 self.element_type, self.guid)