Coverage for bim2sim/utilities/common_functions.py: 73%

270 statements  

« prev     ^ index     » next       coverage.py v7.6.12, created at 2025-03-12 17:09 +0000

1import collections 

2import json 

3import logging 

4import math 

5import re 

6import shutil 

7import zipfile 

8from urllib.request import urlopen 

9from pathlib import Path 

10from typing import Union 

11from time import sleep 

12import git 

13 

14import bim2sim 

15from bim2sim.utilities.types import IFCDomain 

16 

17assets = Path(bim2sim.__file__).parent / 'assets' 

18logger = logging.getLogger(__name__) 

19 

20 

21def angle_equivalent(angle): 

22 while angle >= 360 or angle < 0: 

23 if angle >= 360: 

24 angle -= 360 

25 elif angle < 0: 

26 angle += 360 

27 return angle 

28 

29 

30def vector_angle(vector): 

31 """returns the angle between y-axis and vector""" 

32 x = vector[0] 

33 y = vector[1] 

34 try: 

35 tang = math.degrees(math.atan(x / y)) 

36 except ZeroDivisionError: 

37 if x > 0: 

38 angle = 90 

39 elif x < 0: 

40 angle = 270 

41 else: 

42 angle = 0 

43 else: 

44 if x >= 0: 

45 # quadrant 1 

46 if y > 0: 

47 angle = tang 

48 # quadrant 2 

49 else: 

50 angle = tang + 180 

51 else: 

52 # quadrant 3 

53 if y < 0: 

54 angle = tang + 180 

55 # quadrant 4 

56 else: 

57 angle = tang + 360 

58 return angle 

59 

60 

61def validateJSON(json_data: Union[str, Path,]): 

62 if not isinstance(json_data, Path): 

63 json_data = Path(str(json_data)) 

64 try: 

65 with open(json_data, 'rb') as file: 

66 json.load(file) 

67 except ValueError: 

68 return False 

69 return True 

70 

71 

72def get_use_conditions_dict(custom_use_cond_path: Path) -> dict: 

73 if custom_use_cond_path: 

74 if custom_use_cond_path.is_file(): 

75 use_cond_path = custom_use_cond_path 

76 else: 

77 use_cond_path = assets / 'enrichment/usage/UseConditions.json' 

78 if validateJSON(use_cond_path): 

79 with open(use_cond_path, 'r+', encoding='utf-8') as file: 

80 use_cond_dict = json.load(file) 

81 del use_cond_dict['version'] 

82 return use_cond_dict 

83 else: 

84 raise ValueError(f"Invalid JSON file {use_cond_path}") 

85 

86 

87def get_common_pattern_usage() -> dict: 

88 common_pattern_path = assets / 'enrichment/usage/commonUsages.json' 

89 if validateJSON(common_pattern_path): 

90 with open(common_pattern_path, 'r+', encoding='utf-8') as file: 

91 common_usages = json.load(file) 

92 return common_usages 

93 else: 

94 raise ValueError(f"Invalid JSON file {common_pattern_path}") 

95 

96 

97def get_custom_pattern_usage(custom_usages_path: Path) -> dict: 

98 """gets custom usages based on given json file.""" 

99 custom_usages = {} 

100 if custom_usages_path and custom_usages_path.is_file(): 

101 if validateJSON(custom_usages_path): 

102 with open(custom_usages_path, 'r+', encoding='utf-8') as file: 

103 custom_usages_json = json.load(file) 

104 if custom_usages_json["settings"]["use"]: 

105 custom_usages = custom_usages_json["usage_definitions"] 

106 return custom_usages 

107 else: 

108 raise ValueError(f"Invalid JSON file {custom_usages_path}") 

109 

110 

111def get_pattern_usage(use_conditions: dict, custom_usages_path: Path): 

112 """get usage patterns to use it on the thermal zones get_usage""" 

113 common_usages = get_common_pattern_usage() 

114 

115 custom_usages = get_custom_pattern_usage(custom_usages_path) 

116 usages = combine_usages(common_usages, custom_usages) 

117 

118 pattern_usage_teaser = collections.defaultdict(dict) 

119 

120 for i in use_conditions: 

121 pattern_usage_teaser[i]["common"] = [] 

122 pattern_usage_teaser[i]["custom"] = [] 

123 list_engl = re.sub(r'\((.*?)\)', '', i) \ 

124 .replace(' - ', ', ') \ 

