import functools
import inspect
from typing import Callable, Any, TypeVar, ParamSpec
from datetime import datetime, timedelta
import hashlib
import pickle
P = ParamSpec('P')
R = TypeVar('R')
class CacheConfig:
"""Конфигурация кеширования."""
def __init__(self, ttl: int = 300, max_size: int = 1000):
self.ttl = ttl
self.max_size = max_size
self._cache: dict[str, tuple[Any, datetime]] = {}
def get_key(self, func: Callable, *args, **kwargs) -> str:
"""Генерация ключа кеша на основе функции и аргументов."""
args_bytes = pickle.dumps((args, kwargs))
func_sig = inspect.signature(func)
key_data = f"{func.__module__}.{func.__name__}:{func_sig}:{args_bytes}"
return hashlib.sha256(key_data.encode()).hexdigest()
def get(self, key: str) -> Any:
"""Получение значения из кеша."""
if key not in self._cache:
return None
value, timestamp = self._cache[key]
if datetime.now() - timestamp > timedelta(seconds=self.ttl):
del self._cache[key]
return None
return value
def set(self, key: str, value: Any):
"""Установка значения в кеш."""
if len(self._cache) >= self.max_size:
oldest_key = min(self._cache.items(), key=lambda x: x[1][1])[0]
del self._cache[oldest_key]
self._cache[key] = (value, datetime.now())
def cached(cache_config: CacheConfig | None = None):
"""Декоратор кеширования с конфигурацией."""
if cache_config is None:
cache_config = CacheConfig()
def decorator(func: Callable[P, R]) -> Callable[P, R]:
@functools.wraps(func)
def wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
key = cache_config.get_key(func, *args, **kwargs)
cached_value = cache_config.get(key)
if cached_value is not None:
print(f"Cache hit for {func.__name__}")
return cached_value
result = func(*args, **kwargs)
cache_config.set(key, result)
print(f"Cache set for {func.__name__}")
return result
return wrapper
return decorator
def retry(
max_attempts: int = 3,
delays: list[float] = None,
exceptions: tuple = (Exception,)
):
"""Декоратор повторных попыток с экспоненциальной задержкой."""
if delays is None:
delays = [1, 2, 4, 8, 16]
def decorator(func: Callable[P, R]) -> Callable[P, R]:
@functools.wraps(func)
def wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
last_exception = None
for attempt in range(max_attempts):
try:
return func(*args, **kwargs)
except exceptions as e:
last_exception = e
if attempt == max_attempts - 1:
break
delay = delays[attempt] if attempt < len(delays) else delays[-1]
print(f"Attempt {attempt + 1} failed for {func.__name__}, "
f"retrying in {delay}s: {e}")
time.sleep(delay)
raise RuntimeError(
f"Function {func.__name__} failed after {max_attempts} attempts"
) from last_exception
return wrapper
return decorator
def validate_input(
validators: dict[str, Callable[[Any], bool]] = None
):
"""Декоратор валидации входных параметров."""
if validators is None:
validators = {}
def decorator(func: Callable[P, R]) -> Callable[P, R]:
sig = inspect.signature(func)
@functools.wraps(func)
def wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
bound_args = sig.bind(*args, **kwargs)
bound_args.apply_defaults()
for param_name, value in bound_args.arguments.items():
if param_name in validators:
validator = validators[param_name]
if not validator(value):
raise ValueError(
f"Invalid value for parameter '{param_name}': {value}"
)
return func(*args, **kwargs)
return wrapper
return decorator
@cached(CacheConfig(ttl=600, max_size=100))
@retry(max_attempts=3, exceptions=(ConnectionError, TimeoutError))
@validate_input({
'user_id': lambda x: isinstance(x, int) and x > 0,
'amount': lambda x: isinstance(x, (int, float)) and x > 0
})
def process_payment(user_id: int, amount: float) -> dict:
"""Обработка платежа с кешированием, повторными попытками и валидацией."""
print(f"Processing payment: user_id={user_id}, amount={amount}")
time.sleep(0.5)
return {
"transaction_id": f"tx_{int(time.time())}_{user_id}",
"status": "completed",
"timestamp": datetime.now().isoformat()
}
from typing import ClassVar, get_type_hints
from dataclasses import dataclass, field
import json
class ModelMeta(type):
"""Мета-класс для автоматической регистрации моделей."""
_registry: ClassVar[dict[str, type]] = {}
def __new__(mcs, name, bases, namespace):
if 'id' not in namespace:
namespace['id'] = field(default=None)
cls = super().__new__(mcs, name, bases, namespace)
if name != 'BaseModel':
mcs._registry[name] = cls
if 'validate' not in namespace:
def validate(self) -> list[str]:
errors = []
type_hints = get_type_hints(self.__class__)
for field_name, field_type in type_hints.items():
if field_name == 'return':
continue
value = getattr(self, field_name)
if value is not None and not isinstance(value, field_type):
errors.append(
f"Field '{field_name}' must be {field_type}, "
f"got {type(value)}"
)
return errors
cls.validate = validate
return cls
@classmethod
def get_model(cls, name: str) -> type:
"""Получение модели по имени."""
return cls._registry.get(name)
@classmethod
def all_models(cls) -> dict[str, type]:
"""Все зарегистрированные модели."""
return cls._registry.copy()
@dataclass
class BaseModel(metaclass=ModelMeta):
"""Базовый класс для всех моделей."""
def to_dict(self) -> dict:
"""Конвертация в словарь."""
return {k: v for k, v in self.__dict__.items() if not k.startswith('_')}
def to_json(self) -> str:
"""Конвертация в JSON."""
return json.dumps(self.to_dict(), default=str)
class User(BaseModel):
username: str
email: str
age: int = field(default=0)
is_active: bool = field(default=True)
class Product(BaseModel):
name: str
price: float
category: str = field(default="uncategorized")
in_stock: bool = field(default=True)
user = User(username="john_doe", email="john@example.com", age=25)
print(user.validate())
print(user.to_json())
print(f"Registered models: {list(ModelMeta.all_models().keys())}")
import asyncio
from asyncio import Queue, Semaphore
from typing import Callable, Awaitable, Any
from dataclasses import dataclass
from datetime import datetime
import random
@dataclass
class Task:
id: str
data: Any
priority: int = 0
created_at: datetime = field(default_factory=datetime.now)
def __lt__(self, other: "Task") -> bool:
return self.priority > other.priority
class AsyncWorkerPool:
"""Пул асинхронных воркеров с приоритетной очередью."""
def __init__(
self,
worker_count: int,
handler: Callable[[Task], Awaitable[Any]],
max_queue_size: int = 1000
):
self.worker_count = worker_count
self.handler = handler
self.task_queue: Queue = Queue(maxsize=max_queue_size)
self.workers: list[asyncio.Task] = []
self.is_running = False
self._task_counter = 0
self._semaphore = Semaphore(worker_count)
self._metrics = {
"tasks_processed": 0,
"tasks_failed": 0,
"avg_processing_time": 0.0
}
async def start(self):
"""Запуск воркеров."""
self.is_running = True
for i in range(self.worker_count):
worker = asyncio.create_task(
self._worker_loop(f"worker-{i}"),
name=f"worker-{i}"
)
self.workers.append(worker)
print(f"Started {self.worker_count} workers")
async def stop(self):
"""Остановка воркеров."""
self.is_running = False
await self.task_queue.join()
for worker in self.workers:
worker.cancel()
await asyncio.gather(*self.workers, return_exceptions=True)
print("All workers stopped")
async def submit_task(self, data: Any, priority: int = 0) -> str:
"""Добавление задачи в очередь."""
task_id = f"task-{self._task_counter}"
self._task_counter += 1
task = Task(id=task_id, data=data, priority=priority)
await self.task_queue.put(task)
return task_id
async def _worker_loop(self, worker_name: str):
"""Цикл обработки задач воркером."""
while self.is_running:
try:
task = await self.task_queue.get()
start_time = datetime.now()
try:
async with self._semaphore:
result = await self.handler(task)
processing_time = (datetime.now() - start_time).total_seconds()
self._metrics["tasks_processed"] += 1
total_time = self._metrics["avg_processing_time"] * \
(self._metrics["tasks_processed"] - 1)
self._metrics["avg_processing_time"] = \
(total_time + processing_time) / self._metrics["tasks_processed"]
print(f"{worker_name} processed {task.id} in {processing_time:.3f}s")
except Exception as e:
self._metrics["tasks_failed"] += 1
print(f"{worker_name} failed to process {task.id}: {e}")
finally:
self.task_queue.task_done()
except asyncio.CancelledError:
break
def get_metrics(self) -> dict:
"""Получение текущих метрик."""
return self._metrics.copy()
async def example_handler(task: Task) -> str:
"""Пример обработчика задач."""
await asyncio.sleep(random.uniform(0.1, 0.5))
return f"Processed {task.id} with data: {task.data}"
async def main_worker_pool():
"""Демонстрация работы пула воркеров."""
pool = AsyncWorkerPool(
worker_count=5,
handler=example_handler,
max_queue_size=100
)
await pool.start()
tasks = []
for i in range(20):
task_id = await pool.submit_task(
data=f"payload-{i}",
priority=random.randint(1, 10)
)
tasks.append(task_id)
await asyncio.sleep(2)
print(f"Metrics: {pool.get_metrics()}")
await pool.stop()
4.2. Асинхронный Event Bus для микросервисов
from typing import Type, TypeVar, Generic, Callable, Set
from enum import Enum
import asyncio
from dataclasses import dataclass
from abc import ABC, abstractmethod
T = TypeVar('T')
class EventType(Enum):
USER_CREATED = "user.created"
ORDER_PLACED = "order.placed"
PAYMENT_PROCESSED = "payment.processed"
SYSTEM_ALERT = "system.alert"
@dataclass
class Event(Generic[T]):
"""Базовый класс события."""
event_type: EventType
data: T
timestamp: float = None
correlation_id: str = None
def __post_init__(self):
if self.timestamp is None:
self.timestamp = asyncio.get_event_loop().time()
class EventHandler(ABC, Generic[T]):
"""Абстрактный обработчик событий."""
@abstractmethod
async def handle(self, event: Event[T]) -> None:
pass
class EventBus:
"""Асинхронная шина событий."""
def __init__(self):
self._handlers: dict[EventType, Set[EventHandler]] = {}
self._queue: asyncio.Queue = Queue(maxsize=10000)
self._dispatcher_task: asyncio.Task = None
def subscribe(self, event_type: EventType, handler: EventHandler):
"""Подписка на события."""
if event_type not in self._handlers:
self._handlers[event_type] = set()
self._handlers[event_type].add(handler)
def unsubscribe(self, event_type: EventType, handler: EventHandler):
"""Отписка от событий."""
if event_type in self._handlers:
self._handlers[event_type].discard(handler)
async def publish(self, event: Event):
"""Публикация события."""
await self._queue.put(event)
async def start(self):
"""Запуск диспетчера событий."""
self._dispatcher_task = asyncio.create_task(self._dispatch_events())
async def stop(self):
"""Остановка диспетчера."""
if self._dispatcher_task:
self._dispatcher_task.cancel()
try:
await self._dispatcher_task
except asyncio.CancelledError:
pass
async def _dispatch_events(self):
"""Диспетчер событий."""
while True:
try:
event = await self._queue.get()
if event.event_type in self._handlers:
handlers = self._handlers[event.event_type].copy()
tasks = []
for handler in handlers:
task = asyncio.create_task(
handler.handle(event),
name=f"handler-{handler.__class__.__name__}"
)
tasks.append(task)
await asyncio.gather(*tasks, return_exceptions=True)
self._queue.task_done()
except asyncio.CancelledError:
break
except Exception as e:
print(f"Error in event dispatcher: {e}")
@dataclass
class UserData:
user_id: int
username: str
email: str
class UserCreatedHandler(EventHandler[UserData]):
"""Обработчик создания пользователя."""
async def handle(self, event: Event[UserData]) -> None:
print(f"Handling user created: {event.data.username}")
await asyncio.sleep(0.1)
await asyncio.sleep(0.1)
print(f"User {event.data.user_id} setup completed")
class AnalyticsHandler(EventHandler[UserData]):
"""Обработчик для аналитики."""
async def handle(self, event: Event[UserData]) -> None:
print(f"Analytics: new user {event.data.user_id}")
await asyncio.sleep(0.05)
async def main_event_bus():
"""Демонстрация работы шины событий."""
bus = EventBus()
user_handler = UserCreatedHandler()
analytics_handler = AnalyticsHandler()
bus.subscribe(EventType.USER_CREATED, user_handler)
bus.subscribe(EventType.USER_CREATED, analytics_handler)
await bus.start()
for i in range(5):
user_event = Event[UserData](
event_type=EventType.USER_CREATED,
data=UserData(
user_id=i + 1,
username=f"user_{i}",
email=f"user_{i}@example.com"
),
correlation_id=f"corr-{i}"
)
await bus.publish(user_event)
await asyncio.sleep(0.1)
await asyncio.sleep(1)
await bus.stop()
Заключение
Современный Python в 2026 году — это мощный инструмент для построения сложных, высоконагруженных систем. Ключевые тренды, которые мы рассмотрели:
✅ Итераторы и генераторы перешли от простых циклов к сложным stateful-паттернам обработки потоков данных
✅ Контекстные менеджеры эволюционировали в полноценные системы управления ресурсами с поддержкой транзакций, circuit breakers и health checks
✅ Декораторы стали инструментом для внедрения cross-cutting concerns: кеширования, валидации, retry-логики
✅ Асинхронные паттерны (worker pools, event buses) позволяют строить системы, обрабатывающие миллионы событий в секунду
✅ Метапрограммирование используется для создания DSL, ORM и автоматической кодогенерации
Главная сила Python сегодня — в балансе между производительностью и выразительностью. Вы можете начать с простого скрипта и постепенно, используя те же языковые конструкции, развить его до распределенной системы уровня enterprise.