Stream Gatherers (JEP 485)
What's Missing from Streams?
Similar to an iterator, a Java stream yields a sequence of elements. But the stream elements are processed lazily. When you program a stream pipeline, you specify functions that inspect or transform elements:
void main() { var words = """ Four score and seven years ago our fathers brought forth on this continent, a new nation, conceived in Liberty, and dedicated to the proposition that all men are created equal.""".split(",?\\s+"); var longWords = Stream.of(words) .filter(s -> s.length() > 6) .map(String::toLowerCase) .limit(5) .toList(); println(longWords); }
The lambda expression s -> s.length() > 6
and the toLowerCase
method are only called when needed; that is, until five results were collected.
The lambda expressions in the calls to filter
and map
see one element at a time. But sometimes you want to see more than one stream element in order to make a decision
Consider the task of eliminating adjacent duplicates. When processing the input The the the quick brown fox jumps over the lazy dog dog
, we want to drop the duplicate adjacent the
and dog
.
The stream API has three methods for selectively dropping elements: dropWhile
, takeWhile
, and filter
. The first two don't fit our situation. But we can call filter
with a handcrafted function. That function receives the current element and compares it with the predecessor that it has stashed away.
Here is a potential implementation:
void main() { var sentence = "The the the quick brown fox jumps over the lazy dog dog"; Stream<String> words = new Scanner(sentence).tokens(); // Note that the lambda could not mutate a variable // String previous; class State { String previous; }; var state = new State(); List<String> result = words.filter(element -> { boolean keep = !element.equalsIgnoreCase(state.previous); state.previous = element; return keep; }).toList(); println(result); }
That doesn't look pretty. And that's not good. Why did we use streams in the first place? Because the stream code is easier to read than the equivalent loop.
Of course, the stream API could be augmented by a dropAdjacent
method, but where would that lead? There are any number of potentially useful stream operations. Instead, JEP 485 provides a general extension point for stateful stream transformations, and a small number of useful implementations.
Note: In general, streams have another potential benefit. It may be possible to parallelize stream operations. Clearly, that is not the case in this particular implementation. Here we assume that the filter is invoked for each element in turn.
A Reminder About Collectors
The stream API already has a general extension point: the terminal collect
method and the Collector
interface. In fact, collect()
with no arguments calls collect(Collectors.toList())
, passing a collector that produces a list.
In the collection process, arriving stream elements are accumulated in internal data structures called “result containers”. If the stream is parallel, result containers are merged. After accumulation and merging, the collector can optionally transform the final result container into another object that is the collection result.
Specifically, a Collector
must implement four methods, each of which yields a function object:
Supplier<A> supplier()
- Supplies a result container
BiConsumer<A,T> accumulator()
- Accumulates an element into a result container
BinaryOperator<A> combiner()
- Combines two result containers into one
Function<A, R> finisher()
- Transforms the final result container into the result
Let’s look at the function objects of the Collectors.toList()
collector:
- The
supplier
yields a function yielding an emptyArrayList<T>
, orArrayList<T>::new
. - The
accumulator
yields(a, t) -> a.add(t)
orList::add
. (That method expression is why the result container is the first parameter.) - The
combiner
yields a function that concatenates two lists, specifically(a, b) -> { a.addAll(b); return a; }
- The
finisher
does nothing (which is the most common case)
The static Collectors.groupingBy
method yields a more interesting collector. It produces a map from keys to lists of elements with the same key. In its simplest form, you pass a “classifier” function that extracts the map keys from the elements.
I like to use the stream Locale.availableLocales()
as an example of a useful data stream. A locale describes the language and other preferences of a place. The following call to collect
yields a map from country code strings to sets of locales in that country.
void main() { Map<String, List<Locale>> countryToLocales = Locale.availableLocales().collect( Collectors.groupingBy(Locale::getCountry)); List<Locale> swissLocales = countryToLocales.get("CH"); // Yields locales [de_CH, fr_CH, it_CH, rm_CH, ...] println(swissLocales); }
As you can see, the JDK supports a multitude of locales used in Switzerland, including German, French, Italian, and Romansh, all collected in a list.
Note the classifier function Locale::getCountry
that extracts the map key, i.e. the country code string such as "CH"
or "US"
.
This use of collectors is straightforward. It gets more opaque when you use “downstream collectors” that apply another collection step to the collected map values. Consider:
void main() { Map<String, Map<String, List<Locale>>> countryAndLanguageToLocale = Locale.availableLocales().collect( Collectors.groupingBy(Locale::getCountry, Collectors.groupingBy(Locale::getLanguage))); println(countryAndLanguageToLocale.get("IN").get("hi")); }
Now you have a map of maps. For example, countryAndLanguageToLocale.get("IN").get("hi")
is a list of the Hindi locales in India. (There are several variants.)
The Gatherer Interface
JEP 485 introduces a gather
method that is analogous to the collect
method, but it is an intermediate operation, yielding a stream, not a final result. The method has an argument of type Gatherer
, an interface that is similar to the Collector
interface. A gatherer has a generic “intermediate state” object instead of the “result collection” of a collector. It too has four methods yielding function objects.
Supplier<A> initializer()
- Supplies an intermediate state instance
Gatherer.Integrator<A, T, R> integrator()
- Integrates an element—see below for details
BinaryOperator<A> combiner()
- Combines two intermediate states into one
BiConsumer<A, Gatherer.Downstream<? super R>> finisher()
- Finalizes the processing after all stream elements have been integrated
This is more complex than collectors since the gatherer sends values “downstream”; that is, to the stream that is the result of the gather
method.
Let's first look at the finish
method. It is called after all upstream elements have been passed to the integrator
function, and all combiner
functions have been invoked. We now have the final version of the “intermediate state”. From it, the finisher
method can derive zero or more values and send them downstream, using the methods of the Gatherer.Downstream
interface:
boolean push(T element)
- Pushes the element downstream, returns true if more elements can be pushed
boolean isRejecting()
- Returns true if pushing more elements has no effect downstream
The integrator
method returns a function object conforming to the functional interface Gatherer.Integrator
, with a method
boolean integrate(A state, T element, Gatherer.Downstream<? super R> downstream)
When integrating a new element, you can mutate the intermediate state and/or push values downstream. Return false
if further elements will be rejected.
That all sounds very abstract and technical, so let's apply it to the concrete problem of dropping adjacent duplicates. The Gatherer
interface has a static helper method ofSequential
that creates a gatherer with a given initializer and integrator.
Our state is, as before, a holder for the previous string. The initializer produces a state instance.
To create the integrator, we use the Gatherer.Integrator.of
helper method. Here are the details:
Gatherer<String, ?, String> dropAdjacentDuplicates() { class State { String previous; }; return Gatherer.<String, State, String>ofSequential( State::new, Gatherer.Integrator.of((state, element, downstream) -> { boolean keep = !element.equalsIgnoreCase(state.previous); state.previous = element; if (keep) return downstream.push(element); else return !downstream.isRejecting(); })); } void main() { var sentence = "The the the quick brown fox jumps over the lazy dog dog"; Stream<String> words = new Scanner(sentence).tokens(); List<String> result = words.gather(dropAdjacentDuplicates()).toList(); println(result); }
As you can see, the gather implementation is not pleasant, even in this simple case. But once implemented, it is easy to use.
Built-in Gatherers
Most programmers use the collect
method with collectors from the Collectors
class, which has over forty methods. The Gatherers
class offers a much smaller number of gatherers for use with the gather
method.
The windowFixed
and windowSliding
methods yield a stream of lists of adjacent elements. It's quicker to show them in action than to explain them in words:
void main() { println(IntStream.range(0, 10).boxed().gather(Gatherers.windowFixed(4)).toList()); // [[0, 1, 2, 3], [4, 5, 6, 7], [8, 9]] println(IntStream.range(0, 10).boxed().gather(Gatherers.windowSliding(4)).toList()); // [[0, 1, 2, 3], [1, 2, 3, 4], [2, 3, 4, 5], [3, 4, 5, 6], [4, 5, 6, 7], [5, 6, 7, 8], [6, 7, 8, 9]] }
A fixed window is useful when you have input that comes in fixed-sized groups. For example, consider a file with lines such as:
Zappa Microwave Oven 3 109.95 Blackwell Toaster 1 29.95
You can process it as follows:
List<LineItem> lineItems = Files.lines(Path.of("lineitems.txt")) .windowFixed(3) .map(w -> new LineItem(w.get(0), Integer.parseInt(w.get(1)), Double.parseDouble(w.get(2)))) .toList();
A sliding window is useful when you need to compare adjacent elements. For example, we can almost solve our “drop adjacent duplicates” problem like this:
words.gather(Gatherers.windowSliding(2)) .filter(w -> !w.get(0).equals(w.get(1))) .map(w -> w.get(0))
This doesn't quite work because we also want to keep the last element of wordStream
if it differs from its predecessor. But it is easy to fix:
void main() { var sentence = "The the the quick brown fox jumps over the lazy dog dog"; Stream<String> words = new Scanner(sentence).tokens(); var result = Stream.concat(words, Stream.of((String) null)) .gather(Gatherers.windowSliding(2)) .filter(w -> !w.get(0).equalsIgnoreCase(w.get(1))) .map(w -> w.get(0)) .toList(); println(result); }
The fold
gatherer computes a “fold” or, more precisely, “left fold”. It starts with an initial value and applies an operation, where the left argument is the current value, and the right argument the next element:
. . . op / \ op elem2 / \ op elem1 / \ init elem0
Folds are useful when a value is computed incrementally from the stream elements. Functional programmers prefer folds over loops. Here is a simple example, computing a number from its digits:
void main() { Integer[] digits = { 1, 7, 2, 9 }; int number = Stream.of(digits) .gather(Gatherers.fold(() -> 0, (x, y) -> x * 10 + y)) .findFirst() .orElse(0); println(number); }
I had to use a Stream<Integer>
because the primitive streams IntStream
, LongStream
, DoubleStream
do not have gather
methods.
The fold computes
0 * 10 + 1 // 1 1 * 10 + 7 // 17 17 * 10 + 2 // 172 172 * 10 + 9 // 1729
The result is a stream with a single value, which is obtained by calling findFirst()
followed by .orElse(0)
.
Probably fold
should have been a terminal operation, but gatherers allow fulfillment of this desperate need.
Note that fold
is different from the reduce
terminal operation. Reduction is optimized for parallel evaluation. It uses either an associative operator, or a pair of operations: one to produce intermediate results and another to combine them. In contrast, fold
is strictly sequential.
The scan
operation is similar to fold
, but you get a stream of all the intermediate results. For example, if you have a stream of deposit or withdrawal amounts, then a scan with the addition operator yields the cumulative balances:
void main() { double initial = 1000.0; Double[] transactions = { 100.0, -50.0, 200.0, -150.0 }; var result = Stream.of(transactions) .gather(Gatherers.scan(() -> initial, Double::sum)) .toList(); // [1100.0, 1050.0, 1250.0, 1100.0] println(result); }
This scan computes init + elem0
, init + elem0 + elem1
, init + elem0 + elem1 + elem2
, and so on.
Note: The Arrays.parallelPrefix
method does almost the same operation, but in parallel and only with an associative operator.
I am not sure how popular fold
and scan
will turn out to be. The fifth predefined gatherer, Gatherers.mapConcurrent
, is very useful—see the next section.
The Gatherer.andThen
method combines two gatherers into a single one. The result is the same as calling gather
twice.
void main() { var result1 = IntStream.range(0, 10) .boxed() .gather(Gatherers.windowFixed(2).andThen(Gatherers.windowFixed(2))) .toList(); // [[[0, 1], [2, 3]], [[4, 5], [6, 7]], [[8, 9]]] var result2 = IntStream.range(0, 10) .boxed() .gather(Gatherers.windowFixed(2)) .gather(Gatherers.windowFixed(2)) .toList(); // [[[0, 1], [2, 3]], [[4, 5], [6, 7]], [[8, 9]]] println(result1); println(result2); }
Concurrent Execution
For processor-intensive workloads, you can use parallel streams:
var results = collection .parallelStream() .map(e -> hardWork(e)) .toList();
The stream splits into subranges (normally, one per core), which are processed concurrently, using the global fork-join pool.
However, if the workload is mostly blocking, then parallel streams do not give the best throughput. Many more threads could block concurrently. In this situation, Gatherers.mapConcurrent
shines. It executes each task in a virtual thread.
int MAX_CONCURRENT = 1000; var results = collection .stream() // Not parallelStream! .gather(Gatherers.mapConcurrent(MAX_CONCURRENT, e -> blockingWork(e))) .toList();
You want to use this gatherer with blocking calls. The first argument gives you a convenient way of throttling the number of concurrent tasks. After all, the tasks are likely to connect to some external service that may fail with huge numbers of concurrent requests. (Note that the stream is not parallel.)
As an aside, the stream API purposefully does not allow you to set the executor for parallel stream tasks. There is a well-known workaround to use a different pool than the global fork-join pool. But that does not work with virtual threads:
ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor(); var results = executor.submit( // Won't use executor for subtasks () -> collection .parallelStream() .map(e -> blockingWork(e)).toList()) // Splits into one task per processor .get();
You get one task per processor, not a task per call to map
. And, to add insult to injury, the executor of the submission is not used for the split tasks since it is not an instance of ForkJoinPool
. Use Gatherers.mapConcurrent
if you want virtual threads.
Fine-Tuning Gatherers
When designing your own gatherer, you can signal two opportunities for optimization.
A small optimization is the “greediness” of a gatherer. A greedy gatherer processes all incoming elements. A non-greedy or short-circuiting gatherer may start rejecting elements. Elements can be pushed in bulk to a greedy gatherer, which may be more performant. The gain is not likely to be substantial, but it's easy enough to opt in. If you use a Gatherer.Integrator
factory method to define your integrator, then use ofGreedy
if the gatherer is greedy, or of
if it is not. Alternatively, implement the Gatherers.Integrator.Greedy
subinterface for greedy gatherers.
Next, you need to decide whether to make your gatherer sequential or parallelizable.
A sequential gatherer calls the initializer
method once, then the integrator
for each stream element in order, and then the finisher
.
With a parallelizable gatherer on a parallel stream, the initializer
method is called for each subrange. The integrator
method is called as elements are encountered in the subranges. Elements that are pushed downstream will be assembled in the correct order.
Whenever neighboring subranges are complete, the combiner
method is called. It can combine state, but it cannot push elements downstream.
Finally, the finisher
method is called once, with the combined state. It can push elements downstream. They are appended after any elements that are pushed in an integrator.
It seems unlikely that a parallel gatherer will push elements both in the integrator
and the finalizer
. If the decision whether and what to push is independent of the neighbors, that can happen in the integrator
. Otherwise, all pushing will have to wait to the finalizer
.
The three Gatherer.of
methods yield parallelizable gatherers, and the four Gatherer.ofSequential
methods yield gatherers that are not parallelizable.
If you implement the Gatherer
interface yourself, then your combiner
method determines whether or not the gatherer is sequential or parallelizable. A sequential gatherer must return defaultCombiner()
(which yields a fixed function object that will never be invoked). If combiner()
returns any other function object, the gatherer is parallelizable.
Can the gatherer that drops adjacent duplicates be parallelizable? The integrator
could buffer non-duplicate elements. The combiner
could eliminate elements that appear at the end of the first buffer and and the beginning of the second. The finalizer
would push buffered elements. But it may not be worth the trouble. The savings from parallelizing the comparisons may be eaten up by the cost of buffering.
Here is an example of a parallelizable gatherer. We want to replicate each stream element n times.
<T> Gatherer<T, ?, T> nCopies(int n) { return Gatherer.<T, T>of( Gatherer.Integrator.ofGreedy((_, element, downstream) -> { for (int i = 0; i < n; i++) if (!downstream.push(element)) return false; return !downstream.isRejecting(); })); } void main() { var sentence = "The quick brown fox jumps over the lazy dog"; Stream<String> words = new Scanner(sentence).tokens(); List<String> result = words.gather(nCopies(3)).toList(); println(result); int sum = IntStream.range(1, 101) .parallel() .boxed() .gather(nCopies(1000)) .mapToInt(n -> n) .sum(); println(sum); }
Note: In this example, you don't actually need a gatherer, since there is no state. You can use the mapMulti
method that has a slightly simpler parameter:
<T> BiConsumer<T, Consumer<T>> nCopies(int n) { return (element, sink) -> { for (int i = 0; i < n; i++) sink.accept(element); }; } void main() { var sentence = "The quick brown fox jumps over the lazy dog"; Stream<String> words = new Scanner(sentence).tokens(); List<String> result = words.mapMulti(nCopies(3)).toList(); println(result); }
But the gatherer isn't that much more complicated. Since it is a more useful abstraction, I expect that programmers will soon find it familiar, whereas mapMulti
is pretty obscure.
Note: The processing of a stream pipeline is highly optimized—see this article by Brian Goetz for a very clear description. Each stage of the pipeline reports whether it supports characteristics such as ORDERED
, DISTINCT
, SORTED
, SIZED
, and SUBSIZED
. This information is used to optimize execution and elide unnecessary operations. Similarly, collectors have a (more limited) set of characteristics. However, when authoring a gatherer, there is no mechanism for reporting characteristics.
When to Gather
When stream operations depend on neighboring elements, or all preceding elements, then you need a gatherer. You may be able to use windowSliding
or fold
, or you can roll your own. These gatherers are likely sequential.
When stream operations depend on global characteristics, such as frequency of occurrence, then a gatherer can also work. You would write your own, track candidate elements in a state object, and push them in the finisher.
A gatherer can even be useful without looking at other elements. It can push any number of results, or it can use some kind of policy for processing each element. For example, the mapConcurrent
gatherer computes each result in a virtual thread.
Here is another such gatherer for your amusement. It works like map
, but the mapping method can throw checked exceptions.
interface CheckedFunction<T, R> { R apply(T arg) throws Exception; } <T, R> Gatherer<T, ?, R> mapChecked(CheckedFunction<? super T,? extends R> mapper) { return Gatherer.<T, R>of( Gatherer.Integrator.ofGreedy((_, element, downstream) -> { try { return downstream.push(mapper.apply(element)); } catch (Exception ex) { throw new RuntimeException("mapChecked", ex); } })); } void main() { String[] paths = { "/etc/passwd", "/etc/environment" }; List<String> contents = Stream.of(paths) .map(Path::of) .gather(mapChecked(Files::readString)) .toList(); println(contents); }
Gunnar Morling shows how to zip two streams with a gatherer.
It is also easy to implement “zip with index”—see this StackOverflow answer.
You can find additional potentially useful gatherers in this project.
I'd like to conclude with a tip. When you use a gatherer, don't inline it. Write a method with a friendly name that yields it. That way, the intent is clear to those who read your code. After all, streams are all about “what, not how”.