Введение
Python, благодаря своей простоте, богатой экосистеме библиотек и кроссплатформенности, стал одним из основных языков для автоматизации DevOps-процессов. Он идеально подходит для написания скриптов, которые связывают различные этапы жизненного цикла разработки: от сборки и тестирования до развертывания и мониторинга. В этом посте мы рассмотрим практические примеры скриптов на Python для построения эффективного CI/CD-конвейера и создания простых, но мощных инструментов мониторинга.
Python в CI/CD: Автоматизация пайплайнов
1.1. Скрипт для автоматической сборки и проверки качества кода
Такой скрипт можно запускать локально перед коммитом или использовать как этап в GitLab CI/GitHub Actions.
#!/usr/bin/env python3 """ Скрипт для автоматической проверки Python-проекта. Запускает: линтинг, форматирование, проверку типов, тесты. """ import subprocess import sys from pathlib import Path from typing import List, Tuple def run_command(cmd: List[str], cwd: Path = None) -> Tuple[bool, str]: """Запускает команду и возвращает (успех, вывод).""" try: result = subprocess.run( cmd, cwd=cwd, capture_output=True, text=True, check=True ) return True, result.stdout except subprocess.CalledProcessError as e: return False, f"{e.stderr}\nExit code: {e.returncode}" def main(): project_root = Path(__file__).parent all_ok = True print("🔍 Запуск проверок качества кода...\n") # 1. Проверка синтаксиса и стиля с помощью ruff (современная замена flake8 + isort + black) print("1. Запуск ruff (линтер и форматтер)...") success, output = run_command(["ruff", "check", "--fix", "."], project_root) if not success: print(f"❌ Ruff found issues:\n{output}") all_ok = False else: print("✅ Код соответствует стандартам") # 2. Проверка типов с помощью mypy print("\n2. Проверка типов mypy...") success, output = run_command(["mypy", "src/", "tests/"], project_root) if not success: print(f"⚠️ Mypy warnings:\n{output}") # Не считаем это фатальной ошибкой, только предупреждение else: print("✅ Аннотации типов в порядке") # 3. Запуск тестов с покрытием print("\n3. Запуск тестов pytest...") success, output = run_command([ "pytest", "tests/", "-v", "--cov=src", "--cov-report=term-missing", "--cov-fail-under=80" # Требуем минимум 80% покрытия ], project_root) if not success: print(f"❌ Тесты не прошли или покрытие недостаточно:\n{output}") all_ok = False else: print("✅ Все тесты пройдены, покрытие достаточное") # 4. Проверка уязвимостей в зависимостях print("\n4. Проверка уязвимостей (safety)...") success, output = run_command(["safety", "check", "--full-report"], project_root) if not success: print(f"🚨 Найдены уязвимости:\n{output}") all_ok = False else: print("✅ Уязвимостей не найдено") # 5. Генерация отчета о зависимостях print("\n5. Анализ зависимостей (pip-audit)...") success, output = run_command(["pip-audit"], project_root) if not success: print(f"⚠️ Проблемы с зависимостями:\n{output}") print("\n" + "="*50) if all_ok: print("✅ Все проверки пройдены успешно!") sys.exit(0) else: print("❌ Некоторые проверки не пройдены") sys.exit(1) if __name__ == "__main__": main()
1.2. Автоматизация развертывания с использованием Fabric/Invoke
Для автоматизации деплоя на сервера можно использовать библиотеки Fabric или Invoke.
# deploy.py from invoke import task import paramiko from pathlib import Path import sys def get_ssh_client(host: str, username: str, key_path: Path) -> paramiko.SSHClient: """Создает SSH-клиент для подключения к серверу.""" client = paramiko.SSHClient() client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) key = paramiko.RSAKey.from_private_key_file(str(key_path)) client.connect(hostname=host, username=username, pkey=key) return client @task def deploy_prod(c, version="latest"): """ Развертывание приложения на production-сервере. Использование: invoke deploy-prod --version=v1.2.3 """ config = { 'host': 'prod-server.example.com', 'user': 'deploy', 'key_path': Path.home() / '.ssh' / 'deploy_key', 'app_dir': '/var/www/myapp', 'docker_registry': 'registry.example.com/myapp' } print(f"🚀 Начинаем деплой версии {version} на {config['host']}") # 1. Подключаемся к серверу ssh = get_ssh_client(config['host'], config['user'], config['key_path']) try: # 2. Останавливаем текущее приложение print("1. Остановка текущего контейнера...") stdin, stdout, stderr = ssh.exec_command( f"cd {config['app_dir']} && docker-compose down" ) print(stdout.read().decode()) # 3. Пулл новой версии образа print(f"2. Загрузка образа {version}...") image_name = f"{config['docker_registry']}:{version}" stdin, stdout, stderr = ssh.exec_command(f"docker pull {image_name}") pull_output = stdout.read().decode() if "error" in pull_output.lower(): print(f"❌ Ошибка загрузки образа: {pull_output}") sys.exit(1) # 4. Обновляем docker-compose.yml с новой версией print("3. Обновление конфигурации...") update_cmd = f""" cd {config['app_dir']} && \ sed -i 's|image:.*|image: {image_name}|' docker-compose.yml """ ssh.exec_command(update_cmd) # 5. Запускаем обновленное приложение print("4. Запуск нового контейнера...") stdin, stdout, stderr = ssh.exec_command( f"cd {config['app_dir']} && docker-compose up -d" ) # 6. Проверяем здоровье приложения print("5. Проверка здоровья приложения...") stdin, stdout, stderr = ssh.exec_command( "sleep 5 && " f"cd {config['app_dir']} && " "docker-compose exec -T app curl -f http://localhost:8000/health/" ) health_output = stdout.read().decode() if "healthy" in health_output or "200" in health_output: print(f"✅ Деплой версии {version} успешно завершен!") else: print(f"⚠️ Приложение запущено, но проверка здоровья неудачна: {health_output}") finally: ssh.close() @task def rollback(c, previous_version="previous"): """Откат к предыдущей версии.""" print(f"🔙 Откат к версии {previous_version}...") # Реализация аналогична deploy, но с указанием предыдущей версии @task def run_migrations(c, env="production"): """Запуск миграций БД.""" print(f"📦 Запуск миграций для {env}...") # Команды для применения миграций через Alembic или Django migrations
1.3. Интеграция с API GitHub/GitLab для автоматизации
# github_automation.py import requests import os from typing import Optional from datetime import datetime, timedelta class GitHubAutomation: def __init__(self, token: str, repo: str): self.token = token self.repo = repo self.headers = { "Authorization": f"token {token}", "Accept": "application/vnd.github.v3+json" } self.base_url = f"https://api.github.com/repos/{repo}" def create_release(self, tag: str, name: str, body: str) -> dict: """Создает релиз на GitHub.""" url = f"{self.base_url}/releases" data = { "tag_name": tag, "name": name, "body": body, "draft": False, "prerelease": False } response = requests.post(url, json=data, headers=self.headers) response.raise_for_status() return response.json() def get_stale_pull_requests(self, days_old: int = 30) -> list: """Находит PR, которые давно не обновлялись.""" url = f"{self.base_url}/pulls" params = {"state": "open", "sort": "updated", "direction": "desc"} response = requests.get(url, params=params, headers=self.headers) prs = response.json() stale_cutoff = datetime.now() - timedelta(days=days_old) stale_prs = [] for pr in prs: updated_at = datetime.fromisoformat(pr["updated_at"].replace("Z", "")) if updated_at < stale_cutoff: stale_prs.append({ "number": pr["number"], "title": pr["title"], "author": pr["user"]["login"], "last_updated": pr["updated_at"], "url": pr["html_url"] }) return stale_prs def auto_merge_dependabot(self): """Автоматически мержит безопасные обновления от Dependabot.""" url = f"{self.base_url}/pulls" params = {"state": "open", "creator": "dependabot[bot]"} response = requests.get(url, params=params, headers=self.headers) prs = response.json() for pr in prs: # Проверяем, что все проверки пройдены status_url = f"{self.base_url}/commits/{pr['head']['sha']}/status" status_resp = requests.get(status_url, headers=self.headers) status = status_resp.json() if status["state"] == "success": # Мержим PR merge_url = f"{self.base_url}/pulls/{pr['number']}/merge" merge_data = {"merge_method": "squash"} merge_resp = requests.put(merge_url, json=merge_data, headers=self.headers) if merge_resp.status_code == 200: print(f"✅ Автоматически вмержен PR #{pr['number']}: {pr['title']}") else: print(f"❌ Не удалось вмержить PR #{pr['number']}: {merge_resp.text}") # Использование if __name__ == "__main__": token = os.getenv("GITHUB_TOKEN") automator = GitHubAutomation(token, "myorg/myrepo") # Создаем релиз automator.create_release( tag="v1.2.3", name="Release v1.2.3", body="## Что нового\n- Фикс бага XYZ\n- Улучшение производительности" ) # Получаем старые PR для ревью stale = automator.get_stale_pull_requests(14) if stale: print(f"Найдены {len(stale)} старых PR:") for pr in stale: print(f" PR#{pr['number']}: {pr['title']} ({pr['last_updated']})")
Мониторинг и алертинг на Python
2.1. Скрипт мониторинга состояния сервисов
#!/usr/bin/env python3 """ Скрипт для проверки доступности сервисов и отправки уведомлений. """ import requests import smtplib from email.mime.text import MIMEText from datetime import datetime import time import json from typing import Dict, List import logging from dataclasses import dataclass logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @dataclass class Service: name: str url: str method: str = "GET" timeout: int = 5 expected_status: int = 200 check_ssl: bool = True class ServiceMonitor: def __init__(self, services: List[Service]): self.services = services self.results = [] def check_service(self, service: Service) -> Dict: """Проверяет доступность одного сервиса.""" start_time = time.time() try: response = requests.request( method=service.method, url=service.url, timeout=service.timeout, verify=service.check_ssl ) elapsed = time.time() - start_time is_ok = response.status_code == service.expected_status status = "UP" if is_ok else "DOWN" result = { "service": service.name, "status": status, "status_code": response.status_code, "response_time": round(elapsed * 1000, 2), # мс "timestamp": datetime.now().isoformat(), "error": None } if not is_ok: result["error"] = f"Expected {service.expected_status}, got {response.status_code}" except Exception as e: elapsed = time.time() - start_time result = { "service": service.name, "status": "DOWN", "status_code": None, "response_time": round(elapsed * 1000, 2), "timestamp": datetime.now().isoformat(), "error": str(e) } return result def run_checks(self): """Запускает проверку всех сервисов.""" logger.info(f"Запуск проверки {len(self.services)} сервисов...") for service in self.services: result = self.check_service(service) self.results.append(result) if result["status"] == "DOWN": logger.error(f"❌ {service.name}: {result['error']}") self.send_alert(result) else: logger.info(f"✅ {service.name}: {result['response_time']}ms") def send_alert(self, result: Dict): """Отправляет уведомление о проблеме.""" # 1. Email self._send_email_alert(result) # 2. Slack (альтернатива) # self._send_slack_alert(result) # 3. Telegram (альтернатива) # self._send_telegram_alert(result) def _send_email_alert(self, result: Dict): """Отправляет алерт по email.""" subject = f"🚨 ALERT: {result['service']} is DOWN" body = f""" Service: {result['service']} Status: {result['status']} Error: {result['error']} Time: {result['timestamp']} Check URL: {next(s.url for s in self.services if s.name == result['service'])} """ msg = MIMEText(body) msg['Subject'] = subject msg['From'] = "monitor@example.com" msg['To'] = "devops@example.com" try: with smtplib.SMTP('smtp.example.com', 587) as server: server.starttls() server.login("user", "password") server.send_message(msg) logger.info(f"📧 Алерт отправлен для {result['service']}") except Exception as e: logger.error(f"Не удалось отправить email: {e}") def generate_report(self) -> str: """Генерирует HTML-отчет.""" up_count = sum(1 for r in self.results if r["status"] == "UP") down_count = len(self.results) - up_count html = f""" <html> <head><title>Service Health Report</title></head> <body> <h1>📊 Service Health Report</h1> <p>Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}</p> <div style="margin: 20px 0;"> <span style="background: {'#4CAF50' if down_count == 0 else '#f44336'}; color: white; padding: 10px; border-radius: 5px;"> ✅ UP: {up_count} | ❌ DOWN: {down_count} </span> </div> <table border="1" cellpadding="10" style="border-collapse: collapse;"> <tr> <th>Service</th><th>Status</th><th>Response Time</th><th>Details</th> </tr> """ for result in self.results: color = "#4CAF50" if result["status"] == "UP" else "#f44336" html += f""" <tr> <td>{result['service']}</td> <td style="background: {color}; color: white;">{result['status']}</td> <td>{result['response_time']} ms</td> <td>{result['error'] or 'OK'}</td> </tr> """ html += "</table></body></html>" return html # Конфигурация сервисов для мониторинга SERVICES = [ Service("Main API", "https://api.example.com/health/", expected_status=200), Service("Database", "https://db.example.com/ping/", timeout=10), Service("Cache", "http://cache.example.com:6379/", method="GET", expected_status=200), Service("Authentication", "https://auth.example.com/health/"), Service("Frontend", "https://example.com/", timeout=8), ] if __name__ == "__main__": # Запуск как демона с интервалом import schedule monitor = ServiceMonitor(SERVICES) def job(): monitor.run_checks() # Сохраняем отчет в файл report = monitor.generate_report() with open("/var/log/service_monitor/report.html", "w") as f: f.write(report) # Запускаем каждые 5 минут schedule.every(5).minutes.do(job) # Первый запуск сразу job() # Бесконечный цикл while True: schedule.run_pending() time.sleep(1)
2.2. Мониторинг метрик приложения и отправка в Prometheus
# metrics_exporter.py from prometheus_client import start_http_server, Gauge, Counter, Histogram import psutil import time import requests from typing import Dict import threading class ApplicationMetrics: def __init__(self, port=8000): self.port = port # Метрики системы self.cpu_usage = Gauge('app_cpu_usage_percent', 'CPU usage percentage') self.memory_usage = Gauge('app_memory_usage_bytes', 'Memory usage in bytes') self.disk_usage = Gauge('app_disk_usage_percent', 'Disk usage percentage') # Метрики приложения self.request_count = Counter('app_requests_total', 'Total HTTP requests') self.error_count = Counter('app_errors_total', 'Total application errors') self.response_time = Histogram('app_response_time_seconds', 'Request response time') # Бизнес-метрики self.active_users = Gauge('app_active_users', 'Number of active users') self.queue_size = Gauge('app_queue_size', 'Size of processing queue') def collect_system_metrics(self): """Сбор системных метрик.""" self.cpu_usage.set(psutil.cpu_percent()) memory = psutil.virtual_memory() self.memory_usage.set(memory.used) disk = psutil.disk_usage('/') self.disk_usage.set(disk.percent) def record_request(self, duration: float, status_code: int): """Запись метрик HTTP-запроса.""" self.request_count.inc() self.response_time.observe(duration) if status_code >= 500: self.error_count.inc() def update_business_metrics(self, users: int, queue: int): """Обновление бизнес-метрик.""" self.active_users.set(users) self.queue_size.set(queue) def run_metrics_server(self): """Запуск HTTP-сервера для сбора метрик Prometheus.""" start_http_server(self.port) print(f"📊 Metrics server started on port {self.port}") while True: self.collect_system_metrics() time.sleep(5) # Обновляем каждые 5 секунд # Middleware для FastAPI/Django для сбора метрик def metrics_middleware(app): """Middleware для сбора метрик HTTP-запросов.""" metrics = ApplicationMetrics() # Запускаем сбор системных метрик в отдельном потоке threading.Thread(target=metrics.run_metrics_server, daemon=True).start() async def middleware(request, call_next): start_time = time.time() try: response = await call_next(request) duration = time.time() - start_time # Записываем метрики metrics.record_request(duration, response.status_code) return response except Exception as e: duration = time.time() - start_time metrics.record_request(duration, 500) raise e return middleware # Пример использования с FastAPI from fastapi import FastAPI, Request app = FastAPI() app.middleware("http")(metrics_middleware(app)) @app.get("/metrics/stats") async def get_stats(): """Эндпоинт для получения текущих метрик (дополнительно к Prometheus).""" return { "system": { "cpu": psutil.cpu_percent(), "memory": psutil.virtual_memory().percent, "disk": psutil.disk_usage('/').percent } }
2.3. Скрипт для мониторинга логов и обнаружения аномалий
# log_monitor.py import re from pathlib import Path import time from collections import defaultdict, deque import json from datetime import datetime, timedelta class LogMonitor: def __init__(self, log_path: Path, patterns: Dict[str, str]): self.log_path = log_path self.patterns = {name: re.compile(pattern) for name, pattern in patterns.items()} self.stats = defaultdict(int) self.alert_thresholds = { "ERROR": 10, # Более 10 ошибок в минуту "FAILED_LOGIN": 5, # Более 5 неудачных попыток входа в минуту } self.recent_events = deque(maxlen=1000) # Кольцевой буфер последних событий def tail_log(self): """Чтение лога в реальном времени (аналог tail -f).""" with open(self.log_path, 'r') as f: # Перемещаемся в конец файла f.seek(0, 2) while True: line = f.readline() if line: yield line else: time.sleep(0.1) # Пауза при отсутствии новых данных def parse_line(self, line: str) -> Dict: """Парсит строку лога и извлекает информацию.""" timestamp_match = re.search(r'\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}', line) timestamp = datetime.now() if timestamp_match: try: timestamp = datetime.strptime(timestamp_match.group(), '%Y-%m-%d %H:%M:%S') except: pass event_type = "UNKNOWN" details = {} # Проверяем все паттерны for name, pattern in self.patterns.items(): match = pattern.search(line) if match: event_type = name details = match.groupdict() break return { "timestamp": timestamp, "event_type": event_type, "raw_line": line.strip(), "details": details } def analyze_events(self): """Анализирует события и генерирует алерты.""" current_time = datetime.now() one_minute_ago = current_time - timedelta(minutes=1) # Фильтруем события за последнюю минуту recent = [e for e in self.recent_events if e["timestamp"] > one_minute_ago] # Считаем статистику minute_stats = defaultdict(int) for event in recent: minute_stats[event["event_type"]] += 1 # Проверяем пороги alerts = [] for event_type, threshold in self.alert_thresholds.items(): if minute_stats[event_type] > threshold: alerts.append({ "type": "THRESHOLD_EXCEEDED", "event": event_type, "count": minute_stats[event_type], "threshold": threshold, "period": "1 minute" }) # Поиск паттернов атак (пример) failed_logins = [e for e in recent if e["event_type"] == "FAILED_LOGIN"] if len(failed_logins) >= 3: # Проверяем, идут ли неудачные попытки с одного IP ips = [e["details"].get("ip") for e in failed_logins if e["details"].get("ip")] if len(set(ips)) == 1 and ips[0]: alerts.append({ "type": "BRUTE_FORCE_ATTEMPT", "ip": ips[0], "attempts": len(failed_logins), "period": "1 minute" }) return alerts def run(self): """Основной цикл мониторинга.""" print(f"🔍 Начинаем мониторинг лога {self.log_path}") for line in self.tail_log(): event = self.parse_line(line) self.recent_events.append(event) # Обновляем общую статистику self.stats[event["event_type"]] += 1 # Немедленный алерт для критических ошибок if event["event_type"] in ["CRITICAL", "FATAL"]: print(f"🚨 НЕМЕДЛЕННЫЙ АЛЕРТ: {event['raw_line']}") # Периодический анализ if len(self.recent_events) % 100 == 0: # Каждые 100 событий alerts = self.analyze_events() if alerts: for alert in alerts: print(f"⚠️ АЛЕРТ: {json.dumps(alert, indent=2, default=str)}") # Паттерны для поиска в логах LOG_PATTERNS = { "ERROR": r'ERROR\s+(?P<message>.*)', "WARNING": r'WARNING\s+(?P<message>.*)', "FAILED_LOGIN": r'Failed login for user (?P<user>\w+) from (?P<ip>[\d.]+)', "SUCCESSFUL_LOGIN": r'Successful login for user (?P<user>\w+) from (?P<ip>[\d.]+)', "SQL_INJECTION": r'(SELECT|INSERT|UPDATE|DELETE|DROP|UNION).*FROM.*WHERE.*=.*[\'"]', "CRITICAL": r'CRITICAL\s+(?P<component>\w+):(?P<message>.*)', } if __name__ == "__main__": monitor = LogMonitor( log_path=Path("/var/log/myapp/app.log"), patterns=LOG_PATTERNS ) monitor.run()
Вывод
Python предоставляет DevOps-инженерам и разработчикам мощный инструментарий для автоматизации рутинных задач.

