Оптимизация производительности Python-приложений: профайлеры и техники

Введение

Python ценят за скорость разработки и читаемость, но его производительность иногда становится узким местом. Однако современный Python — это не медленный язык. С правильными инструментами анализа и оптимизации можно ускорить приложения в десятки раз. В этом посте мы разберем, как найти «узкие места» с помощью профайлеров и какие техники оптимизации действительно работают на практике.


Инструменты профилирования: что, где и почему тормозит

Прежде чем оптимизировать, нужно точно измерить. Профилирование покажет, какая часть кода отнимает больше всего времени или памяти.

1.1. Профилирование времени выполнения

cProfile — стандартный профайлер:

import cProfile
import pstats
from io import StringIO

def slow_function():
    total = 0
    for i in range(1000000):
        total += i * i
    return total

def fast_function():
    # Более эффективная реализация
    n = 1000000
    return n * (n + 1) * (2 * n + 1) // 6

# Запуск профилирования
pr = cProfile.Profile()
pr.enable()

# Код для профилирования
for _ in range(10):
    slow_function()
    fast_function()

pr.disable()

# Анализ результатов
s = StringIO()
ps = pstats.Stats(pr, stream=s).sort_stats('cumulative')
ps.print_stats(20)  # Показать топ-20 функций
print(s.getvalue())

# Вывод покажет:
# ncalls  tottime  percall  cumtime  percall filename:lineno(function)
# 10     0.380    0.038    0.380    0.038   test.py:4(slow_function)
# 10     0.000    0.000    0.000    0.000   test.py:11(fast_function)

line_profiler — построчное профилирование:

# Установка: pip install line_profiler
from line_profiler import LineProfiler

def process_data(data):
    result = []
    for item in data:          # Строка 1
        if item % 2 == 0:      # Строка 2
            result.append(item ** 2)  # Строка 3
        else:                  # Строка 4
            result.append(item ** 3)  # Строка 5
    return result              # Строка 6

# Создаем профайлер
profiler = LineProfiler()
profiler.add_function(process_data)  # Указываем функцию для профилирования

# Обертываем функцию
wrapped_func = profiler(process_data)

# Запускаем и получаем отчет
result = wrapped_func(range(100000))
profiler.print_stats()

# Вывод:
# Line #   Hits    Time  Per Hit   % Time  Line Contents
# ======================================================
#      1                                def process_data(data):
#      2    100001  25000    0.3     13.2      result = []
#      3  10000000  750000    7.5    39.5      for item in data:
#      4  10000000  600000    6.0    31.6          if item % 2 == 0:
#      5   5000000  300000    6.0    15.8              result.append(item ** 2)
#      6                                             else:
#      7   5000000  200000    4.0    10.5              result.append(item ** 3)
#      8      10000    5000    0.5      2.6      return result

Вывод: Видим, что 39.5% времени тратится на итерацию по списку, а проверка условия — 31.6%. Оптимизируем.

1.2. Профилирование памяти

memory_profiler — отслеживание использования памяти:

# Установка: pip install memory_profiler
# Запуск: python -m memory_profiler script.py

from memory_profiler import profile
import numpy as np

@profile
def memory_intensive_operation():
    # Плохой вариант: список Python
    data_list = [float(i) for i in range(1000000)]  # ~8.0 MB
    result_list = [x * 2 for x in data_list]        # Еще ~8.0 MB
    
    # Хороший вариант: NumPy array
    data_np = np.arange(1000000, dtype=np.float64)  # ~8.0 MB
    result_np = data_np * 2                         # ~8.0 MB (но вычисления в C)
    
    # Очень плохой вариант: копирование
    copied_data = data_list[:]                      # Еще ~8.0 MB
    copied_data.extend(result_list)                 # Еще ~8.0 MB
    
    return result_np

if __name__ == "__main__":
    memory_intensive_operation()

