Class BoundedConcurrency
It enables the parallel transformation of input elements, guaranteeing that all concurrent operations either complete and their results are gathered, or are fully cancelled and joined upon interruption or exception.
- Since:
- 9.2
-
Method Summary
Modifier and TypeMethodDescriptionconcurrently
(Function<? super I, ? extends O> work) Applieswork
on each input element concurrently and lazily.<T> T
race
(Collection<? extends Callable<? extends T>> tasks, Predicate<? super Throwable> isRecoverable) Racestasks
and returns the first success, then cancels the remaining.static BoundedConcurrency
withMaxConcurrency
(int maxConcurrency) Returns aBoundedConcurrency
usingmaxConcurrency
.static BoundedConcurrency
withMaxConcurrency
(int maxConcurrency, ThreadFactory threadFactory)
-
Method Details
-
withMaxConcurrency
Returns aBoundedConcurrency
usingmaxConcurrency
. Uses virtual threads to run concurrent work.- Throws:
IllegalArgumentException
- ifmaxConcurrency <= 0
-
withMaxConcurrency
public static BoundedConcurrency withMaxConcurrency(int maxConcurrency, ThreadFactory threadFactory) - Throws:
IllegalArgumentException
- ifmaxConcurrency <= 0
-
race
public <T> T race(Collection<? extends Callable<? extends T>> tasks, Predicate<? super Throwable> isRecoverable) Racestasks
and returns the first success, then cancels the remaining. Upon exception, theisRecoverable
predicate is tested to check whether the exception is recoverable (thus allowing the other tasks to continue to run).When all tasks throw recoverable exceptions, or if any task failed with unrecoverable exception, the recoverable exceptions are propagated as
suppressed
.- Parameters:
tasks
- at least one must be providedisRecoverable
- tests whether an exception is recoverable so that the other tasks should continue running.- Throws:
IllegalArgumentException
- iftasks
is emptyNullPointerException
- iftasks
orisRecoverable
is nullRuntimeException
- if all tasks failed, or any task failed with unrecoverable exception.
-
concurrently
Applieswork
on each input element concurrently and lazily.At any given time, at most
maxConcurrency
concurrent work are running.To avoid leaking virtual threads caused by upstream exceptions, input elements are first computed and consumed into a List before any concurrent work is started (such that if any upstream element throws, no virtual thread is ever started). But the result
BiStream
is lazy: concurrent work only starts upon requested by downstream. Specifically, if you short-circuit usingStream.findAny()
orBiStream.findAny()
, at mostmaxConcurrency
virtual threads will be started.Compared to the
Gatherers.mapConcurrent(int, java.util.function.Function<? super T, ? extends R>)
gatherer:mapConcurrent()
only interrupts and joins the on-the-fly virtual threads upon downstream exceptions, but is unable to interrupt or join when an upstream exception is thrown; whereasconcurrently()
will always interrupt and join. As a result, actions in the virtual threads happens-before actions after the collector returns or throws.mapConcurrent()
guarantees encounter-order. If an input element takes a long time or forever to process, it can potentially block or halt the program when there are more thanmaxConcurrency
elements following it, even ifmaxConcurrency - 1
virtual threads have already completed with the long-running task being the only virtual thread still running. Whereasconcurrently()
allowsmaxConcurrency
virtual threads to run concurrently regardless of input order. This means if you have a long-running virtual thread (for instance: a heart-beat worker or a monitoring subtask etc.), it won't reduce throughput or starve the workers after it.- If encounter order is important to you, consider using
BiStream.sorted(java.util.Comparator<? super K>, java.util.Comparator<? super V>)
or friends to re-introduce ordering.
Compared to
Parallelizer.inParallel(java.util.function.Function<? super I, ? extends O>)
:inParallel()
fails fast. When an exception is thrown, it interrupts on-the-fly threads, but doesn't wait for their completion. Thus there is no happens-before guarantee when an exception is thrown.concurrently()
on the other hand always joins the virtual threads and guarantees strong happens-before relationship.- The
inParallel()
collector blocks until all parallel tasks are complete before returning; whereasconcurrently()
utilizes Java 24 Gatherer and will stream results to the downstream as soon as one is computed. inParallel()
can be used with anyExecutorService
(for example you could useExecutors.newCachedThreadPool()
if virtual threads isn't important to you); whereasconcurrently()
strictly runs one task per thread, although you could use a customizedThreadFactory
.
-