Class Fanout
You can use either concurrently(java.util.function.Supplier<A>, java.util.function.Supplier<B>, com.google.mu.util.concurrent.Fanout.Join2<? super A, ? super B, R, X>)
or uninterruptibly(java.util.function.Supplier<A>, java.util.function.Supplier<B>, com.google.mu.util.concurrent.Fanout.Join2<? super A, ? super B, R, X>)
to fan out a few
concurrent operations, with a lambda to combine the results after the concurrent operations have
completed.
Any exception thrown by any of the concurrent operation will cancel all the other pending operations and propagate back to the main thread.
If the main thread is interrupted (when you use concurrently()
to allow interruption),
pending and currently running operations are canceled and the main thread will throw
InterruptedException. For example:
import static com.google.mu.util.concurrent.Fanout.concurrently;
return concurrently(
() -> getProjectAncestry(...),
() -> readJobTimeline(),
(ancestry, timeline) -> ...);
Memory consistency effects: Actions before starting the concurrent operations (including
unsynchronized side effects) happen-before the concurrent operations running in the
virtual threads, which happen-before the join functions, which then happen-before the
concurrently()
or uninterruptibly()
method returns. As a result, thread-safe or
concurrent data structure isn't required to pass data between the caller and callee: they will
work the same way as if running in a single thread (except obviously there is no happens-before
relationship between the concurrent operations themselves).
By default, the JDK Executors.newVirtualThreadPerTaskExecutor()
is used to run all
structured concurrency tasks (thus requires Java 21 and virtual threads).
To use an alternative executor (say, you don't want to use virtual threads), implement a StructuredConcurrencyExecutorPlugin
and package it up for ServiceLoader
. You could also
use Google @AutoService to help
automate the generation of the META-INF/services files.
- Since:
- 8.1
-
Nested Class Summary
Modifier and TypeClassDescriptionstatic interface
Function to join two results from concurrent computation.static interface
Function to join three results from concurrent computation.static interface
Function to join four results from concurrent computation.static interface
Function to join five results from concurrent computation. -
Method Summary
Modifier and TypeMethodDescriptionstatic void
concurrently
(Runnable task1, Runnable task2, Runnable... moreTasks) Runstask1
,task2
andmoreTasks
concurrently in their own virtual threads.static <A,
B, R, X extends Throwable>
Rconcurrently
(Supplier<A> a, Supplier<B> b, Fanout.Join2<? super A, ? super B, R, X> join) Runsa
andb
concurrently in their own virtual threads.static <A,
B, C, R, X extends Throwable>
Rconcurrently
(Supplier<A> a, Supplier<B> b, Supplier<C> c, Fanout.Join3<? super A, ? super B, ? super C, R, X> join) Runsa
,b
andc
concurrently in their own virtual threads.static <A,
B, C, D, R, X extends Throwable>
Rconcurrently
(Supplier<A> a, Supplier<B> b, Supplier<C> c, Supplier<D> d, Fanout.Join4<? super A, ? super B, ? super C, ? super D, R, X> join) Runsa
,b
,c
andd
concurrently in their own virtual threads.static <A,
B, C, D, E, R, X extends Throwable>
Rconcurrently
(Supplier<A> a, Supplier<B> b, Supplier<C> c, Supplier<D> d, Supplier<E> e, Fanout.Join5<? super A, ? super B, ? super C, ? super D, ? super E, R, X> join) Runsa
,b
,c
,d
ande
concurrently in their own virtual threads.static void
uninterruptibly
(Runnable task1, Runnable task2, Runnable... moreTasks) Runstask1
,task2
andmoreTasks
concurrently and uninterruptibly in their own virtual threads.static <A,
B, R, X extends Throwable>
Runinterruptibly
(Supplier<A> a, Supplier<B> b, Fanout.Join2<? super A, ? super B, R, X> join) Runsa
andb
concurrently and uninterruptibly in their own virtual threads.static <A,
B, C, R, X extends Throwable>
Runinterruptibly
(Supplier<A> a, Supplier<B> b, Supplier<C> c, Fanout.Join3<? super A, ? super B, ? super C, R, X> join) Runsa
,b
andc
concurrently and uninterruptibly in their own virtual threads.static <A,
B, C, D, R, X extends Throwable>
Runinterruptibly
(Supplier<A> a, Supplier<B> b, Supplier<C> c, Supplier<D> d, Fanout.Join4<? super A, ? super B, ? super C, ? super D, R, X> join) Runsa
,b
,c
andd
concurrently and uninterruptibly in their own virtual threads.static <A,
B, C, D, E, R, X extends Throwable>
Runinterruptibly
(Supplier<A> a, Supplier<B> b, Supplier<C> c, Supplier<D> d, Supplier<E> e, Fanout.Join5<? super A, ? super B, ? super C, ? super D, ? super E, R, X> join) Runsa
,b
,c
,d
ande
concurrently and uninterruptibly in their own virtual threads.static Parallelizer
withMaxConcurrency
(int maxConcurrency) Returns a concurrency-limitedParallelizer
that can be used to run a potentially large number of fanout concurrent tasks using the currently configured standard (virtual thread) executor.
-
Method Details
-
concurrently
public static <A,B, R concurrentlyR, X extends Throwable> (Supplier<A> a, Supplier<B> b, Fanout.Join2<? super A, ? super B, throws InterruptedException, XR, X> join) Runsa
andb
concurrently in their own virtual threads. After all of the concurrent operations return successfully, invoke thejoin
function on the results in the caller's thread.For example:
Result result = concurrently( () -> fetchArm(), () -> fetchLeg(), (arm, leg) -> new Result(arm, leg));
- Throws:
InterruptedException
- if the current thread is interrupted while waiting for the concurrent operations to complete. The unfinished concurrent operations will be canceled.RuntimeException
- wrapping the original exception from the virtual thread if any concurrent operation failedX
- thrown by thejoin
function
-
concurrently
public static <A,B, R concurrentlyC, R, X extends Throwable> (Supplier<A> a, Supplier<B> b, Supplier<C> c, Fanout.Join3<? super A, ? super B, throws InterruptedException, X? super C, R, X> join) Runsa
,b
andc
concurrently in their own virtual threads. After all of the concurrent operations return successfully, invoke thejoin
function on the results in the caller's thread.For example:
Result result = concurrently( () -> fetchHead(), () -> fetchArm(), () -> fetchLeg(), (head, arm, leg) -> new Result(head, arm, leg));
- Throws:
InterruptedException
- if the current thread is interrupted while waiting for the concurrent operations to complete. The unfinished concurrent operations will be canceled.RuntimeException
- wrapping the original exception from the virtual thread if any concurrent operation failedX
- thrown by thejoin
function
-
concurrently
public static <A,B, R concurrentlyC, D, R, X extends Throwable> (Supplier<A> a, Supplier<B> b, Supplier<C> c, Supplier<D> d, Fanout.Join4<? super A, ? super B, throws InterruptedException, X? super C, ? super D, R, X> join) Runsa
,b
,c
andd
concurrently in their own virtual threads. After all of the concurrent operations return successfully, invoke thejoin
function on the results in the caller's thread.For example:
Result result = concurrently( () -> fetchHead(), () -> fetchShoulder(), () -> fetchArm(), () -> fetchLeg(), (head, shoulder, arm, leg) -> new Result(head, shoulder, arm, leg));
- Throws:
InterruptedException
- if the current thread is interrupted while waiting for the concurrent operations to complete. The unfinished concurrent operations will be canceled.RuntimeException
- wrapping the original exception from the virtual thread if any concurrent operation failedX
- thrown by thejoin
function
-
concurrently
public static <A,B, R concurrentlyC, D, E, R, X extends Throwable> (Supplier<A> a, Supplier<B> b, Supplier<C> c, Supplier<D> d, Supplier<E> e, Fanout.Join5<? super A, ? super B, throws InterruptedException, X? super C, ? super D, ? super E, R, X> join) Runsa
,b
,c
,d
ande
concurrently in their own virtual threads. After all of the concurrent operations return successfully, invoke thejoin
function on the results in the caller's thread.For example:
Result result = concurrently( () -> fetchHead(), () -> fetchShoulder(), () -> fetchArm(), () -> fetchLeg(), () -> fetchFeet(), (head, shoulder, arm, leg, feet) -> new Result(head, shoulder, arm, leg, feet));
- Throws:
InterruptedException
- if the current thread is interrupted while waiting for the concurrent operations to complete. The unfinished concurrent operations will be canceled.RuntimeException
- wrapping the original exception from the virtual thread if any concurrent operation failedX
- thrown by thejoin
function
-
concurrently
public static void concurrently(Runnable task1, Runnable task2, Runnable... moreTasks) throws InterruptedException Runstask1
,task2
andmoreTasks
concurrently in their own virtual threads.For example:
concurrently(() -> uploadFile(), () -> sendMessageToQueue());
- Throws:
InterruptedException
- if the current thread is interrupted while waiting for the concurrent operations to complete. The unfinished concurrent operations will be canceled.RuntimeException
- wrapping the original exception from the virtual thread if any concurrent operation failed- Since:
- 8.3
-
uninterruptibly
public static <A,B, R uninterruptiblyR, X extends Throwable> (Supplier<A> a, Supplier<B> b, Fanout.Join2<? super A, ? super B, throws XR, X> join) Runsa
andb
concurrently and uninterruptibly in their own virtual threads. After all of the concurrent operations return successfully, invoke thejoin
function on the results in the caller's thread.For example:
Result result = uninterruptibly( () -> fetchArm(), () -> fetchLeg(), (arm, leg) -> new Result(arm, leg));
- Throws:
RuntimeException
- wrapping the original exception from the virtual thread if any concurrent operation failedX
- thrown by thejoin
function
-
uninterruptibly
public static <A,B, R uninterruptiblyC, R, X extends Throwable> (Supplier<A> a, Supplier<B> b, Supplier<C> c, Fanout.Join3<? super A, ? super B, throws X? super C, R, X> join) Runsa
,b
andc
concurrently and uninterruptibly in their own virtual threads. After all of the concurrent operations return successfully, invoke thejoin
function on the results in the caller's thread.For example:
Result result = uninterruptibly( () -> fetchHead(), () -> fetchArm(), () -> fetchLeg(), (head, arm, leg) -> new Result(head, arm, leg));
- Throws:
RuntimeException
- wrapping the original exception from the virtual thread if any concurrent operation failedX
- thrown by thejoin
function
-
uninterruptibly
public static <A,B, R uninterruptiblyC, D, R, X extends Throwable> (Supplier<A> a, Supplier<B> b, Supplier<C> c, Supplier<D> d, Fanout.Join4<? super A, ? super B, throws X? super C, ? super D, R, X> join) Runsa
,b
,c
andd
concurrently and uninterruptibly in their own virtual threads. After all of the concurrent operations return successfully, invoke thejoin
function on the results in the caller's thread.For example:
Result result = uninterruptibly( () -> fetchHead(), () -> fetchShoulder(), () -> fetchArm(), () -> fetchLeg(), (head, shoulder, arm, leg) -> new Result(head, shoulder, arm, leg));
- Throws:
RuntimeException
- wrapping the original exception from the virtual thread if any concurrent operation failedX
- thrown by thejoin
function
-
uninterruptibly
public static <A,B, R uninterruptiblyC, D, E, R, X extends Throwable> (Supplier<A> a, Supplier<B> b, Supplier<C> c, Supplier<D> d, Supplier<E> e, Fanout.Join5<? super A, ? super B, throws X? super C, ? super D, ? super E, R, X> join) Runsa
,b
,c
,d
ande
concurrently and uninterruptibly in their own virtual threads. After all of the concurrent operations return successfully, invoke thejoin
function on the results in the caller's thread.For example:
Result result = uninterruptibly( () -> fetchHead(), () -> fetchShoulder(), () -> fetchArm(), () -> fetchLeg(), () -> fetchFeet(), (head, shoulder, arm, leg, feet) -> new Result(head, shoulder, arm, leg, feet));
- Throws:
RuntimeException
- wrapping the original exception from the virtual thread if any concurrent operation failedX
- thrown by thejoin
function
-
uninterruptibly
Runstask1
,task2
andmoreTasks
concurrently and uninterruptibly in their own virtual threads.For example:
uninterruptibly(() -> uploadFile(), () -> sendMessageToQueue());
- Throws:
RuntimeException
- wrapping the original exception from the virtual thread if any concurrent operation failed- Since:
- 8.3
-
withMaxConcurrency
Returns a concurrency-limitedParallelizer
that can be used to run a potentially large number of fanout concurrent tasks using the currently configured standard (virtual thread) executor.For example, if you have a list of user ids to fetch from UserService, and you want to avoid DOSing it, the following code performs at most 10 queries at the same time:
Compared to parallel streams, you can control the concurrency. Exceptions also retain the full stack trace from both the main thread and the virtual thread.List<UserId> userIds = ; Map<UserId, User> users = Fanout.withMaxConcurrency(10) .inParallel(userIds, userService::fetchUser) .toMap();
-