Автоматизация DevOps на Python: скрипты для CI/CD и мониторинга

Введение

Python, благодаря своей простоте, богатой экосистеме библиотек и кроссплатформенности, стал одним из основных языков для автоматизации DevOps-процессов. Он идеально подходит для написания скриптов, которые связывают различные этапы жизненного цикла разработки: от сборки и тестирования до развертывания и мониторинга. В этом посте мы рассмотрим практические примеры скриптов на Python для построения эффективного CI/CD-конвейера и создания простых, но мощных инструментов мониторинга.


Python в CI/CD: Автоматизация пайплайнов

1.1. Скрипт для автоматической сборки и проверки качества кода

Такой скрипт можно запускать локально перед коммитом или использовать как этап в GitLab CI/GitHub Actions.

#!/usr/bin/env python3
"""
Скрипт для автоматической проверки Python-проекта.
Запускает: линтинг, форматирование, проверку типов, тесты.
"""
import subprocess
import sys
from pathlib import Path
from typing import List, Tuple

def run_command(cmd: List[str], cwd: Path = None) -> Tuple[bool, str]:
    """Запускает команду и возвращает (успех, вывод)."""
    try:
        result = subprocess.run(
            cmd,
            cwd=cwd,
            capture_output=True,
            text=True,
            check=True
        )
        return True, result.stdout
    except subprocess.CalledProcessError as e:
        return False, f"{e.stderr}\nExit code: {e.returncode}"

def main():
    project_root = Path(__file__).parent
    all_ok = True
    
    print("🔍 Запуск проверок качества кода...\n")
    
    # 1. Проверка синтаксиса и стиля с помощью ruff (современная замена flake8 + isort + black)
    print("1. Запуск ruff (линтер и форматтер)...")
    success, output = run_command(["ruff", "check", "--fix", "."], project_root)
    if not success:
        print(f"❌ Ruff found issues:\n{output}")
        all_ok = False
    else:
        print("✅ Код соответствует стандартам")
    
    # 2. Проверка типов с помощью mypy
    print("\n2. Проверка типов mypy...")
    success, output = run_command(["mypy", "src/", "tests/"], project_root)
    if not success:
        print(f"⚠️  Mypy warnings:\n{output}")
        # Не считаем это фатальной ошибкой, только предупреждение
    else:
        print("✅ Аннотации типов в порядке")
    
    # 3. Запуск тестов с покрытием
    print("\n3. Запуск тестов pytest...")
    success, output = run_command([
        "pytest", 
        "tests/", 
        "-v", 
        "--cov=src", 
        "--cov-report=term-missing",
        "--cov-fail-under=80"  # Требуем минимум 80% покрытия
    ], project_root)
    
    if not success:
        print(f"❌ Тесты не прошли или покрытие недостаточно:\n{output}")
        all_ok = False
    else:
        print("✅ Все тесты пройдены, покрытие достаточное")
    
    # 4. Проверка уязвимостей в зависимостях
    print("\n4. Проверка уязвимостей (safety)...")
    success, output = run_command(["safety", "check", "--full-report"], project_root)
    if not success:
        print(f"🚨 Найдены уязвимости:\n{output}")
        all_ok = False
    else:
        print("✅ Уязвимостей не найдено")
    
    # 5. Генерация отчета о зависимостях
    print("\n5. Анализ зависимостей (pip-audit)...")
    success, output = run_command(["pip-audit"], project_root)
    if not success:
        print(f"⚠️  Проблемы с зависимостями:\n{output}")
    
    print("\n" + "="*50)
    if all_ok:
        print("✅ Все проверки пройдены успешно!")
        sys.exit(0)
    else:
        print("❌ Некоторые проверки не пройдены")
        sys.exit(1)

if __name__ == "__main__":
    main()

1.2. Автоматизация развертывания с использованием Fabric/Invoke

Для автоматизации деплоя на сервера можно использовать библиотеки Fabric или Invoke.

# deploy.py
from invoke import task
import paramiko
from pathlib import Path
import sys

