Coverage for src / ezcompiler / utils / zip_utils.py: 95.56%

134 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-27 06:49 +0000

1# /////////////////////////////////////////////////////////////// 

2# ZIP_UTILS - ZIP archive operation utilities 

3# Project: ezcompiler 

4# /////////////////////////////////////////////////////////////// 

5 

6""" 

7ZIP utilities - ZIP archive operation utilities for EzCompiler. 

8 

9This module provides utility functions for creating, extracting, and managing 

10ZIP archives used in the EzCompiler project, with support for progress tracking 

11and logging. 

12""" 

13 

14from __future__ import annotations 

15 

16# /////////////////////////////////////////////////////////////// 

17# IMPORTS 

18# /////////////////////////////////////////////////////////////// 

19# Standard library imports 

20import zipfile 

21from collections.abc import Callable 

22from pathlib import Path 

23 

24# Local imports 

25from ..shared.exceptions.utils.zip_exceptions import ( 

26 ZipCreationError, 

27 ZipExtractionError, 

28 ZipFileCorruptedError, 

29 ZipFileNotFoundError, 

30 ZipPathError, 

31) 

32from .file_utils import FileUtils 

33 

34# /////////////////////////////////////////////////////////////// 

35# CLASSES 

36# /////////////////////////////////////////////////////////////// 

37 

38 

39class ZipUtils: 

40 """ 

41 ZIP archive operations utility class. 

42 

43 Provides static methods for creating, extracting, and managing 

44 ZIP archives used in the EzCompiler project. Supports compression, 

45 progress tracking, and detailed logging. 

46 

47 Class Variables: 

48 _ezpl: Cached Ezpl instance for logging 

49 _logger: Cached logger instance 

50 _printer: Cached printer instance 

51 

52 Example: 

53 >>> ZipUtils.create_zip_archive("./source", "./output.zip") 

54 >>> ZipUtils.extract_zip_archive("./output.zip", "./extracted") 

55 """ 

56 

57 # Utils layer should not initialize logging directly 

58 # Logging is handled by the service and interface layers 

59 

60 # //////////////////////////////////////////////// 

61 # ARCHIVE CREATION METHODS 

62 # //////////////////////////////////////////////// 

63 

64 @staticmethod 

65 def create_zip_archive( 

66 source_path: str | Path, 

67 output_path: str | Path, 

68 compression: int = zipfile.ZIP_DEFLATED, 

69 include_hidden: bool = False, 

70 progress_callback: Callable[[str, int], None] | None = None, 

71 ) -> None: 

72 """ 

73 Create a ZIP archive from a file or directory. 

74 

75 Args: 

76 source_path: Path to the source file or directory 

77 output_path: Path where the ZIP file will be created 

78 compression: Compression method (default: ZIP_DEFLATED) 

79 include_hidden: Whether to include hidden files (default: False) 

80 progress_callback: Optional callback for progress updates (file, progress) 

81 

82 Raises: 

83 FileOperationError: If ZIP creation fails 

84 

85 Note: 

86 Progress callback receives (filename: str, progress: int) where 

87 progress is a percentage from 0 to 100. 

88 """ 

89 try: 

90 source = Path(source_path) 

91 output = Path(output_path) 

92 

93 if not source.exists(): 

94 raise ZipPathError(f"Source path does not exist: {source_path}") 

95 

96 # Ensure output directory exists 

97 FileUtils.ensure_parent_directory_exists(output) 

98 

99 # Create the ZIP file 

100 with zipfile.ZipFile(output, "w", compression) as zipf: 

101 if source.is_file(): 

102 # Add single file 

103 arcname = source.name 

104 zipf.write(source, arcname) 

105 if progress_callback: 

106 progress_callback(str(source), 100) 

107 

108 elif source.is_dir(): 108 ↛ exitline 108 didn't jump to the function exit

109 # Add directory recursively with progress tracking 

110 total_files = sum(1 for _ in source.rglob("*") if _.is_file()) 

111 processed_files = 0 

112 

113 for file_path in source.rglob("*"): 

114 if file_path.is_file(): 114 ↛ 113line 114 didn't jump to line 113 because the condition on line 114 was always true

115 # Skip hidden files if not requested 

116 if not include_hidden and ZipUtils._is_hidden_file( 

117 file_path 

118 ): 

119 continue 

120 

121 # Calculate relative path for archive 

122 arcname = file_path.relative_to(source) 

123 zipf.write(file_path, arcname) 

124 

125 processed_files += 1 

126 if progress_callback: 

127 progress = int((processed_files / total_files) * 100) 

128 progress_callback(str(file_path), progress) 

129 

130 except Exception as e: 

131 raise ZipCreationError( 

132 f"Failed to create ZIP archive {output_path}: {e}" 

133 ) from e 

134 

135 # //////////////////////////////////////////////// 

