Современный Python в 2026: от итераторов до асинхронных контекстных менеджеров

Введение

В 2026 году Python продолжает доминировать как универсальный язык программирования, но его ландшафт кардинально изменился. Если раньше Python выбирали за простоту и читаемость, то сегодня — за мощную экосистему, производительность в production-среде и элегантные абстракции для сложных систем. Современный Python — это не просто for-циклы и списки, а сложная система итераторов, контекстных менеджеров, декораторов и асинхронных паттернов, которые позволяют писать код уровня enterprise.


Итераторы и генераторы: сердце Python-памяти

1.1. От простых итераций до кастомных итераторов

from typing import Iterator, Protocol
import time

class DataStreamProtocol(Protocol):
    """Протокол для потоков данных."""
    def read_chunk(self) -> bytes: ...

class BufferedIterator:
    """Итератор с буферизацией для работы с большими данными."""
    
    def __init__(self, stream: DataStreamProtocol, chunk_size: int = 1024):
        self.stream = stream
        self.chunk_size = chunk_size
        self._buffer = b''
        self._exhausted = False
    
    def __iter__(self) -> Iterator[bytes]:
        return self
    
    def __next__(self) -> bytes:
        if self._exhausted and not self._buffer:
            raise StopIteration
        
        while len(self._buffer) < self.chunk_size and not self._exhausted:
            try:
                chunk = self.stream.read_chunk()
                if not chunk:
                    self._exhausted = True
                    break
                self._buffer += chunk
            except IOError:
                self._exhausted = True
                break
        
        if not self._buffer:
            raise StopIteration
        
        result = self._buffer[:self.chunk_size]
        self._buffer = self._buffer[self.chunk_size:]
        return result

# Практический пример: обработка логов в реальном времени
def process_log_stream(log_file_path: str) -> Iterator[dict]:
    """Построчно читает и парсит логи без загрузки в память."""
    with open(log_file_path, 'r', encoding='utf-8') as f:
        for line in f:
            if not line.strip():
                continue
            # Парсинг JSON-лога
            try:
                yield json.loads(line)
            except json.JSONDecodeError:
                continue

# Использование
for log_entry in process_log_stream('/var/log/app.log'):
    if log_entry.get('level') == 'ERROR':
        send_alert(log_entry)

Ключевые моменты:

  • Итераторы позволяют обрабатывать терабайты данных без загрузки в память

  • Поддержка протоколов (Protocol) для статической проверки типов

  • Ленивые вычисления — данные обрабатываются только когда нужны

1.2. Генераторы с состоянием и корутины

from typing import Generator, Any
from dataclasses import dataclass
from enum import Enum

class ProcessorState(Enum):
    IDLE = "idle"
    PROCESSING = "processing"
    PAUSED = "paused"
    COMPLETED = "completed"

@dataclass
class ProcessingResult:
    item_id: str
    status: str
    metrics: dict[str, float]

def data_pipeline() -> Generator[ProcessingResult | None, Any, list[str]]:
    """Продвинутый генератор с двухсторонней коммуникацией."""
    processed_ids = []
    state = ProcessorState.IDLE
    
    try:
        while True:
            # Получение данных от вызывающей стороны
            item = yield None  # type: ignore
            
            if item is None:
                continue
            
            if isinstance(item, str) and item == "PAUSE":
                state = ProcessorState.PAUSED
                yield ProcessingResult("", "paused", {})
                continue
            
            state = ProcessorState.PROCESSING
            result = process_item(item)
            processed_ids.append(result.item_id)
            
            # Отправка результата обратно
            yield result
            
    except GeneratorExit:
        state = ProcessorState.COMPLETED
        return processed_ids

def process_item(item: Any) -> ProcessingResult:
    """Обработка одного элемента."""
    time.sleep(0.01)  # Имитация работы
    return ProcessingResult(
        item_id=f"item_{hash(item)}",
        status="processed",
        metrics={"processing_time": 0.01, "size": len(str(item))}
    )

# Использование
pipeline = data_pipeline()
next(pipeline)  # Инициализация

