Class Parallelizer


  • public final class Parallelizer
    extends java.lang.Object
    An Executor-friendly, interruptible alternative to parallel streams.

    Designed for IO-bound (as opposed to CPU-bound) use cases, this utility allows running a (large) stream of IO-bound sub-tasks in parallel while limiting the max concurrency.

    For example, the following code saves a stream of UserData in parallel with at most 3 concurrent RPC calls at the same time:

      
       new Parallelizer(executor, 3)
           .parallelize(userDataStream.filter(UserData::isModified), userService::save);
     

    Like parallel streams (and unlike executors), these sub-tasks are considered integral parts of one logical task. Failure of any sub-task aborts the entire task, automatically. If an exception isn't fatal, the sub-task should catch and handle it.

    How does it stack against parallel stream itself? The parallel stream counterpart to the above example use case may look like:

      
       userDataStream.filter(UserData::isModified).parallel().forEach(userService::save);
     
    A few key differences:
    • A parallel stream doesn't use arbitrary ExecutorService. It by default uses either the enclosing ForkJoinPool or the common ForkJoinPool instance.
    • By running in a dedicated ForkJoinPool, a parallel stream can take a custom target concurrency, but it's not guaranteed to be max concurrency.
    • Parallel streams are for CPU-bound computations; while Parallelizer deals with IO-bound operations.
    • parallelize() can be interrupted, and can time out; parallel streams are uninterruptible.
    • When a task throws, Parallelizer dismisses pending tasks, and cancels all in-flight tasks (it's up to the user code to properly handle thread interruptions). So if a worker thread is waiting on some resource, it'll be interrupted without hanging the thread forever.
    • Parallelizer wraps exceptions thrown by the worker threads, making stack trace clearer.

    And how do you choose between Parallelizer and ExecutorService? Could you use something like the following instead?

      
       ExecutorService pool = Executors.newFixedThreadPool(3);
       try {
         List<Future<?>> futures = userData
             .filter(...)
             .map(() -> pool.submit(() -> userService.save(data)))
             .collect(toList());
         for (Future<?> future : futures) {
           future.get();
         }
       } finally {
         pool.shutdownNow();
       }
     
    Some differences for consideration:
    • Memory Concern
      • The thread pool queues all pending tasks. For large streams (like reading hundreds of thousands of task input data from a file), it can run out of memory quickly.
      • Storing all the future objects in a list may also use up too much memory for large number of sub tasks.
    • Exception Handling (and fail fast)
      • Executors treat submitted tasks as independent. One task may fail and the other tasks won't be affected.

        But for co-dependent sub tasks that were parallelized only for performance reasons (as in parallel streams), you'll want to abort the whole parallel pipeline upon any critical exception, the same as if they were run sequentially.

        Aborting a parallel pipeline requires complex concurrent logic to coordinate between the sub tasks and the executor in order to dismiss pending sub tasks and also to cancel sub tasks that are already running. Otherwise, when an exception is thrown from a sub task, the other left-over sub tasks will continue to run, some may even hang indefinitely.

      • Automatic cancellation propagation. When parallelize() is interrupted, all running tasks will be automatically canceled; all pending tasks automatically dismissed.
      • You may resort to shutting down the executor to achieve similar result (cancelling the left-over sub tasks). Although even knowing whether a sub task has failed isn't trivial. The above code example uses Future.get(), but it won't help if a sub task submitted earlier is still running or being blocked, while a later-submitted sub task has failed.
      • And, ExecutorServices are often set up centrally and shared among different classes and components in the application. You may not have the option to create and shut down a thread pool of your own.

    Stream parameters used in this class are always consumed in the calling thread and don't have to be thread safe.

    Since:
    1.1
    • Constructor Summary

      Constructors 
      Constructor Description
      Parallelizer​(java.util.concurrent.ExecutorService executor, int maxInFlight)
      Constructs a Parallelizer that runs tasks with executor.
    • Method Summary

      Modifier and Type Method Description
      <T> void parallelize​(java.util.Iterator<? extends T> inputs, java.util.function.Consumer<? super T> consumer)
      Runs consumer for inputs in parallel and blocks until either all tasks have finished, or any exception is thrown upon which all pending tasks are canceled (but the method returns without waiting for the tasks to respond to cancellation).
      <T> void parallelize​(java.util.Iterator<? extends T> inputs, java.util.function.Consumer<? super T> consumer, long heartbeatTimeout, java.util.concurrent.TimeUnit timeUnit)
      Runs consumer for inputs in parallel and blocks until either all tasks have finished, timeout is triggered, or any exception is thrown upon which all pending tasks are canceled (but the method returns without waiting for the tasks to respond to cancellation).
      <T> void parallelize​(java.util.Iterator<? extends T> inputs, java.util.function.Consumer<? super T> consumer, java.time.Duration heartbeatTimeout)
      Runs consumer for inputs in parallel and blocks until either all tasks have finished, timeout is triggered, or any exception is thrown upon which all pending tasks are canceled (but the method returns without waiting for the tasks to respond to cancellation).
      void parallelize​(java.util.stream.Stream<? extends java.lang.Runnable> tasks)
      Runs tasks in parallel and blocks until either all tasks have finished, or any exception is thrown upon which all pending tasks are canceled (but the method returns without waiting for the tasks to respond to cancellation).
      void parallelize​(java.util.stream.Stream<? extends java.lang.Runnable> tasks, long heartbeatTimeout, java.util.concurrent.TimeUnit timeUnit)
      Runs tasks in parallel and blocks uninterruptibly until either all tasks have finished, timeout is triggered, or any exception is thrown upon which all pending tasks are canceled (but the method returns without waiting for the tasks to respond to cancellation).
      void parallelize​(java.util.stream.Stream<? extends java.lang.Runnable> tasks, java.time.Duration heartbeatTimeout)
      Runs tasks in parallel and blocks uninterruptibly until either all tasks have finished, timeout is triggered, or any exception is thrown upon which all pending tasks are canceled (but the method returns without waiting for the tasks to respond to cancellation).
      <T> void parallelize​(java.util.stream.Stream<? extends T> inputs, java.util.function.Consumer<? super T> consumer)
      Runs consumer for inputs in parallel and blocks until either all tasks have finished, or any exception is thrown upon which all pending tasks are canceled (but the method returns without waiting for the tasks to respond to cancellation).
      <T> void parallelize​(java.util.stream.Stream<? extends T> inputs, java.util.function.Consumer<? super T> consumer, long heartbeatTimeout, java.util.concurrent.TimeUnit timeUnit)
      Runs consumer for inputs in parallel and blocks until either all tasks have finished, timeout is triggered, or any exception is thrown upon which all pending tasks are canceled (but the method returns without waiting for the tasks to respond to cancellation).
      <T> void parallelize​(java.util.stream.Stream<? extends T> inputs, java.util.function.Consumer<? super T> consumer, java.time.Duration heartbeatTimeout)
      Runs consumer for inputs in parallel and blocks until either all tasks have finished, timeout is triggered, or any exception is thrown upon which all pending tasks are canceled (but the method returns without waiting for the tasks to respond to cancellation).
      <T> void parallelizeUninterruptibly​(java.util.Iterator<? extends T> inputs, java.util.function.Consumer<? super T> consumer)
      Runs consumer for inputs in parallel and blocks uninterruptibly until either all tasks have finished, or any exception is thrown upon which all pending tasks are canceled (but the method returns without waiting for the tasks to respond to cancellation).
      void parallelizeUninterruptibly​(java.util.stream.Stream<? extends java.lang.Runnable> tasks)
      Runs tasks in parallel and blocks uninterruptibly until either all tasks have finished, or any exception is thrown upon which all pending tasks are canceled (but the method returns without waiting for the tasks to respond to cancellation).
      <T> void parallelizeUninterruptibly​(java.util.stream.Stream<? extends T> inputs, java.util.function.Consumer<? super T> consumer)
      Runs consumer for inputs in parallel and blocks uninterruptibly until either all tasks have finished, or any exception is thrown upon which all pending tasks are canceled (but the method returns without waiting for the tasks to respond to cancellation).
      • Methods inherited from class java.lang.Object

        clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
    • Constructor Detail

      • Parallelizer

        public Parallelizer​(java.util.concurrent.ExecutorService executor,
                            int maxInFlight)
        Constructs a Parallelizer that runs tasks with executor. At any given time, at most maxInFlight tasks are allowed to be submitted to executor.

        Note that a task being submitted to executor doesn't guarantee immediate execution, if for example all worker threads in executor are busy.

    • Method Detail

      • parallelize

        public <T> void parallelize​(java.util.stream.Stream<? extends T> inputs,
                                    java.util.function.Consumer<? super T> consumer)
                             throws java.lang.InterruptedException
        Runs consumer for inputs in parallel and blocks until either all tasks have finished, or any exception is thrown upon which all pending tasks are canceled (but the method returns without waiting for the tasks to respond to cancellation).

        The inputs stream is consumed only in the calling thread in iteration order.

        Parameters:
        inputs - the inputs to be passed to consumer
        consumer - to be parallelized
        Throws:
        java.lang.InterruptedException - if the thread is interrupted while waiting.
      • parallelize

        public <T> void parallelize​(java.util.Iterator<? extends T> inputs,
                                    java.util.function.Consumer<? super T> consumer)
                             throws java.lang.InterruptedException
        Runs consumer for inputs in parallel and blocks until either all tasks have finished, or any exception is thrown upon which all pending tasks are canceled (but the method returns without waiting for the tasks to respond to cancellation).

        The inputs stream is consumed only in the calling thread in iteration order.

        Parameters:
        inputs - the inputs to be passed to consumer
        consumer - to be parallelized
        Throws:
        java.lang.InterruptedException - if the thread is interrupted while waiting.
      • parallelize

        public <T> void parallelize​(java.util.stream.Stream<? extends T> inputs,
                                    java.util.function.Consumer<? super T> consumer,
                                    java.time.Duration heartbeatTimeout)
                             throws java.util.concurrent.TimeoutException,
                                    java.lang.InterruptedException
        Runs consumer for inputs in parallel and blocks until either all tasks have finished, timeout is triggered, or any exception is thrown upon which all pending tasks are canceled (but the method returns without waiting for the tasks to respond to cancellation).

        The inputs stream is consumed only in the calling thread in iteration order.

        Parameters:
        inputs - the inputs to be passed to consumer
        consumer - to be parallelized
        heartbeatTimeout - at least one task needs to complete every heartbeatTimeout.
        Throws:
        java.lang.InterruptedException - if the thread is interrupted while waiting.
        java.util.concurrent.TimeoutException - if the configured timeout is exceeded while waiting.
      • parallelize

        public <T> void parallelize​(java.util.stream.Stream<? extends T> inputs,
                                    java.util.function.Consumer<? super T> consumer,
                                    long heartbeatTimeout,
                                    java.util.concurrent.TimeUnit timeUnit)
                             throws java.util.concurrent.TimeoutException,
                                    java.lang.InterruptedException
        Runs consumer for inputs in parallel and blocks until either all tasks have finished, timeout is triggered, or any exception is thrown upon which all pending tasks are canceled (but the method returns without waiting for the tasks to respond to cancellation).

        The inputs stream is consumed only in the calling thread in iteration order.

        Parameters:
        inputs - the inputs to be passed to consumer
        consumer - to be parallelized
        heartbeatTimeout - at least one task needs to complete every heartbeatTimeout.
        timeUnit - the unit of heartbeatTimeout
        Throws:
        java.lang.InterruptedException - if the thread is interrupted while waiting.
        java.util.concurrent.TimeoutException - if the configured timeout is exceeded while waiting.
      • parallelize

        public <T> void parallelize​(java.util.Iterator<? extends T> inputs,
                                    java.util.function.Consumer<? super T> consumer,
                                    java.time.Duration heartbeatTimeout)
                             throws java.util.concurrent.TimeoutException,
                                    java.lang.InterruptedException
        Runs consumer for inputs in parallel and blocks until either all tasks have finished, timeout is triggered, or any exception is thrown upon which all pending tasks are canceled (but the method returns without waiting for the tasks to respond to cancellation).

        The inputs stream is consumed only in the calling thread in iteration order.

        Parameters:
        inputs - the inputs to be passed to consumer
        consumer - to be parallelized
        heartbeatTimeout - at least one task needs to complete every heartbeatTimeout.
        Throws:
        java.lang.InterruptedException - if the thread is interrupted while waiting.
        java.util.concurrent.TimeoutException - if the configured timeout is exceeded while waiting.
      • parallelize

        public <T> void parallelize​(java.util.Iterator<? extends T> inputs,
                                    java.util.function.Consumer<? super T> consumer,
                                    long heartbeatTimeout,
                                    java.util.concurrent.TimeUnit timeUnit)
                             throws java.util.concurrent.TimeoutException,
                                    java.lang.InterruptedException
        Runs consumer for inputs in parallel and blocks until either all tasks have finished, timeout is triggered, or any exception is thrown upon which all pending tasks are canceled (but the method returns without waiting for the tasks to respond to cancellation).

        The inputs stream is consumed only in the calling thread in iteration order.

        Parameters:
        inputs - the inputs to be passed to consumer
        consumer - to be parallelized
        heartbeatTimeout - at least one task needs to complete every heartbeatTimeout.
        timeUnit - the unit of heartbeatTimeout
        Throws:
        java.lang.InterruptedException - if the thread is interrupted while waiting.
        java.util.concurrent.TimeoutException - if the configured timeout is exceeded while waiting.
      • parallelizeUninterruptibly

        public <T> void parallelizeUninterruptibly​(java.util.stream.Stream<? extends T> inputs,
                                                   java.util.function.Consumer<? super T> consumer)
        Runs consumer for inputs in parallel and blocks uninterruptibly until either all tasks have finished, or any exception is thrown upon which all pending tasks are canceled (but the method returns without waiting for the tasks to respond to cancellation).

        The inputs stream is consumed only in the calling thread in iteration order.

        Parameters:
        inputs - the inputs to be passed to consumer
        consumer - to be parallelized
      • parallelizeUninterruptibly

        public <T> void parallelizeUninterruptibly​(java.util.Iterator<? extends T> inputs,
                                                   java.util.function.Consumer<? super T> consumer)
        Runs consumer for inputs in parallel and blocks uninterruptibly until either all tasks have finished, or any exception is thrown upon which all pending tasks are canceled (but the method returns without waiting for the tasks to respond to cancellation).

        The inputs stream is consumed only in the calling thread in iteration order.

        Parameters:
        inputs - the inputs to be passed to consumer
        consumer - to be parallelized
      • parallelize

        public void parallelize​(java.util.stream.Stream<? extends java.lang.Runnable> tasks)
                         throws java.lang.InterruptedException
        Runs tasks in parallel and blocks until either all tasks have finished, or any exception is thrown upon which all pending tasks are canceled (but the method returns without waiting for the tasks to respond to cancellation).

        The tasks stream is consumed only in the calling thread in iteration order.

        Throws:
        java.lang.InterruptedException - if the thread is interrupted while waiting.
      • parallelize

        public void parallelize​(java.util.stream.Stream<? extends java.lang.Runnable> tasks,
                                java.time.Duration heartbeatTimeout)
                         throws java.util.concurrent.TimeoutException,
                                java.lang.InterruptedException
        Runs tasks in parallel and blocks uninterruptibly until either all tasks have finished, timeout is triggered, or any exception is thrown upon which all pending tasks are canceled (but the method returns without waiting for the tasks to respond to cancellation).

        The tasks stream is consumed only in the calling thread in iteration order.

        Parameters:
        tasks - the tasks to be parallelized
        heartbeatTimeout - at least one task needs to complete every heartbeatTimeout.
        Throws:
        java.lang.InterruptedException - if the thread is interrupted while waiting.
        java.util.concurrent.TimeoutException - if timeout exceeded while waiting.
      • parallelize

        public void parallelize​(java.util.stream.Stream<? extends java.lang.Runnable> tasks,
                                long heartbeatTimeout,
                                java.util.concurrent.TimeUnit timeUnit)
                         throws java.util.concurrent.TimeoutException,
                                java.lang.InterruptedException
        Runs tasks in parallel and blocks uninterruptibly until either all tasks have finished, timeout is triggered, or any exception is thrown upon which all pending tasks are canceled (but the method returns without waiting for the tasks to respond to cancellation).

        The tasks stream is consumed only in the calling thread in iteration order.

        Parameters:
        tasks - the tasks to be parallelized
        heartbeatTimeout - at least one task needs to complete every heartbeatTimeout.
        timeUnit - the unit of heartbeatTimeout
        Throws:
        java.lang.InterruptedException - if the thread is interrupted while waiting.
        java.util.concurrent.TimeoutException - if timeout exceeded while waiting.
      • parallelizeUninterruptibly

        public void parallelizeUninterruptibly​(java.util.stream.Stream<? extends java.lang.Runnable> tasks)
        Runs tasks in parallel and blocks uninterruptibly until either all tasks have finished, or any exception is thrown upon which all pending tasks are canceled (but the method returns without waiting for the tasks to respond to cancellation).