Coverage for src / ezpl / cli / utils / log_parser.py: 72.83%

70 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-13 19:35 +0000

1# /////////////////////////////////////////////////////////////// 

2# EZPL - Log Parser Utility 

3# Project: ezpl 

4# /////////////////////////////////////////////////////////////// 

5 

6""" 

7Log parser utility for CLI operations. 

8 

9This module provides functionality to parse and analyze log files 

10generated by Ezpl's EzLogger. 

11""" 

12 

13from __future__ import annotations 

14 

15# /////////////////////////////////////////////////////////////// 

16# IMPORTS 

17# /////////////////////////////////////////////////////////////// 

18# Standard library imports 

19import re 

20from collections.abc import Iterator 

21from datetime import datetime 

22from pathlib import Path 

23from typing import Any 

24 

25# /////////////////////////////////////////////////////////////// 

26# CLASSES 

27# /////////////////////////////////////////////////////////////// 

28 

29 

30class LogEntry: 

31 """ 

32 Represents a single log entry parsed from a log file. 

33 """ 

34 

35 # /////////////////////////////////////////////////////////////// 

36 # INIT 

37 # /////////////////////////////////////////////////////////////// 

38 

39 def __init__( 

40 self, 

41 timestamp: datetime | None, 

42 level: str, 

43 module: str, 

44 function: str, 

45 line: str, 

46 message: str, 

47 raw_line: str, 

48 line_number: int, 

49 ) -> None: 

50 """ 

51 Initialize a log entry. 

52 

53 Args: 

54 timestamp: Parsed timestamp or None 

55 level: Log level 

56 module: Module name 

57 function: Function name 

58 line: Line number 

59 message: Log message 

60 raw_line: Original raw line from file 

61 line_number: Line number in file 

62 """ 

63 self.timestamp = timestamp 

64 self.level = level 

65 self.module = module 

66 self.function = function 

67 self.line = line 

68 self.message = message 

69 self.raw_line = raw_line 

70 self.line_number = line_number 

71 

72 # /////////////////////////////////////////////////////////////// 

73 # REPRESENTATION METHODS 

74 # /////////////////////////////////////////////////////////////// 

75 

76 def __str__(self) -> str: 

77 """String representation of the log entry.""" 

78 return self.raw_line 

79 

80 def __repr__(self) -> str: 

81 """Detailed string representation.""" 

82 return ( 

83 f"LogEntry(timestamp={self.timestamp}, level={self.level}, " 

84 f"module={self.module}, function={self.function}, line={self.line})" 

85 ) 

86 

87 def to_dict(self) -> dict[str, Any]: 

88 """Convert log entry to dictionary.""" 

89 return { 

90 "timestamp": self.timestamp.isoformat() if self.timestamp else None, 

91 "level": self.level, 

92 "module": self.module, 

93 "function": self.function, 

94 "line": self.line, 

95 "message": self.message, 

96 "line_number": self.line_number, 

97 } 

98 

99 

100class LogParser: 

101 """ 

102 Parser for Ezpl log files. 

103 

104 Parses log files with the format: 

105 YYYY-MM-DD HH:MM:SS | LEVEL | module:function:line - message 

106 """ 

107 

108 # Pattern for parsing log lines 

109 LOG_PATTERN = re.compile( 

110 r"^(\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2})\s+\|\s+(\w+)\s+\|\s+" 

111 r"([^:]+):([^:]+):([^\s-]+)\s+-\s+(.+)$" 

112 ) 

113 

114 # Pattern for session separators 

115 SEPARATOR_PATTERN = re.compile(r"^##\s*==>\s*(.+)$") 

116 

117 # /////////////////////////////////////////////////////////////// 

118 # INIT 

119 # /////////////////////////////////////////////////////////////// 

120 

121 def __init__(self, log_file: Path) -> None: 

122 """ 

123 Initialize the log parser. 

124 

125 Args: 

126 log_file: Path to the log file to parse 

127 

128 Raises: 

129 FileNotFoundError: If the log file doesn't exist 

130 """ 

131 self.log_file = Path(log_file) 

132 if not self.log_file.exists(): 132 ↛ 133line 132 didn't jump to line 133 because the condition on line 132 was never true