for data in ["data1", "data2", "PAUSE", "data3"]:
    result = pipeline.send(data)
    if result:
        print(f"Обработано: {result.item_id}")

try:
    pipeline.close()
except StopIteration as e:
    print(f"Всего обработано: {len(e.value)} элементов")

Контекстные менеджеры: управление ресурсами на уровне предприятия

2.1. Асинхронные контекстные менеджеры для распределенных систем

import asyncio
from typing import Optional, AsyncContextManager
from contextlib import asynccontextmanager
import aiohttp
from datetime import datetime, timedelta
import ssl
import certifi

class CircuitBreaker:
    """Circuit Breaker для обработки сбоев во внешних сервисах."""
    
    def __init__(self, failure_threshold: int = 5, reset_timeout: int = 60):
        self.failure_threshold = failure_threshold
        self.reset_timeout = reset_timeout
        self.failures = 0
        self.state = "CLOSED"  # CLOSED, OPEN, HALF_OPEN
        self.last_failure_time: Optional[datetime] = None
    
    async def __aenter__(self) -> "CircuitBreaker":
        if self.state == "OPEN":
            if self.last_failure_time and \
               datetime.now() - self.last_failure_time > timedelta(seconds=self.reset_timeout):
                self.state = "HALF_OPEN"
            else:
                raise CircuitOpenError("Circuit breaker is OPEN")
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        if exc_type is None:
            if self.state == "HALF_OPEN":
                self.state = "CLOSED"
                self.failures = 0
        else:
            self.failures += 1
            self.last_failure_time = datetime.now()
            if self.failures >= self.failure_threshold:
                self.state = "OPEN"
        return False

class CircuitOpenError(Exception):
    pass

@asynccontextmanager
async def create_http_client(
    base_url: str,
    timeout: int = 30,
    max_connections: int = 100,
    enable_ssl: bool = True
) -> AsyncContextManager[aiohttp.ClientSession]:
    """Создание HTTP-клиента с поддержкой SSL и пулом соединений."""
    ssl_context = None
    if enable_ssl:
        ssl_context = ssl.create_default_context(cafile=certifi.where())
        ssl_context.check_hostname = True
        ssl_context.verify_mode = ssl.CERT_REQUIRED
    
    connector = aiohttp.TCPConnector(
        limit=max_connections,
        ssl=ssl_context,
        enable_cleanup_closed=True
    )
    
    timeout_config = aiohttp.ClientTimeout(total=timeout)
    
    session = aiohttp.ClientSession(
        base_url=base_url,
        connector=connector,
        timeout=timeout_config,
        headers={
            "User-Agent": "AdvancedPythonClient/2026",
            "Accept": "application/json"
        }
    )
    
    try:
        yield session
    finally:
        await session.close()
        await connector.close()

async def fetch_with_retry(
    url: str,
    max_retries: int = 3,
    circuit_breaker: Optional[CircuitBreaker] = None
) -> dict[str, Any]:
    """Запрос с повторными попытками и circuit breaker."""
    retry_delays = [1, 2, 4]  # Exponential backoff
    
    async with create_http_client("https://api.example.com") as session:
        for attempt in range(max_retries):
            try:
                # Использование circuit breaker если предоставлен
                if circuit_breaker:
                    async with circuit_breaker:
                        async with session.get(url) as response:
                            if response.status == 200:
                                return await response.json()
                            else:
                                raise aiohttp.ClientError(f"HTTP {response.status}")
                else:
                    async with session.get(url) as response:
                        if response.status == 200:
                            return await response.json()
                        else:
                            raise aiohttp.ClientError(f"HTTP {response.status}")
                            
            except (aiohttp.ClientError, asyncio.TimeoutError) as e:
                if attempt == max_retries - 1:
                    raise
                await asyncio.sleep(retry_delays[attempt])
                continue
    
    raise RuntimeError("Max retries exceeded")