125 .replace(' and ', ', ') \ 

126 .replace(' in ', ', ') \ 

127 .replace(' with ', ', ') \ 

128 .replace(' or ', ', ') \ 

129 .replace(' the ', ' ') \ 

130 .split(', ') 

131 for i_eng in list_engl: 

132 new_i_eng = i_eng.replace(' ', '(.*?)') 

133 pattern_usage_teaser[i]["common"].append(re.compile( 

134 '(.*?)%s' % new_i_eng, flags=re.IGNORECASE)) 

135 if i in usages: 

136 for c_trans in usages[i]["common"]: 

137 pattern_usage_teaser[i]["common"].append(re.compile( 

138 '(.*?)%s' % c_trans, flags=re.IGNORECASE)) 

139 if "custom" in usages[i]: 

140 for clear_usage in usages[i]["custom"]: 

141 pattern_usage_teaser[i]["custom"].append(clear_usage) 

142 

143 pattern_usage_teaser['office_function']["common"] = [re.compile( 

144 '(.*?)%s' % c_trans, re.IGNORECASE) 

145 for c_trans in usages['office_function']["common"]] 

146 

147 return pattern_usage_teaser 

148 

149 

150def combine_usages(common_usages, custom_usages) -> dict: 

151 """combines the custom and common usages to one dictionary""" 

152 usages = collections.defaultdict(dict) 

153 # combine common and custom usages 

154 for key, value in common_usages.items(): 

155 usages[key]["common"] = value 

156 if custom_usages: 

157 for key, value in custom_usages.items(): 

158 if not isinstance(value, list): 

159 try: 

160 value = list(value) 

161 except TypeError: 

162 raise TypeError("custom usages must be a list") 

163 if key in usages.keys(): 

164 usages[key]["custom"] = value 

165 else: 

166 usages[key]["custom"] = value 

167 usages[key]["common"] = [] 

168 return usages 

169 

170 

171def wildcard_match(pattern, text): 

172 """Check if a text string matches a pattern containing '*' wildcards. 

173 

174 Args: 

175 pattern (str): The pattern string that may contain '*' wildcards. 

176 text (str): The text string to be compared against the pattern. 

177 

178 Returns: 

179 bool: True if the text matches the pattern, considering wildcards. 

180 False otherwise. 

181 """ 

182 # Split the pattern by '*' 

183 parts = pattern.split('*') 

184 

185 # If there is no wildcard in the pattern, perform a simple equality 

186 # check 

187 if len(parts) == 1: 

188 return pattern == text 

189 

190 # If the pattern starts with '*', check if the text ends with the las 

191 # t part 

192 if pattern.startswith('*'): 

193 return text.endswith(parts[1]) 

194 

195 # If the pattern ends with '*', check if the text starts with the first 

196 # part 

197 if pattern.endswith('*'): 

198 return text.startswith(parts[0]) 

199 

200 # If the pattern has '*' in the middle, check if the parts are present 

201 # in order in the text 

202 for i, part in enumerate(parts): 

203 if part: 

204 if i == 0: 

205 if not text.startswith(part): 

206 return False 

207 elif i == len(parts) - 1: 

208 if not text.endswith(part): 

209 return False 

210 else: 

211 index = text.find(part) 

212 if index == -1: 

213 return False 

214 text = text[index + len(part):] 

215 

216 return True 

217 

218 

219def get_type_building_elements(data_file): 

220 type_building_elements_path = \ 

221 assets / 'enrichment/material' / data_file 

222 if validateJSON(type_building_elements_path): 

223 with open(type_building_elements_path, 'r+') as file: 

224 type_building_elements = json.load(file) 

225 del type_building_elements['version'] 

226 else: 

227 raise ValueError(f"Invalid JSON file {type_building_elements_path}") 

228 template_options = {} 

229 for i in type_building_elements: 

230 i_name, i_years = i.split('_')[0:2] 

231 i_template = i.split(f'{i_years}_')[1] 

232 if i_name not in template_options: 

233 template_options[i_name] = {} 

234 if i_years not in template_options[i_name]: 

235 template_options[i_name][i_years] = {} 

236 template_options[i_name][i_years][i_template] = type_building_elements[ 

237 i] 

238 return template_options 

239 

240 

241def get_material_templates(): 

242 material_templates_path = \ 

243 assets / 'enrichment/material/MaterialTemplates.json' 

