Многопоточность в Java: полное практическое руководство

Введение

Многопоточность в Java — это не просто «фича» языка, а фундаментальная технология, определяющая производительность, отзывчивость и масштабируемость современных приложений.


Создание потоков: Thread и ExecutorService

1.1. Базовые способы создания потоков

// Способ 1: Наследование от Thread
class MyThread extends Thread {
    @Override
    public void run() {
        System.out.println("Поток " + Thread.currentThread().getName() + " выполняется");
    }
}
MyThread thread1 = new MyThread();
thread1.start();

// Способ 2: Реализация Runnable
Runnable task = () -> {
    System.out.println("Задача выполняется в потоке: " + Thread.currentThread().getName());
};
Thread thread2 = new Thread(task);
thread2.start();

// Способ 3: Callable с возвращаемым значением
Callable<Integer> callableTask = () -> {
    Thread.sleep(1000);
    return 42;
};
Future<Integer> future = executor.submit(callableTask);
Integer result = future.get(); // Блокирующий вызов
1.2. Executor Framework (Java 5+)
// Виды пулов потоков
ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(4);
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
ExecutorService scheduledExecutor = Executors.newScheduledThreadPool(2);

// Кастомный ThreadFactory
ThreadFactory customFactory = r -> {
    Thread thread = new Thread(r);
    thread.setDaemon(true);
    thread.setPriority(Thread.MAX_PRIORITY);
    thread.setName("CustomThread-" + thread.getId());
    return thread;
};
ExecutorService customExecutor = Executors.newFixedThreadPool(4, customFactory);

// Запуск задач
List<Callable<String>> tasks = Arrays.asList(
    () -> "Task1",
    () -> "Task2",
    () -> "Task3"
);
List<Future<String>> futures = executor.invokeAll(tasks);

Синхронизация и блокировки

2.1. Synchronized блоки

// Синхронизация на объекте
public class Counter {
    private int count = 0;
    
    // Синхронизированный метод
    public synchronized void increment() {
        count++;
    }
    
    // Синхронизированный блок
    public void decrement() {
        synchronized (this) {
            count--;
        }
    }
    
    // Статическая синхронизация
    public static synchronized void staticMethod() {
        // Синхронизация на классе (Counter.class)
    }
}

2.2. ReentrantLock и ReadWriteLock

public class ThreadSafeCache {
    private final Map<String, Object> cache = new HashMap<>();
    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
    private final Lock readLock = lock.readLock();
    private final Lock writeLock = lock.writeLock();
    
    public Object get(String key) {
        readLock.lock();
        try {
            return cache.get(key);
        } finally {
            readLock.unlock();
        }
    }
    
    public void put(String key, Object value) {
        writeLock.lock();
        try {
            cache.put(key, value);
        } finally {
            writeLock.unlock();
        }
    }
    
    // TryLock с таймаутом
    public boolean tryPerformOperation() {
        if (writeLock.tryLock(100, TimeUnit.MILLISECONDS)) {
            try {
                // Критическая секция
                return true;
            } finally {
                writeLock.unlock();
            }
        }
        return false;
    }
}

2.3. StampedLock (Java 8+)

public class Point {
    private double x, y;
    private final StampedLock sl = new StampedLock();
    
    // Оптимистичная блокировка чтения
    public double distanceFromOrigin() {
        long stamp = sl.tryOptimisticRead();
        double currentX = x, currentY = y;
        if (!sl.validate(stamp)) {
            stamp = sl.readLock();
            try {
                currentX = x;
                currentY = y;
            } finally {
                sl.unlockRead(stamp);
            }
        }
        return Math.sqrt(currentX * currentX + currentY * currentY);
    }
    