# Практический пример: загрузка данных с резервированием
async def load_data_with_fallback(
    primary_url: str,
    fallback_urls: list[str]
) -> dict[str, Any]:
    """Загрузка данных с основного и запасных источников."""
    circuit_breaker = CircuitBreaker(failure_threshold=3, reset_timeout=30)
    
    try:
        return await fetch_with_retry(primary_url, circuit_breaker=circuit_breaker)
    except Exception as e:
        print(f"Primary failed: {e}")
    
    # Пробуем запасные источники
    for fallback_url in fallback_urls:
        try:
            return await fetch_with_retry(fallback_url)
        except Exception as e:
            print(f"Fallback {fallback_url} failed: {e}")
            continue
    
    raise RuntimeError("All data sources failed")

2.2. Транзакционные контекстные менеджеры для БД

from typing import TypeVar, Generic
from contextlib import AbstractContextManager
import asyncpg
from asyncio import Lock

T = TypeVar('T')

class DatabaseTransaction(Generic[T]):
    """Транзакционный контекстный менеджер для PostgreSQL."""
    
    def __init__(self, dsn: str):
        self.dsn = dsn
        self.conn: Optional[asyncpg.Connection] = None
        self.transaction: Optional[asyncpg.transaction.Transaction] = None
        self._lock = Lock()
    
    async def __aenter__(self) -> asyncpg.Connection:
        await self._lock.acquire()
        self.conn = await asyncpg.connect(self.dsn)
        self.transaction = self.conn.transaction()
        await self.transaction.start()
        return self.conn
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        try:
            if self.transaction:
                if exc_type is None:
                    await self.transaction.commit()
                    print("Transaction committed successfully")
                else:
                    await self.transaction.rollback()
                    print(f"Transaction rolled back due to: {exc_val}")
        finally:
            if self.conn:
                await self.conn.close()
            self._lock.release()
        
        # Не подавляем исключения
        return False

class ConnectionPoolManager:
    """Менеджер пула соединений с health checks."""
    
    def __init__(self, dsn: str, min_size: int = 10, max_size: int = 100):
        self.dsn = dsn
        self.min_size = min_size
        self.max_size = max_size
        self.pool: Optional[asyncpg.Pool] = None
    
    async def __aenter__(self) -> asyncpg.Pool:
        self.pool = await asyncpg.create_pool(
            self.dsn,
            min_size=self.min_size,
            max_size=self.max_size,
            command_timeout=60,
            max_queries=50000,
            max_inactive_connection_lifetime=300,
            setup=self._setup_connection
        )
        return self.pool
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        if self.pool:
            await self.pool.close()
    
    async def _setup_connection(self, conn: asyncpg.Connection):
        """Настройка соединения при создании."""
        await conn.execute("SET TIME ZONE 'UTC'")
        await conn.execute("SET statement_timeout = '60s'")

# Использование в production
async def process_user_order(user_id: int, order_data: dict) -> dict:
    """Обработка заказа пользователя в транзакции."""
    async with DatabaseTransaction("postgresql://user:pass@localhost/db") as conn:
        # 1. Проверяем наличие пользователя
        user = await conn.fetchrow(
            "SELECT id, balance FROM users WHERE id = $1 FOR UPDATE",
            user_id
        )
        
        if not user:
            raise ValueError("User not found")
        
        # 2. Списываем средства
        new_balance = user['balance'] - order_data['amount']
        if new_balance < 0:
            raise ValueError("Insufficient funds")
        
        await conn.execute(
            "UPDATE users SET balance = $1 WHERE id = $2",
            new_balance, user_id
        )
        
        # 3. Создаем запись о заказе
        order_id = await conn.fetchval("""
            INSERT INTO orders (user_id, amount, status)
            VALUES ($1, $2, 'pending')
            RETURNING id
        """, user_id, order_data['amount'])
        
        # 4. Логируем операцию
        await conn.execute("""
            INSERT INTO audit_log (user_id, action, details)
            VALUES ($1, 'order_created', $2)
        """, user_id, {'order_id': order_id, 'amount': order_data['amount']})
        
        return {'order_id': order_id, 'new_balance': new_balance}

Продвинутые декораторы и метапрограммирование

3.1. Декораторы с зависимостями и кешированием

import functools
import inspect
from typing import Callable, Any, TypeVar, ParamSpec
from datetime import datetime, timedelta
import hashlib
import pickle

P = ParamSpec('P')
R = TypeVar('R')

