Class Fanout

java.lang.Object
com.google.mu.util.concurrent.Fanout

public final class Fanout extends Object
Supports structured concurrency for the common case where all concurrent operations are required (as if you are running them sequentially).

You can use one of the concurrently() methods to fan out a few concurrent operations, with a lambda to combine the results after the concurrent operations have completed.

Any exception thrown by any of the concurrent operations will cancel all the other pending operations and propagate back to the main thread.

If the main thread is interrupted, pending and currently running operations are canceled and the main thread will throw an unchecked exception (with the thread's interrupted bit set).

For example:


 import static com.google.mu.util.concurrent.Fanout.concurrently;

 return concurrently(() -> fetchArm(...), () -> fetchLeg(...), (arm, leg) -> ...);
 

Memory consistency effects: Actions before starting the concurrent operations (including unsynchronized side effects) happen-before the concurrent operations running in the virtual threads, which happen-before the join functions, which then happen-before the concurrently() method returns. As a result, thread-safe or concurrent data structure isn't required to pass data between the caller and callee: they will work the same way as if running in a single thread (except obviously there is no happens-before relationship between the concurrent operations themselves).

By default, the JDK Executors.newVirtualThreadPerTaskExecutor() is used to run all structured concurrency tasks (thus requires Java 21 and virtual threads). To use an alternative executor (say, you need a custom ThreadFactory, or don't want to use virtual threads at all), implement a StructuredConcurrencyExecutorPlugin and package it up for ServiceLoader. You could also use Google @AutoService to help automate the generation of the META-INF/services files.

Since:
8.1
  • Method Details

    • concurrently

      public static <A, B, R, X extends Throwable> R concurrently(Supplier<A> a, Supplier<B> b, Fanout.Join2<? super A,? super B,R,X> join) throws X
      Runs a and b concurrently in their own virtual threads. After all of the concurrent operations return successfully, invoke the join function on the results in the caller's thread.

      For example:

      
       Result result = concurrently(
         () -> fetchArm(),
         () -> fetchLeg(),
         (arm, leg) -> new Result(arm, leg));
       
      Throws:
      StructuredConcurrencyInterruptedException - if the current thread is interrupted while waiting for the concurrent operations to complete. The unfinished concurrent operations will be canceled.
      RuntimeException - wrapping the original exception from the virtual thread if any concurrent operation failed
      X - thrown by the join function
    • concurrently

      public static <A, B, C, R, X extends Throwable> R concurrently(Supplier<A> a, Supplier<B> b, Supplier<C> c, Fanout.Join3<? super A,? super B,? super C,R,X> join) throws X
      Runs a, b and c concurrently in their own virtual threads. After all of the concurrent operations return successfully, invoke the join function on the results in the caller's thread.

      For example:

      
       Result result = concurrently(
         () -> fetchHead(),
         () -> fetchArm(),
         () -> fetchLeg(),
         (head, arm, leg) -> new Result(head, arm, leg));
       
      Throws:
      StructuredConcurrencyInterruptedException - if the current thread is interrupted while waiting for the concurrent operations to complete. The unfinished concurrent operations will be canceled.
      RuntimeException - wrapping the original exception from the virtual thread if any concurrent operation failed
      X - thrown by the join function
    • concurrently

      public static <A, B, C, D, R, X extends Throwable> R concurrently(Supplier<A> a, Supplier<B> b, Supplier<C> c, Supplier<D> d, Fanout.Join4<? super A,? super B,? super C,? super D,R,X> join) throws X
      Runs a, b, c and d concurrently in their own virtual threads. After all of the concurrent operations return successfully, invoke the join function on the results in the caller's thread.

      For example:

      
       Result result = concurrently(
         () -> fetchHead(),
         () -> fetchShoulder(),
         () -> fetchArm(),
         () -> fetchLeg(),
         (head, shoulder, arm, leg) -> new Result(head, shoulder, arm, leg));
       
      Throws:
      StructuredConcurrencyInterruptedException - if the current thread is interrupted while waiting for the concurrent operations to complete. The unfinished concurrent operations will be canceled.
      RuntimeException - wrapping the original exception from the virtual thread if any concurrent operation failed
      X - thrown by the join function
    • concurrently

      public static <A, B, C, D, E, R, X extends Throwable> R concurrently(Supplier<A> a, Supplier<B> b, Supplier<C> c, Supplier<D> d, Supplier<E> e, Fanout.Join5<? super A,? super B,? super C,? super D,? super E,R,X> join) throws X
      Runs a, b, c, d and e concurrently in their own virtual threads. After all of the concurrent operations return successfully, invoke the join function on the results in the caller's thread.

      For example:

      
       Result result = concurrently(
         () -> fetchHead(),
         () -> fetchShoulder(),
         () -> fetchArm(),
         () -> fetchLeg(),
         () -> fetchFeet(),
         (head, shoulder, arm, leg, feet) -> new Result(head, shoulder, arm, leg, feet));
       
      Throws:
      StructuredConcurrencyInterruptedException - if the current thread is interrupted while waiting for the concurrent operations to complete. The unfinished concurrent operations will be canceled.
      RuntimeException - wrapping the original exception from the virtual thread if any concurrent operation failed
      X - thrown by the join function
    • concurrently

      public static void concurrently(Runnable task1, Runnable task2, Runnable... moreTasks)
      Runs task1, task2 and moreTasks concurrently in their own virtual threads.

      For example:

      
       concurrently(() -> uploadFile(), () -> sendMessageToQueue());
       
      Throws:
      StructuredConcurrencyInterruptedException - if the current thread is interrupted while waiting for the concurrent operations to complete. The unfinished concurrent operations will be canceled.
      RuntimeException - wrapping the original exception from the virtual thread if any concurrent operation failed
      Since:
      8.3
    • uninterruptibly

      @Deprecated public static <A, B, R, X extends Throwable> R uninterruptibly(Supplier<A> a, Supplier<B> b, Fanout.Join2<? super A,? super B,R,X> join) throws X
      Deprecated.
      prefer using the concurrently() overloads that allow cancellation
      Runs a and b concurrently and uninterruptibly in their own virtual threads. After all of the concurrent operations return successfully, invoke the join function on the results in the caller's thread.

      For example:

      
       Result result = uninterruptibly(
         () -> fetchArm(),
         () -> fetchLeg(),
         (arm, leg) -> new Result(arm, leg));
       
      Throws:
      RuntimeException - wrapping the original exception from the virtual thread if any concurrent operation failed
      X - thrown by the join function
    • uninterruptibly

      @Deprecated public static <A, B, C, R, X extends Throwable> R uninterruptibly(Supplier<A> a, Supplier<B> b, Supplier<C> c, Fanout.Join3<? super A,? super B,? super C,R,X> join) throws X
      Deprecated.
      prefer using the concurrently() overloads that allow cancellation
      Runs a, b and c concurrently and uninterruptibly in their own virtual threads. After all of the concurrent operations return successfully, invoke the join function on the results in the caller's thread.

      For example:

      
       Result result = uninterruptibly(
         () -> fetchHead(),
         () -> fetchArm(),
         () -> fetchLeg(),
         (head, arm, leg) -> new Result(head, arm, leg));
       
      Throws:
      RuntimeException - wrapping the original exception from the virtual thread if any concurrent operation failed
      X - thrown by the join function
    • uninterruptibly

      @Deprecated public static <A, B, C, D, R, X extends Throwable> R uninterruptibly(Supplier<A> a, Supplier<B> b, Supplier<C> c, Supplier<D> d, Fanout.Join4<? super A,? super B,? super C,? super D,R,X> join) throws X
      Deprecated.
      prefer using the concurrently() overloads that allow cancellation
      Runs a, b, c and d concurrently and uninterruptibly in their own virtual threads. After all of the concurrent operations return successfully, invoke the join function on the results in the caller's thread.

      For example:

      
       Result result = uninterruptibly(
         () -> fetchHead(),
         () -> fetchShoulder(),
         () -> fetchArm(),
         () -> fetchLeg(),
         (head, shoulder, arm, leg) -> new Result(head, shoulder, arm, leg));
       
      Throws:
      RuntimeException - wrapping the original exception from the virtual thread if any concurrent operation failed
      X - thrown by the join function
    • uninterruptibly

      @Deprecated public static <A, B, C, D, E, R, X extends Throwable> R uninterruptibly(Supplier<A> a, Supplier<B> b, Supplier<C> c, Supplier<D> d, Supplier<E> e, Fanout.Join5<? super A,? super B,? super C,? super D,? super E,R,X> join) throws X
      Deprecated.
      prefer using the concurrently() overloads that allow cancellation
      Runs a, b, c, d and e concurrently and uninterruptibly in their own virtual threads. After all of the concurrent operations return successfully, invoke the join function on the results in the caller's thread.

      For example:

      
       Result result = uninterruptibly(
         () -> fetchHead(),
         () -> fetchShoulder(),
         () -> fetchArm(),
         () -> fetchLeg(),
         () -> fetchFeet(),
         (head, shoulder, arm, leg, feet) -> new Result(head, shoulder, arm, leg, feet));
       
      Throws:
      RuntimeException - wrapping the original exception from the virtual thread if any concurrent operation failed
      X - thrown by the join function
    • uninterruptibly

      @Deprecated public static void uninterruptibly(Runnable task1, Runnable task2, Runnable... moreTasks)
      Deprecated.
      prefer using the concurrently() overloads that allow cancellation
      Runs task1, task2 and moreTasks concurrently and uninterruptibly in their own virtual threads.

      For example:

      
       uninterruptibly(() -> uploadFile(), () -> sendMessageToQueue());
       
      Throws:
      RuntimeException - wrapping the original exception from the virtual thread if any concurrent operation failed
      Since:
      8.3
    • withMaxConcurrency

      public static Parallelizer withMaxConcurrency(int maxConcurrency)
      Returns a concurrency-limited Parallelizer that can be used to run a potentially large number of fanout concurrent tasks using the currently configured standard (virtual thread) executor.

      For example, if you have a list of user ids to fetch from UserService, and you want to avoid DOSing it, the following code performs at most 10 queries at the same time:

      
       List<UserId> userIds = ;
       Map<UserId, User> users = userIds.stream()
           .collect(withMaxConcurrency(10).inParallel(userService::fetchUser))
           .toMap();
       
      Functionally, the Parallelizer.inParallel(java.util.function.Function<? super I, ? extends O>) collector is very similar to the Java 25 mapConcurrent() gatherer:
      
       List<UserId> userIds = ;
       List<User> users = userIds.stream()
           .gather(mapConcurrent(userService::fetchUser, 10))
           .toList();
       
      Except it supports extra flexibility: you can customize the executor or the ThreadFactory used for the concurrent operations by plugging in a StructuredConcurrencyExecutorPlugin. This in turn makes it possible to hook in org-specific monitoring, debugging and/or context propagation etc.
    • withUnlimitedConcurrency

      public static Parallelizer withUnlimitedConcurrency()
      Returns a Parallelizer that can be used to run any number of fanout concurrent tasks using the currently configured standard (virtual thread) executor.
      Since:
      8.3