    // Апгрейд блокировки
    public void moveIfAtOrigin(double newX, double newY) {
        long stamp = sl.readLock();
        try {
            while (x == 0.0 && y == 0.0) {
                long ws = sl.tryConvertToWriteLock(stamp);
                if (ws != 0L) {
                    stamp = ws;
                    x = newX;
                    y = newY;
                    break;
                } else {
                    sl.unlockRead(stamp);
                    stamp = sl.writeLock();
                }
            }
        } finally {
            sl.unlock(stamp);
        }
    }
}

Concurrent коллекции

3.1. Thread-safe коллекции

// ConcurrentHashMap
ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();
map.put("key1", 1);
map.compute("key1", (k, v) -> v == null ? 1 : v + 1);
map.computeIfAbsent("key2", k -> 42);
map.forEach(2, (k, v) -> System.out.println(k + "=" + v)); // Параллельное выполнение

// ConcurrentLinkedQueue
ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<>();
queue.offer("element1");
queue.offer("element2");
String element = queue.poll();

// CopyOnWriteArrayList
CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<>();
list.add("item1");
list.addIfAbsent("item1"); // Не добавит, так как уже есть

// BlockingQueue варианты
BlockingQueue<String> arrayBlockingQueue = new ArrayBlockingQueue<>(100);
BlockingQueue<String> linkedBlockingQueue = new LinkedBlockingQueue<>();
BlockingQueue<String> priorityBlockingQueue = new PriorityBlockingQueue<>();
BlockingQueue<String> synchronousQueue = new SynchronousQueue<>();
BlockingQueue<String> delayQueue = new DelayQueue<>();

3.2. Продвинутые паттерны с ConcurrentHashMap

public class ConcurrentCache<K, V> {
    private final ConcurrentHashMap<K, V> map = new ConcurrentHashMap<>();
    private final ConcurrentHashMap<K, Long> timestamps = new ConcurrentHashMap<>();
    private final ScheduledExecutorService cleaner = Executors.newScheduledThreadPool(1);
    
    public ConcurrentCache(long ttlMillis) {
        cleaner.scheduleAtFixedRate(this::evictExpired, 
            ttlMillis, ttlMillis, TimeUnit.MILLISECONDS);
    }
    
    public V get(K key) {
        V value = map.get(key);
        if (value != null) {
            timestamps.put(key, System.currentTimeMillis());
        }
        return value;
    }
    
    public void put(K key, V value) {
        map.put(key, value);
        timestamps.put(key, System.currentTimeMillis());
    }
    
    private void evictExpired() {
        long now = System.currentTimeMillis();
        timestamps.forEach((key, timestamp) -> {
            if (now - timestamp > 60000) { // 60 секунд TTL
                map.remove(key);
                timestamps.remove(key);
            }
        });
    }
}

Atomic переменные и CAS операции

4.1. Классы java.util.concurrent.atomic

AtomicInteger atomicInt = new AtomicInteger(0);
atomicInt.incrementAndGet();
atomicInt.addAndGet(5);
atomicInt.updateAndGet(x -> x * 2);

AtomicLong atomicLong = new AtomicLong();
AtomicBoolean atomicBool = new AtomicBoolean(true);

// Atomic массивы
AtomicIntegerArray atomicArray = new AtomicIntegerArray(10);
atomicArray.set(0, 42);
atomicArray.getAndAdd(0, 10);

// AtomicReference
AtomicReference<String> atomicRef = new AtomicReference<>("initial");
atomicRef.compareAndSet("initial", "updated"); // CAS операция

// AtomicFieldUpdater
public class Account {
    private volatile int balance;
    private static final AtomicIntegerFieldUpdater<Account> updater =
        AtomicIntegerFieldUpdater.newUpdater(Account.class, "balance");
    
    public boolean deposit(int amount) {
        return updater.compareAndSet(this, balance, balance + amount);
    }
}

// Accumulator и Adder для высококонкурентных сценариев
LongAdder adder = new LongAdder(); // Эффективнее AtomicLong при высокой конкуренции
IntAccumulator accumulator = new IntAccumulator(Integer::max, Integer.MIN_VALUE);

