Class Fanout

java.lang.Object
com.google.mu.util.concurrent.Fanout

public final class Fanout extends Object
Supports structured concurrency for the common case where all concurrent operations are required (as if you are running them sequentially).

You can use either concurrently(java.util.function.Supplier<A>, java.util.function.Supplier<B>, com.google.mu.util.concurrent.Fanout.Join2<? super A, ? super B, R, X>) or uninterruptibly(java.util.function.Supplier<A>, java.util.function.Supplier<B>, com.google.mu.util.concurrent.Fanout.Join2<? super A, ? super B, R, X>) to fan out a few concurrent operations, with a lambda to combine the results after the concurrent operations have completed.

Any exception thrown by any of the concurrent operation will cancel all the other pending operations and propagate back to the main thread.

If the main thread is interrupted (when you use concurrently() to allow interruption), pending and currently running operations are canceled and the main thread will throw InterruptedException. For example:


 import static com.google.mu.util.concurrent.Fanout.concurrently;

 return concurrently(
     () -> getProjectAncestry(...),
     () -> readJobTimeline(),
     (ancestry, timeline) -> ...);
 

Memory consistency effects: Actions before starting the concurrent operations (including unsynchronized side effects) happen-before the concurrent operations running in the virtual threads, which happen-before the join functions, which then happen-before the concurrently() or uninterruptibly() method returns. As a result, thread-safe or concurrent data structure isn't required to pass data between the caller and callee: they will work the same way as if running in a single thread (except obviously there is no happens-before relationship between the concurrent operations themselves).

