Concurrency
Немного терминов, которые будут использоваться: поток (Thread) и процесс (process) - группа потоков. Shared Environment - окружение, доступное всем потокам в рамках процесса. Задача (Task) - единица работы, выполняемая потоком. В единицу времени, поток (Thread) может выполнять только одну задачу (Task).
Виды потоков
- Системные - создаются JVM и работают в фоне. Как правило, не видны для разработчика приложения, а в случае возникновения внутренних проблем могут возникать
java.lang.Error
; - Пользовательские - создаются и управляются пользователем.
Очередность выполнения
Для определения очередности, ОС использует расписание потоков для определения того, какой будет выполняться в моменте. Принцип работы для одного ядра, может быть, например, следующим:
Переходы между потоками (Thread) называются Context Switch. Они сохраняют состояние потока до передачи управления другому потоку и восстанавливают свое прежнее (последнее) состояние перед получением управления. Данная операция тоже занимает определенное время.
ОС определяет сколько времени поток Thread будет находиться под управлением (в примере выше, 100ms). Thread могут иметь разный приоритет и принудительно перехватывать управление на себя (числовое, int значение).
Данный (упрощенный) алгоритм создавал иллюзию многопоточности на машинах с 1 CPU - передавая управление разным потокам. Это актуально и сейчас, просто максимально допустимое количество одновременно выполняющихся потоков (а следовательно, и Process) увеличилось.
Runnable
Функциональный интерфейс, реализующий единицу работы, Task. Не принимает аргументов и ничего не возвращает. Может записаться как лямбда:
1 | Runnable r = () -> System.out.println("do something"); |
, или как реализовано в отдельном классе:
1 | public class ArguedRun implements Runnable { |
Thread
Поток, запускаемый для процесса. Запуск выполняется посредством метода start()
:
1 | (new Thread(() -> System.out.println("thread started")).start()); |
Таким способом создается асинхронный поток, т.е. тот результат (окончание) которого не будет ожидаться.
ExecutorService
Все потоки рекомендовано запускать с его помощью, даже если это одиночный поток. Доступ к сервису можно получить через фабрику Executors
:
1 | ExecutService svc = null; |
newSingleThreadExecutor()
создает экземпляр сервиса, который гарантирует последовательное выполнение всех передаваемых в него потоки (Thread) (или задачи (Task)).
Вызов метода shutdown())
обязателен, т.к. его отсутствие приведет к бесконечной работе приложения. Метод не прерывает ранее запущенные задачи (Task), но новые принимать уже не будет, указывая на это ошибкой RejectedExecutionException
. Сразу после shutdown()
метод isShutdown()
будет возвращать true
, но isTerminated()
будет возвращать false
до тех пор, пока последняя задача (Task) не будет завершена.
Похожий метод shutdownNow()
попытается(!) остановить все задачи (Task), но гарантии в успешном закрытии не дает. Вернет List<Runnable>
- список задач, которые не были даже начаты.
Запуск задачи (Task)
Допускается несколько способов вызова задачи (Task):
void execute(Runnable)
- асинхронный “вызвал и забыл”;Future<?> submit(Runnable)
- Асинхронный с возможностью получения результата в видеFuture<?>
;<T> Future<T> submit(Callable<T>)
- аналог предыдущего, но уже типизированный;<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>>)
- ожидание всех задач (Task) с возможностью получения результатов. Список результатов в том же порядке, что и в коллекции на входе;<T> T invokeAny(Collection<? extends Callable<T>>)
- ожидание первого выполнившегося результата.
При работе с Future
могут быть использованы следующие методы:
boolean isDone()
-false
в случае если задача все еще выполняется и результат не получен; в противном случае -true
. Если задача была отклонена - будет ошибка;boolean isCanceled()
- флаг того что выполнение задачи было отменено;boolean cancel()
- попытка остановить задачу. Результатом будетtrue
если успешно остановлено. Принимаетboolean
, который указывает на возможность остановки уже работающей задачи (но еще не завершенной);<T> T get()
- получение результата. Бесконечное ожидание;<T> T get(long, TimeUnit)
- получение результата с ожиданием определенное количество времени. TimeoutException будет выкинуто если результат не будет получен за указанное время.
awaitTermination
Ожидает завершения всех запущенных (работающих) задач (Task). Если не успевает дождаться завершения в отведенное время, получает ошибку InterruptedException
. Если вызывается до метода shutdown
, то будет ожидание всего времени и ошибка.
invokeAll и invokeAny
Методы синхронные - т.е. поток, запустивший их, будет ожидать их выполнения или ошибки. Имеют версии перегруженных методов с ожиданием определенного количества времени.
Запуск по расписанию
Существует много способов реализации, но, пожалуй, наиболее предпочитаемым является вызов через Executor
. В Java API существует специализированная реализация ScheduledExecutorService
:
1 | ScheduledExecutorService svc = Executors.newSingleThreadScheduledExecutor(); |
Дополнительные методы:
ScheduledFuture<?> schedule(Runnable, long, TimeUnit)
- разовый запуск задачи через указанный интервал;<V> ScheduledFuture<V> schedule(Callable<V>, long, TimeUnit)
- аналогичен предыдущему методу, но типизирован;ScheduledFuture<?> scheduleAtFixedRate(Runnable, long*, long**, TimeUnit)
- запуск задачи с промежуточным повторением каждыеlong**
единиц. Первый запуск будет отложен наlong*
единиц времени;ScheduledFuture<?> scheduleWithFixedDelay(Runnable, long*, long**, TimeUnit)
- аналогиченscheduleAtFixedRate
, но не дает запускать следующую задачу пока не завершится предыдущая попытка. После завершения предыдущей, будет ожидатьlong**
, после чего запустит задачу еще раз.
В ответе приходит ScheduleFuture, который расширяет Future
методом getDelay
, который возвращает целое число той временной единицы, которая указывалась в аргументе:
1 | ScheduledFuture<?> f = scheduledSvc.schdule(..., 0, 5, TimeUnit.MINUTE); |
Если сервис останавливается принудительно вызовом shutdown
, то выполнение задач вообще может быть отменено если их запуск был отсрочен (по какой-то причине, например, JVM не удалось выделить дополнительные потоки).
Пулы потоков
Методы фабрик:
newCachedThreadPool
- создает пул потоков (без ограничений по количеству), который использует свободные потоки, если они доступны, и создает новые, если свободных нет;newFixedThreadPool
- пул с фиксированным количеством потоков. В случае если свободных потоков для выполнения нет, задача встает в очередьQueue
;newScheduledThreadPool
- пул с фиксированным количеством, но с возможностью запуска по расписанию.
CachedPool применим к короткоживущим задачам. Рекомендованное количество параллельных потоков - количество доступных JVM ядер, которые можно получить:
1 | int feasibleThreads = Runtime.getRuntime().availableProcessors(); |
Несмотря на указанную выше рекомендацию, жестко ограничивать количество потоков для каждого вида задач не стоит. Данная рекомендация актуально только для вычислительных задач, которые на некоторое время полностью использую вычислительные мощности процессора. Для задач, которые работают с внешними ресурсами такими рекомендациями можно и пренебречь - большая часть времени для них будет характеризоваться ожиданием.
Thread-safety
Непредвиденное, неожидаемое поведение данных посредством изменения их двумя и более потоками называется состоянием гонки (Race Condition). Проблема состоит в Shared Memory и свободном доступе к ней различными потоками.
Atomic
В концепции Atomic лежит выполнение чтения и записи (или наоборот) как единой изолированной операции. Обращение происходит все там же - в Shared Memory, но операция уже атомарна.
Реализации в Java API:
AtomicBoolean
/Integer
/Long
/IntegerArray
/LongArray
- значительно быстрееsynchronized + volatile
. Использует CompareAndSwap технологию, которая связывает значения в памяти с аргументом;AtomicReference
- атомарный объект;AtomicMarkableReference
- атомарный объект сboolean
флагом (для маркировки). Объект хранится в единственном числе;AtomicStampedReference
- аналогMarkable
, но хранитint
значение. Инкремента нет, зато можно выполнять атомарныеset()
методы;AtomicReferenceArray
- можно сказать, атомарный список. Имеетget(int)
иset(int, V)
;AtomicIntegerFieldUpdater
/LongFieldUpdater
/ReferenceFieldUpdater
- посредством рефлекции, обновляет или получает значение поля в объекте. Само поле атомарным не становится, вместо этого атомарность добавляется только если обращение выполняется черезAtomicUpdater
. Сущность создается фабрикой -newUpdater
, которой передается класс и имя поля.
Synchronized
Способ синхронизации посредством объекта наблюдения - lock (monitor). Им может быть любой объект или элемент, используемый языком. Так же в качестве объекта мониторинга (lock) может выступать сам объект (this
).
Объект мониторинга накладывается на объект (на конкретный экземпляр!), а следовательно, лучше будет если он будет immutable:
1 | private synchronized void syncMethod() { ... } |
В примере выше, объект мониторинга будет такой же, что и при:
1 | private void syncMethod() { |
Так же как и:
1 | private static synchronized void staticSync() { ... } |
Аналогичен объекту мониторинга при:
1 | class MyClass { |
Статичный объект мониторинга накладывается на все вызовы статичных (синхронизированных) методов в рамках ClassLoader
.
Lock
Специализированный инструмент для управления выполнения потоками. Чаще всего используется реализация ReentrantLock
. Наиболее используемые методы:
lock()
- требует наложения ограничений на объект мониторинга потоком исполнителем;unlock()
- высвобождает объект мониторинга;boolean tryLock()
- попытка наложения ограничений на объект мониторинга потоком исполнителем. Если успешно - результатом выполнения будетtrue
; в противном случае -false
. Важно, попытка будет выполнена в момент вызова метода;boolean tryLock(long, TimeUnit)
- расширяетtryLock
, добавляя возможность таймаута ожидания.
Важно! Количество unlock
должно соответствовать lock
! Иначе ограничение на объект мониторинга будет висеть вечно.
При создании ReentrantLock
можно для параметра fairness
определить значение true
- тогда последовательность выдачи будет соответствовать запросам доступа. Может (значительно) замедлить выполнение кода.
CyclicBarrier
Позволяет ограничивать выполнение последующего кода пока не будет получено требуемое количество снятия блокировок.
При создании CyclicBarrier
можно указать количество высвобождений, которые необходимо получить для продолжения выполнения потоком действий:
1 | CyclicBarrier barrier = new CyclicBarrier(3); |
Потоки, которые ожидают выполнение какого-либо внешнего условия, вызывают await
:
1 | Runnable run = () -> { barrier.await(); ... } |
После достижения лимита вызовов await
(в примере выше, 3 раза), потоки смогут продолжить выполнение. По достижении высвобождения, барьер может выполнять дополнительное действие, которое можно указать в конструкторе:
1 | var varrier = new CyclicBarrier(3, () -> print("Barrier is released!")); |
Синхронизированные коллекции
Решают проблему Memory Consistency Error - непротиворечивости памяти - может возникать и при работе в одном потоке, например, при чтении данных из коллекции и удалении:
1 | Map<..., ...> map = ...; |
ConcurrentSkipList/Set/Map
Потокобезопасный аналог TreeSet
. По умолчанию использует стандартную сортировку (с натуральным порядком), но может быть передан Comparator
для сравнения элементов.
CopyOnWriteArrayList/Set
При изменении коллекции, выполняет полную копию. Итераторы (в т.ч. через for-each), которые уже открыты не меняются, что делает коллекции потокобезопасными:
1 | List<Integer> list = new CopyOnWriteArrayList(1, 2, 3); |
В примере выше, будет выполнено только 3 итерации, т.к. итератор создастся при входе в цикл for-each и не будет обновляться даже после методов add()
.
Объекты CopyOnWrite*
требовательны к ресурсам. Вместо них можно использовать ListIterator
в сочетании с “обычной”” реализацией List
. Получить его можно с вызовом метода listIterator()
.
LinkedBlockingQueue
Дополняет Queue
специальными методами:
boolean offer(E, long, TimeUnit)
- пытается вставить объект в очередь. Если в теченииlong
единиц времени не удалось получить свободную позицию (для вставки объекта в очередь), результатом метода будетfalse
;E poll(long, TimeUnit)
- пытается забрать объект из очереди в теченииlong
единиц времени. Если объект получен не был, метод вернетnull
.
Преобразование в потокобезопасную коллекцию
Класс Collections
содержит static
методы для преобразования “обычных”” коллекций в синхронизированные:
- synchronizedCollection(Collection);
- synchronizedList(List);
- synchronizedMap(Map);
- synchronizedNavigableMap(NavigableMap);
- synchronizedNavigableSet(NavigableSet);
- synchronizedSet(Set);
- synchronizedSortedMap(SortedMap);
- synchronizedSortedSet(SortedSet).
Фактически, использует mutex объект, в качестве объекта мониторинга и синхронизации. Как отмечалось ранее, синхронизация на объект это дорогая операция, т.е. лучше использовать специализированные коллекции. Например, следующий код повлечет ConcurrentModificationException
, т.к. выполняется параллельное изменение данных одним итератором (при ConcurrenentHashMap
, и подобных синхронизированных коллекциях, такие ошибки не возникают):
1 | var map = new HashMap(); |
Поиск проблем
Deadlock - ситуация, при которой 2 или более потока находятся в статусе ожидании (причем, друг друга).
Starvation - ситуация, при которой поток не может получить доступ к объекту мониторинга или ресурсу и запрещает другим потокам выполнять связанные действия, вызывая каскадную реакцию.
Livelock - потоки (2 или более) не встали в режим ожидания, как в случае с Deadlock, а продолжают свою работу. Например: поток выполняет некое действие, но дойдя до окончания и завершения, откатывает все действия в начало и тем самым бесконечно выполняет действия, не имеющие завершения. Livelock сложнее всего обнаружить, поскольку работа все время продолжается и поток не прерывает свою работу, но это всего лишь иллюзия.
Stream.parallel
В StreamAPI предусмотрено выполнение операций через параллельные инструменты. При этом, некоторые методы не могут работать ожидаемым образом при параллельной реализации.
Так же, параллельные операции типа findFirst
, limit
, skip
могут сильно усложнять процесс в связи с затратами на параллельное выполнение. Переходы между sequential
<-> parallel
так же довольно требовательны к ресурсам.
Для объекта Stream можно применить imtermediate метод unordered()
, который отменит требования сортировки потока (активен для всех потоков по умолчанию) - это может существенно ускорить параллельное выполнение операций объекта Stream
.
Daemon
Thread.setDaemon
- метод позволяет устанавливать поток в режим демона. JVM может корректно остановиться только если действующие потоки помечены как daemon
; в противном случае JVM будет ожидать завершения всех потоков.
CompetableFuture
Данный механизм удобен для заготавки в начале выполнения длительной операции, состоящей из большого количества несвязанных действий, а в конце просто собирать результаты. Данный подход прекрасно работает с получением данных из внешних источников - они отнимают небольшое количество ресурсов, но могут заставить приложение ожидать завершения длительное время. Следует учитывать что существует опасность того что результат CompletableFuture
никогда не будет получен и поток выполнения заморозит работу приложения. В данном случае поможет добавление опционального таймаута.
complete()
- метод передает объект, который станет результатом исполненияCompletableFuture
;completeExceptionally()
- метод “прокидывает” исключение в качестве результата исполненияCompletableFuture
.
Для короткоживущих операций рекомендуется использовать пул потоков cachedThreadPool
, который повторно использует ранее открытые потоки (пока сам пул не будет закрыт).
CompletableFuture.supplyAsync
- статичный метод, который предоставляет возможность быстрого написания многопоточных операций. Вторым аргументом метод может принимать Executor
- реализацию пула потоков (по умолчанию используется системная, ForkJoinPool
).
join
- метод заставляет текущий поток ожидать завершения выполнения CompletableFuture
. При использовании данного метода (или любого другого блокирующего) важно учитывать эту блокировку и то, что вступает она в силу сразу после вызова соответствующего метода. Особенно важно это учитывать при обработке в Stream
:
1 | data.stream() |
Второй вызов map
будет блокировать выполнение всего Stream
(поскольку map
- это intermediate
операция). Решением может быть либо расширение размера пула, либо перевод Stream
в параллельное выполнение (parallel
), либо разделение одного стрима на несколько.
Оптимальное количество потоков
Nпотоков = Ncpu * Ucpu * (1 + W/C)
, где Ncpu - доступное количество ядер процессора. Получить можно используя Runtime.getRuntime().availableProcessors()
;
Ucpu - степень использования одного CPU (в диапазоне от 0 до 1)
W/C - предполагаемое время ожидания / время вычисления
Пример неблокируемого выполнения
1 | ExecutorService execService = Executors.newFixedThreadPool(10, run -> { |
Можно реализовать через конструктор CompletableFuture
или через фабрику.
Конвейерное выполнение
CompletableFuture
допускает реализацию последовательности действий в конвейере (как Stream
):
1 | IntStream.range(0, 30) |
В примере выше действия будут выполняться последовательно: 0 -> 1 -> 2 -> 3
. Причем, операции 0, 1 и 2 будут выполняться в одном потоке, а операция 3, возможно, будет выполнена в отдельном потоке - это будет зависеть от настроек пула потоков.
1 | Future<Object> result = CompletableFuture |
Метод thenCombine
используется для слияния двух CompletableFuture
(а точнее, их результатов). Второй аргумент принимает функцию слияния, которая определяет какой из результатов будет итоговым; метод будет вызван только в случае получения обоих результатов, предлагаемых к слиянию.
Существует так же асинхронная версия - thenCombineAsync
.
Альтернативное завершение
Метод onTimeout
ограничивает время ожидания для CompletableFuture
и выбрасывает исключение TimeoutException
в случае невозможности завершения потока в рамках указанного интервала. Может быть установлен на любой стадии создания CompletableFuture
:
1 | CompletableFuture.supplyAsync(...).thenApply(...).thenCombineAsync().onTimeout(...).thenApply(...).onTimeout(...); |
Метод completeOnTimeout
значительно привлекательнее - он позволяет использовать значение по умолчанию в случае окончания таймаута:
1 | CompletableFuture.supplyAsync(...).thenApply(...).completeOnTimeout(alternativeValue, ...).thenCombine(); |
Принятие и завершение
Метод thenAccept
можно назвать финальной остановкой для CompletableFuture
- он возвращает void
, а на вход получает Consumer
с типом предыдущего CompletableFuture
. Есть асинхронная версия метода.
Фабричное ожидание
Если нужно дождаться всех CompletableFuture
, то поможет метод CompletableFuture.allOf
, который вернет скомбинированный объект, который можно отдельно подождать:
1 | CompletableFuture.allOf(futuresArray).join(); |
Или можно дождаться первого завершившегося потока и выйти:
1 | CompletableFuture.anyOf(futuresArray).getNow(alternativeValue); |