Class StructuredConcurrency

java.lang.Object
com.google.mu.util.concurrent.StructuredConcurrency

public final class StructuredConcurrency extends Object
Convenient utilities to help with structured concurrency on top of an ExecutorService (preferably with virtual threads). For example:

 var fanout = new StructuredConcurrency();
 Result result = fanout.concurrently(
     () -> ListFoos(),
     () -> listBars(),
     (foos, bars) -> ...);
 

If you need to customize the virtual threads or the executor, you can use any custom ExecutorService like:


 ExecutorService executor = ...;
 var fanout = new StructuredConcurrency(executor);
 Result result = fanout.concurrently(...);
 

If using a virtual thread executor, it's safe to define StructuredConcurrency as a static final constant because it's stateless as long as the executor is stateless.

Since:
8.0
  • Constructor Details

    • StructuredConcurrency

      public StructuredConcurrency()
      Constructor using the default virtual thread executor to run the concurrent operations.

      Fails if runtime is lower than Java 21.

    • StructuredConcurrency

      public StructuredConcurrency(ExecutorService executor)
      Constructor using executor to run concurrent operations. Note that if executor doesn't use virtual threads, it can cause throughput issues by blocking in one of the platform threads.

      Useful if your project needs specific setting (such as thread name) for the threads . It's strongly recommended to use Thread.ofVitual() for the thread factory because otherwise you risk blocking the platform threads.

  • Method Details

    • concurrently

      public <A, B, R, X extends Throwable> R concurrently(Supplier<A> a, Supplier<B> b, StructuredConcurrency.Join2<? super A,? super B,R,X> join) throws InterruptedException, X
      Runs a and b concurrently in their own virtual threads. After all of the concurrent operations return successfully, invoke the join function on the results in the caller's thread.

      For example:

      
       StructuredConcurrency fanout = using(executor);
       Result result = fanout.concurrently(
         () -> fetchArm(),
         () -> fetchLeg(),
         (arm, leg) -> new Result(arm, leg));
       

      Exceptions thrown by these concurrent suppliers are expected to be propagated through exception tunneling (wrapped in a special unchecked exception) and handled by the caller of this method.

      Throws:
      InterruptedException - if the current thread is interrupted while waiting for the concurrent operations to complete. The unfinished concurrent operations will be canceled.
      RuntimeException - wrapping the original exception from the virtual thread if any concurrent operation failed
      X - thrown by the join function
    • concurrently

      public <A, B, C, R, X extends Throwable> R concurrently(Supplier<A> a, Supplier<B> b, Supplier<C> c, StructuredConcurrency.Join3<? super A,? super B,? super C,R,X> join) throws InterruptedException, X
      Runs a, b and c concurrently in their own virtual threads. After all of the concurrent operations return successfully, invoke the join function on the results in the caller's thread.

      For example:

      
       StructuredConcurrency fanout = using(executor);
       Result result = fanout.concurrently(
         () -> fetchHead(),
         () -> fetchArm(),
         () -> fetchLeg(),
         (head, arm, leg) -> new Result(head, arm, leg));
       

      Exceptions thrown by these concurrent suppliers are expected to be propagated through exception tunneling (wrapped in a special unchecked exception) and handled by the caller of this method.

      Throws:
      InterruptedException - if the current thread is interrupted while waiting for the concurrent operations to complete. The unfinished concurrent operations will be canceled.
      RuntimeException - wrapping the original exception from the virtual thread if any concurrent operation failed
      X - thrown by the join function
    • concurrently

      public <A, B, C, D, R, X extends Throwable> R concurrently(Supplier<A> a, Supplier<B> b, Supplier<C> c, Supplier<D> d, StructuredConcurrency.Join4<? super A,? super B,? super C,? super D,R,X> join) throws InterruptedException, X
      Runs a, b, c and d concurrently in their own virtual threads. After all of the concurrent operations return successfully, invoke the join function on the results in the caller's thread.

      For example:

      
       StructuredConcurrency fanout = using(executor);
       Result result = fanout.concurrently(
         () -> fetchHead(),
         () -> fetchShoulder(),
         () -> fetchArm(),
         () -> fetchLeg(),
         (head, shoulder, arm, leg) -> new Result(head, shoulder, arm, leg));
       

      Exceptions thrown by these concurrent suppliers are expected to be propagated through exception tunneling (wrapped in a special unchecked exception) and handled by the caller of this method.

      Throws:
      InterruptedException - if the current thread is interrupted while waiting for the concurrent operations to complete. The unfinished concurrent operations will be canceled.
      RuntimeException - wrapping the original exception from the virtual thread if any concurrent operation failed
      X - thrown by the join function
    • concurrently

      public <A, B, C, D, E, R, X extends Throwable> R concurrently(Supplier<A> a, Supplier<B> b, Supplier<C> c, Supplier<D> d, Supplier<E> e, StructuredConcurrency.Join5<? super A,? super B,? super C,? super D,? super E,R,X> join) throws InterruptedException, X
      Runs a, b, c, d and e concurrently in their own virtual threads. After all of the concurrent operations return successfully, invoke the join function on the results in the caller's thread.

      For example:

      
       StructuredConcurrency fanout = using(executor);
       Result result = fanout.concurrently(
         () -> fetchHead(),
         () -> fetchShoulder(),
         () -> fetchArm(),
         () -> fetchLeg(),
         () -> fetchFeet(),
         (head, shoulder, arm, leg, feet) -> new Result(head, shoulder, arm, leg, feet));
       

      Exceptions thrown by these concurrent suppliers are expected to be propagated through exception tunneling (wrapped in a special unchecked exception) and handled by the caller of this method.

      Throws:
      InterruptedException - if the current thread is interrupted while waiting for the concurrent operations to complete. The unfinished concurrent operations will be canceled.
      RuntimeException - wrapping the original exception from the virtual thread if any concurrent operation failed
      X - thrown by the join function
    • uninterruptibly

      public <A, B, R, X extends Throwable> R uninterruptibly(Supplier<A> a, Supplier<B> b, StructuredConcurrency.Join2<? super A,? super B,R,X> join) throws X
      Runs a and b concurrently and uninterruptibly in their own virtual threads. After all of the concurrent operations return successfully, invoke the join function on the results in the caller's thread.

      For example:

      
       StructuredConcurrency fanout = using(executor);
       Result result = fanout.uninterruptibly(
         () -> fetchArm(),
         () -> fetchLeg(),
         (arm, leg) -> new Result(arm, leg));
       

      Exceptions thrown by these concurrent suppliers are expected to be propagated through exception tunneling (wrapped in a special unchecked exception) and handled by the caller of this method.

      Throws:
      RuntimeException - wrapping the original exception from the virtual thread if any concurrent operation failed
      X - thrown by the join function
    • uninterruptibly

      public <A, B, C, R, X extends Throwable> R uninterruptibly(Supplier<A> a, Supplier<B> b, Supplier<C> c, StructuredConcurrency.Join3<? super A,? super B,? super C,R,X> join) throws X
      Runs a, b and c concurrently and uninterruptibly in their own virtual threads. After all of the concurrent operations return successfully, invoke the join function on the results in the caller's thread.

      For example:

      
       StructuredConcurrency fanout = using(executor);
       Result result = fanout.uninterruptibly(
         () -> fetchHead(),
         () -> fetchArm(),
         () -> fetchLeg(),
         (head, arm, leg) -> new Result(head, arm, leg));
       

      Exceptions thrown by these concurrent suppliers are expected to be propagated through exception tunneling (wrapped in a special unchecked exception) and handled by the caller of this method.

      Throws:
      RuntimeException - wrapping the original exception from the virtual thread if any concurrent operation failed
      X - thrown by the join function
    • uninterruptibly

      public <A, B, C, D, R, X extends Throwable> R uninterruptibly(Supplier<A> a, Supplier<B> b, Supplier<C> c, Supplier<D> d, StructuredConcurrency.Join4<? super A,? super B,? super C,? super D,R,X> join) throws X
      Runs a, b, c and d concurrently and uninterruptibly in their own virtual threads. After all of the concurrent operations return successfully, invoke the join function on the results in the caller's thread.

      For example:

      
       StructuredConcurrency fanout = using(executor);
       Result result = fanout.uninterruptibly(
         () -> fetchHead(),
         () -> fetchShoulder(),
         () -> fetchArm(),
         () -> fetchLeg(),
         (head, shoulder, arm, leg) -> new Result(head, shoulder, arm, leg));
       

      Exceptions thrown by these concurrent suppliers are expected to be propagated through exception tunneling (wrapped in a special unchecked exception) and handled by the caller of this method.

      Throws:
      RuntimeException - wrapping the original exception from the virtual thread if any concurrent operation failed
      X - thrown by the join function
    • uninterruptibly

      public <A, B, C, D, E, R, X extends Throwable> R uninterruptibly(Supplier<A> a, Supplier<B> b, Supplier<C> c, Supplier<D> d, Supplier<E> e, StructuredConcurrency.Join5<? super A,? super B,? super C,? super D,? super E,R,X> join) throws X
      Runs a, b, c, d and e concurrently and uninterruptibly in their own virtual threads. After all of the concurrent operations return successfully, invoke the join function on the results in the caller's thread.

      For example:

      
       StructuredConcurrency fanout = using(executor);
       Result result = fanout.uninterruptibly(
         () -> fetchHead(),
         () -> fetchShoulder(),
         () -> fetchArm(),
         () -> fetchLeg(),
         () -> fetchFeet(),
         (head, shoulder, arm, leg, feet) -> new Result(head, shoulder, arm, leg, feet));
       

      Exceptions thrown by these concurrent suppliers are expected to be propagated through exception tunneling (wrapped in a special unchecked exception) and handled by the caller of this method.

      Throws:
      RuntimeException - wrapping the original exception from the virtual thread if any concurrent operation failed
      X - thrown by the join function