Coverage for src / ezpl / cli / utils / log_stats.py: 91.23%

45 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-13 19:35 +0000

1# /////////////////////////////////////////////////////////////// 

2# EZPL - Log Statistics Utility 

3# Project: ezpl 

4# /////////////////////////////////////////////////////////////// 

5 

6""" 

7Log statistics utility for CLI operations. 

8 

9This module provides functionality to calculate statistics from log files. 

10""" 

11 

12from __future__ import annotations 

13 

14# /////////////////////////////////////////////////////////////// 

15# IMPORTS 

16# /////////////////////////////////////////////////////////////// 

17# Standard library imports 

18from collections import Counter, defaultdict 

19from pathlib import Path 

20from typing import Any 

21 

22# Local imports 

23from .log_parser import LogEntry, LogParser 

24 

25# /////////////////////////////////////////////////////////////// 

26# CLASSES 

27# /////////////////////////////////////////////////////////////// 

28 

29 

30class LogStatistics: 

31 """ 

32 Calculate and store statistics from log files. 

33 """ 

34 

35 # /////////////////////////////////////////////////////////////// 

36 # INIT 

37 # /////////////////////////////////////////////////////////////// 

38 

39 def __init__(self, log_file: Path) -> None: 

40 """ 

41 Initialize log statistics calculator. 

42 

43 Args: 

44 log_file: Path to the log file 

45 """ 

46 self.log_file = Path(log_file) 

47 self.parser = LogParser(self.log_file) 

48 self._entries: list[LogEntry] | None = None 

49 

50 # ------------------------------------------------ 

51 # PRIVATE HELPER METHODS 

52 # ------------------------------------------------ 

53 

54 def _get_entries(self) -> list[LogEntry]: 

55 """Get all log entries (cached).""" 

56 if self._entries is None: 

57 self._entries = list(self.parser.parse()) 

58 return self._entries 

59 

60 # /////////////////////////////////////////////////////////////// 

61 # STATISTICS METHODS 

62 # /////////////////////////////////////////////////////////////// 

63 

64 def get_level_counts(self) -> dict[str, int]: 

65 """ 

66 Get count of messages by level. 

67 

68 Returns: 

69 Dictionary mapping level names to counts 

70 """ 

71 entries = self._get_entries() 

72 counter = Counter(entry.level for entry in entries) 

73 return dict(counter) 

74 

75 def get_file_info(self) -> dict[str, Any]: 

76 """ 

77 Get basic file information. 

78 

79 Returns: 

80 Dictionary with file size, line count, etc. 

81 """ 

82 try: 

83 size = self.log_file.stat().st_size if self.log_file.exists() else 0 

84 entries = self._get_entries() 

85 line_count = len(entries) 

86 

87 # Get date range 

88 timestamps = [ 

89 entry.timestamp for entry in entries if entry.timestamp is not None 

90 ] 

91 date_range = None 

92 if timestamps: 92 ↛ 98line 92 didn't jump to line 98 because the condition on line 92 was always true

93 date_range = { 

94 "first": min(timestamps), 

95 "last": max(timestamps), 

96 } 

97 

98 return { 

99 "file_path": str(self.log_file), 

100 "size_bytes": size, 

101 "size_mb": round(size / (1024 * 1024), 2), 

102 "line_count": line_count, 

103 "date_range": date_range, 

104 } 

105 except (OSError, ValueError, TypeError): 

106 return { 

107 "file_path": str(self.log_file), 

108 "size_bytes": 0, 

109 "size_mb": 0, 

110 "line_count": 0, 

111 "date_range": None, 

112 } 

113 

114 def get_temporal_distribution(self, period: str = "hour") -> dict[str, int]: 

115 """ 

116 Get distribution of logs over time. 

117 

118 Args: 

119 period: Time period ('hour' or 'day') 

120 

121 Returns: 

122 Dictionary mapping time periods to log counts 

123 """ 

124 entries = self._get_entries() 

125 distribution: dict[str, int] = defaultdict(int) 

126 

127 for entry in entries: 

128 if entry.timestamp is None: 128 ↛ 129line 128 didn't jump to line 129 because the condition on line 128 was never true

129 continue 

130 

131 if period == "hour": 

132 key = entry.timestamp.strftime("%Y-%m-%d %H:00") 

133 elif period == "day": 133 ↛ 136line 133 didn't jump to line 136 because the condition on line 133 was always true

134 key = entry.timestamp.strftime("%Y-%m-%d") 

135 else: 

136 key = entry.timestamp.strftime("%Y-%m-%d %H:%M") 

137 

138 distribution[key] += 1 

139 

140 return dict(distribution) 

141 

142 def get_all_stats(self) -> dict[str, Any]: 

143 """ 

144 Get all statistics in a single dictionary. 

145 

146 Returns: 

147 Dictionary containing all statistics 

148 """ 

149 return { 

150 "file_info": self.get_file_info(), 

151 "level_counts": self.get_level_counts(), 

152 "temporal_distribution_hour": self.get_temporal_distribution("hour"), 

153 "temporal_distribution_day": self.get_temporal_distribution("day"), 

154 }