Coverage for bim2sim/elements/mapping/attribute.py: 77%
332 statements
« prev ^ index » next coverage.py v7.10.3, created at 2025-08-12 13:34 +0000
« prev ^ index » next coverage.py v7.10.3, created at 2025-08-12 13:34 +0000
1import inspect
2import functools
3import logging
4from functools import partial
5from typing import Tuple, Iterable, Callable, Any, Union
7import pint
9from bim2sim.elements.mapping.units import ureg
10from bim2sim.kernel.decision import RealDecision, Decision, \
11 DecisionBunch, BoolDecision, StringDecision
12from bim2sim.utilities.types import AttributeDataSource
14logger = logging.getLogger(__name__)
15quality_logger = logging.getLogger('bim2sim.QualityReport')
18class AutoAttributeNameMeta(type):
19 """Detect setting on Attributes on class level and set name as given"""
21 def __init__(cls, name, bases, namespace):
22 super(AutoAttributeNameMeta, cls).__init__(name, bases, namespace)
23 for name, obj in namespace.items():
24 if isinstance(obj, Attribute):
25 obj.name = name
27 # def __setattr__(cls, name, value):
28 # if isinstance(value, Attribute):
29 # value.name = name
30 # print(name)
31 # return super().__setattr__(name, value)
34class Attribute:
35 """Descriptor of element attribute to get its value from various sources.
37 value and status of attribute are stored in __dict__ of bound instance.
39 Possible statuses are:
41 * UNKNOWN: default status at the beginning.
42 * REQUESTED: Attribute was already requested via a decision??.
43 * AVAILABLE: Attribute exists and is available.
44 * NOT_AVAILABLE: No way was found to obtain the attributes value.
45 * RESET: The Attribute was reset.
47 To find more about Descriptor objects follow the explanations on
48 https://rszalski.github.io/magicmethods/#descriptor
49 """
50 STATUS_UNKNOWN = 'UNKNOWN'
51 STATUS_REQUESTED = 'REQUESTED'
52 STATUS_AVAILABLE = 'AVAILABLE'
53 STATUS_NOT_AVAILABLE = 'NOT_AVAILABLE'
54 STATUS_RESET = 'RESET'
56 def __init__(self,
57 description: str = "",
58 unit: pint.Unit = None,
59 ifc_attr_name: str = "",
60 default_ps: Tuple[str, str] = None,
61 default_association: Tuple[str, str] = None,
62 patterns: Iterable = None,
63 ifc_postprocessing: Callable[[Any], Any] = None,
64 functions: Iterable[Callable[[object, str], Any]] = None,
65 default=None,
66 dependant_elements: str = None,
67 attr_type: Union[
68 type(bool), type(str), type(int), type(float)] = float
69 ):
70 """
72 Args:
73 description: Description of attribute
74 unit: pint unit of attribute, defaults to dimensionless. Use SI
75 units whenever possible.
76 ifc_attr_name: Name of attribute in IFC schema.
77 default_ps: tuple of propertyset name and property name. These
78 follow the IFC schema specifications.
79 default_association: tuple of association name and property name.
80 These follow the IFC schema specifications.
81 patterns: iterable of (compiled) re patterns to find not schema
82 conform stored information
83 ifc_postprocessing: callable to apply on initial value, returns
84 final value
85 functions: iterable of callable with signature func(bind, name) ->
86 value. First return with no error is used as value.
87 default: default value which is used if no other source is
88 successful. Use only for attributes which have valid
89 defaults.
90 dependant_elements: list of additional elements necessary to
91 calculate the attribute
92 attr_type: data type of attribute, used to determine decision type
93 if decision is needed, float is default
94 """
95 self.name = None # auto set by AutoAttributeNameMeta
96 self.description = description
97 self.unit = unit
99 self.ifc_attr_name = ifc_attr_name
100 self.default_ps = default_ps
101 self.default_association = default_association
102 self.patterns = patterns
103 self.functions = functions
104 self.default_value = default
105 self.dependant_elements = dependant_elements
106 # data_source stores where the information was obtained from throughout
107 # the bim2sim process
108 self.data_source = None
109 self.attr_type = attr_type
111 if ifc_postprocessing is not None:
112 self.ifc_post_processing = ifc_postprocessing
114 # TODO argument for validation function
116 def to_aggregation(self, calc=None, **kwargs):
117 """Create new Attribute suited for aggregation."""
118 options = {
119 'description': self.description,
120 'unit': self.unit,
121 'default': self.default_value
122 }
123 options.update(kwargs)
124 options['functions'] = [calc]
125 return Attribute(**options)
127 def _get_value(self, bind):
128 """"""
129 value = None
130 data_source = None
131 if bind.ifc: # don't bother if there is no ifc
132 # default ifc attribute
133 if value is None and self.ifc_attr_name:
134 if hasattr(bind.ifc, self.ifc_attr_name):
135 raw_value = getattr(bind.ifc, self.ifc_attr_name)
136 value = self.post_process_value(bind, raw_value)
137 if value is not None:
138 data_source = AttributeDataSource.ifc_attr
139 # default property set
140 if value is None and self.default_ps:
141 raw_value = self.get_from_default_propertyset(bind,
142 self.default_ps)
143 value = self.post_process_value(bind, raw_value)
144 if value is not None:
145 data_source = AttributeDataSource.default_ps
147 if value is None and self.default_association:
148 raw_value = self.get_from_default_propertyset(
149 bind, self.default_association)
150 value = self.post_process_value(bind, raw_value)
151 if value is not None:
152 data_source = AttributeDataSource.default_association
154 # tool specific properties (finder)
155 if value is None:
156 raw_value = self.get_from_finder(bind, self.name)
157 value = self.post_process_value(bind, raw_value)
158 if value is not None:
159 data_source = AttributeDataSource.finder
161 # custom properties by patterns
162 if value is None and self.patterns:
163 raw_value = self.get_from_patterns(bind, self.patterns,
164 self.name)
165 value = self.post_process_value(bind, raw_value)
166 if value is not None:
167 data_source = AttributeDataSource.patterns
169 # custom functions
170 if value is None and self.functions:
171 value = self.get_from_functions(bind, self.functions, self.name)
172 if value is not None:
173 data_source = AttributeDataSource.function
175 # logger value none
176 if value is None:
177 quality_logger.warning(
178 "Attribute '%s' of %s %s was not found in default "
179 "PropertySet, default Association, finder, patterns or "
180 "functions",
181 self.name, bind.ifc_type, bind.guid)
183 # default value
184 if value is None and self.default_value is not None:
185 value = self.default_value
186 if value is not None and self.unit:
187 value = value * self.unit
188 data_source = AttributeDataSource.default
190 # check unit
191 if isinstance(value, (list, set)):
192 # case to calculate values that are list of quantities
193 new_value = []
194 for item in value:
195 if self.unit is not None and item is not None and not \
196 isinstance(item, ureg.Quantity):
197 logger.warning(
198 f"Unit not set for attribute {self} of {bind}")
199 new_value.append(item * self.unit)
200 value = new_value if len(new_value) == len(value) else value
201 else:
202 if self.unit is not None and value is not None and not isinstance(
203 value, ureg.Quantity):
204 logger.warning(f"Unit not set for attribute {self} of {bind}")
205 value = value * self.unit
206 # todo validation of attributes on creation time makes accept_valids
207 # function in base_tasks.py unusable as not valid attributes are never
208 # created
209 # if value is not None and bind.conditions:
210 # if not self.check_conditions(bind, value, self.name):
211 # value = None
213 return value, data_source
215 @staticmethod
216 def get_from_default_propertyset(bind, default):
217 """Get value from default property set"""
218 try:
219 value = bind.get_exact_property(*default)
220 except Exception:
221 value = None
222 return value
224 @staticmethod
225 def get_from_finder(bind, name):
226 finder = getattr(bind, 'finder', None)
227 if finder: # Aggregations have no finder
228 try:
229 return bind.finder.find(bind, name)
230 except (AttributeError, TypeError):
231 pass
232 return None
234 @staticmethod
235 def get_from_patterns(bind, patterns, name):
236 """Get value from non default property sets matching patterns"""
237 value = bind.select_from_potential_properties(patterns, name, False)
238 return value
240 @staticmethod
241 def get_from_functions(bind, functions: list, name: str):
242 """Get value from functions.
244 First successful function calls return value is used. As we want to
245 allow to overwrite functions in inherited classes, we use
246 getattr(bind, func.__name__) to get the function from the bind.
248 Args:
249 bind: the bind object
250 functions: a list of functions
251 name: the name of the attribute
252 """
253 value = None
254 for func in functions:
255 func_inherited = getattr(bind, func.__name__)
256 try:
257 value = func_inherited(name)
258 except Exception as ex:
259 logger.error("Function '%s' of %s.%s raised %s",
260 func.__name__, bind, name, ex)
261 pass
262 else:
263 if value is not None:
264 break
265 return value
267 @staticmethod
268 def get_conditions(bind, name):
269 """Get conditions for attribute"""
270 conditions = []
271 for condition in bind.conditions:
272 if condition.key == name:
273 conditions.append(partial(condition.check, bind))
274 return conditions
276 @staticmethod
277 def check_conditions(bind, value, name):
278 """Check conditions"""
279 conditions = Attribute.get_conditions(bind, name)
280 for condition_check in conditions:
281 if not condition_check(value):
282 return False
283 return True
285 def create_decision(self, bind):
286 """Created Decision for this Attribute"""
287 # TODO: set state in output dict -> attributemanager
288 conditions = [lambda x: True] if not bind.conditions else \
289 Attribute.get_conditions(bind, self.name)
291 console_identifier = "Name: %s, GUID: %s" % (bind.name, bind.guid)
292 related = bind.guid
293 key = self.name
294 global_key = "%s_%s.%s" % (bind.ifc_type, bind.guid, self.name)
295 if self.attr_type == bool:
296 question = f"Is the attribute {self.name} of {bind} True/Active?"
297 decision = BoolDecision(
298 question=question,
299 console_identifier=console_identifier,
300 key=key,
301 global_key=global_key,
302 allow_skip=False,
303 related=related
304 )
305 elif self.attr_type == str:
306 question = "Enter value for %s of %s" % (self.name, bind)
307 decision = StringDecision(
308 question=question,
309 console_identifier=console_identifier,
310 key=key,
311 global_key=global_key,
312 allow_skip=False,
313 related=related
314 )
315 else:
316 question = "Enter value for %s of %s" % (self.name, bind)
317 decision = RealDecision(
318 question=question,
319 console_identifier=console_identifier,
320 key=key,
321 global_key=global_key,
322 allow_skip=False,
323 validate_func=conditions,
324 unit=self.unit,
325 related=related
326 )
327 return decision
329 def post_process_value(self, bind, raw_value):
330 """Post-process the raw_value.
332 If attribute is given an external ifc_postprocessing entry, this
333 function will be used. Otherwise, the pre implemented
334 ifc_post_processing of the attribute class will be used.
335 If an external ifc_postprocessing is give, this is checked for being
336 static or not, because if not static, the bind needs to be forwarded to
337 the method.
338 """
339 if raw_value is not None:
340 ifc_post_process_func_name = self.ifc_post_processing.__name__
341 # check if external ifc_post_processing method exists:
342 if hasattr(bind, ifc_post_process_func_name):
343 # check of the method is static or needs the bind
344 is_static = isinstance(inspect.getattr_static(
345 bind, ifc_post_process_func_name), staticmethod)
346 if is_static:
347 value = self.ifc_post_processing(raw_value)
348 else:
349 value = self.ifc_post_processing(bind, raw_value)
350 else:
351 value = self.ifc_post_processing(raw_value)
352 else:
353 value = raw_value
354 return value
356 @staticmethod
357 def ifc_post_processing(value):
358 """Function for post processing of ifc property values (e.g. diameter
359 list -> diameter)by default this function does nothing"""
360 if isinstance(value, str) and value.isnumeric():
361 value = float(value)
362 return value
364 def request(self, bind, external_decision=None):
365 """Request attribute via decision.
367 Args:
368 bind: bound instance of attribute
369 external_decision: Decision to use instead of default decision
370 """
372 # Read current value, status, and data source
373 value, status, _ = self._inner_get(bind)
375 # Case 1: Value is None and status is STATUS_NOT_AVAILABLE
376 if value is None and status == Attribute.STATUS_NOT_AVAILABLE:
377 return self.get_dependency_decisions(bind, external_decision)
379 # Case 2: Value is a list and not all elements are truthy
380 if isinstance(value, list) and not all(value):
381 return self.get_dependency_decisions(bind, external_decision)
383 # Case 3: Value is already a Decision instance
384 if isinstance(value, Decision):
385 return value
387 # Case 4: Value is available or already requested (no action needed)
388 return
390 def reset(self, bind, data_source=AttributeDataSource.manual_overwrite):
391 """Reset attribute, set to None and STATUS_NOT_AVAILABLE."""
392 self._inner_set(
393 bind, None, Attribute.STATUS_RESET, data_source)
395 def get_dependency_decisions(self, bind, external_decision=None):
396 """Get dependency decisions"""
397 status = Attribute.STATUS_REQUESTED
398 if self.functions is not None:
399 if self.dependant_elements:
400 logger.warning(f'Attribute {self.name} of element {bind} uses '
401 f'"dependent_elements" functionality, but this '
402 f'is currently not supported. Please take this'
403 f' into account.')
404 # _decision = {}
405 # # raise NotImplementedError(
406 # # "The implementation of dependant elements needs to be"
407 # # " revised.")
408 # # case for attributes that depend on the same
409 # # attribute in other elements
410 # _decision_inst = self.dependant_elements_decision(
411 # bind)
412 # for inst in _decision_inst:
413 # if inst not in _decision:
414 # _decision[inst] = _decision_inst[inst]
415 # else:
416 # _decision[inst].update(_decision_inst[inst])
417 # for dec_inst, dec in _decision.items():
418 # self._inner_set(
419 # dec_inst, dec, status, self.data_source)
420 # else:
421 # _decision = external_decision or self.create_decision(
422 # bind)
423 # else:
424 # # actual request
425 # _decision = external_decision or self.create_decision(bind)
426 _decision = external_decision or self.create_decision(bind)
427 self._inner_set(bind, _decision, status, self.data_source)
429 return _decision
431 # def get_attribute_dependency(self, instance):
432 # """Get attribute dependency.
433 #
434 # When an attribute depends on other attributes in the same instance or
435 # the same attribute in other elements, this function gets the
436 # dependencies when they are not stored on the respective dictionaries.
437 # """
438 # if not self.dependant_attributes and not self.ConsoleDecisionHandler:
439 # dependant = []
440 # for func in self.functions:
441 # for attr in func.__code__.co_names:
442 # if hasattr(instance, attr):
443 # dependant.append(attr)
444 #
445 # for dependant_item in dependant:
446 # # case for attributes that depend on the same attribute in
447 # # other elements -> dependant_elements
448 # logger.warning("Attribute \"%s\" from class \"%s\" has no: "
449 # % (self.name, type(instance).__name__))
450 # if 'elements' in dependant_item:
451 # self.dependant_elements = dependant_item
452 # logger.warning("- dependant elements: \"%s\"" %
453 # dependant_item)
454 # # case for attributes that depend on the other attributes in
455 # # the same instance -> dependant_attributes
456 # else:
457 # if self.dependant_attributes is None:
458 # self.dependant_attributes = []
459 # self.dependant_attributes.append(dependant_item)
460 # logger.warning("- dependant attributes: \"%s\"" %
461 # dependant_item)
463 def dependant_elements_decision(self, bind) -> dict:
464 """Function to request attributes in other elements different to bind,
465 that are later on necessary to calculate an attribute in bind (case of
466 aggregation)
468 Returns:
469 _decision: key: is the instance, value: is another dict composed of the
470 attr name and the corresponding decision or function to
471 calculate said attribute
472 """
473 _decision = {}
474 for inst in getattr(bind, self.dependant_elements):
475 # request instance attribute
476 pre_decisions = inst.attributes.get_decisions()
477 inst.request(self.name)
478 additional_decisions = inst.attributes.get_decisions()
479 inst_decisions = [dec for dec in additional_decisions
480 if dec not in pre_decisions]
481 for decision in inst_decisions:
482 if decision is not None:
483 if inst not in _decision:
484 _decision[inst] = {}
485 if isinstance(decision, dict):
486 _decision[inst].update(decision)
487 else:
488 _decision[inst][decision.key] = decision
489 # if self.dependant_attributes:
490 # for d_attr in self.dependant_attributes:
491 # requested_decisions = bind.request(d_attr)
492 # if requested_decisions is not None:
493 # for inst, attr in requested_decisions.items():
494 # if not isinstance(inst, str):
495 # if inst not in _decision:
496 # _decision[inst] = {}
497 # _decision[inst].update(attr)
498 return _decision
500 def initialize(self, manager):
501 if not self.name:
502 print(self)
503 raise AttributeError("Attribute.name not set!")
505 manager[self.name] = (None, self.STATUS_UNKNOWN, None)
507 def _inner_get(self, bind):
508 return bind.attributes[self.name]
510 def _inner_set(self, bind, value, status, data_source):
511 # TODO: validate
512 bind.attributes[self.name] = value, status, data_source
514 def __get__(self, bind, owner):
515 """This gets called if attribute is accessed via element.attribute.
517 The descriptors get function handles the different underlying ways to
518 get an attributes value"""
519 if bind is None:
520 return self
522 # read current value and status
523 value_or_decision, status, data_source = self._inner_get(bind)
524 changed = False
525 value = None
527 if isinstance(value_or_decision, Decision):
528 # decision
529 if status != self.STATUS_REQUESTED:
530 raise AssertionError("Inconsistent status")
531 if value_or_decision.valid():
532 value = value_or_decision.value
533 status = self.STATUS_AVAILABLE
534 data_source = AttributeDataSource.decision
535 changed = True
536 else:
537 value = value_or_decision
539 if (value is None and status
540 in [self.STATUS_UNKNOWN, self.STATUS_RESET]):
541 value, data_source = self._get_value(bind)
542 status = self.STATUS_AVAILABLE if value is not None \
543 else self.STATUS_NOT_AVAILABLE # change for temperature
544 changed = True
546 if changed:
547 # write back new value and status
548 self._inner_set(bind, value, status, data_source)
550 return value
552 def __set__(self, bind, value):
553 if isinstance(value, tuple) and len(value) == 2:
554 data_source = value[1]
555 value = value[0]
556 else:
557 # if not data_source is provided, 'manual_overwrite' will be set
558 data_source = AttributeDataSource.manual_overwrite
559 if self.unit:
560 if isinstance(value, ureg.Quantity):
561 # case for quantity
562 _value = value.to(self.unit)
563 elif isinstance(value, list):
564 # case for list of quantities
565 _value = []
566 for item in value:
567 if isinstance(item, ureg.Quantity):
568 _value.append(item.to(self.unit))
569 else:
570 _value.append(item * self.unit)
571 else:
572 _value = value * self.unit
573 else:
574 _value = value
575 self._inner_set(bind, _value, self.STATUS_AVAILABLE, data_source)
577 def __str__(self):
578 return "Attribute %s" % self.name
581class AttributeManager(dict):
582 """Manages the attributes.
584 Every bim2sim element owns an instance of the AttributeManager class which
585 manages the corresponding attributes of this element. It as an dict with
586 key: name of attribute as string
587 value: tuple with (value of attribute, Status of attribute).
588 """
590 def __init__(self, bind):
591 super().__init__()
592 self.bind = bind
594 for name in self.names:
595 attr = self.get_attribute(name)
596 attr.initialize(self)
598 def __setitem__(self, name, value):
599 if name not in self.names:
600 raise AttributeError("Invalid Attribute '%s'. Choices are %s" % (
601 name, list(self.names)))
602 if isinstance(value, tuple) and len(value) == 3:
603 if not (isinstance(value[-1], AttributeDataSource)
604 or value[-1] is None):
605 try:
606 getattr(AttributeDataSource, value[-1])
607 except AttributeError:
608 raise ValueError(
609 f"Non valid DataSource provided for attribute {name} "
610 f"of element {self.bind}")
611 super().__setitem__(name, value)
612 else:
613 if not isinstance(value, tuple):
614 super().__setitem__(name, (
615 value,
616 Attribute.STATUS_AVAILABLE,
617 AttributeDataSource.manual_overwrite))
618 elif isinstance(value[-1], AttributeDataSource) or value[-1] is None:
619 super().__setitem__(name, (value, Attribute.STATUS_AVAILABLE))
620 else:
621 raise ValueError("datasource")
623 def update(self, other):
624 # dict.update does not invoke __setitem__
625 for k, v in other.items():
626 self.__setitem__(k, v)
628 def reset(self, name, data_source=AttributeDataSource.manual_overwrite):
629 """Reset attribute, set to None and STATUS_NOT_AVAILABLE."""
630 # TODO this has limitations when the corresponding attribute uses
631 # functions to calculate the value, see #760 for more information
632 try:
633 attr = self.get_attribute(name)
634 except KeyError:
635 raise KeyError("%s has no Attribute '%s'" % (self.bind, name))
636 attr.reset(self.bind, data_source)
638 def request(self, name: str, external_decision: Decision = None) \
639 -> Union[None, Decision]:
640 """Request attribute by name.
642 Checks the status of the requested attribute and returns
644 Args:
645 name: name of requested attribute
646 external_decision: custom decision to get attribute from
648 Returns:
649 A Decision to get the requested attributes value.
650 """
651 try:
652 attr = self.get_attribute(name)
653 except KeyError:
654 raise KeyError("%s has no Attribute '%s'" % (self.bind, name))
655 value, status, data_source = self[name]
656 if status in [Attribute.STATUS_UNKNOWN, Attribute.STATUS_RESET]:
657 # make sure default methods are tried
658 getattr(self.bind, name)
659 value, status, data_source = self[name]
660 if value is None:
661 if status == Attribute.STATUS_NOT_AVAILABLE:
662 decision = attr.request(self.bind, external_decision)
663 return decision
664 if isinstance(value, list):
665 # case for list of quantities
666 if not all(v is not None for v in value):
667 decision = attr.request(self.bind, external_decision)
668 return decision
669 elif isinstance(value, Decision):
670 if external_decision and value is not external_decision:
671 raise AttributeError("Can't set external decision for an "
672 "already requested attribute.")
673 return value
674 else:
675 # already requested or available
676 return
678 def get_attribute(self, name):
679 return getattr(type(self.bind), name)
681 def get_unit(self, name):
682 attr = self.get_attribute(name)
683 return attr.unit
685 @property
686 def names(self):
687 """Returns a generator object with all attributes that the corresponding
688 bind owns."""
689 return (name for name in dir(type(self.bind))
690 if isinstance(getattr(type(self.bind), name), Attribute))
692 def get_decisions(self) -> DecisionBunch:
693 """Return all decision of attributes with status REQUESTED."""
694 decisions = DecisionBunch()
695 for dec, status, data_source in self.values():
696 if status == Attribute.STATUS_REQUESTED:
697 decisions.append(dec)
698 return decisions
701def multi_calc(func):
702 """Decorator for calculation of multiple Attribute values.
704 Decorator functools.wraps is needed to return the real function name
705 for get_from_functions method.
706 """
708 @functools.wraps(func)
709 def wrapper(bind, name):
710 # inner function call
711 result = func(bind)
712 value = result.pop(name)
713 # send all other result values to AttributeManager instance
714 bind.attributes.update(result)
715 return value
717 return wrapper