By default, the JDK Executors.newVirtualThreadPerTaskExecutor() is used to run all structured concurrency tasks (thus requires Java 21 and virtual threads). To use an alternative executor (say, you don't want to use virtual threads), implement a StructuredConcurrencyExecutorPlugin and package it up for ServiceLoader. You could also use Google @AutoService to help automate the generation of the META-INF/services files.

Since:
8.1
  • Method Details

    • concurrently

      public static <A, B, R, X extends Throwable> R concurrently(Supplier<A> a, Supplier<B> b, Fanout.Join2<? super A,? super B,R,X> join) throws InterruptedException, X
      Runs a and b concurrently in their own virtual threads. After all of the concurrent operations return successfully, invoke the join function on the results in the caller's thread.

      For example:

      
       Result result = concurrently(
         () -> fetchArm(),
         () -> fetchLeg(),
         (arm, leg) -> new Result(arm, leg));
       
      Throws:
      InterruptedException - if the current thread is interrupted while waiting for the concurrent operations to complete. The unfinished concurrent operations will be canceled.
      RuntimeException - wrapping the original exception from the virtual thread if any concurrent operation failed
      X - thrown by the join function
    • concurrently

      public static <A, B, C, R, X extends Throwable> R concurrently(Supplier<A> a, Supplier<B> b, Supplier<C> c, Fanout.Join3<? super A,? super B,? super C,R,X> join) throws InterruptedException, X
      Runs a, b and c concurrently in their own virtual threads. After all of the concurrent operations return successfully, invoke the join function on the results in the caller's thread.

      For example:

      
       Result result = concurrently(
         () -> fetchHead(),
         () -> fetchArm(),
         () -> fetchLeg(),
         (head, arm, leg) -> new Result(head, arm, leg));
       
      Throws:
      InterruptedException - if the current thread is interrupted while waiting for the concurrent operations to complete. The unfinished concurrent operations will be canceled.
      RuntimeException - wrapping the original exception from the virtual thread if any concurrent operation failed
      X - thrown by the join function
    • concurrently

      public static <A, B, C, D, R, X extends Throwable> R concurrently(Supplier<A> a, Supplier<B> b, Supplier<C> c, Supplier<D> d, Fanout.Join4<? super A,? super B,? super C,? super D,R,X> join) throws InterruptedException, X
      Runs a, b, c and d concurrently in their own virtual threads. After all of the concurrent operations return successfully, invoke the join function on the results in the caller's thread.

      For example:

      
       Result result = concurrently(
         () -> fetchHead(),
         () -> fetchShoulder(),
         () -> fetchArm(),
         () -> fetchLeg(),
         (head, shoulder, arm, leg) -> new Result(head, shoulder, arm, leg));
       
      Throws:
      InterruptedException - if the current thread is interrupted while waiting for the concurrent operations to complete. The unfinished concurrent operations will be canceled.
      RuntimeException - wrapping the original exception from the virtual thread if any concurrent operation failed
      X - thrown by the join function
    • concurrently

      public static <A, B, C, D, E, R, X extends Throwable> R concurrently(Supplier<A> a, Supplier<B> b, Supplier<C> c, Supplier<D> d, Supplier<E> e, Fanout.Join5<? super A,? super B,? super C,? super D,? super E,R,X> join) throws InterruptedException, X
      Runs a, b, c, d and e concurrently in their own virtual threads. After all of the concurrent operations return successfully, invoke the join function on the results in the caller's thread.

      For example:

      
       Result result = concurrently(
         () -> fetchHead(),
         () -> fetchShoulder(),
         () -> fetchArm(),
         () -> fetchLeg(),
         () -> fetchFeet(),
         (head, shoulder, arm, leg, feet) -> new Result(head, shoulder, arm, leg, feet));
       
      Throws:
      InterruptedException - if the current thread is interrupted while waiting for the concurrent operations to complete. The unfinished concurrent operations will be canceled.
      RuntimeException - wrapping the original exception from the virtual thread if any concurrent operation failed
      X - thrown by the join function
    • uninterruptibly

      public static <A, B, R, X extends Throwable> R uninterruptibly(Supplier<A> a, Supplier<B> b, Fanout.Join2<? super A,? super B,R,X> join) throws X
      Runs a and b concurrently and uninterruptibly in their own virtual threads. After all of the concurrent operations return successfully, invoke the join function on the results in the caller's thread.

      For example:

      
       Result result = uninterruptibly(
         () -> fetchArm(),
         () -> fetchLeg(),
         (arm, leg) -> new Result(arm, leg));
       
      Throws:
      RuntimeException - wrapping the original exception from the virtual thread if any concurrent operation failed
      X - thrown by the join function
    • uninterruptibly

      public static <A, B, C, R, X extends Throwable> R uninterruptibly(Supplier<A> a, Supplier<B> b, Supplier<C> c, Fanout.Join3<? super A,? super B,? super C,R,X> join) throws X
      Runs a, b and c concurrently and uninterruptibly in their own virtual threads. After all of the concurrent operations return successfully, invoke the join function on the results in the caller's thread.

      For example:

      
       Result result = uninterruptibly(
         () -> fetchHead(),
         () -> fetchArm(),
         () -> fetchLeg(),
         (head, arm, leg) -> new Result(head, arm, leg));
       
      Throws:
      RuntimeException - wrapping the original exception from the virtual thread if any concurrent operation failed
      X - thrown by the join function
    • uninterruptibly

      public static <A, B, C, D, R, X extends Throwable> R uninterruptibly(Supplier<A> a, Supplier<B> b, Supplier<C> c, Supplier<D> d, Fanout.Join4<? super A,? super B,? super C,? super D,R,X> join) throws X
      Runs a, b, c and d concurrently and uninterruptibly in their own virtual threads. After all of the concurrent operations return successfully, invoke the join function on the results in the caller's thread.

      For example:

      
       Result result = uninterruptibly(
         () -> fetchHead(),
         () -> fetchShoulder(),
         () -> fetchArm(),
         () -> fetchLeg(),
         (head, shoulder, arm, leg) -> new Result(head, shoulder, arm, leg));
       
      Throws:
      RuntimeException - wrapping the original exception from the virtual thread if any concurrent operation failed
      X - thrown by the join function
    • uninterruptibly

      public static <A, B, C, D, E, R, X extends Throwable> R uninterruptibly(Supplier<A> a, Supplier<B> b, Supplier<C> c, Supplier<D> d, Supplier<E> e, Fanout.Join5<? super A,? super B,? super C,? super D,? super E,R,X> join) throws X
      Runs a, b, c, d and e concurrently and uninterruptibly in their own virtual threads. After all of the concurrent operations return successfully, invoke the join function on the results in the caller's thread.

      For example:

      
       Result result = uninterruptibly(
         () -> fetchHead(),
         () -> fetchShoulder(),
         () -> fetchArm(),
         () -> fetchLeg(),
         () -> fetchFeet(),
         (head, shoulder, arm, leg, feet) -> new Result(head, shoulder, arm, leg, feet));
       
      Throws:
      RuntimeException - wrapping the original exception from the virtual thread if any concurrent operation failed
      X - thrown by the join function
    • withMaxConcurrency

      public static Parallelizer withMaxConcurrency(int maxConcurrency)
      Returns a concurrency-limited Parallelizer that can be used to run a potentially large number of fanout concurrent tasks using the currently configured standard (virtual thread) executor.

      For example, if you have a list of user ids to fetch from UserService, and you want to avoid DOSing it, the following code performs at most 10 queries at the same time:

      
       Parallelizer parallelizer = Fanout.withMaxConcurrency(10);
       Map<UserId, User> users =
           userIds.stream()
               .collect(parallelizer.inParallel(userService::fetchUser))
               .toMap();
       
      Compared to parallel streams, you can control the concurrency. Exceptions also retain the full stack trace from both the main thread and the virtual thread.