136 # ARCHIVE EXTRACTION METHODS 

137 # //////////////////////////////////////////////// 

138 

139 @staticmethod 

140 def extract_zip_archive( 

141 zip_path: str | Path, 

142 extract_path: str | Path, 

143 password: str | None = None, 

144 progress_callback: Callable[[str, int], None] | None = None, 

145 ) -> None: 

146 """ 

147 Extract a ZIP archive to a directory. 

148 

149 Args: 

150 zip_path: Path to the ZIP file 

151 extract_path: Directory where files will be extracted 

152 password: Optional password for encrypted archives (default: None) 

153 progress_callback: Optional callback for progress updates (file, progress) 

154 

155 Raises: 

156 FileOperationError: If extraction fails 

157 

158 Note: 

159 Progress callback receives (filename: str, progress: int) where 

160 progress is a percentage from 0 to 100. 

161 """ 

162 try: 

163 zip_file = Path(zip_path) 

164 extract_dir = Path(extract_path) 

165 

166 if not zip_file.exists(): 

167 raise ZipFileNotFoundError(f"ZIP file does not exist: {zip_path}") 

168 

169 if not zip_file.is_file(): 

170 raise ZipPathError(f"Path is not a file: {zip_path}") 

171 

172 # Create extraction directory 

173 FileUtils.create_directory_if_not_exists(extract_dir) 

174 

175 # Extract the ZIP file 

176 with zipfile.ZipFile(zip_file, "r") as zipf: 

177 # Set password if provided 

178 if password: 178 ↛ 179line 178 didn't jump to line 179 because the condition on line 178 was never true

179 zipf.setpassword(password.encode("utf-8")) 

180 

181 # Get list of files for progress tracking 

182 file_list = zipf.namelist() 

183 total_files = len(file_list) 

184 

185 for index, file_info in enumerate(zipf.filelist): 

186 try: 

187 zipf.extract(file_info, extract_dir) 

188 

189 if progress_callback: 

190 progress = int(((index + 1) / total_files) * 100) 

191 progress_callback(file_info.filename, progress) 

192 

193 except Exception as e: 

194 raise ZipExtractionError( 

195 f"Failed to extract file '{file_info.filename}' from ZIP archive {zip_path}: {e}" 

196 ) from e 

197 

198 except Exception as e: 

199 raise ZipExtractionError( 

200 f"Failed to extract ZIP archive {zip_path}: {e}" 

201 ) from e 

202 

203 # //////////////////////////////////////////////// 

204 # ARCHIVE INFORMATION METHODS 

205 # //////////////////////////////////////////////// 

206 

207 @staticmethod 

208 def list_zip_contents(zip_path: str | Path) -> list[str]: 

209 """ 

210 List the contents of a ZIP archive. 

211 

212 Args: 

213 zip_path: Path to the ZIP file 

214 

215 Returns: 

216 list[str]: List of file names in the archive 

217 

218 Raises: 

219 ZipFileCorruptedError: If listing fails 

220 """ 

221 try: 

222 zip_file = Path(zip_path) 

223 

224 if not zip_file.exists(): 

225 raise ZipFileNotFoundError(f"ZIP file does not exist: {zip_path}") 

226 

227 with zipfile.ZipFile(zip_file, "r") as zipf: 

228 return zipf.namelist() 

229 

230 except Exception as e: 

231 raise ZipFileCorruptedError( 

232 f"Failed to list ZIP contents {zip_path}: {e}" 

233 ) from e 

234 

235 @staticmethod 

236 def get_zip_info(zip_path: str | Path) -> dict: 

237 """ 

238 Get information about a ZIP archive. 

239 

240 Args: 

241 zip_path: Path to the ZIP file 

242 

243 Returns: 

244 dict: Dictionary with ZIP information including: 

245 - file_count: Number of files in the archive 

246 - total_size: Total uncompressed size in bytes 

247 - compressed_size: Total compressed size in bytes 

248 - compression_ratio: Compression ratio as percentage 

249 - files: List of file names in the archive 

250 

251 Raises: 

252 ZipFileCorruptedError: If getting info fails 

253 """ 

254 try: 

255 zip_file = Path(zip_path) 

256 

257 if not zip_file.exists(): 

258 raise ZipFileNotFoundError(f"ZIP file does not exist: {zip_path}") 

259 

260 with zipfile.ZipFile(zip_file, "r") as zipf: 

261 info = zipf.infolist() 

262 

263 total_size = sum(file_info.file_size for file_info in info) 

264 compressed_size = sum(file_info.compress_size for file_info in info) 

265 

266 return { 

267 "file_count": len(info), 

268 "total_size": total_size, 

269 "compressed_size": compressed_size, 

270 "compression_ratio": ( 

271 (1 - compressed_size / total_size) * 100 

272 if total_size > 0 

273 else 0 

274 ), 

275 "files": [file_info.filename for file_info in info], 

276 } 