4.2. Пользовательские CAS операции

public class ConcurrentStack<T> {
    private static class Node<T> {
        final T value;
        Node<T> next;
        Node(T value) { this.value = value; }
    }
    
    private AtomicReference<Node<T>> top = new AtomicReference<>();
    
    public void push(T value) {
        Node<T> newHead = new Node<>(value);
        Node<T> oldHead;
        do {
            oldHead = top.get();
            newHead.next = oldHead;
        } while (!top.compareAndSet(oldHead, newHead));
    }
    
    public T pop() {
        Node<T> oldHead;
        Node<T> newHead;
        do {
            oldHead = top.get();
            if (oldHead == null) return null;
            newHead = oldHead.next;
        } while (!top.compareAndSet(oldHead, newHead));
        return oldHead.value;
    }
}

Паттерны и фьючерсы

5.1. CompletableFuture (Java 8+)

// Создание CompletableFuture
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
    Thread.sleep(1000);
    return "Результат 1";
});

CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
    Thread.sleep(500);
    return "Результат 2";
});

// Комбинация фьючерсов
CompletableFuture<String> combined = future1
    .thenApply(result -> result + " обработан")
    .thenCombine(future2, (r1, r2) -> r1 + " + " + r2)
    .thenCompose(result -> CompletableFuture.supplyAsync(() -> result.toUpperCase()))
    .exceptionally(ex -> "Ошибка: " + ex.getMessage());

// Параллельное выполнение
CompletableFuture<Void> allFutures = CompletableFuture.allOf(future1, future2);
CompletableFuture<Object> anyFuture = CompletableFuture.anyOf(future1, future2);

// Таймауты (Java 9+)
CompletableFuture<String> withTimeout = future1
    .completeOnTimeout("Значение по умолчанию", 2, TimeUnit.SECONDS)
    .orTimeout(3, TimeUnit.SECONDS);

5.2. ForkJoinPool и RecursiveTask

public class ArraySumTask extends RecursiveTask<Long> {
    private final int[] array;
    private final int start;
    private final int end;
    private static final int THRESHOLD = 10000;
    
    public ArraySumTask(int[] array, int start, int end) {
        this.array = array;
        this.start = start;
        this.end = end;
    }
    
    @Override
    protected Long compute() {
        if (end - start <= THRESHOLD) {
            long sum = 0;
            for (int i = start; i < end; i++) {
                sum += array[i];
            }
            return sum;
        } else {
            int middle = (start + end) / 2;
            ArraySumTask leftTask = new ArraySumTask(array, start, middle);
            ArraySumTask rightTask = new ArraySumTask(array, middle, end);
            
            leftTask.fork(); // Асинхронное выполнение
            long rightResult = rightTask.compute();
            long leftResult = leftTask.join(); // Ожидание результата
            
            return leftResult + rightResult;
        }
    }
}

// Использование
int[] largeArray = new int[1_000_000];
ForkJoinPool pool = new ForkJoinPool();
ArraySumTask task = new ArraySumTask(largeArray, 0, largeArray.length);
long sum = pool.invoke(task);

Производительность и мониторинг

6.1. Thread Dump анализ

# Получение thread dump
jstack <pid> > thread_dump.txt
jcmd <pid> Thread.print

# Анализ блокировок
# 1. Ищем BLOCKED threads
# 2. Смотрим на "waiting to lock <0x0000000715b38880>"
# 3. Ищем владельца блокировки "locked <0x0000000715b38880>"

6.2. JVM флаги для многопоточности

# Размер стека потока
-Xss1m

# Параметры сборщика мусора для многопоточных приложений
-XX:+UseG1GC
-XX:MaxGCPauseMillis=200
-XX:ParallelGCThreads=4
-XX:ConcGCThreads=2

# Параметры для уменьшения contention
-XX:+UseBiasedLocking
-XX:BiasedLockingStartupDelay=0