# Вывод memory_profiler:
# Line #    Mem usage    Increment  Occurrences   Line Contents
# =============================================================
#      4     38.2 MiB     38.2 MiB           1   @profile
#      5                                         def memory_intensive_operation():
#      6     46.0 MiB      7.8 MiB           1       data_list = [float(i) for i in range(1000000)]
#      7     53.8 MiB      7.8 MiB           1       result_list = [x * 2 for x in data_list]
#      8     61.6 MiB      7.8 MiB           1       data_np = np.arange(1000000, dtype=np.float64)
#      9     69.4 MiB      7.8 MiB           1       result_np = data_np * 2
#     10     77.2 MiB      7.8 MiB           1       copied_data = data_list[:]
#     11     85.0 MiB      7.8 MiB           1       copied_data.extend(result_list)
#     12     85.0 MiB      0.0 MiB           1       return result_np

tracemalloc — встроенный трекер памяти:

import tracemalloc
import pandas as pd

def load_and_process_data():
    # Начинаем отслеживание
    tracemalloc.start()
    
    # Снимок до операции
    snapshot1 = tracemalloc.take_snapshot()
    
    # Операция, потребляющая память
    df = pd.DataFrame([{'id': i, 'value': i * 2} for i in range(100000)])
    filtered = df[df['value'] > 1000]
    grouped = filtered.groupby('id').sum()
    
    # Снимок после операции
    snapshot2 = tracemalloc.take_snapshot()
    
    # Анализ разницы
    top_stats = snapshot2.compare_to(snapshot1, 'lineno')
    
    print("[Топ-5 самых прожорливых строк]:")
    for stat in top_stats[:5]:
        print(f"{stat.traceback.format()[-1]}: {stat.size / 1024:.2f} KB")
    
    tracemalloc.stop()
    return grouped

load_and_process_data()

1.3. Визуализация результатов профилирования

SnakeViz — интерактивная визуализация cProfile:

# Установка: pip install snakeviz
# Запуск профилирования:
python -m cProfile -o profile_stats.prof my_script.py
# Визуализация:
snakeviz profile_stats.prof

py-spy — самплинг-профайлер без изменения кода:

# Установка: pip install py-spy
# Запуск для работающего процесса:
py-spy top --pid 12345
# Или запись flamegraph:
py-spy record -o profile.svg --pid 12345

Техники оптимизации производительности

2.1. Алгоритмическая оптимизация

Пример: Поиск пересечения множеств

# Медленно: O(n*m)
def slow_intersection(list1, list2):
    result = []
    for x in list1:          # O(n)
        if x in list2:       # O(m) для списка
            result.append(x) # O(1)*
    return result            # Итого: O(n*m)

# Быстро: O(n+m) с использованием множеств
def fast_intersection(list1, list2):
    set2 = set(list2)        # O(m)
    return [x for x in list1 if x in set2]  # O(n) * O(1)

# Очень быстро: используем встроенные операции
def faster_intersection(list1, list2):
    return list(set(list1) & set(list2))

# Тестирование
import time
list1 = list(range(10000))
list2 = list(range(5000, 15000))

for func in [slow_intersection, fast_intersection, faster_intersection]:
    start = time.time()
    result = func(list1, list2)
    elapsed = time.time() - start
    print(f"{func.__name__}: {len(result)} элементов, {elapsed:.4f} сек")

2.2. Использование эффективных структур данных

collections.defaultdict vs обычный dict:

from collections import defaultdict

# Неоптимально: проверка ключа
def count_words_naive(words):
    counter = {}
    for word in words:
        if word in counter:          # Проверка каждый раз
            counter[word] += 1
        else:
            counter[word] = 1
    return counter

# Оптимально: defaultdict
def count_words_fast(words):
    counter = defaultdict(int)       # Значение по умолчанию: 0
    for word in words:
        counter[word] += 1           # Не нужно проверять наличие
    return dict(counter)

# Еще лучше: Counter
from collections import Counter
def count_words_best(words):
    return dict(Counter(words))

array.array для числовых данных:

import array
import sys

# Список Python для чисел
python_list = list(range(1000000))
print(f"Размер списка: {sys.getsizeof(python_list) / 1024 / 1024:.2f} MB")

# array.array (типизированный массив)
typed_array = array.array('I', range(1000000))  # 'I' = unsigned int
print(f"Размер array: {sys.getsizeof(typed_array) / 1024 / 1024:.2f} MB")

