Class Parallelizer

java.lang.Object
com.google.mu.util.concurrent.Parallelizer

public final class Parallelizer extends Object
Utility to support structured concurrency for IO-bound subtasks of a single unit of work, while limiting the max concurrency.

For example, the following code saves a stream of UserData in parallel with at most 3 concurrent RPC calls at the same time:


 new Parallelizer(executor, 3)
     .parallelize(userDataStream.filter(UserData::isModified), userService::save);
 

Similar to parallel streams (and unlike executors), these sub-tasks are considered integral parts of one unit of work. Failure of any sub-task aborts the entire work, automatically. If an exception isn't fatal, the sub-task should catch and handle it.

How does it stack against parallel stream itself? The parallel stream counterpart to the above example use case may look like:

  
   userDataStream.filter(UserData::isModified).parallel().forEach(userService::save);
 
A few key differences:
  • A parallel stream doesn't use arbitrary ExecutorService. It by default uses either the enclosing ForkJoinPool or the common ForkJoinPool instance.
  • By running in a dedicated ForkJoinPool, a parallel stream can take a custom target concurrency, but it's not guaranteed to be max concurrency.
  • Parallel streams are for CPU-bound computations; while Parallelizer deals with IO-bound operations.
  • parallelize() can be interrupted, and can time out; parallel streams are uninterruptible.
  • When a task throws, Parallelizer dismisses pending tasks, and cancels all in-flight tasks (it's up to the user code to properly handle thread interruptions). So if a worker thread is waiting on some resource, it'll be interrupted without hanging the thread forever.
  • Parallelizer wraps exceptions thrown by the worker threads, making stack trace clearer.

And how do you choose between Parallelizer and ExecutorService? Could you use something like the following instead?

  
   ExecutorService pool = Executors.newFixedThreadPool(3);
   try {
     List<Future<?>> futures = userData
         .filter(...)
         .map(() -> pool.submit(() -> userService.save(data)))
         .collect(toList());
     for (Future<?> future : futures) {
       future.get();
     }
   } finally {
     pool.shutdownNow();
   }
 
Some differences for consideration:
  • Memory Concern
    • The thread pool queues all pending tasks. For large streams (like reading hundreds of thousands of task input data from a file), it can run out of memory quickly.
    • Storing all the future objects in a list may also use up too much memory for large number of sub tasks.
  • Exception Handling (and fail fast)
    • Executors treat submitted tasks as independent. One task may fail and the other tasks won't be affected.

      But for co-dependent sub tasks that were parallelized only for performance reasons (as in parallel streams), you'll want to abort the whole parallel pipeline upon any critical exception, the same as if they were run sequentially.

      Aborting a parallel pipeline requires complex concurrent logic to coordinate between the sub tasks and the executor in order to dismiss pending sub tasks and also to cancel sub tasks that are already running. Otherwise, when an exception is thrown from a sub task, the other left-over sub tasks will continue to run, some may even hang indefinitely.

    • Automatic cancellation propagation. When parallelize() is interrupted, all running tasks will be automatically canceled; all pending tasks automatically dismissed.
    • You may resort to shutting down the executor to achieve similar result (cancelling the left-over sub tasks). Although even knowing whether a sub task has failed isn't trivial. The above code example uses Future.get(), but it won't help if a sub task submitted earlier is still running or being blocked, while a later-submitted sub task has failed.
    • And, ExecutorServices are often set up centrally and shared among different classes and components in the application. You may not have the option to create and shut down a thread pool of your own.

Stream parameters used in this class are always consumed in the calling thread and don't have to be thread safe.

Since:
1.1
  • Constructor Summary

    Constructors
    Constructor
    Description
    Parallelizer(ExecutorService executor, int maxInFlight)
    Constructs a Parallelizer that runs tasks with executor.
  • Method Summary

    Modifier and Type
    Method
    Description
    <I, O> Collector<I,?,BiStream<I,O>>
    inParallel(Function<? super I,? extends O> concurrentFunction)
    Returns a Collector that runs concurrentFunction in parallel using this Parallelizer and returns the inputs and outputs in a BiStream, in encounter order of the input elements.
    newDaemonParallelizer(int maxInFlight)
    Returns a new Parallelizer based on an ExecutorService that exits when the application is complete.
    <T> void
    parallelize(Iterator<? extends T> inputs, Consumer<? super T> consumer)
    Runs consumer for inputs in parallel and blocks until either all tasks have finished, or any exception is thrown upon which all pending tasks are canceled (but the method returns without waiting for the tasks to respond to cancellation).
    <T> void
    parallelize(Iterator<? extends T> inputs, Consumer<? super T> consumer, long heartbeatTimeout, TimeUnit timeUnit)
    Runs consumer for inputs in parallel and blocks until either all tasks have finished, timeout is triggered, or any exception is thrown upon which all pending tasks are canceled (but the method returns without waiting for the tasks to respond to cancellation).
    <T> void
    parallelize(Iterator<? extends T> inputs, Consumer<? super T> consumer, Duration heartbeatTimeout)
    Runs consumer for inputs in parallel and blocks until either all tasks have finished, timeout is triggered, or any exception is thrown upon which all pending tasks are canceled (but the method returns without waiting for the tasks to respond to cancellation).
    void
    parallelize(Stream<? extends Runnable> tasks)
    Runs tasks in parallel and blocks until either all tasks have finished, or any exception is thrown upon which all pending tasks are canceled (but the method returns without waiting for the tasks to respond to cancellation).
    void
    parallelize(Stream<? extends Runnable> tasks, long heartbeatTimeout, TimeUnit timeUnit)
    Runs tasks in parallel and blocks uninterruptibly until either all tasks have finished, timeout is triggered, or any exception is thrown upon which all pending tasks are canceled (but the method returns without waiting for the tasks to respond to cancellation).
    void
    parallelize(Stream<? extends Runnable> tasks, Duration heartbeatTimeout)
    Runs tasks in parallel and blocks uninterruptibly until either all tasks have finished, timeout is triggered, or any exception is thrown upon which all pending tasks are canceled (but the method returns without waiting for the tasks to respond to cancellation).
    <T> void
    parallelize(Stream<? extends T> inputs, Consumer<? super T> consumer)
    Runs consumer for inputs in parallel and blocks until either all tasks have finished, or any exception is thrown upon which all pending tasks are canceled (but the method returns without waiting for the tasks to respond to cancellation).
    <T> void
    parallelize(Stream<? extends T> inputs, Consumer<? super T> consumer, long heartbeatTimeout, TimeUnit timeUnit)
    Runs consumer for inputs in parallel and blocks until either all tasks have finished, timeout is triggered, or any exception is thrown upon which all pending tasks are canceled (but the method returns without waiting for the tasks to respond to cancellation).
    <T> void
    parallelize(Stream<? extends T> inputs, Consumer<? super T> consumer, Duration heartbeatTimeout)
    Runs consumer for inputs in parallel and blocks until either all tasks have finished, timeout is triggered, or any exception is thrown upon which all pending tasks are canceled (but the method returns without waiting for the tasks to respond to cancellation).
    <T> void
    parallelizeUninterruptibly(Iterator<? extends T> inputs, Consumer<? super T> consumer)
    Runs consumer for inputs in parallel and blocks uninterruptibly until either all tasks have finished, or any exception is thrown upon which all pending tasks are canceled (but the method returns without waiting for the tasks to respond to cancellation).
    void
    Runs tasks in parallel and blocks uninterruptibly until either all tasks have finished, or any exception is thrown upon which all pending tasks are canceled (but the method returns without waiting for the tasks to respond to cancellation).
    <T> void
    parallelizeUninterruptibly(Stream<? extends T> inputs, Consumer<? super T> consumer)
    Runs consumer for inputs in parallel and blocks uninterruptibly until either all tasks have finished, or any exception is thrown upon which all pending tasks are canceled (but the method returns without waiting for the tasks to respond to cancellation).
    virtualThreadParallelizer(int maxInFlight)
    Returns a Parallelizer using virtual threads for running tasks, with at most maxInFlight tasks running concurrently.

    Methods inherited from class java.lang.Object

    clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
  • Constructor Details

    • Parallelizer

      public Parallelizer(ExecutorService executor, int maxInFlight)
      Constructs a Parallelizer that runs tasks with executor. At any given time, at most maxInFlight tasks are allowed to be submitted to executor.

      Note that a task being submitted to executor doesn't guarantee immediate execution, if for example all worker threads in executor are busy.

  • Method Details

    • virtualThreadParallelizer

      public static Parallelizer virtualThreadParallelizer(int maxInFlight)
      Returns a Parallelizer using virtual threads for running tasks, with at most maxInFlight tasks running concurrently.

      Only applicable in JDK 21 (throws if below JDK 21).

      Since:
      7.2
    • newDaemonParallelizer

      public static Parallelizer newDaemonParallelizer(int maxInFlight)
      Returns a new Parallelizer based on an ExecutorService that exits when the application is complete. It does so by using daemon threads.

      Typically used by the main() method or as a static final field.

      Since:
      6.5
    • parallelize

      public <T> void parallelize(Stream<? extends T> inputs, Consumer<? super T> consumer) throws InterruptedException
      Runs consumer for inputs in parallel and blocks until either all tasks have finished, or any exception is thrown upon which all pending tasks are canceled (but the method returns without waiting for the tasks to respond to cancellation).

      The inputs stream is consumed only in the calling thread in iteration order.

      Parameters:
      inputs - the inputs to be passed to consumer
      consumer - to be parallelized
      Throws:
      InterruptedException - if the thread is interrupted while waiting.
    • parallelize

      public <T> void parallelize(Iterator<? extends T> inputs, Consumer<? super T> consumer) throws InterruptedException
      Runs consumer for inputs in parallel and blocks until either all tasks have finished, or any exception is thrown upon which all pending tasks are canceled (but the method returns without waiting for the tasks to respond to cancellation).

      The inputs stream is consumed only in the calling thread in iteration order.

      Parameters:
      inputs - the inputs to be passed to consumer
      consumer - to be parallelized
      Throws:
      InterruptedException - if the thread is interrupted while waiting.
    • parallelize

      public <T> void parallelize(Stream<? extends T> inputs, Consumer<? super T> consumer, Duration heartbeatTimeout) throws TimeoutException, InterruptedException
      Runs consumer for inputs in parallel and blocks until either all tasks have finished, timeout is triggered, or any exception is thrown upon which all pending tasks are canceled (but the method returns without waiting for the tasks to respond to cancellation).

      The inputs stream is consumed only in the calling thread in iteration order.

      Parameters:
      inputs - the inputs to be passed to consumer
      consumer - to be parallelized
      heartbeatTimeout - at least one task needs to complete every heartbeatTimeout.
      Throws:
      InterruptedException - if the thread is interrupted while waiting.
      TimeoutException - if the configured timeout is exceeded while waiting.
    • parallelize

      public <T> void parallelize(Stream<? extends T> inputs, Consumer<? super T> consumer, long heartbeatTimeout, TimeUnit timeUnit) throws TimeoutException, InterruptedException
      Runs consumer for inputs in parallel and blocks until either all tasks have finished, timeout is triggered, or any exception is thrown upon which all pending tasks are canceled (but the method returns without waiting for the tasks to respond to cancellation).

      The inputs stream is consumed only in the calling thread in iteration order.

      Parameters:
      inputs - the inputs to be passed to consumer
      consumer - to be parallelized
      heartbeatTimeout - at least one task needs to complete every heartbeatTimeout.
      timeUnit - the unit of heartbeatTimeout
      Throws:
      InterruptedException - if the thread is interrupted while waiting.
      TimeoutException - if the configured timeout is exceeded while waiting.
    • parallelize

      public <T> void parallelize(Iterator<? extends T> inputs, Consumer<? super T> consumer, Duration heartbeatTimeout) throws TimeoutException, InterruptedException
      Runs consumer for inputs in parallel and blocks until either all tasks have finished, timeout is triggered, or any exception is thrown upon which all pending tasks are canceled (but the method returns without waiting for the tasks to respond to cancellation).

      The inputs stream is consumed only in the calling thread in iteration order.

      Parameters:
      inputs - the inputs to be passed to consumer
      consumer - to be parallelized
      heartbeatTimeout - at least one task needs to complete every heartbeatTimeout.
      Throws:
      InterruptedException - if the thread is interrupted while waiting.
      TimeoutException - if the configured timeout is exceeded while waiting.
    • parallelize

      public <T> void parallelize(Iterator<? extends T> inputs, Consumer<? super T> consumer, long heartbeatTimeout, TimeUnit timeUnit) throws TimeoutException, InterruptedException
      Runs consumer for inputs in parallel and blocks until either all tasks have finished, timeout is triggered, or any exception is thrown upon which all pending tasks are canceled (but the method returns without waiting for the tasks to respond to cancellation).

      The inputs stream is consumed only in the calling thread in iteration order.

      Parameters:
      inputs - the inputs to be passed to consumer
      consumer - to be parallelized
      heartbeatTimeout - at least one task needs to complete every heartbeatTimeout.
      timeUnit - the unit of heartbeatTimeout
      Throws:
      InterruptedException - if the thread is interrupted while waiting.
      TimeoutException - if the configured timeout is exceeded while waiting.
    • parallelizeUninterruptibly

      public <T> void parallelizeUninterruptibly(Stream<? extends T> inputs, Consumer<? super T> consumer)
      Runs consumer for inputs in parallel and blocks uninterruptibly until either all tasks have finished, or any exception is thrown upon which all pending tasks are canceled (but the method returns without waiting for the tasks to respond to cancellation).

      The inputs stream is consumed only in the calling thread in iteration order.

      Parameters:
      inputs - the inputs to be passed to consumer
      consumer - to be parallelized
    • parallelizeUninterruptibly

      public <T> void parallelizeUninterruptibly(Iterator<? extends T> inputs, Consumer<? super T> consumer)
      Runs consumer for inputs in parallel and blocks uninterruptibly until either all tasks have finished, or any exception is thrown upon which all pending tasks are canceled (but the method returns without waiting for the tasks to respond to cancellation).

      The inputs stream is consumed only in the calling thread in iteration order.

      Parameters:
      inputs - the inputs to be passed to consumer
      consumer - to be parallelized
    • parallelize

      public void parallelize(Stream<? extends Runnable> tasks) throws InterruptedException
      Runs tasks in parallel and blocks until either all tasks have finished, or any exception is thrown upon which all pending tasks are canceled (but the method returns without waiting for the tasks to respond to cancellation).

      The tasks stream is consumed only in the calling thread in iteration order.

      Throws:
      InterruptedException - if the thread is interrupted while waiting.
    • parallelize

      public void parallelize(Stream<? extends Runnable> tasks, Duration heartbeatTimeout) throws TimeoutException, InterruptedException
      Runs tasks in parallel and blocks uninterruptibly until either all tasks have finished, timeout is triggered, or any exception is thrown upon which all pending tasks are canceled (but the method returns without waiting for the tasks to respond to cancellation).

      The tasks stream is consumed only in the calling thread in iteration order.

      Parameters:
      tasks - the tasks to be parallelized
      heartbeatTimeout - at least one task needs to complete every heartbeatTimeout.
      Throws:
      InterruptedException - if the thread is interrupted while waiting.
      TimeoutException - if timeout exceeded while waiting.
    • parallelize

      public void parallelize(Stream<? extends Runnable> tasks, long heartbeatTimeout, TimeUnit timeUnit) throws TimeoutException, InterruptedException
      Runs tasks in parallel and blocks uninterruptibly until either all tasks have finished, timeout is triggered, or any exception is thrown upon which all pending tasks are canceled (but the method returns without waiting for the tasks to respond to cancellation).

      The tasks stream is consumed only in the calling thread in iteration order.

      Parameters:
      tasks - the tasks to be parallelized
      heartbeatTimeout - at least one task needs to complete every heartbeatTimeout.
      timeUnit - the unit of heartbeatTimeout
      Throws:
      InterruptedException - if the thread is interrupted while waiting.
      TimeoutException - if timeout exceeded while waiting.
    • parallelizeUninterruptibly

      public void parallelizeUninterruptibly(Stream<? extends Runnable> tasks)
      Runs tasks in parallel and blocks uninterruptibly until either all tasks have finished, or any exception is thrown upon which all pending tasks are canceled (but the method returns without waiting for the tasks to respond to cancellation).
    • inParallel

      public <I, O> Collector<I,?,BiStream<I,O>> inParallel(Function<? super I,? extends O> concurrentFunction)
      Returns a Collector that runs concurrentFunction in parallel using this Parallelizer and returns the inputs and outputs in a BiStream, in encounter order of the input elements.

      For example:

      
       ImmutableListMultimap<String, Asset> resourceAssets =
           resources.stream()
               .collect(parallelizer.inParallel(this::listAssets))
               .collect(flatteningToImmutableListMultimap(List::stream));
       

      In Java 20 using structured concurrency, it can be implemented equivalently as in:

      
       ImmutableListMultimap<String, Asset> resourceAssets;
       try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
         ImmutableList<Future<?>> results =
             resources.stream()
                 .map(resource -> scope.fork(() -> listAssets(resource)))
                 .collect(toImmutableList());
         scope.join();
         resourceAssets =
             BiStream.zip(resources, results)
                 .mapValues(Future::resultNow)
                 .collect(flatteningToImmutableListMultimap(List::stream));
       }
       
      Parameters:
      concurrentFunction - a function that's safe to be run concurrently, and is usually IO-intensive (such as an outgoing RPC or reading distributed storage).
      Since:
      6.5