277 

278 except Exception as e: 

279 raise ZipFileCorruptedError( 

280 f"Failed to get ZIP info {zip_path}: {e}" 

281 ) from e 

282 

283 @staticmethod 

284 def is_valid_zip(zip_path: str | Path) -> bool: 

285 """ 

286 Check if a file is a valid ZIP archive. 

287 

288 Args: 

289 zip_path: Path to the file to check 

290 

291 Returns: 

292 bool: True if file is a valid ZIP archive, False otherwise 

293 """ 

294 try: 

295 zip_file = Path(zip_path) 

296 

297 if not zip_file.exists() or not zip_file.is_file(): 

298 return False 

299 

300 with zipfile.ZipFile(zip_file, "r") as zipf: 

301 # Try to read the ZIP structure to test integrity 

302 zipf.testzip() 

303 return True 

304 

305 except Exception: 

306 return False 

307 

308 # //////////////////////////////////////////////// 

309 # FILE MODIFICATION METHODS 

310 # //////////////////////////////////////////////// 

311 

312 @staticmethod 

313 def add_file_to_zip( 

314 zip_path: str | Path, 

315 file_path: str | Path, 

316 arcname: str | None = None, 

317 ) -> None: 

318 """ 

319 Add a single file to an existing ZIP archive. 

320 

321 Args: 

322 zip_path: Path to the ZIP file 

323 file_path: Path to the file to add 

324 arcname: Name of the file in the archive (default: file name) 

325 

326 Raises: 

327 ZipCreationError: If adding file fails 

328 """ 

329 try: 

330 zip_file = Path(zip_path) 

331 file_to_add = Path(file_path) 

332 

333 if not file_to_add.exists(): 

334 raise ZipPathError(f"File to add does not exist: {file_path}") 

335 

336 if not file_to_add.is_file(): 336 ↛ 337line 336 didn't jump to line 337 because the condition on line 336 was never true

337 raise ZipPathError(f"Path is not a file: {file_path}") 

338 

339 # Use file name if arcname not specified 

340 if arcname is None: 

341 arcname = file_to_add.name 

342 

343 with zipfile.ZipFile(zip_file, "a") as zipf: 

344 zipf.write(file_to_add, arcname) 

345 

346 except Exception as e: 

347 raise ZipCreationError(f"Failed to add file to ZIP {zip_path}: {e}") from e 

348 

349 @staticmethod 

350 def remove_file_from_zip(zip_path: str | Path, file_name: str) -> None: 

351 """ 

352 Remove a file from a ZIP archive. 

353 

354 Args: 

355 zip_path: Path to the ZIP file 

356 file_name: Name of the file to remove from the archive 

357 

358 Raises: 

359 ZipCreationError: If removal fails 

360 

361 Note: 

362 Creates a temporary file during removal process. 

363 """ 

364 try: 

365 zip_file = Path(zip_path) 

366 

367 if not zip_file.exists(): 

368 raise ZipFileNotFoundError(f"ZIP file does not exist: {zip_path}") 

369 

370 # Create a temporary ZIP without the file 

371 temp_zip = zip_file.with_suffix(".tmp.zip") 

372 

373 with ( 

374 zipfile.ZipFile(zip_file, "r") as zipf_read, 

375 zipfile.ZipFile(temp_zip, "w") as zipf_write, 

376 ): 

377 for item in zipf_read.infolist(): 

378 if item.filename != file_name: 

379 zipf_write.writestr(item, zipf_read.read(item.filename)) 

380 

381 # Replace original with temporary 

382 zip_file.unlink() 

383 temp_zip.rename(zip_file) 

384 

385 except Exception as e: 

386 raise ZipCreationError( 

387 f"Failed to remove file from ZIP {zip_path}: {e}" 

388 ) from e 

389 

390 # //////////////////////////////////////////////// 

391 # PRIVATE UTILITY METHODS 

392 # //////////////////////////////////////////////// 

393 

394 @staticmethod 

395 def _is_hidden_file(file_path: Path) -> bool: 

396 """ 

397 Check if a file is hidden (starts with dot or has hidden attribute). 

398 

399 Args: 

400 file_path: Path to the file 

401 

402 Returns: 

403 bool: True if file is hidden, False otherwise 

404 

405 Note: 

406 On Windows, checks both filename and FILE_ATTRIBUTE_HIDDEN. 

407 On Unix, checks if filename starts with dot. 

408 """ 

409 # Check if filename starts with dot 

410 if file_path.name.startswith("."): 

411 return True 

412 

413 # Check hidden attribute on Windows 

414 try: 

415 import stat 

416 

417 return bool( 

418 file_path.stat().st_file_attributes & stat.FILE_ATTRIBUTE_HIDDEN 

419 ) 

420 except (AttributeError, ImportError): 

421 # Not on Windows or attribute not available 

422 return False