class CacheConfig:
    """Конфигурация кеширования."""
    
    def __init__(self, ttl: int = 300, max_size: int = 1000):
        self.ttl = ttl
        self.max_size = max_size
        self._cache: dict[str, tuple[Any, datetime]] = {}
    
    def get_key(self, func: Callable, *args, **kwargs) -> str:
        """Генерация ключа кеша на основе функции и аргументов."""
        # Сериализуем аргументы
        args_bytes = pickle.dumps((args, kwargs))
        # Добавляем сигнатуру функции
        func_sig = inspect.signature(func)
        key_data = f"{func.__module__}.{func.__name__}:{func_sig}:{args_bytes}"
        return hashlib.sha256(key_data.encode()).hexdigest()
    
    def get(self, key: str) -> Any:
        """Получение значения из кеша."""
        if key not in self._cache:
            return None
        
        value, timestamp = self._cache[key]
        if datetime.now() - timestamp > timedelta(seconds=self.ttl):
            del self._cache[key]
            return None
        
        return value
    
    def set(self, key: str, value: Any):
        """Установка значения в кеш."""
        # LRU-логика при превышении размера
        if len(self._cache) >= self.max_size:
            # Удаляем самый старый элемент
            oldest_key = min(self._cache.items(), key=lambda x: x[1][1])[0]
            del self._cache[oldest_key]
        
        self._cache[key] = (value, datetime.now())

def cached(cache_config: CacheConfig | None = None):
    """Декоратор кеширования с конфигурацией."""
    if cache_config is None:
        cache_config = CacheConfig()
    
    def decorator(func: Callable[P, R]) -> Callable[P, R]:
        @functools.wraps(func)
        def wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
            key = cache_config.get_key(func, *args, **kwargs)
            cached_value = cache_config.get(key)
            
            if cached_value is not None:
                print(f"Cache hit for {func.__name__}")
                return cached_value
            
            result = func(*args, **kwargs)
            cache_config.set(key, result)
            print(f"Cache set for {func.__name__}")
            return result
        
        return wrapper
    return decorator

def retry(
    max_attempts: int = 3,
    delays: list[float] = None,
    exceptions: tuple = (Exception,)
):
    """Декоратор повторных попыток с экспоненциальной задержкой."""
    if delays is None:
        delays = [1, 2, 4, 8, 16]
    
    def decorator(func: Callable[P, R]) -> Callable[P, R]:
        @functools.wraps(func)
        def wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
            last_exception = None
            
            for attempt in range(max_attempts):
                try:
                    return func(*args, **kwargs)
                except exceptions as e:
                    last_exception = e
                    if attempt == max_attempts - 1:
                        break
                    
                    delay = delays[attempt] if attempt < len(delays) else delays[-1]
                    print(f"Attempt {attempt + 1} failed for {func.__name__}, "
                          f"retrying in {delay}s: {e}")
                    time.sleep(delay)
            
            raise RuntimeError(
                f"Function {func.__name__} failed after {max_attempts} attempts"
            ) from last_exception
        
        return wrapper
    return decorator

def validate_input(
    validators: dict[str, Callable[[Any], bool]] = None
):
    """Декоратор валидации входных параметров."""
    if validators is None:
        validators = {}
    
    def decorator(func: Callable[P, R]) -> Callable[P, R]:
        sig = inspect.signature(func)
        
        @functools.wraps(func)
        def wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
            bound_args = sig.bind(*args, **kwargs)
            bound_args.apply_defaults()
            
            for param_name, value in bound_args.arguments.items():
                if param_name in validators:
                    validator = validators[param_name]
                    if not validator(value):
                        raise ValueError(
                            f"Invalid value for parameter '{param_name}': {value}"
                        )
            
            return func(*args, **kwargs)
        
        return wrapper
    return decorator