133 raise FileNotFoundError(f"Log file not found: {log_file}") 

134 

135 # /////////////////////////////////////////////////////////////// 

136 # PARSING METHODS 

137 # /////////////////////////////////////////////////////////////// 

138 

139 def parse_line(self, line: str, line_number: int) -> LogEntry | None: 

140 """ 

141 Parse a single line from a log file. 

142 

143 Args: 

144 line: Line to parse 

145 line_number: Line number in file 

146 

147 Returns: 

148 LogEntry if line is a valid log entry, None otherwise 

149 """ 

150 line = line.rstrip("\n\r") 

151 if not line or line.startswith("#"): 151 ↛ 152line 151 didn't jump to line 152 because the condition on line 151 was never true

152 return None 

153 

154 match = self.LOG_PATTERN.match(line) 

155 if not match: 155 ↛ 156line 155 didn't jump to line 156 because the condition on line 155 was never true

156 return None 

157 

158 try: 

159 timestamp_str, level, module, function, line_num, message = match.groups() 

160 timestamp = datetime.strptime(timestamp_str, "%Y-%m-%d %H:%M:%S") 

161 return LogEntry( 

162 timestamp=timestamp, 

163 level=level.strip(), 

164 module=module, 

165 function=function, 

166 line=line_num, 

167 message=message, 

168 raw_line=line, 

169 line_number=line_number, 

170 ) 

171 except (ValueError, AttributeError): 

172 return None 

173 

174 def parse(self) -> Iterator[LogEntry]: 

175 """ 

176 Parse the entire log file. 

177 

178 Yields: 

179 LogEntry objects for each valid log entry 

180 """ 

181 try: 

182 with open(self.log_file, encoding="utf-8") as f: 

183 for line_num, line in enumerate(f, start=1): 

184 entry = self.parse_line(line, line_num) 

185 if entry: 185 ↛ 183line 185 didn't jump to line 183 because the condition on line 185 was always true

186 yield entry 

187 except (OSError, UnicodeDecodeError): 

188 # Return empty iterator on error 

189 return 

190 

191 def parse_lines(self, max_lines: int | None = None) -> list[LogEntry]: 

192 """ 

193 Parse log file and return entries as a list. 

194 

195 Args: 

196 max_lines: Maximum number of lines to parse (None for all) 

197 

198 Returns: 

199 List of LogEntry objects 

200 """ 

201 entries = [] 

202 for entry in self.parse(): 

203 entries.append(entry) 

204 if max_lines and len(entries) >= max_lines: 204 ↛ 205line 204 didn't jump to line 205 because the condition on line 204 was never true

205 break 

206 return entries 

207 

208 def get_last_lines(self, n: int) -> list[LogEntry]: 

209 """ 

210 Get the last N log entries from the file. 

211 

212 Args: 

213 n: Number of lines to retrieve 

214 

215 Returns: 

216 List of last N LogEntry objects 

217 """ 

218 all_entries = list(self.parse()) 

219 return all_entries[-n:] if len(all_entries) > n else all_entries 

220 

221 def filter_by_level(self, level: str) -> Iterator[LogEntry]: 

222 """ 

223 Filter log entries by level. 

224 

225 Args: 

226 level: Log level to filter (case-insensitive) 

227 

228 Yields: 

229 LogEntry objects matching the level 

230 """ 

231 level_upper = level.upper() 

232 for entry in self.parse(): 

233 if entry.level.upper() == level_upper: 

234 yield entry 

235 

236 def search(self, pattern: str, case_sensitive: bool = False) -> Iterator[LogEntry]: 

237 """ 

238 Search for entries matching a pattern. 

239 

240 Args: 

241 pattern: Regex pattern to search for 

242 case_sensitive: Whether search is case-sensitive 

243 

244 Yields: 

245 LogEntry objects matching the pattern 

246 """ 

247 flags = 0 if case_sensitive else re.IGNORECASE 

248 regex = re.compile(pattern, flags) 

249 

250 for entry in self.parse(): 

251 if regex.search(entry.message) or regex.search(entry.raw_line): 

252 yield entry