Coverage for bim2sim/kernel/decision/console.py: 63%

218 statements  

« prev     ^ index     » next       coverage.py v7.6.12, created at 2025-03-12 17:09 +0000

1"""Decision handling via console.""" 

2 

3import re 

4 

5from bim2sim.kernel.decision import Decision, BoolDecision, ListDecision, \ 

6 DecisionBunch 

7from bim2sim.kernel.decision import DecisionCancel, DecisionSkip, DecisionSkipAll 

8from bim2sim.kernel.decision.decisionhandler import DecisionHandler 

9 

10 

11class ConsoleDecisionHandler(DecisionHandler): 

12 """DecisionHandler to user with an interactive console.""" 

13 

14 @staticmethod 

15 def get_input_txt(decision): 

16 txt = 'Enter value: ' 

17 if isinstance(decision, ListDecision): 

18 if not decision.live_search: 

19 txt = 'Enter key: ' 

20 else: 

21 txt = 'Enter key or search words: ' 

22 

23 return txt 

24 

25 @staticmethod 

26 def get_default_txt(decision): 

27 if decision.default is not None: 

28 return f"default={decision.default} (leave blank to use default)" 

29 else: 

30 return '' 

31 

32 @staticmethod 

33 def get_options_txt(options): 

34 return "Additional commands: %s" % (", ".join(options)) 

35 

36 @staticmethod 

37 def get_body_txt(body): 

38 len_labels = max(len(str(item[2])) for item in body) 

39 header_str = " {key:3s} {label:%ds} {value:s}" % (len_labels) 

40 format_str = "\n {key:3s} {label:%ds} {value:s}" % (len_labels) 

41 body_txt = header_str.format(key="key", label="label", value="value") 

42 

43 for key, value, label in body: 

44 body_txt += format_str.format(key=str(key), label=str(label), 

45 value=str(value)) 

46 

47 return body_txt 

48 

49 @staticmethod 

50 def collection_progress(collection): 

51 total = len(collection) 

52 for i, decision in enumerate(collection): 

53 yield decision, "[Decision {}/{}]".format(i + 1, total) 

54 

55 def get_answers_for_bunch(self, bunch: DecisionBunch) -> list: 

56 answers = [] 

57 if not bunch: 

58 return answers 

59 

60 skip_all = False 

61 extra_options = [] 

62 if all([d.allow_skip for d in bunch]): 

63 extra_options.append(Decision.SKIPALL) 

64 

65 for decision, progress in self.collection_progress(bunch): 

66 answer = None 

67 if skip_all and decision.allow_skip: 

68 # decision.skip() 

69 pass 

70 else: 

71 if skip_all: 

72 self.logger.info("Decision can not be skipped") 

73 try: 

74 answer = self.user_input(decision, 

75 extra_options=extra_options, 

76 progress=progress) 

77 except DecisionSkip: 

78 # decision.skip() 

79 pass 

80 except DecisionSkipAll: 

81 skip_all = True 

82 self.logger.info("Skipping remaining decisions") 

83 except DecisionCancel as ex: 

84 self.logger.info("Canceling decisions") 

85 raise 

86 answers.append(answer) 

87 return answers 

88 

89 # TODO: based on decision type 

90 # TODO: merge from element_filter_by_text 

91 def user_input(self, decision, extra_options=None, progress=''): 

92 

93 question = self.get_question(decision) 

94 identifier = decision.console_identifier 

95 options = self.get_options(decision) 

96 if extra_options: 

97 options = options + extra_options 

98 options_txt = self.get_options_txt(options) 

99 default = self.get_default_txt(decision) 

100 body = self.get_body(decision) if isinstance(decision, ListDecision) \ 

101 else None 

102 input_txt = self.get_input_txt(decision) 

103 if progress: 

104 progress += ' ' 

105 

106 print(progress, end='') 

107 print(question) 

108 if identifier: 

109 print(identifier) 

110 if isinstance(decision, ListDecision) and decision.live_search: 

111 print("enter 'reset' to start search again") 

112 print("enter 'back' to return to last search") 

113 print(options_txt + ' ' + default) 

114 if body: 

115 print(self.get_body_txt(body)) 

116 

117 max_attempts = 10 

118 attempt = 0 

119 

120 if isinstance(decision, ListDecision) and decision.live_search: 

121 value = self.user_input_live(decision, input_txt, options) 

122 else: 

123 while True: 

124 raw_value = input(input_txt) 

125 if raw_value.lower() == Decision.SKIP.lower() and Decision.SKIP in options: 

126 raise DecisionSkip 

127 # decision.skip() 

128 # return None 

129 if raw_value.lower() == Decision.SKIPALL.lower() and Decision.SKIPALL in options: 

130 # decision.skip() 

131 raise DecisionSkipAll 

132 if raw_value.lower() == Decision.CANCEL.lower() and Decision.CANCEL in options: 

133 raise DecisionCancel 

134 

135 if not raw_value and decision.default is not None: 

136 return decision.default 

137 

138 value = self.parse(decision, raw_value) 

139 if self.validate(decision, value): 

140 break 

