Coverage for src / ezpl / cli / utils / log_parser.py: 72.83%
70 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-13 19:35 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-13 19:35 +0000
1# ///////////////////////////////////////////////////////////////
2# EZPL - Log Parser Utility
3# Project: ezpl
4# ///////////////////////////////////////////////////////////////
6"""
7Log parser utility for CLI operations.
9This module provides functionality to parse and analyze log files
10generated by Ezpl's EzLogger.
11"""
13from __future__ import annotations
15# ///////////////////////////////////////////////////////////////
16# IMPORTS
17# ///////////////////////////////////////////////////////////////
18# Standard library imports
19import re
20from collections.abc import Iterator
21from datetime import datetime
22from pathlib import Path
23from typing import Any
25# ///////////////////////////////////////////////////////////////
26# CLASSES
27# ///////////////////////////////////////////////////////////////
30class LogEntry:
31 """
32 Represents a single log entry parsed from a log file.
33 """
35 # ///////////////////////////////////////////////////////////////
36 # INIT
37 # ///////////////////////////////////////////////////////////////
39 def __init__(
40 self,
41 timestamp: datetime | None,
42 level: str,
43 module: str,
44 function: str,
45 line: str,
46 message: str,
47 raw_line: str,
48 line_number: int,
49 ) -> None:
50 """
51 Initialize a log entry.
53 Args:
54 timestamp: Parsed timestamp or None
55 level: Log level
56 module: Module name
57 function: Function name
58 line: Line number
59 message: Log message
60 raw_line: Original raw line from file
61 line_number: Line number in file
62 """
63 self.timestamp = timestamp
64 self.level = level
65 self.module = module
66 self.function = function
67 self.line = line
68 self.message = message
69 self.raw_line = raw_line
70 self.line_number = line_number
72 # ///////////////////////////////////////////////////////////////
73 # REPRESENTATION METHODS
74 # ///////////////////////////////////////////////////////////////
76 def __str__(self) -> str:
77 """String representation of the log entry."""
78 return self.raw_line
80 def __repr__(self) -> str:
81 """Detailed string representation."""
82 return (
83 f"LogEntry(timestamp={self.timestamp}, level={self.level}, "
84 f"module={self.module}, function={self.function}, line={self.line})"
85 )
87 def to_dict(self) -> dict[str, Any]:
88 """Convert log entry to dictionary."""
89 return {
90 "timestamp": self.timestamp.isoformat() if self.timestamp else None,
91 "level": self.level,
92 "module": self.module,
93 "function": self.function,
94 "line": self.line,
95 "message": self.message,
96 "line_number": self.line_number,
97 }
100class LogParser:
101 """
102 Parser for Ezpl log files.
104 Parses log files with the format:
105 YYYY-MM-DD HH:MM:SS | LEVEL | module:function:line - message
106 """
108 # Pattern for parsing log lines
109 LOG_PATTERN = re.compile(
110 r"^(\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2})\s+\|\s+(\w+)\s+\|\s+"
111 r"([^:]+):([^:]+):([^\s-]+)\s+-\s+(.+)$"
112 )
114 # Pattern for session separators
115 SEPARATOR_PATTERN = re.compile(r"^##\s*==>\s*(.+)$")
117 # ///////////////////////////////////////////////////////////////
118 # INIT
119 # ///////////////////////////////////////////////////////////////
121 def __init__(self, log_file: Path) -> None:
122 """
123 Initialize the log parser.
125 Args:
126 log_file: Path to the log file to parse
128 Raises:
129 FileNotFoundError: If the log file doesn't exist
130 """
131 self.log_file = Path(log_file)
132 if not self.log_file.exists(): 132 ↛ 133line 132 didn't jump to line 133 because the condition on line 132 was never true
133 raise FileNotFoundError(f"Log file not found: {log_file}")
135 # ///////////////////////////////////////////////////////////////
136 # PARSING METHODS
137 # ///////////////////////////////////////////////////////////////
139 def parse_line(self, line: str, line_number: int) -> LogEntry | None:
140 """
141 Parse a single line from a log file.
143 Args:
144 line: Line to parse
145 line_number: Line number in file
147 Returns:
148 LogEntry if line is a valid log entry, None otherwise
149 """
150 line = line.rstrip("\n\r")
151 if not line or line.startswith("#"): 151 ↛ 152line 151 didn't jump to line 152 because the condition on line 151 was never true
152 return None
154 match = self.LOG_PATTERN.match(line)
155 if not match: 155 ↛ 156line 155 didn't jump to line 156 because the condition on line 155 was never true
156 return None
158 try:
159 timestamp_str, level, module, function, line_num, message = match.groups()
160 timestamp = datetime.strptime(timestamp_str, "%Y-%m-%d %H:%M:%S")
161 return LogEntry(
162 timestamp=timestamp,
163 level=level.strip(),
164 module=module,
165 function=function,
166 line=line_num,
167 message=message,
168 raw_line=line,
169 line_number=line_number,
170 )
171 except (ValueError, AttributeError):
172 return None
174 def parse(self) -> Iterator[LogEntry]:
175 """
176 Parse the entire log file.
178 Yields:
179 LogEntry objects for each valid log entry
180 """
181 try:
182 with open(self.log_file, encoding="utf-8") as f:
183 for line_num, line in enumerate(f, start=1):
184 entry = self.parse_line(line, line_num)
185 if entry: 185 ↛ 183line 185 didn't jump to line 183 because the condition on line 185 was always true
186 yield entry
187 except (OSError, UnicodeDecodeError):
188 # Return empty iterator on error
189 return
191 def parse_lines(self, max_lines: int | None = None) -> list[LogEntry]:
192 """
193 Parse log file and return entries as a list.
195 Args:
196 max_lines: Maximum number of lines to parse (None for all)
198 Returns:
199 List of LogEntry objects
200 """
201 entries = []
202 for entry in self.parse():
203 entries.append(entry)
204 if max_lines and len(entries) >= max_lines: 204 ↛ 205line 204 didn't jump to line 205 because the condition on line 204 was never true
205 break
206 return entries
208 def get_last_lines(self, n: int) -> list[LogEntry]:
209 """
210 Get the last N log entries from the file.
212 Args:
213 n: Number of lines to retrieve
215 Returns:
216 List of last N LogEntry objects
217 """
218 all_entries = list(self.parse())
219 return all_entries[-n:] if len(all_entries) > n else all_entries
221 def filter_by_level(self, level: str) -> Iterator[LogEntry]:
222 """
223 Filter log entries by level.
225 Args:
226 level: Log level to filter (case-insensitive)
228 Yields:
229 LogEntry objects matching the level
230 """
231 level_upper = level.upper()
232 for entry in self.parse():
233 if entry.level.upper() == level_upper:
234 yield entry
236 def search(self, pattern: str, case_sensitive: bool = False) -> Iterator[LogEntry]:
237 """
238 Search for entries matching a pattern.
240 Args:
241 pattern: Regex pattern to search for
242 case_sensitive: Whether search is case-sensitive
244 Yields:
245 LogEntry objects matching the pattern
246 """
247 flags = 0 if case_sensitive else re.IGNORECASE
248 regex = re.compile(pattern, flags)
250 for entry in self.parse():
251 if regex.search(entry.message) or regex.search(entry.raw_line):
252 yield entry