# Комбинированный декоратор
@cached(CacheConfig(ttl=600, max_size=100))
@retry(max_attempts=3, exceptions=(ConnectionError, TimeoutError))
@validate_input({
    'user_id': lambda x: isinstance(x, int) and x > 0,
    'amount': lambda x: isinstance(x, (int, float)) and x > 0
})
def process_payment(user_id: int, amount: float) -> dict:
    """Обработка платежа с кешированием, повторными попытками и валидацией."""
    print(f"Processing payment: user_id={user_id}, amount={amount}")
    # Имитация обработки платежа
    time.sleep(0.5)
    return {
        "transaction_id": f"tx_{int(time.time())}_{user_id}",
        "status": "completed",
        "timestamp": datetime.now().isoformat()
    }

3.2. Метапрограммирование: динамическое создание классов

from typing import ClassVar, get_type_hints
from dataclasses import dataclass, field
import json

class ModelMeta(type):
    """Мета-класс для автоматической регистрации моделей."""
    
    _registry: ClassVar[dict[str, type]] = {}
    
    def __new__(mcs, name, bases, namespace):
        # Добавляем поле id если его нет
        if 'id' not in namespace:
            namespace['id'] = field(default=None)
        
        # Создаем класс
        cls = super().__new__(mcs, name, bases, namespace)
        
        # Регистрируем если это не базовый класс
        if name != 'BaseModel':
            mcs._registry[name] = cls
        
        # Добавляем метод валидации
        if 'validate' not in namespace:
            def validate(self) -> list[str]:
                errors = []
                type_hints = get_type_hints(self.__class__)
                for field_name, field_type in type_hints.items():
                    if field_name == 'return':
                        continue
                    value = getattr(self, field_name)
                    if value is not None and not isinstance(value, field_type):
                        errors.append(
                            f"Field '{field_name}' must be {field_type}, "
                            f"got {type(value)}"
                        )
                return errors
            
            cls.validate = validate
        
        return cls
    
    @classmethod
    def get_model(cls, name: str) -> type:
        """Получение модели по имени."""
        return cls._registry.get(name)
    
    @classmethod
    def all_models(cls) -> dict[str, type]:
        """Все зарегистрированные модели."""
        return cls._registry.copy()

@dataclass
class BaseModel(metaclass=ModelMeta):
    """Базовый класс для всех моделей."""
    
    def to_dict(self) -> dict:
        """Конвертация в словарь."""
        return {k: v for k, v in self.__dict__.items() if not k.startswith('_')}
    
    def to_json(self) -> str:
        """Конвертация в JSON."""
        return json.dumps(self.to_dict(), default=str)

# Автоматическое создание моделей
class User(BaseModel):
    username: str
    email: str
    age: int = field(default=0)
    is_active: bool = field(default=True)

class Product(BaseModel):
    name: str
    price: float
    category: str = field(default="uncategorized")
    in_stock: bool = field(default=True)

# Использование
user = User(username="john_doe", email="john@example.com", age=25)
print(user.validate())  # []
print(user.to_json())

# Получение всех моделей
print(f"Registered models: {list(ModelMeta.all_models().keys())}")

Асинхронные паттерны для высоконагруженных систем

4.1. Асинхронные очереди и worker pool

import asyncio
from asyncio import Queue, Semaphore
from typing import Callable, Awaitable, Any
from dataclasses import dataclass
from datetime import datetime
import random

@dataclass
class Task:
    id: str
    data: Any
    priority: int = 0
    created_at: datetime = field(default_factory=datetime.now)
    
    def __lt__(self, other: "Task") -> bool:
        # Для приоритетной очереди
        return self.priority > other.priority

