Class BoundedConcurrency

java.lang.Object
com.google.mu.util.concurrent24.BoundedConcurrency

public final class BoundedConcurrency extends Object
A fixed concurrency limit for structured concurrent IO-intensive operations.

It enables the parallel transformation of input elements, guaranteeing that all concurrent operations either complete and their results are gathered, or are fully cancelled and joined upon interruption or exception.

Since:
9.2
  • Method Details

    • withMaxConcurrency

      public static BoundedConcurrency withMaxConcurrency(int maxConcurrency)
      Returns a BoundedConcurrency using maxConcurrency. Uses virtual threads to run concurrent work.
      Throws:
      IllegalArgumentException - if maxConcurrency <= 0
    • withMaxConcurrency

      public static BoundedConcurrency withMaxConcurrency(int maxConcurrency, ThreadFactory threadFactory)
      Returns a BoundedConcurrency using maxConcurrency and threadFactor.
      Throws:
      IllegalArgumentException - if maxConcurrency <= 0
    • race

      public <T> T race(Collection<? extends Callable<? extends T>> tasks, Predicate<? super Throwable> isRecoverable)
      Races tasks and returns the first success, then cancels the remaining. Upon exception, the isRecoverable predicate is tested to check whether the exception is recoverable (thus allowing the other tasks to continue to run).

      When all tasks throw recoverable exceptions, or if any task failed with unrecoverable exception, the recoverable exceptions are propagated as suppressed.

      Parameters:
      tasks - at least one must be provided
      isRecoverable - tests whether an exception is recoverable so that the other tasks should continue running.
      Throws:
      IllegalArgumentException - if tasks is empty
      NullPointerException - if tasks or isRecoverable is null
      RuntimeException - if all tasks failed, or any task failed with unrecoverable exception.
    • concurrently

      public <I,O> Collector<I, ?, BiStream<I,O>> concurrently(Function<? super I, ? extends O> work)
      Applies work on each input element concurrently and lazily.

      At any given time, at most maxConcurrency concurrent work are running.

      To avoid leaking virtual threads caused by upstream exceptions, input elements are first computed and consumed into a List before any concurrent work is started (such that if any upstream element throws, no virtual thread is ever started). But the result BiStream is lazy: concurrent work only starts upon requested by downstream. Specifically, if you short-circuit using Stream.findAny() or BiStream.findAny(), at most maxConcurrency virtual threads will be started.

      Compared to the Gatherers.mapConcurrent(int, java.util.function.Function<? super T, ? extends R>) gatherer:

      • mapConcurrent() only interrupts and joins the on-the-fly virtual threads upon downstream exceptions, but is unable to interrupt or join when an upstream exception is thrown; whereas concurrently() will always interrupt and join. As a result, actions in the virtual threads happens-before actions after the collector returns or throws.
      • mapConcurrent() guarantees encounter-order. If an input element takes a long time or forever to process, it can potentially block or halt the program when there are more than maxConcurrency elements following it, even if maxConcurrency - 1 virtual threads have already completed with the long-running task being the only virtual thread still running. Whereas concurrently() allows maxConcurrency virtual threads to run concurrently regardless of input order. This means if you have a long-running virtual thread (for instance: a heart-beat worker or a monitoring subtask etc.), it won't reduce throughput or starve the workers after it.
      • If encounter order is important to you, consider using BiStream.sorted(java.util.Comparator<? super K>, java.util.Comparator<? super V>) or friends to re-introduce ordering.

      Compared to Parallelizer.inParallel(java.util.function.Function<? super I, ? extends O>):

      • inParallel() fails fast. When an exception is thrown, it interrupts on-the-fly threads, but doesn't wait for their completion. Thus there is no happens-before guarantee when an exception is thrown. concurrently() on the other hand always joins the virtual threads and guarantees strong happens-before relationship.
      • The inParallel() collector blocks until all parallel tasks are complete before returning; whereas concurrently() utilizes Java 24 Gatherer and will stream results to the downstream as soon as one is computed.
      • inParallel() can be used with any ExecutorService (for example you could use Executors.newCachedThreadPool() if virtual threads isn't important to you); whereas concurrently() strictly runs one task per thread, although you could use a customized ThreadFactory.