# Операции быстрее и память экономится
sum_list = sum(python_list)          # Медленнее, обход Python объектов
sum_array = sum(typed_array)         # Быстрее, нативные типы

2.3. Векторизация с NumPy/Numba

NumPy для числовых операций:

import numpy as np
import time

# Чистый Python
def python_vector_operation(size=1000000):
    a = list(range(size))
    b = list(range(size))
    result = []
    for i in range(size):
        result.append(a[i] * 2 + b[i] ** 2)
    return result

# NumPy
def numpy_vector_operation(size=1000000):
    a = np.arange(size)
    b = np.arange(size)
    return a * 2 + b ** 2  # Векторизованная операция в C

# Сравнение
start = time.time()
python_result = python_vector_operation(100000)
python_time = time.time() - start

start = time.time()
numpy_result = numpy_vector_operation(100000)
numpy_time = time.time() - start

print(f"Python: {python_time:.4f} сек")
print(f"NumPy: {numpy_time:.4f} сек")
print(f"Ускорение: {python_time/numpy_time:.1f}x")

Numba для JIT-компиляции:

from numba import jit
import numpy as np

# Обычная функция Python
def monte_carlo_pi(n):
    count = 0
    for _ in range(n):
        x = np.random.random()
        y = np.random.random()
        if x*x + y*y <= 1.0:
            count += 1
    return 4.0 * count / n

# JIT-компилированная версия
@jit(nopython=True)
def monte_carlo_pi_numba(n):
    count = 0
    for _ in range(n):
        x = np.random.random()
        y = np.random.random()
        if x*x + y*y <= 1.0:
            count += 1
    return 4.0 * count / n

# Тестирование
n = 1000000
%timeit monte_carlo_pi(n)         # ~200 ms
%timeit monte_carlo_pi_numba(n)   # ~15 ms (в 13 раз быстрее!)

2.4. Кеширование и мемоизация

functools.lru_cache:

from functools import lru_cache
import time

# Рекурсивный расчет чисел Фибоначчи без кеширования
def fib(n):
    if n < 2:
        return n
    return fib(n-1) + fib(n-2)  # Экспоненциальная сложность

# С кешированием
@lru_cache(maxsize=None)
def fib_cached(n):
    if n < 2:
        return n
    return fib_cached(n-1) + fib_cached(n-2)

# Сравнение
start = time.time()
result1 = fib(35)
time1 = time.time() - start

start = time.time()
result2 = fib_cached(35)
time2 = time.time() - start

print(f"Без кеша: {time1:.2f} сек, результат: {result1}")
print(f"С кешем: {time2:.4f} сек, результат: {result2}")
print(f"Ускорение: {time1/time2:.0f}x")

Самодельный кеш для дорогих вычислений:

class DataProcessor:
    def __init__(self):
        self._cache = {}
        self._hits = 0
        self._misses = 0
    
    def process(self, data_id, expensive_computation):
        """Кеширует результат дорогой операции."""
        if data_id in self._cache:
            self._hits += 1
            return self._cache[data_id]
        
        self._misses += 1
        result = expensive_computation(data_id)
        self._cache[data_id] = result
        return result
    
    def cache_stats(self):
        return {"hits": self._hits, "misses": self._misses, 
                "hit_rate": self._hits/(self._hits+self._misses) if (self._hits+self._misses) > 0 else 0}

2.5. Оптимизация ввода-вывода

Буферизация операций с файлами:

# Медленно: много маленьких операций
def write_slow(filename, lines):
    with open(filename, 'w') as f:
        for line in lines:
            f.write(line + '\n')  # Каждая запись = системный вызов

# Быстро: буферизация
def write_fast(filename, lines):
    with open(filename, 'w', buffering=8192) as f:  # 8KB буфер
        # Или собираем все в одну строку
        content = '\n'.join(lines)
        f.write(content)

# Еще лучше для больших файлов: chunking
def write_large_file(filename, data_generator, chunk_size=10000):
    with open(filename, 'w', buffering=8192) as f:
        chunk = []
        for item in data_generator:
            chunk.append(str(item))
            if len(chunk) >= chunk_size:
                f.write('\n'.join(chunk) + '\n')
                chunk = []
        if chunk:
            f.write('\n'.join(chunk))