244 if validateJSON(material_templates_path): 

245 with open(material_templates_path, 'r+') as f: 

246 material_templates = json.load(f) 

247 del material_templates['version'] 

248 else: 

249 raise ValueError(f"Invalid JSON file {material_templates_path}") 

250 return material_templates 

251 

252 

253def filter_elements( 

254 elements: Union[dict, list], type_name, create_dict=False)\ 

255 -> Union[list, dict]: 

256 """Filters the inspected elements by type name (e.g. Wall) and 

257 returns them as list or dict if wanted 

258 

259 Args: 

260 elements: dict or list with all bim2sim elements 

261 type_name: str or element type to filter for 

262 create_dict (Boolean): True if a dict instead of a list should be 

263 created 

264 Returns: 

265 elements_filtered: list of all bim2sim elements of type type_name 

266 """ 

267 from bim2sim.elements.base_elements import SerializedElement 

268 elements_filtered = [] 

269 list_elements = elements.values() if type(elements) is dict \ 

270 else elements 

271 if isinstance(type_name, str): 

272 for instance in list_elements: 

273 if isinstance(instance, SerializedElement): 

274 if instance.element_type == type_name: 

275 elements_filtered.append(instance) 

276 else: 

277 if type_name in type(instance).__name__: 

278 elements_filtered.append(instance) 

279 else: 

280 for instance in list_elements: 

281 if isinstance(instance, SerializedElement): 

282 if instance.element_type == type_name.__name__: 

283 elements_filtered.append(instance) 

284 if type_name is type(instance): 

285 elements_filtered.append(instance) 

286 if not create_dict: 

287 return elements_filtered 

288 else: 

289 return {inst.guid: inst for inst in elements_filtered} 

290 

291 

292def remove_umlaut(string): 

293 """ 

294 Removes umlauts from strings and replaces them with the letter+e convention 

295 :param string: string to remove umlauts from 

296 :return: unumlauted string 

297 """ 

298 u = 'ü'.encode() 

299 U = 'Ü'.encode() 

300 a = 'ä'.encode() 

301 A = 'Ä'.encode() 

302 o = 'ö'.encode() 

303 O = 'Ö'.encode() 

304 ss = 'ß'.encode() 

305 

306 string = string.encode() 

307 string = string.replace(u, b'ue') 

308 string = string.replace(U, b'Ue') 

309 string = string.replace(a, b'ae') 

310 string = string.replace(A, b'Ae') 

311 string = string.replace(o, b'oe') 

312 string = string.replace(O, b'Oe') 

313 string = string.replace(ss, b'ss') 

314 

315 string = string.decode('utf-8') 

316 return string 

317 

318 

319def translate_deep(text, source='auto', target='en'): 

320 """ translate function that uses deep_translator package with 

321 Google Translator""" 

322 # return False # test no internet 

323 try: 

324 from deep_translator import GoogleTranslator 

325 translated = GoogleTranslator( 

326 source=source, target=target).translate(text=text) 

327 return translated 

328 except: 

329 return False 

330 # proxies_example = { 

331 # "https": "34.195.196.27:8080", 

332 # "http": "34.195.196.27:8080" 

333 # } 

334 

335 

336def all_subclasses(cls, as_names: bool = False, include_self: bool=False): 

337 """Get all subclasses of the given subclass, even subsubclasses and so on 

338 

339 Args: 

340 cls: class for which to find subclasses 

341 as_names: boolean, if True the subclasses are returned as names 

342 include_self: boolean, if True include evaluated class to subclasses. 

343 """ 

344 all_cls = set(cls.__subclasses__()).union( 

345 [s for c in cls.__subclasses__() for s in all_subclasses(c)]) 

346 if as_names: 

347 all_cls = [cls.__name__ for cls in all_cls] 

348 if include_self: 

349 if as_names: 

350 all_cls.add(cls.__name__) 

351 else: 

352 all_cls.add(cls) 

353 return all_cls 

354 

355 

356def get_spaces_with_bounds(elements: dict): 

357 """Get spaces (ThermalZone) that provide space boundaries. 

358 

359 This function extracts spaces from an instance dictionary and returns 

360 those spaces that hold space boundaries. 

361 

362 Args: 

363 elements: dict[guid: element] 

364 """ 

365 

366 spaces = filter_elements(elements, 'ThermalZone') 

367 spaces_with_bounds = [s for s in spaces if s.space_boundaries] 

