Class Fanout
You can use one of the concurrently()
methods to fan out a few concurrent operations,
with a lambda to combine the results after the concurrent operations have completed.
Any exception thrown by any of the concurrent operations will cancel all the other pending operations and propagate back to the main thread.
If the main thread is interrupted, pending and currently running operations are canceled and the main thread will throw an unchecked exception (with the thread's interrupted bit set).
For example:
import static com.google.mu.util.concurrent.Fanout.concurrently;
return concurrently(() -> fetchArm(...), () -> fetchLeg(...), (arm, leg) -> ...);
Memory consistency effects: Actions before starting the concurrent operations (including
unsynchronized side effects) happen-before the concurrent operations running in the
virtual threads, which happen-before the join functions, which then happen-before the
concurrently()
method returns. As a result, thread-safe or concurrent data structure isn't
required to pass data between the caller and callee: they will work the same way as if running
in a single thread (except obviously there is no happens-before relationship between the
concurrent operations themselves).
By default, the JDK Executors.newVirtualThreadPerTaskExecutor()
is used to run all
structured concurrency tasks (thus requires Java 21 and virtual threads).
To use an alternative executor (say, you need a custom ThreadFactory, or don't want to use
virtual threads at all), implement a StructuredConcurrencyExecutorPlugin
and package it
up for ServiceLoader
. You could also use Google
@AutoService to help
automate the generation of the META-INF/services files.
- Since:
- 8.1
-
Nested Class Summary
Nested ClassesModifier and TypeClassDescriptionstatic interface
Function to join two results from concurrent computation.static interface
Function to join three results from concurrent computation.static interface
Function to join four results from concurrent computation.static interface
Function to join five results from concurrent computation. -
Method Summary
Modifier and TypeMethodDescriptionstatic void
concurrently
(Runnable task1, Runnable task2, Runnable... moreTasks) Runstask1
,task2
andmoreTasks
concurrently in their own virtual threads.static <A,
B, R, X extends Throwable>
Rconcurrently
(Supplier<A> a, Supplier<B> b, Fanout.Join2<? super A, ? super B, R, X> join) Runsa
andb
concurrently in their own virtual threads.static <A,
B, C, R, X extends Throwable>
Rconcurrently
(Supplier<A> a, Supplier<B> b, Supplier<C> c, Fanout.Join3<? super A, ? super B, ? super C, R, X> join) Runsa
,b
andc
concurrently in their own virtual threads.static <A,
B, C, D, R, X extends Throwable>
Rconcurrently
(Supplier<A> a, Supplier<B> b, Supplier<C> c, Supplier<D> d, Fanout.Join4<? super A, ? super B, ? super C, ? super D, R, X> join) Runsa
,b
,c
andd
concurrently in their own virtual threads.static <A,
B, C, D, E, R, X extends Throwable>
Rconcurrently
(Supplier<A> a, Supplier<B> b, Supplier<C> c, Supplier<D> d, Supplier<E> e, Fanout.Join5<? super A, ? super B, ? super C, ? super D, ? super E, R, X> join) Runsa
,b
,c
,d
ande
concurrently in their own virtual threads.static void
uninterruptibly
(Runnable task1, Runnable task2, Runnable... moreTasks) Deprecated.prefer using the concurrently() overloads that allow cancellationstatic <A,
B, R, X extends Throwable>
Runinterruptibly
(Supplier<A> a, Supplier<B> b, Fanout.Join2<? super A, ? super B, R, X> join) Deprecated.prefer using the concurrently() overloads that allow cancellationstatic <A,
B, C, R, X extends Throwable>
Runinterruptibly
(Supplier<A> a, Supplier<B> b, Supplier<C> c, Fanout.Join3<? super A, ? super B, ? super C, R, X> join) Deprecated.prefer using the concurrently() overloads that allow cancellationstatic <A,
B, C, D, R, X extends Throwable>
Runinterruptibly
(Supplier<A> a, Supplier<B> b, Supplier<C> c, Supplier<D> d, Fanout.Join4<? super A, ? super B, ? super C, ? super D, R, X> join) Deprecated.prefer using the concurrently() overloads that allow cancellationstatic <A,
B, C, D, E, R, X extends Throwable>
Runinterruptibly
(Supplier<A> a, Supplier<B> b, Supplier<C> c, Supplier<D> d, Supplier<E> e, Fanout.Join5<? super A, ? super B, ? super C, ? super D, ? super E, R, X> join) Deprecated.prefer using the concurrently() overloads that allow cancellationstatic Parallelizer
withMaxConcurrency
(int maxConcurrency) Returns a concurrency-limitedParallelizer
that can be used to run a potentially large number of fanout concurrent tasks using the currently configured standard (virtual thread) executor.static Parallelizer
Returns aParallelizer
that can be used to run any number of fanout concurrent tasks using the currently configured standard (virtual thread) executor.
-
Method Details
-
concurrently
public static <A,B, R concurrentlyR, X extends Throwable> (Supplier<A> a, Supplier<B> b, Fanout.Join2<? super A, ? super B, throws XR, X> join) Runsa
andb
concurrently in their own virtual threads. After all of the concurrent operations return successfully, invoke thejoin
function on the results in the caller's thread.For example:
Result result = concurrently( () -> fetchArm(), () -> fetchLeg(), (arm, leg) -> new Result(arm, leg));
- Throws:
StructuredConcurrencyInterruptedException
- if the current thread is interrupted while waiting for the concurrent operations to complete. The unfinished concurrent operations will be canceled.RuntimeException
- wrapping the original exception from the virtual thread if any concurrent operation failedX
- thrown by thejoin
function
-
concurrently
public static <A,B, R concurrentlyC, R, X extends Throwable> (Supplier<A> a, Supplier<B> b, Supplier<C> c, Fanout.Join3<? super A, ? super B, throws X? super C, R, X> join) Runsa
,b
andc
concurrently in their own virtual threads. After all of the concurrent operations return successfully, invoke thejoin
function on the results in the caller's thread.For example:
Result result = concurrently( () -> fetchHead(), () -> fetchArm(), () -> fetchLeg(), (head, arm, leg) -> new Result(head, arm, leg));
- Throws:
StructuredConcurrencyInterruptedException
- if the current thread is interrupted while waiting for the concurrent operations to complete. The unfinished concurrent operations will be canceled.RuntimeException
- wrapping the original exception from the virtual thread if any concurrent operation failedX
- thrown by thejoin
function
-
concurrently
public static <A,B, R concurrentlyC, D, R, X extends Throwable> (Supplier<A> a, Supplier<B> b, Supplier<C> c, Supplier<D> d, Fanout.Join4<? super A, ? super B, throws X? super C, ? super D, R, X> join) Runsa
,b
,c
andd
concurrently in their own virtual threads. After all of the concurrent operations return successfully, invoke thejoin
function on the results in the caller's thread.For example:
Result result = concurrently( () -> fetchHead(), () -> fetchShoulder(), () -> fetchArm(), () -> fetchLeg(), (head, shoulder, arm, leg) -> new Result(head, shoulder, arm, leg));
- Throws:
StructuredConcurrencyInterruptedException
- if the current thread is interrupted while waiting for the concurrent operations to complete. The unfinished concurrent operations will be canceled.RuntimeException
- wrapping the original exception from the virtual thread if any concurrent operation failedX
- thrown by thejoin
function
-
concurrently
public static <A,B, R concurrentlyC, D, E, R, X extends Throwable> (Supplier<A> a, Supplier<B> b, Supplier<C> c, Supplier<D> d, Supplier<E> e, Fanout.Join5<? super A, ? super B, throws X? super C, ? super D, ? super E, R, X> join) Runsa
,b
,c
,d
ande
concurrently in their own virtual threads. After all of the concurrent operations return successfully, invoke thejoin
function on the results in the caller's thread.For example:
Result result = concurrently( () -> fetchHead(), () -> fetchShoulder(), () -> fetchArm(), () -> fetchLeg(), () -> fetchFeet(), (head, shoulder, arm, leg, feet) -> new Result(head, shoulder, arm, leg, feet));
- Throws:
StructuredConcurrencyInterruptedException
- if the current thread is interrupted while waiting for the concurrent operations to complete. The unfinished concurrent operations will be canceled.RuntimeException
- wrapping the original exception from the virtual thread if any concurrent operation failedX
- thrown by thejoin
function
-
concurrently
Runstask1
,task2
andmoreTasks
concurrently in their own virtual threads.For example:
concurrently(() -> uploadFile(), () -> sendMessageToQueue());
- Throws:
StructuredConcurrencyInterruptedException
- if the current thread is interrupted while waiting for the concurrent operations to complete. The unfinished concurrent operations will be canceled.RuntimeException
- wrapping the original exception from the virtual thread if any concurrent operation failed- Since:
- 8.3
-
uninterruptibly
@Deprecated public static <A,B, R uninterruptiblyR, X extends Throwable> (Supplier<A> a, Supplier<B> b, Fanout.Join2<? super A, ? super B, throws XR, X> join) Deprecated.prefer using the concurrently() overloads that allow cancellationRunsa
andb
concurrently and uninterruptibly in their own virtual threads. After all of the concurrent operations return successfully, invoke thejoin
function on the results in the caller's thread.For example:
Result result = uninterruptibly( () -> fetchArm(), () -> fetchLeg(), (arm, leg) -> new Result(arm, leg));
- Throws:
RuntimeException
- wrapping the original exception from the virtual thread if any concurrent operation failedX
- thrown by thejoin
function
-
uninterruptibly
@Deprecated public static <A,B, R uninterruptiblyC, R, X extends Throwable> (Supplier<A> a, Supplier<B> b, Supplier<C> c, Fanout.Join3<? super A, ? super B, throws X? super C, R, X> join) Deprecated.prefer using the concurrently() overloads that allow cancellationRunsa
,b
andc
concurrently and uninterruptibly in their own virtual threads. After all of the concurrent operations return successfully, invoke thejoin
function on the results in the caller's thread.For example:
Result result = uninterruptibly( () -> fetchHead(), () -> fetchArm(), () -> fetchLeg(), (head, arm, leg) -> new Result(head, arm, leg));
- Throws:
RuntimeException
- wrapping the original exception from the virtual thread if any concurrent operation failedX
- thrown by thejoin
function
-
uninterruptibly
@Deprecated public static <A,B, R uninterruptiblyC, D, R, X extends Throwable> (Supplier<A> a, Supplier<B> b, Supplier<C> c, Supplier<D> d, Fanout.Join4<? super A, ? super B, throws X? super C, ? super D, R, X> join) Deprecated.prefer using the concurrently() overloads that allow cancellationRunsa
,b
,c
andd
concurrently and uninterruptibly in their own virtual threads. After all of the concurrent operations return successfully, invoke thejoin
function on the results in the caller's thread.For example:
Result result = uninterruptibly( () -> fetchHead(), () -> fetchShoulder(), () -> fetchArm(), () -> fetchLeg(), (head, shoulder, arm, leg) -> new Result(head, shoulder, arm, leg));
- Throws:
RuntimeException
- wrapping the original exception from the virtual thread if any concurrent operation failedX
- thrown by thejoin
function
-
uninterruptibly
@Deprecated public static <A,B, R uninterruptiblyC, D, E, R, X extends Throwable> (Supplier<A> a, Supplier<B> b, Supplier<C> c, Supplier<D> d, Supplier<E> e, Fanout.Join5<? super A, ? super B, throws X? super C, ? super D, ? super E, R, X> join) Deprecated.prefer using the concurrently() overloads that allow cancellationRunsa
,b
,c
,d
ande
concurrently and uninterruptibly in their own virtual threads. After all of the concurrent operations return successfully, invoke thejoin
function on the results in the caller's thread.For example:
Result result = uninterruptibly( () -> fetchHead(), () -> fetchShoulder(), () -> fetchArm(), () -> fetchLeg(), () -> fetchFeet(), (head, shoulder, arm, leg, feet) -> new Result(head, shoulder, arm, leg, feet));
- Throws:
RuntimeException
- wrapping the original exception from the virtual thread if any concurrent operation failedX
- thrown by thejoin
function
-
uninterruptibly
@Deprecated public static void uninterruptibly(Runnable task1, Runnable task2, Runnable... moreTasks) Deprecated.prefer using the concurrently() overloads that allow cancellationRunstask1
,task2
andmoreTasks
concurrently and uninterruptibly in their own virtual threads.For example:
uninterruptibly(() -> uploadFile(), () -> sendMessageToQueue());
- Throws:
RuntimeException
- wrapping the original exception from the virtual thread if any concurrent operation failed- Since:
- 8.3
-
withMaxConcurrency
Returns a concurrency-limitedParallelizer
that can be used to run a potentially large number of fanout concurrent tasks using the currently configured standard (virtual thread) executor.For example, if you have a list of user ids to fetch from UserService, and you want to avoid DOSing it, the following code performs at most 10 queries at the same time:
Functionally, theList<UserId> userIds = ; Map<UserId, User> users = userIds.stream() .collect(withMaxConcurrency(10).inParallel(userService::fetchUser)) .toMap();
Parallelizer.inParallel(java.util.function.Function<? super I, ? extends O>)
collector is very similar to the Java 25mapConcurrent()
gatherer:
Except it supports extra flexibility: you can customize the executor or the ThreadFactory used for the concurrent operations by plugging in aList<UserId> userIds = ; List<User> users = userIds.stream() .gather(mapConcurrent(userService::fetchUser, 10)) .toList();
StructuredConcurrencyExecutorPlugin
. This in turn makes it possible to hook in org-specific monitoring, debugging and/or context propagation etc. -
withUnlimitedConcurrency
Returns aParallelizer
that can be used to run any number of fanout concurrent tasks using the currently configured standard (virtual thread) executor.- Since:
- 8.3
-