Coverage for report_generator.py: 80%

164 statements  

« prev     ^ index     » next       coverage.py v7.10.3, created at 2025-08-12 13:45 +0300

1import argparse 

2import sys 

3import json 

4from abc import ABC, abstractmethod 

5from datetime import date 

6from tabulate import tabulate 

7 

8# pylint: disable=broad-exception-caught 

9 

10 

11class ReportGenerator(ABC): 

12 """Абстрактный базовый класс для генерации различных типов отчетов""" 

13 

14 @abstractmethod 

15 def generate(self, input_data: dict[dict:str]) -> dict[str]: 

16 """Генерирует отчет на основе входных данных""" 

17 

18 

19class ReportReader(ABC): 

20 """Абстрактный базовый класс для чтения данных из различных источников""" 

21 

22 @abstractmethod 

23 def read(self, source_path: str): 

24 """Читает данные из указанного источника""" 

25 

26 

27class ReportRender(ABC): 

28 """Абстрактный базовый класс для вывода отчетов в различных форматах""" 

29 

30 @abstractmethod 

31 def render(self, report_data: dict): 

32 """Отображает отчет в указанном формате""" 

33 

34 

35class ReportFilter(ABC): 

36 """Абстрактный базовый класс для фильтрации данных отчетов""" 

37 

38 @abstractmethod 

39 def filter(self, raw_data: list, filter_value: str): 

40 """Фильтрует данные по указанному критерию""" 

41 

42 

43class CustomArgumentParser(argparse.ArgumentParser): 

44 """Кастомный парсер аргументов командной строки с улучшенной обработкой ошибок""" 

45 

46 def __init__(self, *args, **kwargs): 

47 kwargs["allow_abbrev"] = False 

48 super().__init__(*args, **kwargs) 

49 

50 def parse_args(self, args=None, namespace=None): 

51 args, argv = self.parse_known_args(args, namespace) 

52 if argv: 

53 msg = f"Неизвестные аргументы: {' '.join(argv)}" 

54 self.error(msg) 

55 return args 

56 

57 def error(self, message: str): 

58 """Обрабатывает ошибки парсинга аргументов""" 

59 try: 

60 raise ValueError("Invalid parameters") 

61 except ValueError as error: 

62 print("-" * 105, "\n", "[EROR]", error, file=sys.stderr) 

63 print("-" * 105, "\n", "[MESSAGE]", message) 

64 print("-" * 105) 

65 print("[HELP]", self.print_help()) 

66 sys.exit(1) 

67 

68 

69class JsonReader(ReportReader): 

70 """Реализация чтения данных из JSON-файлов""" 

71 

72 def read(self, source_path: str) -> list: 

73 """Читает и парсит JSON данные из файла построчно""" 

74 try: 

75 with open(file=source_path, mode="r", encoding="utf-8") as json_file: 

76 parsed_data = [] 

77 for json_line in json_file: 

78 try: 

79 parsed_data.append(json.loads(json_line)) 

80 except json.JSONDecodeError as e: 

81 print(f"Invalid JSON in {source_path}: {e}", file=sys.stderr) 

82 return parsed_data 

83 except IOError as e: 

84 raise FileNotFoundError(f"Error reading file {source_path}: {e}") from e 

85 

86 

87class DateReportFilter(ReportFilter): 

88 """Фильтрация данных по дате""" 

89 

90 def filter(self, raw_data: list, filter_value: str) -> list: 

91 """Фильтрует данные, оставляя только записи с указанной датой""" 

92 year, month, day = map(int, filter_value.split("-")) 

93 target_date = date(year, month, day) 

94 

95 filtered_records = [] 

96 for record in raw_data: 

97 if str(target_date) in record.get("@timestamp", ""): 

98 filtered_records.append(record) 

99 return filtered_records 

100 

101 

102class AverageReportGenerator(ReportGenerator): 

103 """Генератор отчета со средней статистикой по endpoint'ам""" 

104 

105 def generate(self, input_data: list) -> dict: 

106 """Вычисляет среднее время ответа для каждого endpoint'а""" 

107 if not input_data: 

108 raise ValueError("Input data cannot be empty") 

109 

110 endpoint_stats = {} 

111 

112 for log_entry in input_data: 

113 endpoint_url = log_entry.get("url") 

114 response_time = log_entry.get("response_time") 

115 

116 if endpoint_url not in endpoint_stats: 

117 endpoint_stats[endpoint_url] = {"handler": endpoint_url, "total": 1, "avg_response_time": response_time} 

118 else: 

119 endpoint_stats[endpoint_url]["total"] += 1 

120 endpoint_stats[endpoint_url]["avg_response_time"] += response_time 

121 

122 for stats in endpoint_stats.values(): 

123 stats["avg_response_time"] = round(stats["avg_response_time"] / stats["total"], 3) 

124 

125 return endpoint_stats 

126 

127 

128class TableRender(ReportRender): 

129 """Рендеринг отчета в виде таблицы""" 

130 

131 def render(self, report_data: dict[dict]): 

132 """Выводит отчет в виде форматированной таблицы""" 

133 if not report_data: 

134 raise ValueError("Report data is empty") 

135 

136 table_rows = [endpoint_stats.values() for endpoint_stats in report_data.values()] 

137 column_headers = ["Handler", "Total", "Average Response Time"] 