368 

369 return spaces_with_bounds 

370 

371 

372def download_library( 

373 repo_url: str, 

374 branch_name: str, 

375 clone_dir: Path, 

376): 

377 """Clones a Git repository and checks out a specific branch, or updates 

378 the repository if it already exists. 

379 

380 This function clones the specified Git repository into the given directory 

381 and checks out the specified branch. If the directory already exists and 

382 is a Git repository, it will perform a 'git pull' to update the repository 

383 instead of cloning. 

384 

385 Args: 

386 repo_url (str): The URL of the Git repository to clone or update. 

387 branch_name (str): The name of the branch to check out. 

388 clone_dir (Path): The directory where the repository should be cloned 

389 or updated. 

390 

391 Returns: 

392 None 

393 

394 Raises: 

395 git.GitCommandError: If there is an error during the cloning, checkout, 

396 or pull process. 

397 Exception: If the directory exists but is not a Git repository. 

398 """ 

399 if clone_dir.exists(): 

400 # If the directory exists, check if it's a Git repository 

401 try: 

402 repo = git.Repo(clone_dir) 

403 if repo.bare: 

404 raise Exception( 

405 f"Directory {clone_dir} is not a valid Git repository.") 

406 

407 # If it's a valid Git repository, perform a pull to update it 

408 print( 

409 f"Directory {clone_dir} already exists. Pulling latest " 

410 f"changes...") 

411 repo.git.checkout( 

412 branch_name) # Ensure we're on the correct branch 

413 repo.remotes.origin.pull() 

414 print(f"Repository in {clone_dir} updated successfully.") 

415 

416 except git.exc.InvalidGitRepositoryError: 

417 raise Exception( 

418 f"Directory {clone_dir} exists but is not a Git repository.") 

419 

420 else: 

421 # If the directory doesn't exist, clone the repository 

422 print(f"Cloning repository {repo_url} into {clone_dir}...") 

423 repo = git.Repo.clone_from( 

424 repo_url, clone_dir, branch=branch_name, recursive=True) 

425 

426 # Checkout the specified branch 

427 print(f"Checking out branch {branch_name}...") 

428 repo.git.checkout(branch_name) 

429 print(f"Checked out branch {branch_name}.") 

430 

431 

432def rm_tree(pth): 

433 """Remove an empty or non-empty directory using pathlib""" 

434 pth = Path(pth) 

435 for child in pth.glob('*'): 

436 if child.is_file(): 

437 child.unlink() 

438 else: 

439 rm_tree(child) 

440 pth.rmdir() 

441 

442 

443def create_plotly_graphs_from_df(self): 

444 # save plotly graphs to export folder 

445 # todo 497 

446 pass 

447 

448 

449def group_by_levenshtein(entities, similarity_score): 

450 """ 

451 Groups similar entities based on the similarity of their 'Name' attribute. 

452 

453 Args: 

454 entities (list): A list of objects with a 'Name' attribute. 

455 similarity_score (float): Similarity threshold between 0 and 1. 

456 0 means all objects will be grouped together, 1 means only identical 

457 strings are grouped. 

458 

459 Returns: 

460 dict: A dictionary where keys are representative entities and values are 

461 lists of similar entities. 

462 """ 

463 

464 from collections import defaultdict 

465 

466 def levenshtein(s1, s2): 

467 m, n = len(s1), len(s2) 

468 dp = [[0] * (n + 1) for _ in range(m + 1)] 

469 

470 for i in range(m + 1): 

471 dp[i][0] = i 

472 

473 for j in range(n + 1): 

474 dp[0][j] = j 

475 

476 for i in range(1, m + 1): 

477 for j in range(1, n + 1): 

478 cost = 0 if s1[i - 1] == s2[j - 1] else 1 

479 dp[i][j] = min( 

480 dp[i - 1][j] + 1, dp[i][j - 1] + 1, dp[i - 1][j - 1] + cost) 

481 

482 return dp[m][n] 

483 

484 repres = defaultdict(list) 

485 

486 for entity in entities: 

487 matched = False 

488 for rep_entity in repres: 

489 if levenshtein(entity.Name, rep_entity.Name) <= int((1 - similarity_score) * max(len(entity.Name), len(rep_entity.Name))): 

490 repres[rep_entity].append(entity) 

491 matched = True 

492 break 

493 if not matched: 

494 repres[entity].append(entity) 

495 

496 return repres