Асинхронный I/O с asyncio:

import asyncio
import aiohttp
import time

# Синхронные запросы (медленно)
def sync_fetch(urls):
    import requests
    results = []
    for url in urls:
        results.append(requests.get(url).text[:100])
    return results

# Асинхронные запросы (быстро)
async def async_fetch(session, url):
    async with session.get(url) as response:
        return (await response.text())[:100]

async def main(urls):
    async with aiohttp.ClientSession() as session:
        tasks = [async_fetch(session, url) for url in urls]
        return await asyncio.gather(*tasks)

# Сравнение
urls = ['https://httpbin.org/delay/1'] * 10  # 10 запросов по 1 секунде

# Синхронно: ~10 секунд
# Асинхронно: ~1 секунда

2.6. Оптимизация баз данных

Пакетные операции в Django ORM:

# Медленно: N+1 запрос
def create_users_slow(usernames):
    for username in usernames:
        User.objects.create(username=username)  # Каждый create = отдельный INSERT

# Быстро: bulk_create
from django.db import transaction

def create_users_fast(usernames):
    users = [User(username=username) for username in usernames]
    User.objects.bulk_create(users)  # Один запрос с множеством VALUES
    
# select_related и prefetch_related для оптимизации связанных объектов
def get_articles_with_authors():
    # Плохо: N+1 запрос для авторов
    articles = Article.objects.all()
    for article in articles:
        print(article.author.name)  # Отдельный запрос для каждого автора
    
    # Хорошо: один JOIN
    articles = Article.objects.select_related('author').all()
    for article in articles:
        print(article.author.name)  # Автор уже загружен

# Оптимизация сложных запросов через values() и only()
def get_article_stats():
    # Загружаем только нужные поля
    return (Article.objects
            .filter(published=True)
            .values('category', 'author_id')
            .annotate(count=Count('id'))
            .order_by('-count'))

Оптимизация для веб-приложений

3.1. Кеширование в Django

# settings.py
CACHES = {
    'default': {
        'BACKEND': 'django.core.cache.backends.redis.RedisCache',
        'LOCATION': 'redis://127.0.0.1:6379/1',
        'TIMEOUT': 300,  # 5 минут
        'OPTIONS': {
            'CLIENT_CLASS': 'django_redis.client.DefaultClient',
            'COMPRESSOR': 'django_redis.compressors.zlib.ZlibCompressor',
        }
    }
}

# views.py
from django.views.decorators.cache import cache_page
from django.core.cache import cache

# Кеширование всей страницы
@cache_page(60 * 15)  # 15 минут
def expensive_view(request):
    # Сложные вычисления
    return render(...)

# Кеширование фрагментов
def product_detail(request, product_id):
    context = cache.get(f'product_{product_id}')
    if not context:
        # Дорогостоящая подготовка данных
        product = get_object_or_404(Product, id=product_id)
        reviews = product.reviews.select_related('user')
        similar = product.get_similar_products()
        
        context = {
            'product': product,
            'reviews': reviews,
            'similar': similar,
        }
        cache.set(f'product_{product_id}', context, timeout=300)
    
    return render(request, 'product_detail.html', context)

3.2. Оптимизация сериализации JSON

import json
import ujson  # pip install ujson
import orjson  # pip install orjson
import time

data = [{'id': i, 'value': 'x' * 100} for i in range(10000)]

# Стандартный json
start = time.time()
json_str = json.dumps(data)
json.loads(json_str)
print(f"Стандартный json: {time.time() - start:.3f} сек")

# ujson (быстрее)
start = time.time()
ujson_str = ujson.dumps(data)
ujson.loads(ujson_str)
print(f"ujson: {time.time() - start:.3f} сек")

# orjson (самый быстрый, поддерживает datetime)
start = time.time()
orjson_str = orjson.dumps(data)
orjson.loads(orjson_str)
print(f"orjson: {time.time() - start:.3f} сек")

Вывод

Оптимизация производительности Python-приложений — это системный процесс, а не случайные правки.

Современный Python предоставляет все инструменты для создания высокопроизводительных приложений. Часто достаточно нескольких целенаправленных оптимизаций, чтобы ускорить код в десятки раз.