138 print(tabulate(table_rows, column_headers, tablefmt="simple")) 

139 

140 

141class ReportEngine: 

142 """Ядро для запуска обработки отсчетов""" 

143 

144 def __init__( 

145 self, reader: ReportReader, generator: ReportGenerator, render: ReportRender, report_filter: ReportFilter = None 

146 ) -> None: 

147 self.reader = reader 

148 self.generator = generator 

149 self.render = render 

150 self.filter = report_filter 

151 

152 def _validate_date(self, date_str: str) -> bool: 

153 """Проверяет корректность формата даты""" 

154 if not date_str: 

155 return True 

156 try: 

157 year, month, day = map(int, date_str.split("-")) 

158 date(year, month, day) # Проверяем, что дата валидна 

159 return True 

160 except (ValueError, AttributeError): 

161 return False 

162 

163 def _merge_statistics(self, statistics_reports: list[dict]) -> dict: 

164 """Объединяет статистику из нескольких отчетов и сортирует по количеству запросов""" 

165 combined_stats = {} 

166 

167 for report in statistics_reports: 

168 for endpoint, endpoint_data in report.items(): 

169 if endpoint not in combined_stats: 

170 combined_stats[endpoint] = { 

171 "handler": endpoint, 

172 "total": endpoint_data["total"], 

173 "sum_time": endpoint_data["avg_response_time"] * endpoint_data["total"], 

174 } 

175 else: 

176 combined_stats[endpoint]["total"] += endpoint_data["total"] 

177 combined_stats[endpoint]["sum_time"] += endpoint_data["avg_response_time"] * endpoint_data["total"] 

178 

179 # Сортировка по убыванию количества запросов 

180 sorted_endpoints = sorted(combined_stats.items(), key=lambda item: item[1]["total"], reverse=True) 

181 

182 # Формирование итогового отчета с порядковыми номерами 

183 final_report = {} 

184 for index, (endpoint, stats) in enumerate(sorted_endpoints): 

185 final_report[endpoint] = { 

186 "idx": index, 

187 "handler": endpoint, 

188 "total": stats["total"], 

189 "avg_response_time": round(stats["sum_time"] / stats["total"], 3), 

190 } 

191 

192 return final_report 

193 

194 def run(self, files, date_value): 

195 """Основной метод обработки данных и генерации отчета""" 

196 try: 

197 if not files: 

198 raise ValueError("No input files provided") 

199 

200 if date_value and not self._validate_date(date_value): 

201 raise ValueError(f"Invalid date format: {date_value}. Expected YYYY-MM-DD") 

202 

203 if date_value and not self.filter: 

204 raise ValueError("Date filter provided but no filter implementation configured") 

205 

206 collected_reports = [] 

207 for log_file in files: 

208 raw_logs = self.reader.read(log_file) 

209 if date_value: 

210 raw_logs = self.filter.filter(raw_logs, date_value) 

211 report = self.generator.generate(raw_logs) 

212 if not report: 

213 raise ValueError(f"No valid data found in {log_file} after filtering") 

214 collected_reports.append(self.generator.generate(raw_logs)) 

215 

216 if not collected_reports: 

217 raise RuntimeError("No valid reports generated from any input file") 

218 

219 # Объединение и вывод результатов 

220 final_statistics = self._merge_statistics(collected_reports) 

221 if not final_statistics: 

222 raise RuntimeError("Empty statistics after merging reports") 

223 

224 self.render.render(final_statistics) 

225 

226 except Exception as e: 

227 print(f"Fatal error during report generation: {str(e)}", file=sys.stderr) 

228 raise 

229 

230 

231def main(): 

232 """Основная функция обработки и анализа логов""" 

233 try: 

234 argument_parser = CustomArgumentParser(description="Анализатор логов - генерация статистики по endpoint'ам") 

235 argument_parser.add_argument("--file", nargs="+", required=True, help="Путь к файлу(ам) с логами") 

236 argument_parser.add_argument("--report", help="Создать отчет в JSON формате") 

237 argument_parser.add_argument("--date", default=None, help="Фильтрация по дате (формат YYYY-MM-DD)") 

238 

239 parsed_args = argument_parser.parse_args() 

240 

241 # Доступные компоненты системы 

242 readers = {"json": JsonReader} 

243 report_types = {"average": AverageReportGenerator} 

244 renderers = {"table": TableRender} 

245 filters = {"date_filter": DateReportFilter} 

246 

247 if parsed_args.report not in report_types: 

248 print(f"Тип отчета {parsed_args.report} не поддерживается. " f"Доступные: {', '.join(report_types.keys())}") 

249 sys.exit(1) 

250 

251 # Инициализация компонентов 

252 log_reader = readers["json"]() 

253 report_generator = report_types["average"]() 

254 report_renderer = renderers["table"]() 

255 data_filter = filters["date_filter"]() 

256 

257 # Обработка файлов 

258 try: 

259 app = ReportEngine(log_reader, report_generator, report_renderer, data_filter) 

260 app.run(parsed_args.file, parsed_args.date) 

261 except Exception as e: 

262 print(f"Report generation failed: {str(e)}", file=sys.stderr) 

263 sys.exit(1) 

264 

265 except Exception as e: 

266 print(f"Application error: {str(e)}", file=sys.stderr) 

267 sys.exit(1) 

268 

269 

270if __name__ == "__main__": 

271 main()