def get_ssh_client(host: str, username: str, key_path: Path) -> paramiko.SSHClient:
    """Создает SSH-клиент для подключения к серверу."""
    client = paramiko.SSHClient()
    client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
    
    key = paramiko.RSAKey.from_private_key_file(str(key_path))
    client.connect(hostname=host, username=username, pkey=key)
    
    return client

@task
def deploy_prod(c, version="latest"):
    """
    Развертывание приложения на production-сервере.
    Использование: invoke deploy-prod --version=v1.2.3
    """
    config = {
        'host': 'prod-server.example.com',
        'user': 'deploy',
        'key_path': Path.home() / '.ssh' / 'deploy_key',
        'app_dir': '/var/www/myapp',
        'docker_registry': 'registry.example.com/myapp'
    }
    
    print(f"🚀 Начинаем деплой версии {version} на {config['host']}")
    
    # 1. Подключаемся к серверу
    ssh = get_ssh_client(config['host'], config['user'], config['key_path'])
    
    try:
        # 2. Останавливаем текущее приложение
        print("1. Остановка текущего контейнера...")
        stdin, stdout, stderr = ssh.exec_command(
            f"cd {config['app_dir']} && docker-compose down"
        )
        print(stdout.read().decode())
        
        # 3. Пулл новой версии образа
        print(f"2. Загрузка образа {version}...")
        image_name = f"{config['docker_registry']}:{version}"
        stdin, stdout, stderr = ssh.exec_command(f"docker pull {image_name}")
        pull_output = stdout.read().decode()
        if "error" in pull_output.lower():
            print(f"❌ Ошибка загрузки образа: {pull_output}")
            sys.exit(1)
        
        # 4. Обновляем docker-compose.yml с новой версией
        print("3. Обновление конфигурации...")
        update_cmd = f"""
        cd {config['app_dir']} && \
        sed -i 's|image:.*|image: {image_name}|' docker-compose.yml
        """
        ssh.exec_command(update_cmd)
        
        # 5. Запускаем обновленное приложение
        print("4. Запуск нового контейнера...")
        stdin, stdout, stderr = ssh.exec_command(
            f"cd {config['app_dir']} && docker-compose up -d"
        )
        
        # 6. Проверяем здоровье приложения
        print("5. Проверка здоровья приложения...")
        stdin, stdout, stderr = ssh.exec_command(
            "sleep 5 && "
            f"cd {config['app_dir']} && "
            "docker-compose exec -T app curl -f http://localhost:8000/health/"
        )
        
        health_output = stdout.read().decode()
        if "healthy" in health_output or "200" in health_output:
            print(f"✅ Деплой версии {version} успешно завершен!")
        else:
            print(f"⚠️  Приложение запущено, но проверка здоровья неудачна: {health_output}")
            
    finally:
        ssh.close()

@task
def rollback(c, previous_version="previous"):
    """Откат к предыдущей версии."""
    print(f"🔙 Откат к версии {previous_version}...")
    # Реализация аналогична deploy, но с указанием предыдущей версии

@task
def run_migrations(c, env="production"):
    """Запуск миграций БД."""
    print(f"📦 Запуск миграций для {env}...")
    # Команды для применения миграций через Alembic или Django migrations

1.3. Интеграция с API GitHub/GitLab для автоматизации

# github_automation.py
import requests
import os
from typing import Optional
from datetime import datetime, timedelta