# Мониторинг блокировок
-XX:+PrintSafepointStatistics
-XX:PrintSafepointStatisticsCount=1

6.3. Тестирование многопоточного кода

@Test
public void testConcurrentAccess() throws InterruptedException {
    final int THREAD_COUNT = 100;
    final int ITERATIONS = 1000;
    final Counter counter = new Counter();
    
    CountDownLatch startLatch = new CountDownLatch(1);
    CountDownLatch endLatch = new CountDownLatch(THREAD_COUNT);
    
    for (int i = 0; i < THREAD_COUNT; i++) {
        new Thread(() -> {
            try {
                startLatch.await();
                for (int j = 0; j < ITERATIONS; j++) {
                    counter.increment();
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            } finally {
                endLatch.countDown();
            }
        }).start();
    }
    
    startLatch.countDown();
    endLatch.await(10, TimeUnit.SECONDS);
    
    assertEquals(THREAD_COUNT * ITERATIONS, counter.getCount());
}

Продвинутые паттерны

7.1. Producer-Consumer с BlockingQueue

public class ProducerConsumer {
    private final BlockingQueue<Task> queue = new LinkedBlockingQueue<>(100);
    private final ExecutorService producers = Executors.newFixedThreadPool(3);
    private final ExecutorService consumers = Executors.newFixedThreadPool(5);
    private volatile boolean running = true;
    
    public void start() {
        // Producers
        for (int i = 0; i < 3; i++) {
            producers.submit(() -> {
                while (running) {
                    Task task = generateTask();
                    try {
                        queue.put(task); // Блокируется если очередь полна
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        break;
                    }
                }
            });
        }
        
        // Consumers
        for (int i = 0; i < 5; i++) {
            consumers.submit(() -> {
                while (running) {
                    try {
                        Task task = queue.take(); // Блокируется если очередь пуста
                        processTask(task);
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        break;
                    }
                }
            });
        }
    }
}

7.2. Кэширование с асинхронным обновлением

public class AsyncLoadingCache<K, V> {
    private final ConcurrentHashMap<K, Future<V>> cache = new ConcurrentHashMap<>();
    private final Function<K, V> loader;
    private final ExecutorService executor;
    
    public V get(K key) throws ExecutionException, InterruptedException {
        while (true) {
            Future<V> future = cache.get(key);
            if (future == null) {
                Callable<V> callable = () -> loader.apply(key);
                FutureTask<V> futureTask = new FutureTask<>(callable);
                future = cache.putIfAbsent(key, futureTask);
                if (future == null) {
                    future = futureTask;
                    futureTask.run();
                }
            }
            try {
                return future.get();
            } catch (CancellationException e) {
                cache.remove(key, future);
            }
        }
    }
}

7.3. Rate Limiter на семафорах

public class RateLimiter {
    private final Semaphore semaphore;
    private final int maxPermits;
    private final ScheduledExecutorService scheduler;
    
    public RateLimiter(int permitsPerSecond) {
        this.maxPermits = permitsPerSecond;
        this.semaphore = new Semaphore(permitsPerSecond);
        this.scheduler = Executors.newScheduledThreadPool(1);
        
        scheduler.scheduleAtFixedRate(() -> {
            int currentPermits = semaphore.availablePermits();
            if (currentPermits < maxPermits) {
                semaphore.release(maxPermits - currentPermits);
            }
        }, 0, 1, TimeUnit.SECONDS);
    }
    
    public boolean tryAcquire() {
        return semaphore.tryAcquire();
    }
    
    public void acquire() throws InterruptedException {
        semaphore.acquire();
    }
}

Заключение

Многопоточность в Java эволюционировала от простых механизмов синхронизации до сложной, но мощной экосистемы java.util.concurrent, которая предоставляет разработчику инструменты для решения практически любых задач параллельной обработки. Ключевой инсайт современного подхода — не изобретать велосипеды, а выбирать правильные абстракции из богатого арсенала Java.