Coverage for bim2sim/elements/mapping/attribute.py: 77%
332 statements
« prev ^ index » next coverage.py v7.6.12, created at 2025-03-12 17:09 +0000
« prev ^ index » next coverage.py v7.6.12, created at 2025-03-12 17:09 +0000
1import inspect
2import functools
3import logging
4from functools import partial
5from typing import Tuple, Iterable, Callable, Any, Union
7import pint
9from bim2sim.elements.mapping.units import ureg
10from bim2sim.kernel.decision import RealDecision, Decision, \
11 DecisionBunch, BoolDecision, StringDecision
12from bim2sim.utilities.types import AttributeDataSource
14logger = logging.getLogger(__name__)
15quality_logger = logging.getLogger('bim2sim.QualityReport')
18class AutoAttributeNameMeta(type):
19 """Detect setting on Attributes on class level and set name as given"""
21 def __init__(cls, name, bases, namespace):
22 super(AutoAttributeNameMeta, cls).__init__(name, bases, namespace)
23 for name, obj in namespace.items():
24 if isinstance(obj, Attribute):
25 obj.name = name
27 # def __setattr__(cls, name, value):
28 # if isinstance(value, Attribute):
29 # value.name = name
30 # print(name)
31 # return super().__setattr__(name, value)
34class Attribute:
35 """Descriptor of element attribute to get its value from various sources.
37 value and status of attribute are stored in __dict__ of bound instance.
38 Possible statuses are:
39 UNKNOWN: default status at the beginning.
40 REQUESTED: Attribute was already requested via a decision??.
41 AVAILABLE: Attribute exists and is available.
42 NOT_AVAILABLE: No way was found to obtain the attributes value.
43 RESET: The Attribute was reset.
45 To find more about Descriptor objects follow the explanations on
46 https://rszalski.github.io/magicmethods/#descriptor
47 """
48 STATUS_UNKNOWN = 'UNKNOWN'
49 STATUS_REQUESTED = 'REQUESTED'
50 STATUS_AVAILABLE = 'AVAILABLE'
51 STATUS_NOT_AVAILABLE = 'NOT_AVAILABLE'
52 STATUS_RESET = 'RESET'
54 def __init__(self,
55 description: str = "",
56 unit: pint.Unit = None,
57 ifc_attr_name: str = "",
58 default_ps: Tuple[str, str] = None,
59 default_association: Tuple[str, str] = None,
60 patterns: Iterable = None,
61 ifc_postprocessing: Callable[[Any], Any] = None,
62 functions: Iterable[Callable[[object, str], Any]] = None,
63 default=None,
64 dependant_elements: str = None,
65 attr_type: Union[
66 type(bool), type(str), type(int), type(float)] = float
67 ):
68 """
70 Args:
71 description: Description of attribute
72 unit: pint unit of attribute, defaults to dimensionless. Use SI
73 units whenever possible.
74 ifc_attr_name: Name of attribute in IFC schema.
75 default_ps: tuple of propertyset name and property name. These
76 follow the IFC schema specifications.
77 default_association: tuple of association name and property name.
78 These follow the IFC schema specifications.
79 patterns: iterable of (compiled) re patterns to find not schema
80 conform stored information
81 ifc_postprocessing: callable to apply on initial value, returns
82 final value
83 functions: iterable of callable with signature func(bind, name) ->
84 value. First return with no error is used as value.
85 default: default value which is used if no other source is
86 successful. Use only for attributes which have valid
87 defaults.
88 dependant_elements: list of additional elements necessary to
89 calculate the attribute
90 attr_type: data type of attribute, used to determine decision type
91 if decision is needed, float is default
92 """
93 self.name = None # auto set by AutoAttributeNameMeta
94 self.description = description
95 self.unit = unit
97 self.ifc_attr_name = ifc_attr_name
98 self.default_ps = default_ps
99 self.default_association = default_association
100 self.patterns = patterns
101 self.functions = functions
102 self.default_value = default
103 self.dependant_elements = dependant_elements
104 # data_source stores where the information was obtained from throughout
105 # the bim2sim process
106 self.data_source = None
107 self.attr_type = attr_type
109 if ifc_postprocessing is not None:
110 self.ifc_post_processing = ifc_postprocessing
112 # TODO argument for validation function
114 def to_aggregation(self, calc=None, **kwargs):
115 """Create new Attribute suited for aggregation."""
116 options = {
117 'description': self.description,
118 'unit': self.unit,
119 'default': self.default_value
120 }
121 options.update(kwargs)
122 options['functions'] = [calc]
123 return Attribute(**options)
125 def _get_value(self, bind):
126 """"""
127 value = None
128 data_source = None
129 if bind.ifc: # don't bother if there is no ifc
130 # default ifc attribute
131 if value is None and self.ifc_attr_name:
132 if hasattr(bind.ifc, self.ifc_attr_name):
133 raw_value = getattr(bind.ifc, self.ifc_attr_name)
134 value = self.post_process_value(bind, raw_value)
135 if value is not None:
136 data_source = AttributeDataSource.ifc_attr
137 # default property set
138 if value is None and self.default_ps:
139 raw_value = self.get_from_default_propertyset(bind,
140 self.default_ps)
141 value = self.post_process_value(bind, raw_value)
142 if value is not None:
143 data_source = AttributeDataSource.default_ps
145 if value is None and self.default_association:
146 raw_value = self.get_from_default_propertyset(
147 bind, self.default_association)
148 value = self.post_process_value(bind, raw_value)
149 if value is not None:
150 data_source = AttributeDataSource.default_association
152 # tool specific properties (finder)
153 if value is None:
154 raw_value = self.get_from_finder(bind, self.name)
155 value = self.post_process_value(bind, raw_value)
156 if value is not None:
157 data_source = AttributeDataSource.finder
159 # custom properties by patterns
160 if value is None and self.patterns:
161 raw_value = self.get_from_patterns(bind, self.patterns,
162 self.name)
163 value = self.post_process_value(bind, raw_value)
164 if value is not None:
165 data_source = AttributeDataSource.patterns
167 # custom functions
168 if value is None and self.functions:
169 value = self.get_from_functions(bind, self.functions, self.name)
170 if value is not None:
171 data_source = AttributeDataSource.function
173 # logger value none
174 if value is None:
175 quality_logger.warning(
176 "Attribute '%s' of %s %s was not found in default "
177 "PropertySet, default Association, finder, patterns or "
178 "functions",
179 self.name, bind.ifc_type, bind.guid)
181 # default value
182 if value is None and self.default_value is not None:
183 value = self.default_value
184 if value is not None and self.unit:
185 value = value * self.unit
186 data_source = AttributeDataSource.default
188 # check unit
189 if isinstance(value, (list, set)):
190 # case to calculate values that are list of quantities
191 new_value = []
192 for item in value:
193 if self.unit is not None and item is not None and not \
194 isinstance(item, ureg.Quantity):
195 logger.warning(
196 f"Unit not set for attribute {self} of {bind}")
197 new_value.append(item * self.unit)
198 value = new_value if len(new_value) == len(value) else value
199 else:
200 if self.unit is not None and value is not None and not isinstance(
201 value, ureg.Quantity):
202 logger.warning(f"Unit not set for attribute {self} of {bind}")
203 value = value * self.unit
204 # todo validation of attributes on creation time makes accept_valids
205 # function in base_tasks.py unusable as not valid attributes are never
206 # created
207 # if value is not None and bind.conditions:
208 # if not self.check_conditions(bind, value, self.name):
209 # value = None
211 return value, data_source
213 @staticmethod
214 def get_from_default_propertyset(bind, default):
215 """Get value from default property set"""
216 try:
217 value = bind.get_exact_property(*default)
218 except Exception:
219 value = None
220 return value
222 @staticmethod
223 def get_from_finder(bind, name):
224 finder = getattr(bind, 'finder', None)
225 if finder: # Aggregations have no finder
226 try:
227 return bind.finder.find(bind, name)
228 except (AttributeError, TypeError):
229 pass
230 return None
232 @staticmethod
233 def get_from_patterns(bind, patterns, name):
234 """Get value from non default property sets matching patterns"""
235 value = bind.select_from_potential_properties(patterns, name, False)
236 return value
238 @staticmethod
239 def get_from_functions(bind, functions: list, name: str):
240 """Get value from functions.
242 First successful function calls return value is used. As we want to
243 allow to overwrite functions in inherited classes, we use
244 getattr(bind, func.__name__) to get the function from the bind.
246 Args:
247 bind: the bind object
248 functions: a list of functions
249 name: the name of the attribute
250 """
251 value = None
252 for func in functions:
253 func_inherited = getattr(bind, func.__name__)
254 try:
255 value = func_inherited(name)
256 except Exception as ex:
257 logger.error("Function '%s' of %s.%s raised %s",
258 func.__name__, bind, name, ex)
259 pass
260 else:
261 if value is not None:
262 break
263 return value
265 @staticmethod
266 def get_conditions(bind, name):
267 """Get conditions for attribute"""
268 conditions = []
269 for condition in bind.conditions:
270 if condition.key == name:
271 conditions.append(partial(condition.check, bind))
272 return conditions
274 @staticmethod
275 def check_conditions(bind, value, name):
276 """Check conditions"""
277 conditions = Attribute.get_conditions(bind, name)
278 for condition_check in conditions:
279 if not condition_check(value):
280 return False
281 return True
283 def create_decision(self, bind):
284 """Created Decision for this Attribute"""
285 # TODO: set state in output dict -> attributemanager
286 conditions = [lambda x: True] if not bind.conditions else \
287 Attribute.get_conditions(bind, self.name)
289 console_identifier = "Name: %s, GUID: %s" % (bind.name, bind.guid)
290 related = bind.guid
291 key = self.name
292 global_key = "%s_%s.%s" % (bind.ifc_type, bind.guid, self.name)
293 if self.attr_type == bool:
294 question = f"Is the attribute {self.name} of {bind} True/Active?"
295 decision = BoolDecision(
296 question=question,
297 console_identifier=console_identifier,
298 key=key,
299 global_key=global_key,
300 allow_skip=False,
301 related=related
302 )
303 elif self.attr_type == str:
304 question = "Enter value for %s of %s" % (self.name, bind)
305 decision = StringDecision(
306 question=question,
307 console_identifier=console_identifier,
308 key=key,
309 global_key=global_key,
310 allow_skip=False,
311 related=related
312 )
313 else:
314 question = "Enter value for %s of %s" % (self.name, bind)
315 decision = RealDecision(
316 question=question,
317 console_identifier=console_identifier,
318 key=key,
319 global_key=global_key,
320 allow_skip=False,
321 validate_func=conditions,
322 unit=self.unit,
323 related=related
324 )
325 return decision
327 def post_process_value(self, bind, raw_value):
328 """Post-process the raw_value.
330 If attribute is given an external ifc_postprocessing entry, this
331 function will be used. Otherwise, the pre implemented
332 ifc_post_processing of the attribute class will be used.
333 If an external ifc_postprocessing is give, this is checked for being
334 static or not, because if not static, the bind needs to be forwarded to
335 the method.
336 """
337 if raw_value is not None:
338 ifc_post_process_func_name = self.ifc_post_processing.__name__
339 # check if external ifc_post_processing method exists:
340 if hasattr(bind, ifc_post_process_func_name):
341 # check of the method is static or needs the bind
342 is_static = isinstance(inspect.getattr_static(
343 bind, ifc_post_process_func_name), staticmethod)
344 if is_static:
345 value = self.ifc_post_processing(raw_value)
346 else:
347 value = self.ifc_post_processing(bind, raw_value)
348 else:
349 value = self.ifc_post_processing(raw_value)
350 else:
351 value = raw_value
352 return value
354 @staticmethod
355 def ifc_post_processing(value):
356 """Function for post processing of ifc property values (e.g. diameter
357 list -> diameter)by default this function does nothing"""
358 if isinstance(value, str) and value.isnumeric():
359 value = float(value)
360 return value
362 def request(self, bind, external_decision=None):
363 """Request attribute via decision.
365 Args:
366 bind: bound instance of attribute
367 external_decision: Decision to use instead of default decision
368 """
370 # Read current value, status, and data source
371 value, status, _ = self._inner_get(bind)
373 # Case 1: Value is None and status is STATUS_NOT_AVAILABLE
374 if value is None and status == Attribute.STATUS_NOT_AVAILABLE:
375 return self.get_dependency_decisions(bind, external_decision)
377 # Case 2: Value is a list and not all elements are truthy
378 if isinstance(value, list) and not all(value):
379 return self.get_dependency_decisions(bind, external_decision)
381 # Case 3: Value is already a Decision instance
382 if isinstance(value, Decision):
383 return value
385 # Case 4: Value is available or already requested (no action needed)
386 return
388 def reset(self, bind, data_source=AttributeDataSource.manual_overwrite):
389 """Reset attribute, set to None and STATUS_NOT_AVAILABLE."""
390 self._inner_set(
391 bind, None, Attribute.STATUS_RESET, data_source)
393 def get_dependency_decisions(self, bind, external_decision=None):
394 """Get dependency decisions"""
395 status = Attribute.STATUS_REQUESTED
396 if self.functions is not None:
397 if self.dependant_elements:
398 logger.warning(f'Attribute {self.name} of element {bind} uses '
399 f'"dependent_elements" functionality, but this '
400 f'is currently not supported. Please take this'
401 f' into account.')
402 # _decision = {}
403 # # raise NotImplementedError(
404 # # "The implementation of dependant elements needs to be"
405 # # " revised.")
406 # # case for attributes that depend on the same
407 # # attribute in other elements
408 # _decision_inst = self.dependant_elements_decision(
409 # bind)
410 # for inst in _decision_inst:
411 # if inst not in _decision:
412 # _decision[inst] = _decision_inst[inst]
413 # else:
414 # _decision[inst].update(_decision_inst[inst])
415 # for dec_inst, dec in _decision.items():
416 # self._inner_set(
417 # dec_inst, dec, status, self.data_source)
418 # else:
419 # _decision = external_decision or self.create_decision(
420 # bind)
421 # else:
422 # # actual request
423 # _decision = external_decision or self.create_decision(bind)
424 _decision = external_decision or self.create_decision(bind)
425 self._inner_set(bind, _decision, status, self.data_source)
427 return _decision
429 # def get_attribute_dependency(self, instance):
430 # """Get attribute dependency.
431 #
432 # When an attribute depends on other attributes in the same instance or
433 # the same attribute in other elements, this function gets the
434 # dependencies when they are not stored on the respective dictionaries.
435 # """
436 # if not self.dependant_attributes and not self.ConsoleDecisionHandler:
437 # dependant = []
438 # for func in self.functions:
439 # for attr in func.__code__.co_names:
440 # if hasattr(instance, attr):
441 # dependant.append(attr)
442 #
443 # for dependant_item in dependant:
444 # # case for attributes that depend on the same attribute in
445 # # other elements -> dependant_elements
446 # logger.warning("Attribute \"%s\" from class \"%s\" has no: "
447 # % (self.name, type(instance).__name__))
448 # if 'elements' in dependant_item:
449 # self.dependant_elements = dependant_item
450 # logger.warning("- dependant elements: \"%s\"" %
451 # dependant_item)
452 # # case for attributes that depend on the other attributes in
453 # # the same instance -> dependant_attributes
454 # else:
455 # if self.dependant_attributes is None:
456 # self.dependant_attributes = []
457 # self.dependant_attributes.append(dependant_item)
458 # logger.warning("- dependant attributes: \"%s\"" %
459 # dependant_item)
461 def dependant_elements_decision(self, bind) -> dict:
462 """Function to request attributes in other elements different to bind,
463 that are later on necessary to calculate an attribute in bind (case of
464 aggregation)
466 Returns:
467 _decision: key: is the instance, value: is another dict composed of the
468 attr name and the corresponding decision or function to
469 calculate said attribute
470 """
471 _decision = {}
472 for inst in getattr(bind, self.dependant_elements):
473 # request instance attribute
474 pre_decisions = inst.attributes.get_decisions()
475 inst.request(self.name)
476 additional_decisions = inst.attributes.get_decisions()
477 inst_decisions = [dec for dec in additional_decisions
478 if dec not in pre_decisions]
479 for decision in inst_decisions:
480 if decision is not None:
481 if inst not in _decision:
482 _decision[inst] = {}
483 if isinstance(decision, dict):
484 _decision[inst].update(decision)
485 else:
486 _decision[inst][decision.key] = decision
487 # if self.dependant_attributes:
488 # for d_attr in self.dependant_attributes:
489 # requested_decisions = bind.request(d_attr)
490 # if requested_decisions is not None:
491 # for inst, attr in requested_decisions.items():
492 # if not isinstance(inst, str):
493 # if inst not in _decision:
494 # _decision[inst] = {}
495 # _decision[inst].update(attr)
496 return _decision
498 def initialize(self, manager):
499 if not self.name:
500 print(self)
501 raise AttributeError("Attribute.name not set!")
503 manager[self.name] = (None, self.STATUS_UNKNOWN, None)
505 def _inner_get(self, bind):
506 return bind.attributes[self.name]
508 def _inner_set(self, bind, value, status, data_source):
509 # TODO: validate
510 bind.attributes[self.name] = value, status, data_source
512 def __get__(self, bind, owner):
513 """This gets called if attribute is accessed via element.attribute.
515 The descriptors get function handles the different underlying ways to
516 get an attributes value"""
517 if bind is None:
518 return self
520 # read current value and status
521 value_or_decision, status, data_source = self._inner_get(bind)
522 changed = False
523 value = None
525 if isinstance(value_or_decision, Decision):
526 # decision
527 if status != self.STATUS_REQUESTED:
528 raise AssertionError("Inconsistent status")
529 if value_or_decision.valid():
530 value = value_or_decision.value
531 status = self.STATUS_AVAILABLE
532 data_source = AttributeDataSource.decision
533 changed = True
534 else:
535 value = value_or_decision
537 if (value is None and status
538 in [self.STATUS_UNKNOWN, self.STATUS_RESET]):
539 value, data_source = self._get_value(bind)
540 status = self.STATUS_AVAILABLE if value is not None \
541 else self.STATUS_NOT_AVAILABLE # change for temperature
542 changed = True
544 if changed:
545 # write back new value and status
546 self._inner_set(bind, value, status, data_source)
548 return value
550 def __set__(self, bind, value):
551 if isinstance(value, tuple) and len(value) == 2:
552 data_source = value[1]
553 value = value[0]
554 else:
555 # if not data_source is provided, 'manual_overwrite' will be set
556 data_source = AttributeDataSource.manual_overwrite
557 if self.unit:
558 if isinstance(value, ureg.Quantity):
559 # case for quantity
560 _value = value.to(self.unit)
561 elif isinstance(value, list):
562 # case for list of quantities
563 _value = []
564 for item in value:
565 if isinstance(item, ureg.Quantity):
566 _value.append(item.to(self.unit))
567 else:
568 _value.append(item * self.unit)
569 else:
570 _value = value * self.unit
571 else:
572 _value = value
573 self._inner_set(bind, _value, self.STATUS_AVAILABLE, data_source)
575 def __str__(self):
576 return "Attribute %s" % self.name
579class AttributeManager(dict):
580 """Manages the attributes.
582 Every bim2sim element owns an instance of the AttributeManager class which
583 manages the corresponding attributes of this element. It as an dict with
584 key: name of attribute as string
585 value: tuple with (value of attribute, Status of attribute).
586 """
588 def __init__(self, bind):
589 super().__init__()
590 self.bind = bind
592 for name in self.names:
593 attr = self.get_attribute(name)
594 attr.initialize(self)
596 def __setitem__(self, name, value):
597 if name not in self.names:
598 raise AttributeError("Invalid Attribute '%s'. Choices are %s" % (
599 name, list(self.names)))
600 if isinstance(value, tuple) and len(value) == 3:
601 if not (isinstance(value[-1], AttributeDataSource)
602 or value[-1] is None):
603 try:
604 getattr(AttributeDataSource, value[-1])
605 except AttributeError:
606 raise ValueError(
607 f"Non valid DataSource provided for attribute {name} "
608 f"of element {self.bind}")
609 super().__setitem__(name, value)
610 else:
611 if not isinstance(value, tuple):
612 super().__setitem__(name, (
613 value,
614 Attribute.STATUS_AVAILABLE,
615 AttributeDataSource.manual_overwrite))
616 elif isinstance(value[-1], AttributeDataSource) or value[-1] is None:
617 super().__setitem__(name, (value, Attribute.STATUS_AVAILABLE))
618 else:
619 raise ValueError("datasource")
621 def update(self, other):
622 # dict.update does not invoke __setitem__
623 for k, v in other.items():
624 self.__setitem__(k, v)
626 def reset(self, name, data_source=AttributeDataSource.manual_overwrite):
627 """Reset attribute, set to None and STATUS_NOT_AVAILABLE."""
628 # TODO this has limitations when the corresponding attribute uses
629 # functions to calculate the value, see #760 for more information
630 try:
631 attr = self.get_attribute(name)
632 except KeyError:
633 raise KeyError("%s has no Attribute '%s'" % (self.bind, name))
634 attr.reset(self.bind, data_source)
636 def request(self, name: str, external_decision: Decision = None) \
637 -> Union[None, Decision]:
638 """Request attribute by name.
640 Checks the status of the requested attribute and returns
642 Args:
643 name: name of requested attribute
644 external_decision: custom decision to get attribute from
646 Returns:
647 A Decision to get the requested attributes value.
648 """
649 try:
650 attr = self.get_attribute(name)
651 except KeyError:
652 raise KeyError("%s has no Attribute '%s'" % (self.bind, name))
653 value, status, data_source = self[name]
654 if status in [Attribute.STATUS_UNKNOWN, Attribute.STATUS_RESET]:
655 # make sure default methods are tried
656 getattr(self.bind, name)
657 value, status, data_source = self[name]
658 if value is None:
659 if status == Attribute.STATUS_NOT_AVAILABLE:
660 decision = attr.request(self.bind, external_decision)
661 return decision
662 if isinstance(value, list):
663 # case for list of quantities
664 if not all(v is not None for v in value):
665 decision = attr.request(self.bind, external_decision)
666 return decision
667 elif isinstance(value, Decision):
668 if external_decision and value is not external_decision:
669 raise AttributeError("Can't set external decision for an "
670 "already requested attribute.")
671 return value
672 else:
673 # already requested or available
674 return
676 def get_attribute(self, name):
677 return getattr(type(self.bind), name)
679 def get_unit(self, name):
680 attr = self.get_attribute(name)
681 return attr.unit
683 @property
684 def names(self):
685 """Returns a generator object with all attributes that the corresponding
686 bind owns."""
687 return (name for name in dir(type(self.bind))
688 if isinstance(getattr(type(self.bind), name), Attribute))
690 def get_decisions(self) -> DecisionBunch:
691 """Return all decision of attributes with status REQUESTED."""
692 decisions = DecisionBunch()
693 for dec, status, data_source in self.values():
694 if status == Attribute.STATUS_REQUESTED:
695 decisions.append(dec)
696 return decisions
699def multi_calc(func):
700 """Decorator for calculation of multiple Attribute values.
702 Decorator functools.wraps is needed to return the real function name
703 for get_from_functions method.
704 """
706 @functools.wraps(func)
707 def wrapper(bind, name):
708 # inner function call
709 result = func(bind)
710 value = result.pop(name)
711 # send all other result values to AttributeManager instance
712 bind.attributes.update(result)
713 return value
715 return wrapper