class GitHubAutomation:
    def __init__(self, token: str, repo: str):
        self.token = token
        self.repo = repo
        self.headers = {
            "Authorization": f"token {token}",
            "Accept": "application/vnd.github.v3+json"
        }
        self.base_url = f"https://api.github.com/repos/{repo}"
    
    def create_release(self, tag: str, name: str, body: str) -> dict:
        """Создает релиз на GitHub."""
        url = f"{self.base_url}/releases"
        data = {
            "tag_name": tag,
            "name": name,
            "body": body,
            "draft": False,
            "prerelease": False
        }
        
        response = requests.post(url, json=data, headers=self.headers)
        response.raise_for_status()
        return response.json()
    
    def get_stale_pull_requests(self, days_old: int = 30) -> list:
        """Находит PR, которые давно не обновлялись."""
        url = f"{self.base_url}/pulls"
        params = {"state": "open", "sort": "updated", "direction": "desc"}
        
        response = requests.get(url, params=params, headers=self.headers)
        prs = response.json()
        
        stale_cutoff = datetime.now() - timedelta(days=days_old)
        stale_prs = []
        
        for pr in prs:
            updated_at = datetime.fromisoformat(pr["updated_at"].replace("Z", ""))
            if updated_at < stale_cutoff:
                stale_prs.append({
                    "number": pr["number"],
                    "title": pr["title"],
                    "author": pr["user"]["login"],
                    "last_updated": pr["updated_at"],
                    "url": pr["html_url"]
                })
        
        return stale_prs
    
    def auto_merge_dependabot(self):
        """Автоматически мержит безопасные обновления от Dependabot."""
        url = f"{self.base_url}/pulls"
        params = {"state": "open", "creator": "dependabot[bot]"}
        
        response = requests.get(url, params=params, headers=self.headers)
        prs = response.json()
        
        for pr in prs:
            # Проверяем, что все проверки пройдены
            status_url = f"{self.base_url}/commits/{pr['head']['sha']}/status"
            status_resp = requests.get(status_url, headers=self.headers)
            status = status_resp.json()
            
            if status["state"] == "success":
                # Мержим PR
                merge_url = f"{self.base_url}/pulls/{pr['number']}/merge"
                merge_data = {"merge_method": "squash"}
                
                merge_resp = requests.put(merge_url, json=merge_data, headers=self.headers)
                if merge_resp.status_code == 200:
                    print(f"✅ Автоматически вмержен PR #{pr['number']}: {pr['title']}")
                else:
                    print(f"❌ Не удалось вмержить PR #{pr['number']}: {merge_resp.text}")

# Использование
if __name__ == "__main__":
    token = os.getenv("GITHUB_TOKEN")
    automator = GitHubAutomation(token, "myorg/myrepo")
    
    # Создаем релиз
    automator.create_release(
        tag="v1.2.3",
        name="Release v1.2.3",
        body="## Что нового\n- Фикс бага XYZ\n- Улучшение производительности"
    )
    
    # Получаем старые PR для ревью
    stale = automator.get_stale_pull_requests(14)
    if stale:
        print(f"Найдены {len(stale)} старых PR:")
        for pr in stale:
            print(f"  PR#{pr['number']}: {pr['title']} ({pr['last_updated']})")

Мониторинг и алертинг на Python

2.1. Скрипт мониторинга состояния сервисов

#!/usr/bin/env python3
"""
Скрипт для проверки доступности сервисов и отправки уведомлений.
"""
import requests
import smtplib
from email.mime.text import MIMEText
from datetime import datetime
import time
import json
from typing import Dict, List
import logging
from dataclasses import dataclass

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

@dataclass
class Service:
    name: str
    url: str
    method: str = "GET"
    timeout: int = 5
    expected_status: int = 200
    check_ssl: bool = True