class AsyncWorkerPool:
    """Пул асинхронных воркеров с приоритетной очередью."""
    
    def __init__(
        self,
        worker_count: int,
        handler: Callable[[Task], Awaitable[Any]],
        max_queue_size: int = 1000
    ):
        self.worker_count = worker_count
        self.handler = handler
        self.task_queue: Queue = Queue(maxsize=max_queue_size)
        self.workers: list[asyncio.Task] = []
        self.is_running = False
        self._task_counter = 0
        self._semaphore = Semaphore(worker_count)
        self._metrics = {
            "tasks_processed": 0,
            "tasks_failed": 0,
            "avg_processing_time": 0.0
        }
    
    async def start(self):
        """Запуск воркеров."""
        self.is_running = True
        for i in range(self.worker_count):
            worker = asyncio.create_task(
                self._worker_loop(f"worker-{i}"),
                name=f"worker-{i}"
            )
            self.workers.append(worker)
        print(f"Started {self.worker_count} workers")
    
    async def stop(self):
        """Остановка воркеров."""
        self.is_running = False
        # Ожидаем завершения всех задач
        await self.task_queue.join()
        # Отменяем воркеров
        for worker in self.workers:
            worker.cancel()
        await asyncio.gather(*self.workers, return_exceptions=True)
        print("All workers stopped")
    
    async def submit_task(self, data: Any, priority: int = 0) -> str:
        """Добавление задачи в очередь."""
        task_id = f"task-{self._task_counter}"
        self._task_counter += 1
        
        task = Task(id=task_id, data=data, priority=priority)
        await self.task_queue.put(task)
        return task_id
    
    async def _worker_loop(self, worker_name: str):
        """Цикл обработки задач воркером."""
        while self.is_running:
            try:
                task = await self.task_queue.get()
                
                start_time = datetime.now()
                try:
                    async with self._semaphore:
                        result = await self.handler(task)
                    processing_time = (datetime.now() - start_time).total_seconds()
                    
                    # Обновляем метрики
                    self._metrics["tasks_processed"] += 1
                    total_time = self._metrics["avg_processing_time"] * \
                                (self._metrics["tasks_processed"] - 1)
                    self._metrics["avg_processing_time"] = \
                        (total_time + processing_time) / self._metrics["tasks_processed"]
                    
                    print(f"{worker_name} processed {task.id} in {processing_time:.3f}s")
                    
                except Exception as e:
                    self._metrics["tasks_failed"] += 1
                    print(f"{worker_name} failed to process {task.id}: {e}")
                
                finally:
                    self.task_queue.task_done()
            
            except asyncio.CancelledError:
                break
    
    def get_metrics(self) -> dict:
        """Получение текущих метрик."""
        return self._metrics.copy()

async def example_handler(task: Task) -> str:
    """Пример обработчика задач."""
    await asyncio.sleep(random.uniform(0.1, 0.5))  # Имитация работы
    return f"Processed {task.id} with data: {task.data}"

async def main_worker_pool():
    """Демонстрация работы пула воркеров."""
    pool = AsyncWorkerPool(
        worker_count=5,
        handler=example_handler,
        max_queue_size=100
    )
    
    await pool.start()
    
    # Добавляем задачи
    tasks = []
    for i in range(20):
        task_id = await pool.submit_task(
            data=f"payload-{i}",
            priority=random.randint(1, 10)
        )
        tasks.append(task_id)
    
    # Даем время на обработку
    await asyncio.sleep(2)
    
    # Смотрим метрики
    print(f"Metrics: {pool.get_metrics()}")
    
    await pool.stop()

4.2. Асинхронный Event Bus для микросервисов

from typing import Type, TypeVar, Generic, Callable, Set
from enum import Enum
import asyncio
from dataclasses import dataclass
from abc import ABC, abstractmethod

T = TypeVar('T')

class EventType(Enum):
    USER_CREATED = "user.created"
    ORDER_PLACED = "order.placed"
    PAYMENT_PROCESSED = "payment.processed"
    SYSTEM_ALERT = "system.alert"

@dataclass
class Event(Generic[T]):
    """Базовый класс события."""
    event_type: EventType
    data: T
    timestamp: float = None
    correlation_id: str = None
    
    def __post_init__(self):
        if self.timestamp is None:
            self.timestamp = asyncio.get_event_loop().time()

class EventHandler(ABC, Generic[T]):
    """Абстрактный обработчик событий."""
    
    @abstractmethod
    async def handle(self, event: Event[T]) -> None:
        pass

