Java Basics: Concurrency

Concurrency

Немного терминов, которые будут использоваться: поток (Thread) и процесс (process) - группа потоков. Shared Environment - окружение, доступное всем потокам в рамках процесса. Задача (Task) - единица работы, выполняемая потоком. В единицу времени, поток (Thread) может выполнять только одну задачу (Task).

Виды потоков

  • Системные - создаются JVM и работают в фоне. Как правило, не видны для разработчика приложения, а в случае возникновения внутренних проблем могут возникать java.lang.Error;
  • Пользовательские - создаются и управляются пользователем.

Очередность выполнения

Для определения очередности, ОС использует расписание потоков для определения того, какой будет выполняться в моменте. Принцип работы для одного ядра, может быть, например, следующим:

Java: Concurrency: OS threads cycle

Переходы между потоками (Thread) называются Context Switch. Они сохраняют состояние потока до передачи управления другому потоку и восстанавливают свое прежнее (последнее) состояние перед получением управления. Данная операция тоже занимает определенное время.

ОС определяет сколько времени поток Thread будет находиться под управлением (в примере выше, 100ms). Thread могут иметь разный приоритет и принудительно перехватывать управление на себя (числовое, int значение).

Данный (упрощенный) алгоритм создавал иллюзию многопоточности на машинах с 1 CPU - передавая управление разным потокам. Это актуально и сейчас, просто максимально допустимое количество одновременно выполняющихся потоков (а следовательно, и Process) увеличилось.

Runnable

Функциональный интерфейс, реализующий единицу работы, Task. Не принимает аргументов и ничего не возвращает. Может записаться как лямбда:

1
Runnable r = () -> System.out.println("do something");

, или как реализовано в отдельном классе:

1
2
3
4
5
6
7
8
9
public class ArguedRun implements Runnable {
private final String argument;

public ArguedRun(String argument) {
this.argument = argument;
}

public void run() { ... }
}

Thread

Поток, запускаемый для процесса. Запуск выполняется посредством метода start():

1
2
3
4
5
6
7
(new Thread(() -> System.out.println("thread started")).start());
(new Thread(() -> {
@Override
public void run() {
System.out.println("thread started");
}
}).start());

Таким способом создается асинхронный поток, т.е. тот результат (окончание) которого не будет ожидаться.

ExecutorService

Все потоки рекомендовано запускать с его помощью, даже если это одиночный поток. Доступ к сервису можно получить через фабрику Executors:

1
2
3
4
5
6
7
8
9
10
11
ExecutService svc = null;
Runnable task1 = () -> out.println("task 1");
Runnable task2 = () -> for(...) out.println("task 2");
try {
svc = Executor.newSingleThreadExecutor();
svc.execute(task1);
svc.execute(task2);
svc.execute(task1);
} finally {
if(svc != null) scv.shutdown();
}

newSingleThreadExecutor() создает экземпляр сервиса, который гарантирует последовательное выполнение всех передаваемых в него потоки (Thread) (или задачи (Task)).

Вызов метода shutdown()) обязателен, т.к. его отсутствие приведет к бесконечной работе приложения. Метод не прерывает ранее запущенные задачи (Task), но новые принимать уже не будет, указывая на это ошибкой RejectedExecutionException. Сразу после shutdown() метод isShutdown() будет возвращать true, но isTerminated() будет возвращать false до тех пор, пока последняя задача (Task) не будет завершена.

Похожий метод shutdownNow() попытается(!) остановить все задачи (Task), но гарантии в успешном закрытии не дает. Вернет List<Runnable> - список задач, которые не были даже начаты.

Запуск задачи (Task)

Допускается несколько способов вызова задачи (Task):

  • void execute(Runnable) - асинхронный “вызвал и забыл”;
  • Future<?> submit(Runnable) - Асинхронный с возможностью получения результата в виде Future<?>;
  • <T> Future<T> submit(Callable<T>) - аналог предыдущего, но уже типизированный;
  • <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>>) - ожидание всех задач (Task) с возможностью получения результатов. Список результатов в том же порядке, что и в коллекции на входе;
  • <T> T invokeAny(Collection<? extends Callable<T>>) - ожидание первого выполнившегося результата.

