Class Parallelizer
For example, the following code saves a stream of UserData in parallel with at most 3
concurrent RPC calls at the same time:
new Parallelizer(executor, 3)
.parallelize(userDataStream.filter(UserData::isModified), userService::save);
Similar to parallel streams (and unlike executors), these sub-tasks are considered integral parts of one unit of work. Failure of any sub-task aborts the entire work, automatically. If an exception isn't fatal, the sub-task should catch and handle it.
How does it stack against parallel stream itself? The parallel stream counterpart to the above example use case may look like:
userDataStream.filter(UserData::isModified).parallel().forEach(userService::save);
A few key differences: - A parallel stream doesn't use arbitrary
ExecutorService. It by default uses either the enclosingForkJoinPoolor the commonForkJoinPoolinstance. - By running in a dedicated
ForkJoinPool, a parallel stream can take a custom target concurrency, but it's not guaranteed to be max concurrency. - Parallel streams are for CPU-bound computations; while
Parallelizerdeals with IO-bound operations. parallelize()can be interrupted, and can time out; parallel streams are uninterruptible.- When a task throws,
Parallelizerdismisses pending tasks, and cancels all in-flight tasks (it's up to the user code to properly handle thread interruptions). So if a worker thread is waiting on some resource, it'll be interrupted without hanging the thread forever. Parallelizerwraps exceptions thrown by the worker threads, making stack trace clearer.
And how do you choose between Parallelizer and ExecutorService?
Could you use something like the following instead?
ExecutorService pool = Executors.newFixedThreadPool(3);
try {
List<Future<?>> futures = userData
.filter(...)
.map(() -> pool.submit(() -> userService.save(data)))
.collect(toList());
for (Future<?> future : futures) {
future.get();
}
} finally {
pool.shutdownNow();
}
Some differences for consideration:- Memory Concern
- The thread pool queues all pending tasks. For large streams (like reading hundreds of thousands of task input data from a file), it can run out of memory quickly.
- Storing all the future objects in a list may also use up too much memory for large number of sub tasks.
- Exception Handling (and fail fast)
- Executors treat submitted tasks as independent. One task may fail and the other tasks
won't be affected.
But for co-dependent sub tasks that were parallelized only for performance reasons (as in parallel streams), you'll want to abort the whole parallel pipeline upon any critical exception, the same as if they were run sequentially.
Aborting a parallel pipeline requires complex concurrent logic to coordinate between the sub tasks and the executor in order to dismiss pending sub tasks and also to cancel sub tasks that are already running. Otherwise, when an exception is thrown from a sub task, the other left-over sub tasks will continue to run, some may even hang indefinitely.
- Automatic cancellation propagation. When
parallelize()is interrupted, all running tasks will be automatically canceled; all pending tasks automatically dismissed. - You may resort to shutting down the executor to achieve similar result (cancelling the
left-over sub tasks). Although even knowing whether a sub task has failed isn't trivial.
The above code example uses
Future.get(), but it won't help if a sub task submitted earlier is still running or being blocked, while a later-submitted sub task has failed. - And,
ExecutorServices are often set up centrally and shared among different classes and components in the application. You may not have the option to create and shut down a thread pool of your own.
- Executors treat submitted tasks as independent. One task may fail and the other tasks
won't be affected.
Stream parameters used in this class are always consumed in the calling thread and don't have to be thread safe.
- Since:
- 1.1
-
Constructor Summary
ConstructorsConstructorDescriptionParallelizer(ExecutorService executor, int maxConcurrency) Constructs aParallelizerthat runs tasks withexecutor. -
Method Summary
Modifier and TypeMethodDescriptioninParallel(Function<? super I, ? extends O> concurrentFunction) <T> voidparallelize(Iterator<? extends T> inputs, Consumer<? super T> consumer) Runsconsumerforinputsin parallel and blocks until either all tasks have finished, or any exception is thrown upon which all pending tasks are canceled (but the method returns without waiting for the tasks to respond to cancellation).<T> voidparallelize(Iterator<? extends T> inputs, Consumer<? super T> consumer, long heartbeatTimeout, TimeUnit timeUnit) Runsconsumerforinputsin parallel and blocks until either all tasks have finished, timeout is triggered, or any exception is thrown upon which all pending tasks are canceled (but the method returns without waiting for the tasks to respond to cancellation).<T> voidparallelize(Iterator<? extends T> inputs, Consumer<? super T> consumer, Duration heartbeatTimeout) Runsconsumerforinputsin parallel and blocks until either all tasks have finished, timeout is triggered, or any exception is thrown upon which all pending tasks are canceled (but the method returns without waiting for the tasks to respond to cancellation).voidparallelize(Stream<? extends Runnable> tasks) Runstasksin parallel and blocks until either all tasks have finished, or any exception is thrown upon which all pending tasks are canceled (but the method returns without waiting for the tasks to respond to cancellation).voidparallelize(Stream<? extends Runnable> tasks, long heartbeatTimeout, TimeUnit timeUnit) Runstasksin parallel and blocks uninterruptibly until either all tasks have finished, timeout is triggered, or any exception is thrown upon which all pending tasks are canceled (but the method returns without waiting for the tasks to respond to cancellation).voidparallelize(Stream<? extends Runnable> tasks, Duration heartbeatTimeout) Runstasksin parallel and blocks uninterruptibly until either all tasks have finished, timeout is triggered, or any exception is thrown upon which all pending tasks are canceled (but the method returns without waiting for the tasks to respond to cancellation).<T> voidparallelize(Stream<? extends T> inputs, Consumer<? super T> consumer) Runsconsumerforinputsin parallel and blocks until either all tasks have finished, or any exception is thrown upon which all pending tasks are canceled (but the method returns without waiting for the tasks to respond to cancellation).<T> voidparallelize(Stream<? extends T> inputs, Consumer<? super T> consumer, long heartbeatTimeout, TimeUnit timeUnit) Runsconsumerforinputsin parallel and blocks until either all tasks have finished, timeout is triggered, or any exception is thrown upon which all pending tasks are canceled (but the method returns without waiting for the tasks to respond to cancellation).<T> voidparallelize(Stream<? extends T> inputs, Consumer<? super T> consumer, Duration heartbeatTimeout) Runsconsumerforinputsin parallel and blocks until either all tasks have finished, timeout is triggered, or any exception is thrown upon which all pending tasks are canceled (but the method returns without waiting for the tasks to respond to cancellation).<T> voidparallelizeUninterruptibly(Iterator<? extends T> inputs, Consumer<? super T> consumer) Runsconsumerforinputsin parallel and blocks uninterruptibly until either all tasks have finished, or any exception is thrown upon which all pending tasks are canceled (but the method returns without waiting for the tasks to respond to cancellation).voidparallelizeUninterruptibly(Stream<? extends Runnable> tasks) Runstasksin parallel and blocks uninterruptibly until either all tasks have finished, or any exception is thrown upon which all pending tasks are canceled (but the method returns without waiting for the tasks to respond to cancellation).<T> voidparallelizeUninterruptibly(Stream<? extends T> inputs, Consumer<? super T> consumer) Runsconsumerforinputsin parallel and blocks uninterruptibly until either all tasks have finished, or any exception is thrown upon which all pending tasks are canceled (but the method returns without waiting for the tasks to respond to cancellation).
-
Constructor Details
-
Parallelizer
Constructs aParallelizerthat runs tasks withexecutor. At any given time, at mostmaxConcurrencytasks are allowed to be submitted toexecutor.Note that a task being submitted to
executordoesn't guarantee immediate execution, if for example all worker threads inexecutorare busy.
-
-
Method Details
-
parallelize
public <T> void parallelize(Stream<? extends T> inputs, Consumer<? super T> consumer) throws InterruptedException Runsconsumerforinputsin parallel and blocks until either all tasks have finished, or any exception is thrown upon which all pending tasks are canceled (but the method returns without waiting for the tasks to respond to cancellation).The
inputsstream is consumed only in the calling thread in iteration order.- Parameters:
inputs- the inputs to be passed toconsumerconsumer- to be parallelized- Throws:
InterruptedException- if the thread is interrupted while waiting.
-
parallelize
public <T> void parallelize(Iterator<? extends T> inputs, Consumer<? super T> consumer) throws InterruptedException Runsconsumerforinputsin parallel and blocks until either all tasks have finished, or any exception is thrown upon which all pending tasks are canceled (but the method returns without waiting for the tasks to respond to cancellation).The
inputsstream is consumed only in the calling thread in iteration order.- Parameters:
inputs- the inputs to be passed toconsumerconsumer- to be parallelized- Throws:
InterruptedException- if the thread is interrupted while waiting.
-
parallelize
public <T> void parallelize(Stream<? extends T> inputs, Consumer<? super T> consumer, Duration heartbeatTimeout) throws TimeoutException, InterruptedException Runsconsumerforinputsin parallel and blocks until either all tasks have finished, timeout is triggered, or any exception is thrown upon which all pending tasks are canceled (but the method returns without waiting for the tasks to respond to cancellation).The
inputsstream is consumed only in the calling thread in iteration order.- Parameters:
inputs- the inputs to be passed toconsumerconsumer- to be parallelizedheartbeatTimeout- at least one task needs to complete everyheartbeatTimeout.- Throws:
InterruptedException- if the thread is interrupted while waiting.TimeoutException- if the configured timeout is exceeded while waiting.
-
parallelize
public <T> void parallelize(Stream<? extends T> inputs, Consumer<? super T> consumer, long heartbeatTimeout, TimeUnit timeUnit) throws TimeoutException, InterruptedException Runsconsumerforinputsin parallel and blocks until either all tasks have finished, timeout is triggered, or any exception is thrown upon which all pending tasks are canceled (but the method returns without waiting for the tasks to respond to cancellation).The
inputsstream is consumed only in the calling thread in iteration order.- Parameters:
inputs- the inputs to be passed toconsumerconsumer- to be parallelizedheartbeatTimeout- at least one task needs to complete everyheartbeatTimeout.timeUnit- the unit ofheartbeatTimeout- Throws:
InterruptedException- if the thread is interrupted while waiting.TimeoutException- if the configured timeout is exceeded while waiting.
-
parallelize
public <T> void parallelize(Iterator<? extends T> inputs, Consumer<? super T> consumer, Duration heartbeatTimeout) throws TimeoutException, InterruptedException Runsconsumerforinputsin parallel and blocks until either all tasks have finished, timeout is triggered, or any exception is thrown upon which all pending tasks are canceled (but the method returns without waiting for the tasks to respond to cancellation).The
inputsstream is consumed only in the calling thread in iteration order.- Parameters:
inputs- the inputs to be passed toconsumerconsumer- to be parallelizedheartbeatTimeout- at least one task needs to complete everyheartbeatTimeout.- Throws:
InterruptedException- if the thread is interrupted while waiting.TimeoutException- if the configured timeout is exceeded while waiting.
-
parallelize
public <T> void parallelize(Iterator<? extends T> inputs, Consumer<? super T> consumer, long heartbeatTimeout, TimeUnit timeUnit) throws TimeoutException, InterruptedException Runsconsumerforinputsin parallel and blocks until either all tasks have finished, timeout is triggered, or any exception is thrown upon which all pending tasks are canceled (but the method returns without waiting for the tasks to respond to cancellation).The
inputsstream is consumed only in the calling thread in iteration order.- Parameters:
inputs- the inputs to be passed toconsumerconsumer- to be parallelizedheartbeatTimeout- at least one task needs to complete everyheartbeatTimeout.timeUnit- the unit ofheartbeatTimeout- Throws:
InterruptedException- if the thread is interrupted while waiting.TimeoutException- if the configured timeout is exceeded while waiting.
-
parallelizeUninterruptibly
public <T> void parallelizeUninterruptibly(Stream<? extends T> inputs, Consumer<? super T> consumer) Runsconsumerforinputsin parallel and blocks uninterruptibly until either all tasks have finished, or any exception is thrown upon which all pending tasks are canceled (but the method returns without waiting for the tasks to respond to cancellation).The
inputsstream is consumed only in the calling thread in iteration order.- Parameters:
inputs- the inputs to be passed toconsumerconsumer- to be parallelized
-
parallelizeUninterruptibly
public <T> void parallelizeUninterruptibly(Iterator<? extends T> inputs, Consumer<? super T> consumer) Runsconsumerforinputsin parallel and blocks uninterruptibly until either all tasks have finished, or any exception is thrown upon which all pending tasks are canceled (but the method returns without waiting for the tasks to respond to cancellation).The
inputsstream is consumed only in the calling thread in iteration order.- Parameters:
inputs- the inputs to be passed toconsumerconsumer- to be parallelized
-
parallelize
Runstasksin parallel and blocks until either all tasks have finished, or any exception is thrown upon which all pending tasks are canceled (but the method returns without waiting for the tasks to respond to cancellation).The
tasksstream is consumed only in the calling thread in iteration order.- Throws:
InterruptedException- if the thread is interrupted while waiting.
-
parallelize
public void parallelize(Stream<? extends Runnable> tasks, Duration heartbeatTimeout) throws TimeoutException, InterruptedException Runstasksin parallel and blocks uninterruptibly until either all tasks have finished, timeout is triggered, or any exception is thrown upon which all pending tasks are canceled (but the method returns without waiting for the tasks to respond to cancellation).The
tasksstream is consumed only in the calling thread in iteration order.- Parameters:
tasks- the tasks to be parallelizedheartbeatTimeout- at least one task needs to complete everyheartbeatTimeout.- Throws:
InterruptedException- if the thread is interrupted while waiting.TimeoutException- if timeout exceeded while waiting.
-
parallelize
public void parallelize(Stream<? extends Runnable> tasks, long heartbeatTimeout, TimeUnit timeUnit) throws TimeoutException, InterruptedException Runstasksin parallel and blocks uninterruptibly until either all tasks have finished, timeout is triggered, or any exception is thrown upon which all pending tasks are canceled (but the method returns without waiting for the tasks to respond to cancellation).The
tasksstream is consumed only in the calling thread in iteration order.- Parameters:
tasks- the tasks to be parallelizedheartbeatTimeout- at least one task needs to complete everyheartbeatTimeout.timeUnit- the unit ofheartbeatTimeout- Throws:
InterruptedException- if the thread is interrupted while waiting.TimeoutException- if timeout exceeded while waiting.
-
parallelizeUninterruptibly
Runstasksin parallel and blocks uninterruptibly until either all tasks have finished, or any exception is thrown upon which all pending tasks are canceled (but the method returns without waiting for the tasks to respond to cancellation). -
inParallel
public <I,O> Collector<I, ?, BiStream<I,O>> inParallel(Function<? super I, ? extends O> concurrentFunction) Returns aCollectorthat runsconcurrentFunctionin parallel using thisParallelizerand returns the inputs and outputs in aBiStream, in encounter order of the input elements.For example:
int concurrency = ...; ImmutableListMultimap<String, Asset> resourceAssets = resources.stream() .collect(withMaxConcurrency(concurrency).inParallel(this::listAssets)) .collect(flatteningToImmutableListMultimap(List::stream));In Java 22 using structured concurrency, it can be implemented equivalently as in:
ImmutableListMultimap<String, Asset> resourceAssets; try (var scope = new StructuredTaskScope.ShutdownOnFailure()) { ImmutableList<Future<?>> results = resources.stream() .map(resource -> scope.fork(() -> listAssets(resource))) .collect(toImmutableList()); scope.join(); resourceAssets = BiStream.zip(resources, results) .mapValues(Future::resultNow) .collect(flatteningToImmutableListMultimap(List::stream)); }- Since:
- 6.5
-