Coverage for report_generator.py: 80%
164 statements
« prev ^ index » next coverage.py v7.10.3, created at 2025-08-12 13:45 +0300
« prev ^ index » next coverage.py v7.10.3, created at 2025-08-12 13:45 +0300
1import argparse
2import sys
3import json
4from abc import ABC, abstractmethod
5from datetime import date
6from tabulate import tabulate
8# pylint: disable=broad-exception-caught
11class ReportGenerator(ABC):
12 """Абстрактный базовый класс для генерации различных типов отчетов"""
14 @abstractmethod
15 def generate(self, input_data: dict[dict:str]) -> dict[str]:
16 """Генерирует отчет на основе входных данных"""
19class ReportReader(ABC):
20 """Абстрактный базовый класс для чтения данных из различных источников"""
22 @abstractmethod
23 def read(self, source_path: str):
24 """Читает данные из указанного источника"""
27class ReportRender(ABC):
28 """Абстрактный базовый класс для вывода отчетов в различных форматах"""
30 @abstractmethod
31 def render(self, report_data: dict):
32 """Отображает отчет в указанном формате"""
35class ReportFilter(ABC):
36 """Абстрактный базовый класс для фильтрации данных отчетов"""
38 @abstractmethod
39 def filter(self, raw_data: list, filter_value: str):
40 """Фильтрует данные по указанному критерию"""
43class CustomArgumentParser(argparse.ArgumentParser):
44 """Кастомный парсер аргументов командной строки с улучшенной обработкой ошибок"""
46 def __init__(self, *args, **kwargs):
47 kwargs["allow_abbrev"] = False
48 super().__init__(*args, **kwargs)
50 def parse_args(self, args=None, namespace=None):
51 args, argv = self.parse_known_args(args, namespace)
52 if argv:
53 msg = f"Неизвестные аргументы: {' '.join(argv)}"
54 self.error(msg)
55 return args
57 def error(self, message: str):
58 """Обрабатывает ошибки парсинга аргументов"""
59 try:
60 raise ValueError("Invalid parameters")
61 except ValueError as error:
62 print("-" * 105, "\n", "[EROR]", error, file=sys.stderr)
63 print("-" * 105, "\n", "[MESSAGE]", message)
64 print("-" * 105)
65 print("[HELP]", self.print_help())
66 sys.exit(1)
69class JsonReader(ReportReader):
70 """Реализация чтения данных из JSON-файлов"""
72 def read(self, source_path: str) -> list:
73 """Читает и парсит JSON данные из файла построчно"""
74 try:
75 with open(file=source_path, mode="r", encoding="utf-8") as json_file:
76 parsed_data = []
77 for json_line in json_file:
78 try:
79 parsed_data.append(json.loads(json_line))
80 except json.JSONDecodeError as e:
81 print(f"Invalid JSON in {source_path}: {e}", file=sys.stderr)
82 return parsed_data
83 except IOError as e:
84 raise FileNotFoundError(f"Error reading file {source_path}: {e}") from e
87class DateReportFilter(ReportFilter):
88 """Фильтрация данных по дате"""
90 def filter(self, raw_data: list, filter_value: str) -> list:
91 """Фильтрует данные, оставляя только записи с указанной датой"""
92 year, month, day = map(int, filter_value.split("-"))
93 target_date = date(year, month, day)
95 filtered_records = []
96 for record in raw_data:
97 if str(target_date) in record.get("@timestamp", ""):
98 filtered_records.append(record)
99 return filtered_records
102class AverageReportGenerator(ReportGenerator):
103 """Генератор отчета со средней статистикой по endpoint'ам"""
105 def generate(self, input_data: list) -> dict:
106 """Вычисляет среднее время ответа для каждого endpoint'а"""
107 if not input_data:
108 raise ValueError("Input data cannot be empty")
110 endpoint_stats = {}
112 for log_entry in input_data:
113 endpoint_url = log_entry.get("url")
114 response_time = log_entry.get("response_time")
116 if endpoint_url not in endpoint_stats:
117 endpoint_stats[endpoint_url] = {"handler": endpoint_url, "total": 1, "avg_response_time": response_time}
118 else:
119 endpoint_stats[endpoint_url]["total"] += 1
120 endpoint_stats[endpoint_url]["avg_response_time"] += response_time
122 for stats in endpoint_stats.values():
123 stats["avg_response_time"] = round(stats["avg_response_time"] / stats["total"], 3)
125 return endpoint_stats
128class TableRender(ReportRender):
129 """Рендеринг отчета в виде таблицы"""
131 def render(self, report_data: dict[dict]):
132 """Выводит отчет в виде форматированной таблицы"""
133 if not report_data:
134 raise ValueError("Report data is empty")
136 table_rows = [endpoint_stats.values() for endpoint_stats in report_data.values()]
137 column_headers = ["Handler", "Total", "Average Response Time"]
138 print(tabulate(table_rows, column_headers, tablefmt="simple"))
141class ReportEngine:
142 """Ядро для запуска обработки отсчетов"""
144 def __init__(
145 self, reader: ReportReader, generator: ReportGenerator, render: ReportRender, report_filter: ReportFilter = None
146 ) -> None:
147 self.reader = reader
148 self.generator = generator
149 self.render = render
150 self.filter = report_filter
152 def _validate_date(self, date_str: str) -> bool:
153 """Проверяет корректность формата даты"""
154 if not date_str:
155 return True
156 try:
157 year, month, day = map(int, date_str.split("-"))
158 date(year, month, day) # Проверяем, что дата валидна
159 return True
160 except (ValueError, AttributeError):
161 return False
163 def _merge_statistics(self, statistics_reports: list[dict]) -> dict:
164 """Объединяет статистику из нескольких отчетов и сортирует по количеству запросов"""
165 combined_stats = {}
167 for report in statistics_reports:
168 for endpoint, endpoint_data in report.items():
169 if endpoint not in combined_stats:
170 combined_stats[endpoint] = {
171 "handler": endpoint,
172 "total": endpoint_data["total"],
173 "sum_time": endpoint_data["avg_response_time"] * endpoint_data["total"],
174 }
175 else:
176 combined_stats[endpoint]["total"] += endpoint_data["total"]
177 combined_stats[endpoint]["sum_time"] += endpoint_data["avg_response_time"] * endpoint_data["total"]
179 # Сортировка по убыванию количества запросов
180 sorted_endpoints = sorted(combined_stats.items(), key=lambda item: item[1]["total"], reverse=True)
182 # Формирование итогового отчета с порядковыми номерами
183 final_report = {}
184 for index, (endpoint, stats) in enumerate(sorted_endpoints):
185 final_report[endpoint] = {
186 "idx": index,
187 "handler": endpoint,
188 "total": stats["total"],
189 "avg_response_time": round(stats["sum_time"] / stats["total"], 3),
190 }
192 return final_report
194 def run(self, files, date_value):
195 """Основной метод обработки данных и генерации отчета"""
196 try:
197 if not files:
198 raise ValueError("No input files provided")
200 if date_value and not self._validate_date(date_value):
201 raise ValueError(f"Invalid date format: {date_value}. Expected YYYY-MM-DD")
203 if date_value and not self.filter:
204 raise ValueError("Date filter provided but no filter implementation configured")
206 collected_reports = []
207 for log_file in files:
208 raw_logs = self.reader.read(log_file)
209 if date_value:
210 raw_logs = self.filter.filter(raw_logs, date_value)
211 report = self.generator.generate(raw_logs)
212 if not report:
213 raise ValueError(f"No valid data found in {log_file} after filtering")
214 collected_reports.append(self.generator.generate(raw_logs))
216 if not collected_reports:
217 raise RuntimeError("No valid reports generated from any input file")
219 # Объединение и вывод результатов
220 final_statistics = self._merge_statistics(collected_reports)
221 if not final_statistics:
222 raise RuntimeError("Empty statistics after merging reports")
224 self.render.render(final_statistics)
226 except Exception as e:
227 print(f"Fatal error during report generation: {str(e)}", file=sys.stderr)
228 raise
231def main():
232 """Основная функция обработки и анализа логов"""
233 try:
234 argument_parser = CustomArgumentParser(description="Анализатор логов - генерация статистики по endpoint'ам")
235 argument_parser.add_argument("--file", nargs="+", required=True, help="Путь к файлу(ам) с логами")
236 argument_parser.add_argument("--report", help="Создать отчет в JSON формате")
237 argument_parser.add_argument("--date", default=None, help="Фильтрация по дате (формат YYYY-MM-DD)")
239 parsed_args = argument_parser.parse_args()
241 # Доступные компоненты системы
242 readers = {"json": JsonReader}
243 report_types = {"average": AverageReportGenerator}
244 renderers = {"table": TableRender}
245 filters = {"date_filter": DateReportFilter}
247 if parsed_args.report not in report_types:
248 print(f"Тип отчета {parsed_args.report} не поддерживается. " f"Доступные: {', '.join(report_types.keys())}")
249 sys.exit(1)
251 # Инициализация компонентов
252 log_reader = readers["json"]()
253 report_generator = report_types["average"]()
254 report_renderer = renderers["table"]()
255 data_filter = filters["date_filter"]()
257 # Обработка файлов
258 try:
259 app = ReportEngine(log_reader, report_generator, report_renderer, data_filter)
260 app.run(parsed_args.file, parsed_args.date)
261 except Exception as e:
262 print(f"Report generation failed: {str(e)}", file=sys.stderr)
263 sys.exit(1)
265 except Exception as e:
266 print(f"Application error: {str(e)}", file=sys.stderr)
267 sys.exit(1)
270if __name__ == "__main__":
271 main()