При работе с Future могут быть использованы следующие методы:

  • boolean isDone() - false в случае если задача все еще выполняется и результат не получен; в противном случае - true. Если задача была отклонена - будет ошибка;
  • boolean isCanceled() - флаг того что выполнение задачи было отменено;
  • boolean cancel() - попытка остановить задачу. Результатом будет true если успешно остановлено. Принимает boolean, который указывает на возможность остановки уже работающей задачи (но еще не завершенной);
  • <T> T get() - получение результата. Бесконечное ожидание;
  • <T> T get(long, TimeUnit) - получение результата с ожиданием определенное количество времени. TimeoutException будет выкинуто если результат не будет получен за указанное время.

awaitTermination

Ожидает завершения всех запущенных (работающих) задач (Task). Если не успевает дождаться завершения в отведенное время, получает ошибку InterruptedException. Если вызывается до метода shutdown, то будет ожидание всего времени и ошибка.

invokeAll и invokeAny

Методы синхронные - т.е. поток, запустивший их, будет ожидать их выполнения или ошибки. Имеют версии перегруженных методов с ожиданием определенного количества времени.

Запуск по расписанию

Существует много способов реализации, но, пожалуй, наиболее предпочитаемым является вызов через Executor. В Java API существует специализированная реализация ScheduledExecutorService:

1
ScheduledExecutorService svc = Executors.newSingleThreadScheduledExecutor();

Дополнительные методы:

  • ScheduledFuture<?> schedule(Runnable, long, TimeUnit)- разовый запуск задачи через указанный интервал;
  • <V> ScheduledFuture<V> schedule(Callable<V>, long, TimeUnit)- аналогичен предыдущему методу, но типизирован;
  • ScheduledFuture<?> scheduleAtFixedRate(Runnable, long*, long**, TimeUnit) - запуск задачи с промежуточным повторением каждые long** единиц. Первый запуск будет отложен на long* единиц времени;
  • ScheduledFuture<?> scheduleWithFixedDelay(Runnable, long*, long**, TimeUnit) - аналогичен scheduleAtFixedRate, но не дает запускать следующую задачу пока не завершится предыдущая попытка. После завершения предыдущей, будет ожидать long**, после чего запустит задачу еще раз.

В ответе приходит ScheduleFuture, который расширяет Future методом getDelay, который возвращает целое число той временной единицы, которая указывалась в аргументе:

1
2
3
ScheduledFuture<?> f = scheduledSvc.schdule(..., 0, 5, TimeUnit.MINUTE);
out.println(f.getDelay(TimeUnit.MINUTE)); // 4
out.println(f.getDelay(TimeUnit.SECOND)); // 299

Если сервис останавливается принудительно вызовом shutdown, то выполнение задач вообще может быть отменено если их запуск был отсрочен (по какой-то причине, например, JVM не удалось выделить дополнительные потоки).

Пулы потоков

Методы фабрик:

  • newCachedThreadPool - создает пул потоков (без ограничений по количеству), который использует свободные потоки, если они доступны, и создает новые, если свободных нет;
  • newFixedThreadPool - пул с фиксированным количеством потоков. В случае если свободных потоков для выполнения нет, задача встает в очередь Queue;
  • newScheduledThreadPool - пул с фиксированным количеством, но с возможностью запуска по расписанию.

CachedPool применим к короткоживущим задачам. Рекомендованное количество параллельных потоков - количество доступных JVM ядер, которые можно получить:

1
int feasibleThreads = Runtime.getRuntime().availableProcessors();

Несмотря на указанную выше рекомендацию, жестко ограничивать количество потоков для каждого вида задач не стоит. Данная рекомендация актуально только для вычислительных задач, которые на некоторое время полностью использую вычислительные мощности процессора. Для задач, которые работают с внешними ресурсами такими рекомендациями можно и пренебречь - большая часть времени для них будет характеризоваться ожиданием.

Thread-safety

Непредвиденное, неожидаемое поведение данных посредством изменения их двумя и более потоками называется состоянием гонки (Race Condition). Проблема состоит в Shared Memory и свободном доступе к ней различными потоками.

Atomic

В концепции Atomic лежит выполнение чтения и записи (или наоборот) как единой изолированной операции. Обращение происходит все там же - в Shared Memory, но операция уже атомарна.