141 else: 

142 if attempt <= max_attempts: 

143 if attempt == max_attempts: 

144 print("Last try before auto Cancel!") 

145 print( 

146 f"'{raw_value}' (interpreted as {value}) is no valid input! Try again.") 

147 else: 

148 raise DecisionCancel( 

149 "Too many invalid attempts. Canceling input.") 

150 attempt += 1 

151 

152 return value 

153 

154 def user_input_live(self, decision, input_txt, options): 

155 last_searches = [] 

156 

157 max_attempts = 10 

158 attempt = 0 

159 

160 original_options = list(decision.items) 

161 new_options = decision.items 

162 while True: 

163 searches = ' + '.join([i[0] for i in last_searches]) + ' +' \ 

164 if len(last_searches) > 0 else '' 

165 raw_value = input(input_txt + searches) 

166 

167 if raw_value.lower() == Decision.SKIP.lower() and Decision.SKIP in options: 

168 raise DecisionSkip 

169 if raw_value.lower() == Decision.SKIPALL.lower() and Decision.SKIPALL in options: 

170 raise DecisionSkipAll 

171 if raw_value.lower() == Decision.CANCEL.lower() and Decision.CANCEL in options: 

172 raise DecisionCancel 

173 

174 value = self.parse(decision, raw_value) 

175 if value: 

176 if self.validate(decision, value): 

177 break 

178 else: 

179 if attempt <= max_attempts: 

180 if attempt == max_attempts: 

181 print("Last try before auto Cancel!") 

182 print( 

183 f"'{raw_value}' (interpreted as {value}) is no valid input! Try again.") 

184 else: 

185 raise DecisionCancel( 

186 "Too many invalid attempts. Canceling input.") 

187 attempt += 1 

188 else: 

189 if raw_value.lower() == 'none': # cancel search option 

190 break 

191 elif raw_value.lower() == 'back' and len(last_searches) > 1: 

192 del last_searches[-1] 

193 new_options = last_searches[-1][1] 

194 elif raw_value.lower() == 'reset' or \ 

195 (raw_value.lower() == 'back' and len( 

196 last_searches) <= 1): 

197 last_searches = [] 

198 new_options = original_options 

199 else: 

200 new_options = self.get_matches_list(raw_value, new_options) 

201 if len(new_options) == 1: 

202 value = new_options[0] 

203 break 

204 elif len(new_options) == 0: 

205 print('No options found for %s' % raw_value) 

206 if len(last_searches) == 0: 

207 new_options = original_options 

208 else: 

209 new_options = last_searches[-1][1] 

210 else: 

211 last_searches.append([raw_value, new_options]) 

212 decision.items = new_options 

213 options_txt = self.get_options_txt(options) 

214 default = self.get_default_txt(decision) 

215 body = decision.get_body() 

216 body_txt = self.get_body_txt(body) 

217 print(decision.question) 

218 print(options_txt + ' ' + default) 

219 print(body_txt) 

220 

221 return value 

222 

223 @staticmethod 

224 def get_matches_list(search_words: str, search_list: list) -> list: 

225 """get patterns for a search name, and get afterwards the related 

226 elements from list that matches the search""" 

227 

228 search_ref = [] 

229 if search_words in search_list: 

230 return [search_words] 

231 

232 if type(search_words) is str: 

233 pattern_search = search_words.split() 

234 

235 for i in pattern_search: 

236 search_ref.append(re.compile('(.*?)%s' % i, 

237 flags=re.IGNORECASE)) 

238 

239 search_options = [] 

240 for ref in search_ref: 

241 for mat in search_list: 

242 if ref.match(mat): 

243 if mat not in search_options: 

244 search_options.append(mat) 

245 

246 return search_options 

247 

248 @staticmethod 

249 def parse_real_input(raw_input, unit=None): 

250 """Convert input to float""" 

251 

252 try: 

253 if unit: 

254 value = float(raw_input) * unit 

255 else: 

256 value = float(raw_input) 

257 except: 

258 value = None 

259 return value 

260 

261 @staticmethod 

262 def parse_bool_input(raw_input): 

263 """Convert input to bool""" 

264 

265 inp = raw_input.lower() 

266 if inp in BoolDecision.POSITIVES: 

267 return True 

268 if inp in BoolDecision.NEGATIVES: 

269 return False 

270 return None 

271 

272 @staticmethod 

273 def parse_list_input(raw_input, items): 

274 raw_value = None 

275 try: 

276 index = int(raw_input) 

277 raw_value = items[index] 

278 except Exception: 

279 pass 

280 

281 return raw_value 

282 

283 @staticmethod 

284 def parse_string_input(raw_input): 

285 raw_value = None 

286 try: 

287 raw_value = str(raw_input) 

288 except Exception: 

289 pass 

290 return raw_value 

291 

292 @staticmethod 

293 def parse_guid_input(raw_input): 

294 raw_value = None 

295 try: 

296 parts = str(raw_input).replace(',', ' ').split(' ') 

297 raw_value = {guid for guid in parts if guid} 

298 except Exception: 

299 pass 

300 return raw_value