class ServiceMonitor:
    def __init__(self, services: List[Service]):
        self.services = services
        self.results = []
        
    def check_service(self, service: Service) -> Dict:
        """Проверяет доступность одного сервиса."""
        start_time = time.time()
        
        try:
            response = requests.request(
                method=service.method,
                url=service.url,
                timeout=service.timeout,
                verify=service.check_ssl
            )
            elapsed = time.time() - start_time
            
            is_ok = response.status_code == service.expected_status
            status = "UP" if is_ok else "DOWN"
            
            result = {
                "service": service.name,
                "status": status,
                "status_code": response.status_code,
                "response_time": round(elapsed * 1000, 2),  # мс
                "timestamp": datetime.now().isoformat(),
                "error": None
            }
            
            if not is_ok:
                result["error"] = f"Expected {service.expected_status}, got {response.status_code}"
                
        except Exception as e:
            elapsed = time.time() - start_time
            result = {
                "service": service.name,
                "status": "DOWN",
                "status_code": None,
                "response_time": round(elapsed * 1000, 2),
                "timestamp": datetime.now().isoformat(),
                "error": str(e)
            }
        
        return result
    
    def run_checks(self):
        """Запускает проверку всех сервисов."""
        logger.info(f"Запуск проверки {len(self.services)} сервисов...")
        
        for service in self.services:
            result = self.check_service(service)
            self.results.append(result)
            
            if result["status"] == "DOWN":
                logger.error(f"❌ {service.name}: {result['error']}")
                self.send_alert(result)
            else:
                logger.info(f"✅ {service.name}: {result['response_time']}ms")
    
    def send_alert(self, result: Dict):
        """Отправляет уведомление о проблеме."""
        # 1. Email
        self._send_email_alert(result)
        
        # 2. Slack (альтернатива)
        # self._send_slack_alert(result)
        
        # 3. Telegram (альтернатива)
        # self._send_telegram_alert(result)
    
    def _send_email_alert(self, result: Dict):
        """Отправляет алерт по email."""
        subject = f"🚨 ALERT: {result['service']} is DOWN"
        body = f"""
        Service: {result['service']}
        Status: {result['status']}
        Error: {result['error']}
        Time: {result['timestamp']}
        
        Check URL: {next(s.url for s in self.services if s.name == result['service'])}
        """
        
        msg = MIMEText(body)
        msg['Subject'] = subject
        msg['From'] = "monitor@example.com"
        msg['To'] = "devops@example.com"
        
        try:
            with smtplib.SMTP('smtp.example.com', 587) as server:
                server.starttls()
                server.login("user", "password")
                server.send_message(msg)
            logger.info(f"📧 Алерт отправлен для {result['service']}")
        except Exception as e:
            logger.error(f"Не удалось отправить email: {e}")
    
    def generate_report(self) -> str:
        """Генерирует HTML-отчет."""
        up_count = sum(1 for r in self.results if r["status"] == "UP")
        down_count = len(self.results) - up_count
        
        html = f"""
        <html>
        <head><title>Service Health Report</title></head>
        <body>
            <h1>📊 Service Health Report</h1>
            <p>Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}</p>
            
            <div style="margin: 20px 0;">
                <span style="background: {'#4CAF50' if down_count == 0 else '#f44336'}; 
                           color: white; padding: 10px; border-radius: 5px;">
                    ✅ UP: {up_count} | ❌ DOWN: {down_count}
                </span>
            </div>
            
            <table border="1" cellpadding="10" style="border-collapse: collapse;">
                <tr>
                    <th>Service</th><th>Status</th><th>Response Time</th><th>Details</th>
                </tr>
        """
        
        for result in self.results:
            color = "#4CAF50" if result["status"] == "UP" else "#f44336"
            html += f"""
                <tr>
                    <td>{result['service']}</td>
                    <td style="background: {color}; color: white;">{result['status']}</td>
                    <td>{result['response_time']} ms</td>
                    <td>{result['error'] or 'OK'}</td>
                </tr>
            """
        
        html += "</table></body></html>"
        return html

# Конфигурация сервисов для мониторинга
SERVICES = [
    Service("Main API", "https://api.example.com/health/", expected_status=200),
    Service("Database", "https://db.example.com/ping/", timeout=10),
    Service("Cache", "http://cache.example.com:6379/", method="GET", expected_status=200),
    Service("Authentication", "https://auth.example.com/health/"),
    Service("Frontend", "https://example.com/", timeout=8),
]

if __name__ == "__main__":
    # Запуск как демона с интервалом
    import schedule
    
    monitor = ServiceMonitor(SERVICES)
    
    def job():
        monitor.run_checks()
        # Сохраняем отчет в файл
        report = monitor.generate_report()
        with open("/var/log/service_monitor/report.html", "w") as f:
            f.write(report)
    
    # Запускаем каждые 5 минут
    schedule.every(5).minutes.do(job)
    
    # Первый запуск сразу
    job()
    
    # Бесконечный цикл
    while True:
        schedule.run_pending()
        time.sleep(1)

2.2. Мониторинг метрик приложения и отправка в Prometheus

# metrics_exporter.py
from prometheus_client import start_http_server, Gauge, Counter, Histogram
import psutil
import time
import requests
from typing import Dict
import threading