Реализации в Java API:

  • AtomicBoolean/Integer/Long/IntegerArray/LongArray - значительно быстрее synchronized + volatile. Использует CompareAndSwap технологию, которая связывает значения в памяти с аргументом;
  • AtomicReference - атомарный объект;
  • AtomicMarkableReference - атомарный объект с boolean флагом (для маркировки). Объект хранится в единственном числе;
  • AtomicStampedReference - аналог Markable, но хранит int значение. Инкремента нет, зато можно выполнять атомарные set() методы;
  • AtomicReferenceArray - можно сказать, атомарный список. Имеет get(int) и set(int, V);
  • AtomicIntegerFieldUpdater/LongFieldUpdater/ReferenceFieldUpdater - посредством рефлекции, обновляет или получает значение поля в объекте. Само поле атомарным не становится, вместо этого атомарность добавляется только если обращение выполняется через AtomicUpdater. Сущность создается фабрикой - newUpdater, которой передается класс и имя поля.

Synchronized

Способ синхронизации посредством объекта наблюдения - lock (monitor). Им может быть любой объект или элемент, используемый языком. Так же в качестве объекта мониторинга (lock) может выступать сам объект (this).

Объект мониторинга накладывается на объект (на конкретный экземпляр!), а следовательно, лучше будет если он будет immutable:

1
private synchronized void syncMethod() { ... }

В примере выше, объект мониторинга будет такой же, что и при:

1
2
3
private void syncMethod() {
synchronized(this) { ... }
}

Так же как и:

1
private static synchronized void staticSync() { ... }

Аналогичен объекту мониторинга при:

1
2
3
4
5
class MyClass {
private void syncMethod() {
synchronized(MyClass.class) { ... }
}
}

Статичный объект мониторинга накладывается на все вызовы статичных (синхронизированных) методов в рамках ClassLoader.

Lock

Специализированный инструмент для управления выполнения потоками. Чаще всего используется реализация ReentrantLock. Наиболее используемые методы:

  • lock() - требует наложения ограничений на объект мониторинга потоком исполнителем;
  • unlock() - высвобождает объект мониторинга;
  • boolean tryLock() - попытка наложения ограничений на объект мониторинга потоком исполнителем. Если успешно - результатом выполнения будет true; в противном случае - false. Важно, попытка будет выполнена в момент вызова метода;
  • boolean tryLock(long, TimeUnit) - расширяет tryLock, добавляя возможность таймаута ожидания.

Важно! Количество unlock должно соответствовать lock! Иначе ограничение на объект мониторинга будет висеть вечно.

При создании ReentrantLock можно для параметра fairness определить значение true - тогда последовательность выдачи будет соответствовать запросам доступа. Может (значительно) замедлить выполнение кода.

CyclicBarrier

Позволяет ограничивать выполнение последующего кода пока не будет получено требуемое количество снятия блокировок.

При создании CyclicBarrier можно указать количество высвобождений, которые необходимо получить для продолжения выполнения потоком действий:

1
CyclicBarrier barrier = new CyclicBarrier(3);

Потоки, которые ожидают выполнение какого-либо внешнего условия, вызывают await:

1
Runnable run = () -> { barrier.await(); ... }

После достижения лимита вызовов await (в примере выше, 3 раза), потоки смогут продолжить выполнение. По достижении высвобождения, барьер может выполнять дополнительное действие, которое можно указать в конструкторе:

1
var varrier = new CyclicBarrier(3, () -> print("Barrier is released!"));

Синхронизированные коллекции

Решают проблему Memory Consistency Error - непротиворечивости памяти - может возникать и при работе в одном потоке, например, при чтении данных из коллекции и удалении:

1
2
Map<..., ...> map = ...;
for(var x : map.keySet()) map.remove(x); // ConcurrentModificationException

ConcurrentSkipList/Set/Map

Потокобезопасный аналог TreeSet. По умолчанию использует стандартную сортировку (с натуральным порядком), но может быть передан Comparator для сравнения элементов.

CopyOnWriteArrayList/Set

При изменении коллекции, выполняет полную копию. Итераторы (в т.ч. через for-each), которые уже открыты не меняются, что делает коллекции потокобезопасными:

1
2
3
4
5
6
List<Integer> list = new CopyOnWriteArrayList(1, 2, 3);
for(var i : list) {
out.println("%s:%s".formatted(i, list.size())); // выводы: 1:3, 2:4, 3:5
list.add(i * -1);
}
out.println(list.size()); // 6

В примере выше, будет выполнено только 3 итерации, т.к. итератор создастся при входе в цикл for-each и не будет обновляться даже после методов add().