class EventBus:
    """Асинхронная шина событий."""
    
    def __init__(self):
        self._handlers: dict[EventType, Set[EventHandler]] = {}
        self._queue: asyncio.Queue = Queue(maxsize=10000)
        self._dispatcher_task: asyncio.Task = None
    
    def subscribe(self, event_type: EventType, handler: EventHandler):
        """Подписка на события."""
        if event_type not in self._handlers:
            self._handlers[event_type] = set()
        self._handlers[event_type].add(handler)
    
    def unsubscribe(self, event_type: EventType, handler: EventHandler):
        """Отписка от событий."""
        if event_type in self._handlers:
            self._handlers[event_type].discard(handler)
    
    async def publish(self, event: Event):
        """Публикация события."""
        await self._queue.put(event)
    
    async def start(self):
        """Запуск диспетчера событий."""
        self._dispatcher_task = asyncio.create_task(self._dispatch_events())
    
    async def stop(self):
        """Остановка диспетчера."""
        if self._dispatcher_task:
            self._dispatcher_task.cancel()
            try:
                await self._dispatcher_task
            except asyncio.CancelledError:
                pass
    
    async def _dispatch_events(self):
        """Диспетчер событий."""
        while True:
            try:
                event = await self._queue.get()
                
                if event.event_type in self._handlers:
                    handlers = self._handlers[event.event_type].copy()
                    
                    # Запускаем обработчики конкурентно
                    tasks = []
                    for handler in handlers:
                        task = asyncio.create_task(
                            handler.handle(event),
                            name=f"handler-{handler.__class__.__name__}"
                        )
                        tasks.append(task)
                    
                    # Ожидаем завершения всех обработчиков
                    await asyncio.gather(*tasks, return_exceptions=True)
                
                self._queue.task_done()
                
            except asyncio.CancelledError:
                break
            except Exception as e:
                print(f"Error in event dispatcher: {e}")

# Пример использования
@dataclass
class UserData:
    user_id: int
    username: str
    email: str

class UserCreatedHandler(EventHandler[UserData]):
    """Обработчик создания пользователя."""
    
    async def handle(self, event: Event[UserData]) -> None:
        print(f"Handling user created: {event.data.username}")
        # Отправка приветственного email
        await asyncio.sleep(0.1)
        # Создание профиля
        await asyncio.sleep(0.1)
        print(f"User {event.data.user_id} setup completed")

class AnalyticsHandler(EventHandler[UserData]):
    """Обработчик для аналитики."""
    
    async def handle(self, event: Event[UserData]) -> None:
        print(f"Analytics: new user {event.data.user_id}")
        await asyncio.sleep(0.05)

async def main_event_bus():
    """Демонстрация работы шины событий."""
    bus = EventBus()
    
    # Регистрируем обработчики
    user_handler = UserCreatedHandler()
    analytics_handler = AnalyticsHandler()
    
    bus.subscribe(EventType.USER_CREATED, user_handler)
    bus.subscribe(EventType.USER_CREATED, analytics_handler)
    
    # Запускаем шину
    await bus.start()
    
    # Публикуем события
    for i in range(5):
        user_event = Event[UserData](
            event_type=EventType.USER_CREATED,
            data=UserData(
                user_id=i + 1,
                username=f"user_{i}",
                email=f"user_{i}@example.com"
            ),
            correlation_id=f"corr-{i}"
        )
        await bus.publish(user_event)
        await asyncio.sleep(0.1)
    
    # Даем время на обработку
    await asyncio.sleep(1)
    
    # Останавливаем шину
    await bus.stop()

Заключение

Современный Python в 2026 году — это мощный инструмент для построения сложных, высоконагруженных систем. Ключевые тренды, которые мы рассмотрели:

✅ Итераторы и генераторы перешли от простых циклов к сложным stateful-паттернам обработки потоков данных
✅ Контекстные менеджеры эволюционировали в полноценные системы управления ресурсами с поддержкой транзакций, circuit breakers и health checks
✅ Декораторы стали инструментом для внедрения cross-cutting concerns: кеширования, валидации, retry-логики
✅ Асинхронные паттерны (worker pools, event buses) позволяют строить системы, обрабатывающие миллионы событий в секунду
✅ Метапрограммирование используется для создания DSL, ORM и автоматической кодогенерации

Главная сила Python сегодня — в балансе между производительностью и выразительностью. Вы можете начать с простого скрипта и постепенно, используя те же языковые конструкции, развить его до распределенной системы уровня enterprise.