Class Parallelizer
For example, the following code saves a stream of UserData
in parallel with at most 3
concurrent RPC calls at the same time:
new Parallelizer(executor, 3)
.parallelize(userDataStream.filter(UserData::isModified), userService::save);
Similar to parallel streams (and unlike executors), these sub-tasks are considered integral parts of one unit of work. Failure of any sub-task aborts the entire work, automatically. If an exception isn't fatal, the sub-task should catch and handle it.
How does it stack against parallel stream itself? The parallel stream counterpart to the above example use case may look like:
userDataStream.filter(UserData::isModified).parallel().forEach(userService::save);
A few key differences: - A parallel stream doesn't use arbitrary
ExecutorService
. It by default uses either the enclosingForkJoinPool
or the commonForkJoinPool
instance. - By running in a dedicated
ForkJoinPool
, a parallel stream can take a custom target concurrency, but it's not guaranteed to be max concurrency. - Parallel streams are for CPU-bound computations; while
Parallelizer
deals with IO-bound operations. parallelize()
can be interrupted, and can time out; parallel streams are uninterruptible.- When a task throws,
Parallelizer
dismisses pending tasks, and cancels all in-flight tasks (it's up to the user code to properly handle thread interruptions). So if a worker thread is waiting on some resource, it'll be interrupted without hanging the thread forever. Parallelizer
wraps exceptions thrown by the worker threads, making stack trace clearer.
And how do you choose between Parallelizer
and ExecutorService
?
Could you use something like the following instead?
ExecutorService pool = Executors.newFixedThreadPool(3);
try {
List<Future<?>> futures = userData
.filter(...)
.map(() -> pool.submit(() -> userService.save(data)))
.collect(toList());
for (Future<?> future : futures) {
future.get();
}
} finally {
pool.shutdownNow();
}
Some differences for consideration:- Memory Concern
- The thread pool queues all pending tasks. For large streams (like reading hundreds of thousands of task input data from a file), it can run out of memory quickly.
- Storing all the future objects in a list may also use up too much memory for large number of sub tasks.
- Exception Handling (and fail fast)
- Executors treat submitted tasks as independent. One task may fail and the other tasks
won't be affected.
But for co-dependent sub tasks that were parallelized only for performance reasons (as in parallel streams), you'll want to abort the whole parallel pipeline upon any critical exception, the same as if they were run sequentially.
Aborting a parallel pipeline requires complex concurrent logic to coordinate between the sub tasks and the executor in order to dismiss pending sub tasks and also to cancel sub tasks that are already running. Otherwise, when an exception is thrown from a sub task, the other left-over sub tasks will continue to run, some may even hang indefinitely.
- Automatic cancellation propagation. When
parallelize()
is interrupted, all running tasks will be automatically canceled; all pending tasks automatically dismissed. - You may resort to shutting down the executor to achieve similar result (cancelling the
left-over sub tasks). Although even knowing whether a sub task has failed isn't trivial.
The above code example uses
Future.get()
, but it won't help if a sub task submitted earlier is still running or being blocked, while a later-submitted sub task has failed. - And,
ExecutorService
s are often set up centrally and shared among different classes and components in the application. You may not have the option to create and shut down a thread pool of your own.
- Executors treat submitted tasks as independent. One task may fail and the other tasks
won't be affected.
Stream parameters used in this class are always consumed in the calling thread and don't have to be thread safe.
- Since:
- 1.1
-
Constructor Summary
ConstructorDescriptionParallelizer
(ExecutorService executor, int maxInFlight) Constructs aParallelizer
that runs tasks withexecutor
. -
Method Summary
Modifier and TypeMethodDescriptioninParallel
(Function<? super I, ? extends O> concurrentFunction) static Parallelizer
newDaemonParallelizer
(int maxInFlight) Returns a newParallelizer
based on an ExecutorService that exits when the application is complete.<T> void
parallelize
(Iterator<? extends T> inputs, Consumer<? super T> consumer) Runsconsumer
forinputs
in parallel and blocks until either all tasks have finished, or any exception is thrown upon which all pending tasks are canceled (but the method returns without waiting for the tasks to respond to cancellation).<T> void
parallelize
(Iterator<? extends T> inputs, Consumer<? super T> consumer, long heartbeatTimeout, TimeUnit timeUnit) Runsconsumer
forinputs
in parallel and blocks until either all tasks have finished, timeout is triggered, or any exception is thrown upon which all pending tasks are canceled (but the method returns without waiting for the tasks to respond to cancellation).<T> void
parallelize
(Iterator<? extends T> inputs, Consumer<? super T> consumer, Duration heartbeatTimeout) Runsconsumer
forinputs
in parallel and blocks until either all tasks have finished, timeout is triggered, or any exception is thrown upon which all pending tasks are canceled (but the method returns without waiting for the tasks to respond to cancellation).void
parallelize
(Stream<? extends Runnable> tasks) Runstasks
in parallel and blocks until either all tasks have finished, or any exception is thrown upon which all pending tasks are canceled (but the method returns without waiting for the tasks to respond to cancellation).void
parallelize
(Stream<? extends Runnable> tasks, long heartbeatTimeout, TimeUnit timeUnit) Runstasks
in parallel and blocks uninterruptibly until either all tasks have finished, timeout is triggered, or any exception is thrown upon which all pending tasks are canceled (but the method returns without waiting for the tasks to respond to cancellation).void
parallelize
(Stream<? extends Runnable> tasks, Duration heartbeatTimeout) Runstasks
in parallel and blocks uninterruptibly until either all tasks have finished, timeout is triggered, or any exception is thrown upon which all pending tasks are canceled (but the method returns without waiting for the tasks to respond to cancellation).<T> void
parallelize
(Stream<? extends T> inputs, Consumer<? super T> consumer) Runsconsumer
forinputs
in parallel and blocks until either all tasks have finished, or any exception is thrown upon which all pending tasks are canceled (but the method returns without waiting for the tasks to respond to cancellation).<T> void
parallelize
(Stream<? extends T> inputs, Consumer<? super T> consumer, long heartbeatTimeout, TimeUnit timeUnit) Runsconsumer
forinputs
in parallel and blocks until either all tasks have finished, timeout is triggered, or any exception is thrown upon which all pending tasks are canceled (but the method returns without waiting for the tasks to respond to cancellation).<T> void
parallelize
(Stream<? extends T> inputs, Consumer<? super T> consumer, Duration heartbeatTimeout) Runsconsumer
forinputs
in parallel and blocks until either all tasks have finished, timeout is triggered, or any exception is thrown upon which all pending tasks are canceled (but the method returns without waiting for the tasks to respond to cancellation).<T> void
parallelizeUninterruptibly
(Iterator<? extends T> inputs, Consumer<? super T> consumer) Runsconsumer
forinputs
in parallel and blocks uninterruptibly until either all tasks have finished, or any exception is thrown upon which all pending tasks are canceled (but the method returns without waiting for the tasks to respond to cancellation).void
parallelizeUninterruptibly
(Stream<? extends Runnable> tasks) Runstasks
in parallel and blocks uninterruptibly until either all tasks have finished, or any exception is thrown upon which all pending tasks are canceled (but the method returns without waiting for the tasks to respond to cancellation).<T> void
parallelizeUninterruptibly
(Stream<? extends T> inputs, Consumer<? super T> consumer) Runsconsumer
forinputs
in parallel and blocks uninterruptibly until either all tasks have finished, or any exception is thrown upon which all pending tasks are canceled (but the method returns without waiting for the tasks to respond to cancellation).static Parallelizer
virtualThreadParallelizer
(int maxInFlight) Returns aParallelizer
using virtual threads for running tasks, with at mostmaxInFlight
tasks running concurrently.
-
Constructor Details
-
Parallelizer
Constructs aParallelizer
that runs tasks withexecutor
. At any given time, at mostmaxInFlight
tasks are allowed to be submitted toexecutor
.Note that a task being submitted to
executor
doesn't guarantee immediate execution, if for example all worker threads inexecutor
are busy.
-
-
Method Details
-
virtualThreadParallelizer
Returns aParallelizer
using virtual threads for running tasks, with at mostmaxInFlight
tasks running concurrently.Only applicable in JDK 21 (throws if below JDK 21).
- Since:
- 7.2
-
newDaemonParallelizer
Returns a newParallelizer
based on an ExecutorService that exits when the application is complete. It does so by using daemon threads.Typically used by the
main()
method or as a static final field.- Since:
- 6.5
-
parallelize
public <T> void parallelize(Stream<? extends T> inputs, Consumer<? super T> consumer) throws InterruptedException Runsconsumer
forinputs
in parallel and blocks until either all tasks have finished, or any exception is thrown upon which all pending tasks are canceled (but the method returns without waiting for the tasks to respond to cancellation).The
inputs
stream is consumed only in the calling thread in iteration order.- Parameters:
inputs
- the inputs to be passed toconsumer
consumer
- to be parallelized- Throws:
InterruptedException
- if the thread is interrupted while waiting.
-
parallelize
public <T> void parallelize(Iterator<? extends T> inputs, Consumer<? super T> consumer) throws InterruptedException Runsconsumer
forinputs
in parallel and blocks until either all tasks have finished, or any exception is thrown upon which all pending tasks are canceled (but the method returns without waiting for the tasks to respond to cancellation).The
inputs
stream is consumed only in the calling thread in iteration order.- Parameters:
inputs
- the inputs to be passed toconsumer
consumer
- to be parallelized- Throws:
InterruptedException
- if the thread is interrupted while waiting.
-
parallelize
public <T> void parallelize(Stream<? extends T> inputs, Consumer<? super T> consumer, Duration heartbeatTimeout) throws TimeoutException, InterruptedException Runsconsumer
forinputs
in parallel and blocks until either all tasks have finished, timeout is triggered, or any exception is thrown upon which all pending tasks are canceled (but the method returns without waiting for the tasks to respond to cancellation).The
inputs
stream is consumed only in the calling thread in iteration order.- Parameters:
inputs
- the inputs to be passed toconsumer
consumer
- to be parallelizedheartbeatTimeout
- at least one task needs to complete everyheartbeatTimeout
.- Throws:
InterruptedException
- if the thread is interrupted while waiting.TimeoutException
- if the configured timeout is exceeded while waiting.
-
parallelize
public <T> void parallelize(Stream<? extends T> inputs, Consumer<? super T> consumer, long heartbeatTimeout, TimeUnit timeUnit) throws TimeoutException, InterruptedException Runsconsumer
forinputs
in parallel and blocks until either all tasks have finished, timeout is triggered, or any exception is thrown upon which all pending tasks are canceled (but the method returns without waiting for the tasks to respond to cancellation).The
inputs
stream is consumed only in the calling thread in iteration order.- Parameters:
inputs
- the inputs to be passed toconsumer
consumer
- to be parallelizedheartbeatTimeout
- at least one task needs to complete everyheartbeatTimeout
.timeUnit
- the unit ofheartbeatTimeout
- Throws:
InterruptedException
- if the thread is interrupted while waiting.TimeoutException
- if the configured timeout is exceeded while waiting.
-
parallelize
public <T> void parallelize(Iterator<? extends T> inputs, Consumer<? super T> consumer, Duration heartbeatTimeout) throws TimeoutException, InterruptedException Runsconsumer
forinputs
in parallel and blocks until either all tasks have finished, timeout is triggered, or any exception is thrown upon which all pending tasks are canceled (but the method returns without waiting for the tasks to respond to cancellation).The
inputs
stream is consumed only in the calling thread in iteration order.- Parameters:
inputs
- the inputs to be passed toconsumer
consumer
- to be parallelizedheartbeatTimeout
- at least one task needs to complete everyheartbeatTimeout
.- Throws:
InterruptedException
- if the thread is interrupted while waiting.TimeoutException
- if the configured timeout is exceeded while waiting.
-
parallelize
public <T> void parallelize(Iterator<? extends T> inputs, Consumer<? super T> consumer, long heartbeatTimeout, TimeUnit timeUnit) throws TimeoutException, InterruptedException Runsconsumer
forinputs
in parallel and blocks until either all tasks have finished, timeout is triggered, or any exception is thrown upon which all pending tasks are canceled (but the method returns without waiting for the tasks to respond to cancellation).The
inputs
stream is consumed only in the calling thread in iteration order.- Parameters:
inputs
- the inputs to be passed toconsumer
consumer
- to be parallelizedheartbeatTimeout
- at least one task needs to complete everyheartbeatTimeout
.timeUnit
- the unit ofheartbeatTimeout
- Throws:
InterruptedException
- if the thread is interrupted while waiting.TimeoutException
- if the configured timeout is exceeded while waiting.
-
parallelizeUninterruptibly
public <T> void parallelizeUninterruptibly(Stream<? extends T> inputs, Consumer<? super T> consumer) Runsconsumer
forinputs
in parallel and blocks uninterruptibly until either all tasks have finished, or any exception is thrown upon which all pending tasks are canceled (but the method returns without waiting for the tasks to respond to cancellation).The
inputs
stream is consumed only in the calling thread in iteration order.- Parameters:
inputs
- the inputs to be passed toconsumer
consumer
- to be parallelized
-
parallelizeUninterruptibly
public <T> void parallelizeUninterruptibly(Iterator<? extends T> inputs, Consumer<? super T> consumer) Runsconsumer
forinputs
in parallel and blocks uninterruptibly until either all tasks have finished, or any exception is thrown upon which all pending tasks are canceled (but the method returns without waiting for the tasks to respond to cancellation).The
inputs
stream is consumed only in the calling thread in iteration order.- Parameters:
inputs
- the inputs to be passed toconsumer
consumer
- to be parallelized
-
parallelize
Runstasks
in parallel and blocks until either all tasks have finished, or any exception is thrown upon which all pending tasks are canceled (but the method returns without waiting for the tasks to respond to cancellation).The
tasks
stream is consumed only in the calling thread in iteration order.- Throws:
InterruptedException
- if the thread is interrupted while waiting.
-
parallelize
public void parallelize(Stream<? extends Runnable> tasks, Duration heartbeatTimeout) throws TimeoutException, InterruptedException Runstasks
in parallel and blocks uninterruptibly until either all tasks have finished, timeout is triggered, or any exception is thrown upon which all pending tasks are canceled (but the method returns without waiting for the tasks to respond to cancellation).The
tasks
stream is consumed only in the calling thread in iteration order.- Parameters:
tasks
- the tasks to be parallelizedheartbeatTimeout
- at least one task needs to complete everyheartbeatTimeout
.- Throws:
InterruptedException
- if the thread is interrupted while waiting.TimeoutException
- if timeout exceeded while waiting.
-
parallelize
public void parallelize(Stream<? extends Runnable> tasks, long heartbeatTimeout, TimeUnit timeUnit) throws TimeoutException, InterruptedException Runstasks
in parallel and blocks uninterruptibly until either all tasks have finished, timeout is triggered, or any exception is thrown upon which all pending tasks are canceled (but the method returns without waiting for the tasks to respond to cancellation).The
tasks
stream is consumed only in the calling thread in iteration order.- Parameters:
tasks
- the tasks to be parallelizedheartbeatTimeout
- at least one task needs to complete everyheartbeatTimeout
.timeUnit
- the unit ofheartbeatTimeout
- Throws:
InterruptedException
- if the thread is interrupted while waiting.TimeoutException
- if timeout exceeded while waiting.
-
parallelizeUninterruptibly
Runstasks
in parallel and blocks uninterruptibly until either all tasks have finished, or any exception is thrown upon which all pending tasks are canceled (but the method returns without waiting for the tasks to respond to cancellation). -
inParallel
public <I,O> Collector<I,?, inParallelBiStream<I, O>> (Function<? super I, ? extends O> concurrentFunction) Returns aCollector
that runsconcurrentFunction
in parallel using thisParallelizer
and returns the inputs and outputs in aBiStream
, in encounter order of the input elements.For example:
ImmutableListMultimap<String, Asset> resourceAssets = resources.stream() .collect(parallelizer.inParallel(this::listAssets)) .collect(flatteningToImmutableListMultimap(List::stream));
In Java 20 using structured concurrency, it can be implemented equivalently as in:
ImmutableListMultimap<String, Asset> resourceAssets; try (var scope = new StructuredTaskScope.ShutdownOnFailure()) { ImmutableList<Future<?>> results = resources.stream() .map(resource -> scope.fork(() -> listAssets(resource))) .collect(toImmutableList()); scope.join(); resourceAssets = BiStream.zip(resources, results) .mapValues(Future::resultNow) .collect(flatteningToImmutableListMultimap(List::stream)); }
- Parameters:
concurrentFunction
- a function that's safe to be run concurrently, and is usually IO-intensive (such as an outgoing RPC or reading distributed storage).- Since:
- 6.5
-