Объекты CopyOnWrite* требовательны к ресурсам. Вместо них можно использовать ListIterator в сочетании с “обычной”” реализацией List. Получить его можно с вызовом метода listIterator().

LinkedBlockingQueue

Дополняет Queue специальными методами:

  • boolean offer(E, long, TimeUnit) - пытается вставить объект в очередь. Если в течении long единиц времени не удалось получить свободную позицию (для вставки объекта в очередь), результатом метода будет false;
  • E poll(long, TimeUnit) - пытается забрать объект из очереди в течении long единиц времени. Если объект получен не был, метод вернет null.

Преобразование в потокобезопасную коллекцию

Класс Collections содержит static методы для преобразования “обычных”” коллекций в синхронизированные:

  • synchronizedCollection(Collection);
  • synchronizedList(List);
  • synchronizedMap(Map);
  • synchronizedNavigableMap(NavigableMap);
  • synchronizedNavigableSet(NavigableSet);
  • synchronizedSet(Set);
  • synchronizedSortedMap(SortedMap);
  • synchronizedSortedSet(SortedSet).

Фактически, использует mutex объект, в качестве объекта мониторинга и синхронизации. Как отмечалось ранее, синхронизация на объект это дорогая операция, т.е. лучше использовать специализированные коллекции. Например, следующий код повлечет ConcurrentModificationException, т.к. выполняется параллельное изменение данных одним итератором (при ConcurrenentHashMap, и подобных синхронизированных коллекциях, такие ошибки не возникают):

1
2
3
4
5
6
var map = new HashMap();
map.put(...);
var syncMap = Collections.synchronizedMap(map);
for(var key : syncMap.keys()) {
syncMap.remove(k);
}

Поиск проблем

Deadlock - ситуация, при которой 2 или более потока находятся в статусе ожидании (причем, друг друга).
Starvation - ситуация, при которой поток не может получить доступ к объекту мониторинга или ресурсу и запрещает другим потокам выполнять связанные действия, вызывая каскадную реакцию.
Livelock - потоки (2 или более) не встали в режим ожидания, как в случае с Deadlock, а продолжают свою работу. Например: поток выполняет некое действие, но дойдя до окончания и завершения, откатывает все действия в начало и тем самым бесконечно выполняет действия, не имеющие завершения. Livelock сложнее всего обнаружить, поскольку работа все время продолжается и поток не прерывает свою работу, но это всего лишь иллюзия.

Stream.parallel

В StreamAPI предусмотрено выполнение операций через параллельные инструменты. При этом, некоторые методы не могут работать ожидаемым образом при параллельной реализации.

Так же, параллельные операции типа findFirst, limit, skip могут сильно усложнять процесс в связи с затратами на параллельное выполнение. Переходы между sequential <-> parallel так же довольно требовательны к ресурсам.

Для объекта Stream можно применить imtermediate метод unordered(), который отменит требования сортировки потока (активен для всех потоков по умолчанию) - это может существенно ускорить параллельное выполнение операций объекта Stream.

Daemon

Thread.setDaemon - метод позволяет устанавливать поток в режим демона. JVM может корректно остановиться только если действующие потоки помечены как daemon; в противном случае JVM будет ожидать завершения всех потоков.

CompetableFuture

Данный механизм удобен для заготавки в начале выполнения длительной операции, состоящей из большого количества несвязанных действий, а в конце просто собирать результаты. Данный подход прекрасно работает с получением данных из внешних источников - они отнимают небольшое количество ресурсов, но могут заставить приложение ожидать завершения длительное время. Следует учитывать что существует опасность того что результат CompletableFuture никогда не будет получен и поток выполнения заморозит работу приложения. В данном случае поможет добавление опционального таймаута.

  • complete() - метод передает объект, который станет результатом исполнения CompletableFuture;
  • completeExceptionally() - метод “прокидывает” исключение в качестве результата исполнения CompletableFuture.

Для короткоживущих операций рекомендуется использовать пул потоков cachedThreadPool, который повторно использует ранее открытые потоки (пока сам пул не будет закрыт).

CompletableFuture.supplyAsync - статичный метод, который предоставляет возможность быстрого написания многопоточных операций. Вторым аргументом метод может принимать Executor - реализацию пула потоков (по умолчанию используется системная, ForkJoinPool).

