Coverage for bim2sim/kernel/decision/console.py: 63%
218 statements
« prev ^ index » next coverage.py v7.6.12, created at 2025-03-12 17:09 +0000
« prev ^ index » next coverage.py v7.6.12, created at 2025-03-12 17:09 +0000
1"""Decision handling via console."""
3import re
5from bim2sim.kernel.decision import Decision, BoolDecision, ListDecision, \
6 DecisionBunch
7from bim2sim.kernel.decision import DecisionCancel, DecisionSkip, DecisionSkipAll
8from bim2sim.kernel.decision.decisionhandler import DecisionHandler
11class ConsoleDecisionHandler(DecisionHandler):
12 """DecisionHandler to user with an interactive console."""
14 @staticmethod
15 def get_input_txt(decision):
16 txt = 'Enter value: '
17 if isinstance(decision, ListDecision):
18 if not decision.live_search:
19 txt = 'Enter key: '
20 else:
21 txt = 'Enter key or search words: '
23 return txt
25 @staticmethod
26 def get_default_txt(decision):
27 if decision.default is not None:
28 return f"default={decision.default} (leave blank to use default)"
29 else:
30 return ''
32 @staticmethod
33 def get_options_txt(options):
34 return "Additional commands: %s" % (", ".join(options))
36 @staticmethod
37 def get_body_txt(body):
38 len_labels = max(len(str(item[2])) for item in body)
39 header_str = " {key:3s} {label:%ds} {value:s}" % (len_labels)
40 format_str = "\n {key:3s} {label:%ds} {value:s}" % (len_labels)
41 body_txt = header_str.format(key="key", label="label", value="value")
43 for key, value, label in body:
44 body_txt += format_str.format(key=str(key), label=str(label),
45 value=str(value))
47 return body_txt
49 @staticmethod
50 def collection_progress(collection):
51 total = len(collection)
52 for i, decision in enumerate(collection):
53 yield decision, "[Decision {}/{}]".format(i + 1, total)
55 def get_answers_for_bunch(self, bunch: DecisionBunch) -> list:
56 answers = []
57 if not bunch:
58 return answers
60 skip_all = False
61 extra_options = []
62 if all([d.allow_skip for d in bunch]):
63 extra_options.append(Decision.SKIPALL)
65 for decision, progress in self.collection_progress(bunch):
66 answer = None
67 if skip_all and decision.allow_skip:
68 # decision.skip()
69 pass
70 else:
71 if skip_all:
72 self.logger.info("Decision can not be skipped")
73 try:
74 answer = self.user_input(decision,
75 extra_options=extra_options,
76 progress=progress)
77 except DecisionSkip:
78 # decision.skip()
79 pass
80 except DecisionSkipAll:
81 skip_all = True
82 self.logger.info("Skipping remaining decisions")
83 except DecisionCancel as ex:
84 self.logger.info("Canceling decisions")
85 raise
86 answers.append(answer)
87 return answers
89 # TODO: based on decision type
90 # TODO: merge from element_filter_by_text
91 def user_input(self, decision, extra_options=None, progress=''):
93 question = self.get_question(decision)
94 identifier = decision.console_identifier
95 options = self.get_options(decision)
96 if extra_options:
97 options = options + extra_options
98 options_txt = self.get_options_txt(options)
99 default = self.get_default_txt(decision)
100 body = self.get_body(decision) if isinstance(decision, ListDecision) \
101 else None
102 input_txt = self.get_input_txt(decision)
103 if progress:
104 progress += ' '
106 print(progress, end='')
107 print(question)
108 if identifier:
109 print(identifier)
110 if isinstance(decision, ListDecision) and decision.live_search:
111 print("enter 'reset' to start search again")
112 print("enter 'back' to return to last search")
113 print(options_txt + ' ' + default)
114 if body:
115 print(self.get_body_txt(body))
117 max_attempts = 10
118 attempt = 0
120 if isinstance(decision, ListDecision) and decision.live_search:
121 value = self.user_input_live(decision, input_txt, options)
122 else:
123 while True:
124 raw_value = input(input_txt)
125 if raw_value.lower() == Decision.SKIP.lower() and Decision.SKIP in options:
126 raise DecisionSkip
127 # decision.skip()
128 # return None
129 if raw_value.lower() == Decision.SKIPALL.lower() and Decision.SKIPALL in options:
130 # decision.skip()
131 raise DecisionSkipAll
132 if raw_value.lower() == Decision.CANCEL.lower() and Decision.CANCEL in options:
133 raise DecisionCancel
135 if not raw_value and decision.default is not None:
136 return decision.default
138 value = self.parse(decision, raw_value)
139 if self.validate(decision, value):
140 break
141 else:
142 if attempt <= max_attempts:
143 if attempt == max_attempts:
144 print("Last try before auto Cancel!")
145 print(
146 f"'{raw_value}' (interpreted as {value}) is no valid input! Try again.")
147 else:
148 raise DecisionCancel(
149 "Too many invalid attempts. Canceling input.")
150 attempt += 1
152 return value
154 def user_input_live(self, decision, input_txt, options):
155 last_searches = []
157 max_attempts = 10
158 attempt = 0
160 original_options = list(decision.items)
161 new_options = decision.items
162 while True:
163 searches = ' + '.join([i[0] for i in last_searches]) + ' +' \
164 if len(last_searches) > 0 else ''
165 raw_value = input(input_txt + searches)
167 if raw_value.lower() == Decision.SKIP.lower() and Decision.SKIP in options:
168 raise DecisionSkip
169 if raw_value.lower() == Decision.SKIPALL.lower() and Decision.SKIPALL in options:
170 raise DecisionSkipAll
171 if raw_value.lower() == Decision.CANCEL.lower() and Decision.CANCEL in options:
172 raise DecisionCancel
174 value = self.parse(decision, raw_value)
175 if value:
176 if self.validate(decision, value):
177 break
178 else:
179 if attempt <= max_attempts:
180 if attempt == max_attempts:
181 print("Last try before auto Cancel!")
182 print(
183 f"'{raw_value}' (interpreted as {value}) is no valid input! Try again.")
184 else:
185 raise DecisionCancel(
186 "Too many invalid attempts. Canceling input.")
187 attempt += 1
188 else:
189 if raw_value.lower() == 'none': # cancel search option
190 break
191 elif raw_value.lower() == 'back' and len(last_searches) > 1:
192 del last_searches[-1]
193 new_options = last_searches[-1][1]
194 elif raw_value.lower() == 'reset' or \
195 (raw_value.lower() == 'back' and len(
196 last_searches) <= 1):
197 last_searches = []
198 new_options = original_options
199 else:
200 new_options = self.get_matches_list(raw_value, new_options)
201 if len(new_options) == 1:
202 value = new_options[0]
203 break
204 elif len(new_options) == 0:
205 print('No options found for %s' % raw_value)
206 if len(last_searches) == 0:
207 new_options = original_options
208 else:
209 new_options = last_searches[-1][1]
210 else:
211 last_searches.append([raw_value, new_options])
212 decision.items = new_options
213 options_txt = self.get_options_txt(options)
214 default = self.get_default_txt(decision)
215 body = decision.get_body()
216 body_txt = self.get_body_txt(body)
217 print(decision.question)
218 print(options_txt + ' ' + default)
219 print(body_txt)
221 return value
223 @staticmethod
224 def get_matches_list(search_words: str, search_list: list) -> list:
225 """get patterns for a search name, and get afterwards the related
226 elements from list that matches the search"""
228 search_ref = []
229 if search_words in search_list:
230 return [search_words]
232 if type(search_words) is str:
233 pattern_search = search_words.split()
235 for i in pattern_search:
236 search_ref.append(re.compile('(.*?)%s' % i,
237 flags=re.IGNORECASE))
239 search_options = []
240 for ref in search_ref:
241 for mat in search_list:
242 if ref.match(mat):
243 if mat not in search_options:
244 search_options.append(mat)
246 return search_options
248 @staticmethod
249 def parse_real_input(raw_input, unit=None):
250 """Convert input to float"""
252 try:
253 if unit:
254 value = float(raw_input) * unit
255 else:
256 value = float(raw_input)
257 except:
258 value = None
259 return value
261 @staticmethod
262 def parse_bool_input(raw_input):
263 """Convert input to bool"""
265 inp = raw_input.lower()
266 if inp in BoolDecision.POSITIVES:
267 return True
268 if inp in BoolDecision.NEGATIVES:
269 return False
270 return None
272 @staticmethod
273 def parse_list_input(raw_input, items):
274 raw_value = None
275 try:
276 index = int(raw_input)
277 raw_value = items[index]
278 except Exception:
279 pass
281 return raw_value
283 @staticmethod
284 def parse_string_input(raw_input):
285 raw_value = None
286 try:
287 raw_value = str(raw_input)
288 except Exception:
289 pass
290 return raw_value
292 @staticmethod
293 def parse_guid_input(raw_input):
294 raw_value = None
295 try:
296 parts = str(raw_input).replace(',', ' ').split(' ')
297 raw_value = {guid for guid in parts if guid}
298 except Exception:
299 pass
300 return raw_value