class ApplicationMetrics:
    def __init__(self, port=8000):
        self.port = port
        
        # Метрики системы
        self.cpu_usage = Gauge('app_cpu_usage_percent', 'CPU usage percentage')
        self.memory_usage = Gauge('app_memory_usage_bytes', 'Memory usage in bytes')
        self.disk_usage = Gauge('app_disk_usage_percent', 'Disk usage percentage')
        
        # Метрики приложения
        self.request_count = Counter('app_requests_total', 'Total HTTP requests')
        self.error_count = Counter('app_errors_total', 'Total application errors')
        self.response_time = Histogram('app_response_time_seconds', 'Request response time')
        
        # Бизнес-метрики
        self.active_users = Gauge('app_active_users', 'Number of active users')
        self.queue_size = Gauge('app_queue_size', 'Size of processing queue')
        
    def collect_system_metrics(self):
        """Сбор системных метрик."""
        self.cpu_usage.set(psutil.cpu_percent())
        
        memory = psutil.virtual_memory()
        self.memory_usage.set(memory.used)
        
        disk = psutil.disk_usage('/')
        self.disk_usage.set(disk.percent)
    
    def record_request(self, duration: float, status_code: int):
        """Запись метрик HTTP-запроса."""
        self.request_count.inc()
        self.response_time.observe(duration)
        
        if status_code >= 500:
            self.error_count.inc()
    
    def update_business_metrics(self, users: int, queue: int):
        """Обновление бизнес-метрик."""
        self.active_users.set(users)
        self.queue_size.set(queue)
    
    def run_metrics_server(self):
        """Запуск HTTP-сервера для сбора метрик Prometheus."""
        start_http_server(self.port)
        print(f"📊 Metrics server started on port {self.port}")
        
        while True:
            self.collect_system_metrics()
            time.sleep(5)  # Обновляем каждые 5 секунд

# Middleware для FastAPI/Django для сбора метрик
def metrics_middleware(app):
    """Middleware для сбора метрик HTTP-запросов."""
    metrics = ApplicationMetrics()
    
    # Запускаем сбор системных метрик в отдельном потоке
    threading.Thread(target=metrics.run_metrics_server, daemon=True).start()
    
    async def middleware(request, call_next):
        start_time = time.time()
        
        try:
            response = await call_next(request)
            duration = time.time() - start_time
            
            # Записываем метрики
            metrics.record_request(duration, response.status_code)
            
            return response
        except Exception as e:
            duration = time.time() - start_time
            metrics.record_request(duration, 500)
            raise e
    
    return middleware

# Пример использования с FastAPI
from fastapi import FastAPI, Request

app = FastAPI()
app.middleware("http")(metrics_middleware(app))

@app.get("/metrics/stats")
async def get_stats():
    """Эндпоинт для получения текущих метрик (дополнительно к Prometheus)."""
    return {
        "system": {
            "cpu": psutil.cpu_percent(),
            "memory": psutil.virtual_memory().percent,
            "disk": psutil.disk_usage('/').percent
        }
    }

2.3. Скрипт для мониторинга логов и обнаружения аномалий

# log_monitor.py
import re
from pathlib import Path
import time
from collections import defaultdict, deque
import json
from datetime import datetime, timedelta