join - метод заставляет текущий поток ожидать завершения выполнения CompletableFuture. При использовании данного метода (или любого другого блокирующего) важно учитывать эту блокировку и то, что вступает она в силу сразу после вызова соответствующего метода. Особенно важно это учитывать при обработке в Stream:

1
2
3
4
data.stream()
.map(obj -> CompletableFuture.supplyAsync(...))
.map(CompletableFuture::join)
.collect(Collectors.toList());

Второй вызов map будет блокировать выполнение всего Stream (поскольку map - это intermediate операция). Решением может быть либо расширение размера пула, либо перевод Stream в параллельное выполнение (parallel), либо разделение одного стрима на несколько.

Оптимальное количество потоков

Nпотоков = Ncpu * Ucpu * (1 + W/C)

, где Ncpu - доступное количество ядер процессора. Получить можно используя Runtime.getRuntime().availableProcessors();
Ucpu - степень использования одного CPU (в диапазоне от 0 до 1)
W/C - предполагаемое время ожидания / время вычисления

Пример неблокируемого выполнения

1
2
3
4
5
6
7
8
9
10
11
12
ExecutorService execService = Executors.newFixedThreadPool(10, run -> {
Thread thread = new Thread("thread name");
thread.setDaemon(true);
return thread;
});

Supplier<CompletableFuture<String>> withConstructor = new CompletableFuture<String>();
Supplier<CompletableFuture<String>> withFactory = CompletableFuture.supplyAsync(...);

List<CompletableFuture> futures = IntStream.range(0, 30).mapToObject(withConstructor ).collect(Collectors.toList());
IntStream.range(0, futures.size()).forEach(i -> execService.submit(() -> doSomething(futures.get(i))));
List<String> results = futures.stream().map(CompletableFuture::join).collect(Collectors.toList());

Можно реализовать через конструктор CompletableFuture или через фабрику.

Конвейерное выполнение

CompletableFuture допускает реализацию последовательности действий в конвейере (как Stream):

1
2
3
4
5
6
IntStream.range(0, 30)
.mapToObject(CompletableFuture.supplyAsync(/*0*/))
.map(future -> future.thenApply(/*1*/))
.map(future -> future.thenApply(/*2*/))
.map(future -> future.thenCompose(() -> /*3*/))
.collect(Collectors.toList());

В примере выше действия будут выполняться последовательно: 0 -> 1 -> 2 -> 3. Причем, операции 0, 1 и 2 будут выполняться в одном потоке, а операция 3, возможно, будет выполнена в отдельном потоке - это будет зависеть от настроек пула потоков.

1
2
3
Future<Object> result = CompletableFuture
.supplyAsync()
.thenCombine(CompletableFuture.supplyAsync(...), (result1, result2) -> result1.compareTo(result2));

Метод thenCombine используется для слияния двух CompletableFuture (а точнее, их результатов). Второй аргумент принимает функцию слияния, которая определяет какой из результатов будет итоговым; метод будет вызван только в случае получения обоих результатов, предлагаемых к слиянию.

Существует так же асинхронная версия - thenCombineAsync.

Альтернативное завершение

Метод onTimeout ограничивает время ожидания для CompletableFuture и выбрасывает исключение TimeoutException в случае невозможности завершения потока в рамках указанного интервала. Может быть установлен на любой стадии создания CompletableFuture:

1
CompletableFuture.supplyAsync(...).thenApply(...).thenCombineAsync().onTimeout(...).thenApply(...).onTimeout(...);

Метод completeOnTimeout значительно привлекательнее - он позволяет использовать значение по умолчанию в случае окончания таймаута:

1
CompletableFuture.supplyAsync(...).thenApply(...).completeOnTimeout(alternativeValue, ...).thenCombine();

Принятие и завершение

Метод thenAccept можно назвать финальной остановкой для CompletableFuture - он возвращает void, а на вход получает Consumer с типом предыдущего CompletableFuture. Есть асинхронная версия метода.

Фабричное ожидание

Если нужно дождаться всех CompletableFuture, то поможет метод CompletableFuture.allOf, который вернет скомбинированный объект, который можно отдельно подождать:

1
CompletableFuture.allOf(futuresArray).join();

Или можно дождаться первого завершившегося потока и выйти:

1
CompletableFuture.anyOf(futuresArray).getNow(alternativeValue);
 Comments
Comment plugin failed to load
Loading comment plugin