- 1.1. From Iterating to Stream Operations
- 1.2. Stream Creation
- 1.3. The filter, map, and flatMap Methods
- 1.4. Extracting Substreams and Combining Streams
- 1.5. Other Stream Transformations
- 1.6. Simple Reductions
- 1.7. The Optional Type
- 1.8. Collecting Results
- 1.9. Collectors
- 1.10. Reduction Operations
- 1.11. Gatherers
- 1.12. Primitive Type Streams
- 1.13. Parallel Streams
1.11. Gatherers
The stream API has an extension point for implementing arbitrary terminal operations: the collect method, whose argument is a Collector. Many collector implementations are provided in the Collectors class. You can also implement your own, as you have seen in Section 1.9.5.
Java 24 adds an extension point for intermediate operations. The gather method, whose argument is a Gatherer, turns a stream into another stream.
1.11.1. Predefined Gatherers
The Gatherers class provides a few Gatherer implementations. The windowFixed and windowSliding gatherers group adjacent elements together, yielding a stream of lists. It's easier to show them in action than to explain them in words:
IntStream.range(0, 10).boxed().gather(Gatherers.windowFixed(4)).toList()
// [[0, 1, 2, 3], [4, 5, 6, 7], [8, 9]]
IntStream.range(0, 10).boxed().gather(Gatherers.windowSliding(4)).toList()
// [[0, 1, 2, 3], [1, 2, 3, 4], [2, 3, 4, 5], [3, 4, 5, 6],
// [4, 5, 6, 7], [5, 6, 7, 8], [6, 7, 8, 9]]
The windowSliding gatherer is useful for comparing adjacent elements of a stream. For example, here is how to drop adjacent duplicates:
Stream.concat(words, Stream.of((String) null))
.gather(Gatherers.windowSliding(2))
.filter(w -> !w.get(0).equalsIgnoreCase(w.get(1)))
.map(w -> w.get(0))
By appending a null to the stream, the last element can be compared with something.
The mapConcurrent gatherer is similar to map, but the computations run in virtual threads. This is very useful for blocking operations. For example, if you have a list of URLs, you can read them concurrently.
String read(String url) {
try {
return new String(URI.create(url).toURL().openStream().readAllBytes());
}
catch (IOException e) {
throw new UncheckedIOException(e);
}
}
int max = 100; // Maximumum number of concurrent threads
List<String> contents = urls.stream()
.gather(Gatherers.mapConcurrent(max, this::read))
.toList();
If one of the computations throws an exception, all remaining tasks are cancelled, and the exception is rethrown.
Finally, there are fold and scan gatherers. A fold is a general mechanism for repeatedly applying an operation op. It starts with an initial value iv and computes (((iv op v0) op v1) op v2 op) ..., where vi are the stream elements.
This looks similar to a reduction, which you saw in Section 1.10. However, reductions are intended for parallelizable operations. A fold is inherently sequential, and the operation need not be associative.
Some functional programmers love folds because any loop can be translated into a fold. Consider this computation of a number from its decimal digits:
int n = 0;
for (int d : digits)
n = 10 * n + d;
Here is the same computation as a fold:
int n = digits.stream()
.gather(Gatherers.fold(() -> 0, (x, y) -> x * 10 + y))
.findFirst()
.orElse(0);
The fold gatherer produces a stream with one element, or none if digits was empty.
As you can see from the example, you have to provide a supplier for the initial value, and the operation that is repeatedly applied.
There is also a scan gatherer that produces a stream of all intermediate results. For example,
Stream.of(1, 7, 2, 9)
.gather(Gatherers.scan(() -> 0, (x, y) -> x * 10 + y))
.toList() // [1, 17, 172, 1729]
1.11.2. Implementing Gatherers
Implementing a gatherer is similar to implementing a collector (Section 1.9.5). A gatherer has a generic “intermediate state” object instead of the “result collection” of a collector. It too has four methods yielding function objects. Here T is the type of the stream elements, A the type of the intermediate state, and R the element type of the stream that the gatherer produces.
- Supplier<A> initializer() supplies an intermediate state instance.
- Gatherer.Integrator<A, T, R> integrator() processes a stream element.
- BinaryOperator<A> combiner() combines two intermediate states int one.
- BiConsumer<A, Gatherer.Downstream<? super R>> finisher() carries out a final operation after processing all stream elements.
Gatherer.Integrator is a functional interface with this method:
boolean integrate(A state, T element, Gatherer.Downstream<? super R> downstream)
In the integrator and finisher, you call the push method of the Gatherer.Downstream interface in order to send values to the stream that the gatherer produces. You can call push as often as you like, or not at all.
Very simple gatherers can be stateless. Then the initializer is not needed.
A combiner is only needed for parallel streams.
The finisher is only needed if you need to push values after the last element has been integrated. For example, the windowFixed gatherer pushes the short list of remaining elements in the finisher.
As an example, let us implement a gatherer that samples every nth element of a stream.
Here is how we want to use it:
List<String> samples = wordList.stream().gather(sampling(50)).toList();
The result is a list holding every 50th word from the original list.
The sampling method yields the gatherer:
static class IntermediateState {
long index = -1;
}
<T> Gatherer<T, IntermediateState, T> sampling(int n) {
return Gatherer.ofSequential(
IntermediateState::new,
Gatherer.Integrator.of((state, e, downstream) -> {
state.index++;
if (state.index % n == 0)
return downstream.push(e);
else
return true;
}));
}
The Gatherer.ofSequential method yields a gatherer that calls the integrator in encounter order. There is also a Gatherer.of method for constructing parallelizable gatherers.
This gatherer doesn't have a combiner or finisher. In general, you don’t provide arguments for unneeded initializers, combiners and finishers. Use one of the overloaded versions with just the gatherer operations you need.
Now let us look more closely at the sampling gatherer. The state holds the index of the element. The initializer constructs a new state instance.
The integrator is declared using the Gatherer.Integrator.of factory method. Whenever an element is processed, the index is incremented. The element is pushed when the index is divisible by the sampling count.
Note that the integrator returns a boolean. You should return false when the pipeline “short-circuits” and no longer wants to accept elements. This can happen for two reasons. Your gatherer might want to short-circuit on its own. Simply return false in that case. (In the sampling example, that never happens.) Alternatively, a downstream operation can short-circuit. Then the call to push returns false. The integrator needs to return that downstream status.
For optimizing the performance of a stream pipeline, it is important to know whether it contains any short-circuiting operations. An integrator that doesn't short-circuit should signal that fact by implementing the Gatherer.Integrator.Greedy interface. There is a factory method for that:
<T> Gatherer<T, IntermediateState, T> sampling(int n) {
return Gatherer.ofSequential(
IntermediateState::new,
Gatherer.Integrator.ofGreedy((state, e, downstream) -> {
state.index++;
if (state.index % n == 0)
return downstream.push(e);
else
return true;
}));
}
As you have seen, writing your own gatherer may seem a bit daunting at first, but it is not all that complex. Follow these steps:
- Decide whether your gatherer can execute in parallel (Gatherer.of), or whether it must see the elements sequentially (Gatherer.ofSequential).
- Decide whether the integrator may short-circuit (Gatherer.Integrator.of) or if it always accepts elements (Gatherer.Integrator.ofGreedy).
- Do you need state? If so, provide a class for the state, and if your gatherer is not sequential, implement a combiner.
- In the integrator, return false when short-ciruiting, or when the downstream push is rejected.