class LogMonitor:
    def __init__(self, log_path: Path, patterns: Dict[str, str]):
        self.log_path = log_path
        self.patterns = {name: re.compile(pattern) for name, pattern in patterns.items()}
        self.stats = defaultdict(int)
        self.alert_thresholds = {
            "ERROR": 10,  # Более 10 ошибок в минуту
            "FAILED_LOGIN": 5,  # Более 5 неудачных попыток входа в минуту
        }
        self.recent_events = deque(maxlen=1000)  # Кольцевой буфер последних событий
        
    def tail_log(self):
        """Чтение лога в реальном времени (аналог tail -f)."""
        with open(self.log_path, 'r') as f:
            # Перемещаемся в конец файла
            f.seek(0, 2)
            
            while True:
                line = f.readline()
                if line:
                    yield line
                else:
                    time.sleep(0.1)  # Пауза при отсутствии новых данных
    
    def parse_line(self, line: str) -> Dict:
        """Парсит строку лога и извлекает информацию."""
        timestamp_match = re.search(r'\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}', line)
        timestamp = datetime.now()
        
        if timestamp_match:
            try:
                timestamp = datetime.strptime(timestamp_match.group(), '%Y-%m-%d %H:%M:%S')
            except:
                pass
        
        event_type = "UNKNOWN"
        details = {}
        
        # Проверяем все паттерны
        for name, pattern in self.patterns.items():
            match = pattern.search(line)
            if match:
                event_type = name
                details = match.groupdict()
                break
        
        return {
            "timestamp": timestamp,
            "event_type": event_type,
            "raw_line": line.strip(),
            "details": details
        }
    
    def analyze_events(self):
        """Анализирует события и генерирует алерты."""
        current_time = datetime.now()
        one_minute_ago = current_time - timedelta(minutes=1)
        
        # Фильтруем события за последнюю минуту
        recent = [e for e in self.recent_events if e["timestamp"] > one_minute_ago]
        
        # Считаем статистику
        minute_stats = defaultdict(int)
        for event in recent:
            minute_stats[event["event_type"]] += 1
        
        # Проверяем пороги
        alerts = []
        for event_type, threshold in self.alert_thresholds.items():
            if minute_stats[event_type] > threshold:
                alerts.append({
                    "type": "THRESHOLD_EXCEEDED",
                    "event": event_type,
                    "count": minute_stats[event_type],
                    "threshold": threshold,
                    "period": "1 minute"
                })
        
        # Поиск паттернов атак (пример)
        failed_logins = [e for e in recent if e["event_type"] == "FAILED_LOGIN"]
        if len(failed_logins) >= 3:
            # Проверяем, идут ли неудачные попытки с одного IP
            ips = [e["details"].get("ip") for e in failed_logins if e["details"].get("ip")]
            if len(set(ips)) == 1 and ips[0]:
                alerts.append({
                    "type": "BRUTE_FORCE_ATTEMPT",
                    "ip": ips[0],
                    "attempts": len(failed_logins),
                    "period": "1 minute"
                })
        
        return alerts
    
    def run(self):
        """Основной цикл мониторинга."""
        print(f"🔍 Начинаем мониторинг лога {self.log_path}")
        
        for line in self.tail_log():
            event = self.parse_line(line)
            self.recent_events.append(event)
            
            # Обновляем общую статистику
            self.stats[event["event_type"]] += 1
            
            # Немедленный алерт для критических ошибок
            if event["event_type"] in ["CRITICAL", "FATAL"]:
                print(f"🚨 НЕМЕДЛЕННЫЙ АЛЕРТ: {event['raw_line']}")
            
            # Периодический анализ
            if len(self.recent_events) % 100 == 0:  # Каждые 100 событий
                alerts = self.analyze_events()
                if alerts:
                    for alert in alerts:
                        print(f"⚠️  АЛЕРТ: {json.dumps(alert, indent=2, default=str)}")

# Паттерны для поиска в логах
LOG_PATTERNS = {
    "ERROR": r'ERROR\s+(?P<message>.*)',
    "WARNING": r'WARNING\s+(?P<message>.*)',
    "FAILED_LOGIN": r'Failed login for user (?P<user>\w+) from (?P<ip>[\d.]+)',
    "SUCCESSFUL_LOGIN": r'Successful login for user (?P<user>\w+) from (?P<ip>[\d.]+)',
    "SQL_INJECTION": r'(SELECT|INSERT|UPDATE|DELETE|DROP|UNION).*FROM.*WHERE.*=.*[\'"]',
    "CRITICAL": r'CRITICAL\s+(?P<component>\w+):(?P<message>.*)',
}

if __name__ == "__main__":
    monitor = LogMonitor(
        log_path=Path("/var/log/myapp/app.log"),
        patterns=LOG_PATTERNS
    )
    monitor.run()

Вывод

Python предоставляет DevOps-инженерам и разработчикам мощный инструментарий для автоматизации рутинных задач.