Class MoreStreams
- java.lang.Object
-
- com.google.mu.util.stream.MoreStreams
-
public final class MoreStreams extends java.lang.Object
Static utilities pertaining toStream
in addition to relevant utilities in JDK and Guava.- Since:
- 1.1
-
-
Method Summary
Modifier and Type Method Description static <T> java.util.Spliterator<java.util.List<T>>
dice(java.util.Spliterator<? extends T> spliterator, int maxSize)
Dicesspliterator
into smaller chunks each with up tomaxSize
elements.static <T> java.util.stream.Stream<java.util.List<T>>
dice(java.util.stream.Stream<? extends T> stream, int maxSize)
Dicesstream
into smaller chunks each with up tomaxSize
elements.static <T> java.util.stream.Stream<T>
flatten(java.util.stream.Stream<? extends java.util.stream.Stream<? extends T>> streamOfStream)
FlattensstreamOfStream
and returns an unordered sequential stream of the nested elements.static <T> java.util.stream.Stream<T>
generate(T seed, java.util.function.Function<? super T,? extends java.util.stream.Stream<? extends T>> step)
Returns a Stream produced by iterative application ofstep
to the initialseed
, producing a Stream consisting of seed, elements of step(seed), elements of step(x) for each x in step(seed), etc.static <T> java.util.stream.Stream<T>
groupConsecutive(java.util.stream.Stream<T> stream, java.util.function.BiPredicate<? super T,? super T> sameGroup, java.util.function.BinaryOperator<T> groupReducer)
Groups consecutive elements fromstream
lazily.static <T,R>
java.util.stream.Stream<R>groupConsecutive(java.util.stream.Stream<T> stream, java.util.function.BiPredicate<? super T,? super T> sameGroup, java.util.stream.Collector<? super T,?,R> groupCollector)
Groups consecutive elements fromstream
lazily.static <T> java.util.stream.Stream<T>
groupConsecutive(java.util.stream.Stream<T> stream, java.util.function.Function<? super T,?> groupKeyFunction, java.util.function.BinaryOperator<T> groupReducer)
Groups consecutive elements fromstream
lazily.static <T,R>
java.util.stream.Stream<R>groupConsecutive(java.util.stream.Stream<T> stream, java.util.function.Function<? super T,?> groupKeyFunction, java.util.stream.Collector<? super T,?,R> groupCollector)
Groups consecutive elements fromstream
lazily.static java.util.stream.Stream<java.lang.Integer>
indexesFrom(int firstIndex)
Returns an infiniteStream
starting fromfirstIndex
.static java.util.stream.Stream<java.lang.Long>
indexesFrom(long firstIndex)
Returns an infinite index stream starting fromfirstIndex
.static <T> java.lang.Iterable<T>
iterateOnce(java.util.stream.Stream<T> stream)
Iterates throughstream
only once.static <T,E extends java.lang.Throwable>
voiditerateThrough(java.util.stream.Stream<? extends T> stream, CheckedConsumer<? super T,E> consumer)
Iterates throughstream
sequentially and passes each element toconsumer
with exceptions propagated.static <T> java.util.stream.Stream<T>
whileNotNull(java.util.function.Supplier<? extends T> supplier)
Similar toStream.generate(java.util.function.Supplier<? extends T>)
, returns an infinite, sequential, ordered, and non-null stream where each element is generated by the provided Supplier.static <T> java.util.stream.Stream<T>
withSideEffect(java.util.stream.Stream<T> stream, java.util.function.Consumer<? super T> sideEffect)
Returns a sequential stream withsideEfect
attached on every element.
-
-
-
Method Detail
-
generate
public static <T> java.util.stream.Stream<T> generate(T seed, java.util.function.Function<? super T,? extends java.util.stream.Stream<? extends T>> step)
Returns a Stream produced by iterative application ofstep
to the initialseed
, producing a Stream consisting of seed, elements of step(seed), elements of step(x) for each x in step(seed), etc. (If the result stream returned by thestep
function is null an empty stream is used, instead.)While
Stream.generate(supplier)
can be used to generate infinite streams, it's not as easy to generate a finite stream unless the size can be pre-determined. This method can be used to generate finite streams: just return an empty stream when thestep
determines that there's no more elements to be generated.A typical group of use cases are BFS traversal algorithms. For example, to stream the tree nodes in BFS order:
It's functionally equivalent to the following common imperative code:Stream<Node> bfs(Node root) { return generate(root, node -> node.children().stream()); }
A BFS 2-D grid traversal algorithm:List<Node> bfs(Node root) { List<Node> result = new ArrayList<>(); Queue<Node> queue = new ArrayDeque<>(); queue.add(root); while (!queue.isEmpty()) { Node node = queue.remove(); result.add(node); queue.addAll(node.children()); } return result; }
Stream<Cell> bfs(Cell startingCell) { Set<Cell> visited = new HashSet<>(); visited.add(startingCell); return generate(startingCell, c -> c.neighbors().filter(visited::add)); }
At every step, 0, 1 or more elements can be generated into the resulting stream. As discussed above, returning an empty stream leads to eventual termination of the stream; returning 1-element stream is equivalent to
Stream.generate(supplier)
; while returning more than one elements allows a single element to fan out to multiple elements.- Since:
- 1.9
-
flatten
public static <T> java.util.stream.Stream<T> flatten(java.util.stream.Stream<? extends java.util.stream.Stream<? extends T>> streamOfStream)
FlattensstreamOfStream
and returns an unordered sequential stream of the nested elements.Logically,
stream.flatMap(fanOut)
is equivalent toMoreStreams.flatten(stream.map(fanOut))
. Due to this JDK bug,flatMap()
usesforEach()
internally and doesn't support short-circuiting for the passed-in stream.flatten()
supports short-circuiting and can be used to flatten infinite streams.- Since:
- 1.9
-
groupConsecutive
public static <T,R> java.util.stream.Stream<R> groupConsecutive(java.util.stream.Stream<T> stream, java.util.function.BiPredicate<? super T,? super T> sameGroup, java.util.stream.Collector<? super T,?,R> groupCollector)
Groups consecutive elements fromstream
lazily. Two consecutive elements belong to the same group ifsameGroup
evaluates to true. Consecutive elements belonging to the same group will be collected together usinggroupCollector
.For example, you can find every list of increasing stock prices, given daily stock prices:
ImmutableList<ImmutableList<Double>> increasingStockPriceSeries = groupConsecutive(stockPrices, (p1, p2) -> p1 <= p2, toImmutableList()) .collect(toImmutableList());
- Since:
- 5.7
-
groupConsecutive
public static <T> java.util.stream.Stream<T> groupConsecutive(java.util.stream.Stream<T> stream, java.util.function.BiPredicate<? super T,? super T> sameGroup, java.util.function.BinaryOperator<T> groupReducer)
Groups consecutive elements fromstream
lazily. Two consecutive elements belong to the same group ifsameGroup
evaluates to true. Consecutive elements belonging to the same group will be reduced usinggroupReducer
.For example, you can find the total number of trades for the stock during each period when there was no large trade anomaly (difference):
ImmutableList<Long> stockTradesPerPeriod = groupConsecutive(stockTrades, (t1, t2) -> Math.abs(t1 - t2) < threshold, Long::sum) .collect(toImmutableList());
- Since:
- 5.7
-
groupConsecutive
public static <T,R> java.util.stream.Stream<R> groupConsecutive(java.util.stream.Stream<T> stream, java.util.function.Function<? super T,?> groupKeyFunction, java.util.stream.Collector<? super T,?,R> groupCollector)
Groups consecutive elements fromstream
lazily. Two consecutive elements belong to the same group ifgroupKeyFunction
evaluates to equal keys. Consecutive elements belonging to the same group will be collected together usinggroupCollector
.For example, you can group consecutive events by their severity:
ImmutableList<ImmutableList<Event>> sameSeverityEventGroups = groupConsecutive(events, Event::severity, toImmutableList()) .collect(toImmutableList());
- Since:
- 5.7
-
groupConsecutive
public static <T> java.util.stream.Stream<T> groupConsecutive(java.util.stream.Stream<T> stream, java.util.function.Function<? super T,?> groupKeyFunction, java.util.function.BinaryOperator<T> groupReducer)
Groups consecutive elements fromstream
lazily. Two consecutive elements belong to the same group ifgroupKeyFunction
evaluates to equal keys. Consecutive elements belonging to the same group will be reduced usinggroupReducer
.For example, you can find the first event of each severity in a consecutive series of events:
ImmutableList<Event> firstEventsWithAlternatingSeverity = groupConsecutive(events, Event::severity, (e1, e2) -> e1) .collect(toImmutableList());
- Since:
- 5.7
-
iterateOnce
public static <T> java.lang.Iterable<T> iterateOnce(java.util.stream.Stream<T> stream)
Iterates throughstream
only once. It's strongly recommended to avoid assigning the return value to a variable or passing it to any other method because the returnedIterable
'siterator()
method can only be called once. Instead, always use it together with a for-each loop, as in:
The above is equivalent to manually doing:for (Foo foo : iterateOnce(stream)) { ... if (...) continue; if (...) break; ... }
except using this API eliminates the need for a named variable that escapes the scope of the for-each loop. And code is more readable too.Iterable<Foo> foos = stream::iterator; for (Foo foo : foos) { ... }
Note that
iterateThrough()
should be preferred whenever possible due to the caveats mentioned above. This method is still useful when the loop body needs to use control flows such asbreak
orreturn
.
-
iterateThrough
public static <T,E extends java.lang.Throwable> void iterateThrough(java.util.stream.Stream<? extends T> stream, CheckedConsumer<? super T,E> consumer) throws E extends java.lang.Throwable
Iterates throughstream
sequentially and passes each element toconsumer
with exceptions propagated. For example:void writeAll(Stream<?> stream, ObjectOutput out) throws IOException { iterateThrough(stream, out::writeObject); }
- Throws:
E extends java.lang.Throwable
-
dice
public static <T> java.util.stream.Stream<java.util.List<T>> dice(java.util.stream.Stream<? extends T> stream, int maxSize)
Dicesstream
into smaller chunks each with up tomaxSize
elements.For a sequential stream, the first N-1 chunk's will contain exactly
maxSize
elements and the last chunk may contain less (but never 0). However for parallel streams, it's possible that the stream is split in roughly equal-sized sub streams before being diced into smaller chunks, which then will result in more than one chunks with less thanmaxSize
elements.This is an intermediary operation.
- Parameters:
stream
- the source stream to be dicedmaxSize
- the maximum size for each chunk- Returns:
- Stream of diced chunks each being a list of size up to
maxSize
- Throws:
java.lang.IllegalStateException
- ifmaxSize <= 0
-
dice
public static <T> java.util.Spliterator<java.util.List<T>> dice(java.util.Spliterator<? extends T> spliterator, int maxSize)
Dicesspliterator
into smaller chunks each with up tomaxSize
elements.- Parameters:
spliterator
- the source spliterator to be dicedmaxSize
- the maximum size for each chunk- Returns:
- Spliterator of diced chunks each being a list of size up to
maxSize
- Throws:
java.lang.IllegalStateException
- ifmaxSize <= 0
-
indexesFrom
public static java.util.stream.Stream<java.lang.Integer> indexesFrom(int firstIndex)
Returns an infiniteStream
starting fromfirstIndex
. Can be used together withBiStream.zip(java.util.Collection<L>, java.util.Collection<R>)
to iterate over a stream with index. For example:zip(indexesFrom(0), values)
.To get a finite stream, use
indexesFrom(...).limit(size)
.Note that while
indexesFrom(0)
will eventually incur boxing cost for every integer, the JVM typically pre-caches smallInteger
instances (by default up to 127).- Since:
- 3.7
-
indexesFrom
public static java.util.stream.Stream<java.lang.Long> indexesFrom(long firstIndex)
Returns an infinite index stream starting fromfirstIndex
. This can then be used tozip
with another stream to provide indexing, such as:BiStream.zip(indexesFrom(0), values).toMap();
To get a finite stream, use
indexesFrom(0).limit(size)
.For small indexes (up to 127),
Long
instances are pre-cached by JVM so no boxing happens; for larger indexes, every index incurs some boxing cost. If the cost is of concern, prefer to useLongStream.iterate(long, java.util.function.LongUnaryOperator)
directly.- Since:
- 5.7
-
whileNotNull
public static <T> java.util.stream.Stream<T> whileNotNull(java.util.function.Supplier<? extends T> supplier)
Similar toStream.generate(java.util.function.Supplier<? extends T>)
, returns an infinite, sequential, ordered, and non-null stream where each element is generated by the provided Supplier. The stream however will terminate as soon as the Supplier returns null, in which case the null is treated as the terminal condition and doesn't constitute a stream element.For sequential iterations,
whileNotNll()
is usually more concise than implementingSpliterators.AbstractSpliterator
directly. The latter requires boilerplate that looks like this:
Which is equivalent to the following one-liner usingreturn StreamSupport.stream( new AbstractSpliterator<T>(MAX_VALUE, NONNULL | ORDERED) { public boolean tryAdvance(Consumer<? super T> action) { if (hasData) { action.accept(data); return true; } return false; } }, false);
whileNotNull()
:return whileNotNull(() -> hasData ? data : null);
Why null? Why not
Optional
? Wrapping every generated element of a stream in anOptional
carries considerable allocation cost. Also, while nulls are in general discouraged, they are mainly a problem for users who have to remember to deal with them. The stream returned bywhileNotNull()
on the other hand is guaranteed to never include nulls that users have to worry about.If you already have an
Optional
from a method return value, you can usewhileNotNull(() -> optionalReturningMethod().orElse(null))
.One may still need to implement
AbstractSpliterator
orIterator
directly if null is a valid element (usually discouraged though).If you have an imperative loop over a mutable queue or stack:
it can be turned into a stream usingwhile (!queue.isEmpty()) { int num = queue.poll(); if (someCondition) { ... } }
whileNotNull()
:whileNotNull(queue::poll).filter(someCondition)...
- Since:
- 4.1
-
withSideEffect
public static <T> java.util.stream.Stream<T> withSideEffect(java.util.stream.Stream<T> stream, java.util.function.Consumer<? super T> sideEffect)
Returns a sequential stream withsideEfect
attached on every element.Unlike
Stream.peek(java.util.function.Consumer<? super T>)
, which should only be used for debugging purpose, the side effect is allowed to interfere with the source of the stream, and is guaranteed to be applied in encounter order.If you have to resort to side effects, use this dedicated method instead of
peek()
or any other stream method. From the API specification, all methods defined byStream
are expected to be stateless, and should not cause or depend on side effects, because even for ordered, sequential streams, only the order of output is defined, not the order of evaluation.- Since:
- 4.9
-
-