GitHub
The Java Version Almanac
javaalmanac.io
Feedback on this page?

Stream Gatherers (JEP 485)

What's Missing from Streams?

Similar to an iterator, a Java stream yields a sequence of elements. But the stream elements are processed lazily. When you program a stream pipeline, you specify functions that inspect or transform elements:

void main() {
    var words = """
        Four score and seven years ago our fathers brought forth on this continent,
        a new nation, conceived in Liberty, and dedicated to the proposition
        that all men are created equal.""".split(",?\\s+");
    var longWords = Stream.of(words)
        .filter(s -> s.length() > 6)
        .map(String::toLowerCase)
        .limit(5)
        .toList();
    println(longWords);
}

The lambda expression s -> s.length() > 6 and the toLowerCase method are only called when needed; that is, until five results were collected.

The lambda expressions in the calls to filter and map see one element at a time. But sometimes you want to see more than one stream element in order to make a decision

Consider the task of eliminating adjacent duplicates. When processing the input The the the quick brown fox jumps over the lazy dog dog, we want to drop the duplicate adjacent the and dog.

The stream API has three methods for selectively dropping elements: dropWhile, takeWhile, and filter. The first two don't fit our situation. But we can call filter with a handcrafted function. That function receives the current element and compares it with the predecessor that it has stashed away.

Here is a potential implementation:

void main() {
    var sentence = "The the the quick brown fox jumps over the lazy dog dog";
    Stream<String> words = new Scanner(sentence).tokens();
    // Note that the lambda could not mutate a variable
    // String previous;
    class State { String previous; };
    var state = new State();
    List<String> result = words.filter(element -> {
            boolean keep = !element.equalsIgnoreCase(state.previous);
            state.previous = element;
            return keep;
        }).toList();
    println(result);
}

That doesn't look pretty. And that's not good. Why did we use streams in the first place? Because the stream code is easier to read than the equivalent loop.

Of course, the stream API could be augmented by a dropAdjacent method, but where would that lead? There are any number of potentially useful stream operations. Instead, JEP 485 provides a general extension point for stateful stream transformations, and a small number of useful implementations.

Note: In general, streams have another potential benefit. It may be possible to parallelize stream operations. Clearly, that is not the case in this particular implementation. Here we assume that the filter is invoked for each element in turn.

A Reminder About Collectors

The stream API already has a general extension point: the terminal collect method and the Collector interface. In fact, collect() with no arguments calls collect(Collectors.toList()), passing a collector that produces a list.

In the collection process, arriving stream elements are accumulated in internal data structures called “result containers”. If the stream is parallel, result containers are merged. After accumulation and merging, the collector can optionally transform the final result container into another object that is the collection result.

Specifically, a Collector must implement four methods, each of which yields a function object:

Supplier<A> supplier()
Supplies a result container
BiConsumer<A,T> accumulator()
Accumulates an element into a result container
BinaryOperator<A> combiner()
Combines two result containers into one
Function<A, R> finisher()
Transforms the final result container into the result

Let’s look at the function objects of the Collectors.toList() collector:

The static Collectors.groupingBy method yields a more interesting collector. It produces a map from keys to lists of elements with the same key. In its simplest form, you pass a “classifier” function that extracts the map keys from the elements.

I like to use the stream Locale.availableLocales() as an example of a useful data stream. A locale describes the language and other preferences of a place. The following call to collect yields a map from country code strings to sets of locales in that country.

void main() {
    Map<String, List<Locale>> countryToLocales = Locale.availableLocales().collect(
        Collectors.groupingBy(Locale::getCountry));
    List<Locale> swissLocales = countryToLocales.get("CH");
        // Yields locales [de_CH, fr_CH, it_CH, rm_CH, ...]
    println(swissLocales);
}

As you can see, the JDK supports a multitude of locales used in Switzerland, including German, French, Italian, and Romansh, all collected in a list.

Note the classifier function Locale::getCountry that extracts the map key, i.e. the country code string such as "CH" or "US".

This use of collectors is straightforward. It gets more opaque when you use “downstream collectors” that apply another collection step to the collected map values. Consider:

void main() {
    Map<String, Map<String, List<Locale>>> countryAndLanguageToLocale =
        Locale.availableLocales().collect(
            Collectors.groupingBy(Locale::getCountry,
                Collectors.groupingBy(Locale::getLanguage)));
    println(countryAndLanguageToLocale.get("IN").get("hi"));
}

Now you have a map of maps. For example, countryAndLanguageToLocale.get("IN").get("hi") is a list of the Hindi locales in India. (There are several variants.)

The Gatherer Interface

JEP 485 introduces a gather method that is analogous to the collect method, but it is an intermediate operation, yielding a stream, not a final result. The method has an argument of type Gatherer, an interface that is similar to the Collector interface. A gatherer has a generic “intermediate state” object instead of the “result collection” of a collector. It too has four methods yielding function objects.

Supplier<A> initializer()
Supplies an intermediate state instance
Gatherer.Integrator<A, T, R> integrator()
Integrates an element—see below for details
BinaryOperator<A> combiner()
Combines two intermediate states into one
BiConsumer<A, Gatherer.Downstream<? super R>> finisher()
Finalizes the processing after all stream elements have been integrated

This is more complex than collectors since the gatherer sends values “downstream”; that is, to the stream that is the result of the gather method.

Let's first look at the finish method. It is called after all upstream elements have been passed to the integrator function, and all combiner functions have been invoked. We now have the final version of the “intermediate state”. From it, the finisher method can derive zero or more values and send them downstream, using the methods of the Gatherer.Downstream interface:

boolean push(T element)
Pushes the element downstream, returns true if more elements can be pushed
boolean isRejecting()
Returns true if pushing more elements has no effect downstream

The integrator method returns a function object conforming to the functional interface Gatherer.Integrator, with a method

boolean integrate(A state, T element, Gatherer.Downstream<? super R> downstream)

When integrating a new element, you can mutate the intermediate state and/or push values downstream. Return false if further elements will be rejected.

That all sounds very abstract and technical, so let's apply it to the concrete problem of dropping adjacent duplicates. The Gatherer interface has a static helper method ofSequential that creates a gatherer with a given initializer and integrator.

Our state is, as before, a holder for the previous string. The initializer produces a state instance.

To create the integrator, we use the Gatherer.Integrator.of helper method. Here are the details:

Gatherer<String, ?, String> dropAdjacentDuplicates() {
    class State { String previous; };
    return Gatherer.<String, State, String>ofSequential(
        State::new,
        Gatherer.Integrator.of((state, element, downstream) -> {
                boolean keep = !element.equalsIgnoreCase(state.previous);
                state.previous = element;
                if (keep)
                    return downstream.push(element);
                else
                    return !downstream.isRejecting();
        }));
}

void main() {
    var sentence = "The the the quick brown fox jumps over the lazy dog dog";
    Stream<String> words = new Scanner(sentence).tokens();
    List<String> result = words.gather(dropAdjacentDuplicates()).toList();
    println(result);
}

As you can see, the gather implementation is not pleasant, even in this simple case. But once implemented, it is easy to use.

Built-in Gatherers

Most programmers use the collect method with collectors from the Collectors class, which has over forty methods. The Gatherers class offers a much smaller number of gatherers for use with the gather method.

The windowFixed and windowSliding methods yield a stream of lists of adjacent elements. It's quicker to show them in action than to explain them in words:

void main() {
    println(IntStream.range(0, 10).boxed().gather(Gatherers.windowFixed(4)).toList());
        // [[0, 1, 2, 3], [4, 5, 6, 7], [8, 9]]
    println(IntStream.range(0, 10).boxed().gather(Gatherers.windowSliding(4)).toList());
        // [[0, 1, 2, 3], [1, 2, 3, 4], [2, 3, 4, 5], [3, 4, 5, 6], [4, 5, 6, 7], [5, 6, 7, 8], [6, 7, 8, 9]]
}

A fixed window is useful when you have input that comes in fixed-sized groups. For example, consider a file with lines such as:

Zappa Microwave Oven
3
109.95
Blackwell Toaster
1
29.95

You can process it as follows:

List<LineItem> lineItems = Files.lines(Path.of("lineitems.txt"))
    .windowFixed(3)
    .map(w -> new LineItem(w.get(0), Integer.parseInt(w.get(1)), Double.parseDouble(w.get(2))))
    .toList();

A sliding window is useful when you need to compare adjacent elements. For example, we can almost solve our “drop adjacent duplicates” problem like this:

words.gather(Gatherers.windowSliding(2))
    .filter(w -> !w.get(0).equals(w.get(1)))
    .map(w -> w.get(0))

This doesn't quite work because we also want to keep the last element of wordStream if it differs from its predecessor. But it is easy to fix:

void main() {
    var sentence = "The the the quick brown fox jumps over the lazy dog dog";
    Stream<String> words = new Scanner(sentence).tokens();
    var result = Stream.concat(words, Stream.of((String) null))
        .gather(Gatherers.windowSliding(2))
        .filter(w -> !w.get(0).equalsIgnoreCase(w.get(1)))
        .map(w -> w.get(0))
        .toList();
    println(result);
}

The fold gatherer computes a “fold” or, more precisely, “left fold”. It starts with an initial value and applies an operation, where the left argument is the current value, and the right argument the next element:

           .
          .
         .
        op
       /  \
      op   elem2
     /  \
    op   elem1
   /  \
init   elem0

Folds are useful when a value is computed incrementally from the stream elements. Functional programmers prefer folds over loops. Here is a simple example, computing a number from its digits:

void main() {
    Integer[] digits = { 1, 7, 2, 9 };
    int number = Stream.of(digits)
        .gather(Gatherers.fold(() -> 0, (x, y) -> x * 10 + y))
        .findFirst()
        .orElse(0);
    println(number);
}

I had to use a Stream<Integer> because the primitive streams IntStream, LongStream, DoubleStream do not have gather methods.

The fold computes

  0 * 10 + 1 // 1
  1 * 10 + 7 // 17
 17 * 10 + 2 // 172
172 * 10 + 9 // 1729

The result is a stream with a single value, which is obtained by calling findFirst() followed by .orElse(0).

Probably fold should have been a terminal operation, but gatherers allow fulfillment of this desperate need.

Note that fold is different from the reduce terminal operation. Reduction is optimized for parallel evaluation. It uses either an associative operator, or a pair of operations: one to produce intermediate results and another to combine them. In contrast, fold is strictly sequential.

The scan operation is similar to fold, but you get a stream of all the intermediate results. For example, if you have a stream of deposit or withdrawal amounts, then a scan with the addition operator yields the cumulative balances:

void main() {
    double initial = 1000.0;
    Double[] transactions = { 100.0, -50.0, 200.0, -150.0 };
    var result = Stream.of(transactions)
        .gather(Gatherers.scan(() -> initial, Double::sum))
        .toList(); // [1100.0, 1050.0, 1250.0, 1100.0]
    println(result);
}

This scan computes init + elem0, init + elem0 + elem1, init + elem0 + elem1 + elem2, and so on.

Note: The Arrays.parallelPrefix method does almost the same operation, but in parallel and only with an associative operator.

I am not sure how popular fold and scan will turn out to be. The fifth predefined gatherer, Gatherers.mapConcurrent, is very useful—see the next section.

The Gatherer.andThen method combines two gatherers into a single one. The result is the same as calling gather twice.

void main() {
    var result1 = IntStream.range(0, 10)
        .boxed()
        .gather(Gatherers.windowFixed(2).andThen(Gatherers.windowFixed(2)))
        .toList(); // [[[0, 1], [2, 3]], [[4, 5], [6, 7]], [[8, 9]]]

    var result2 = IntStream.range(0, 10)
        .boxed()
        .gather(Gatherers.windowFixed(2))
        .gather(Gatherers.windowFixed(2))
        .toList(); // [[[0, 1], [2, 3]], [[4, 5], [6, 7]], [[8, 9]]]

    println(result1);
    println(result2);
}

Concurrent Execution

For processor-intensive workloads, you can use parallel streams:

var results = collection
    .parallelStream()
    .map(e -> hardWork(e))
    .toList();

The stream splits into subranges (normally, one per core), which are processed concurrently, using the global fork-join pool.

However, if the workload is mostly blocking, then parallel streams do not give the best throughput. Many more threads could block concurrently. In this situation, Gatherers.mapConcurrent shines. It executes each task in a virtual thread.

int MAX_CONCURRENT = 1000;
var results = collection
    .stream() // Not parallelStream!
    .gather(Gatherers.mapConcurrent(MAX_CONCURRENT, e -> blockingWork(e)))
    .toList();

You want to use this gatherer with blocking calls. The first argument gives you a convenient way of throttling the number of concurrent tasks. After all, the tasks are likely to connect to some external service that may fail with huge numbers of concurrent requests. (Note that the stream is not parallel.)

As an aside, the stream API purposefully does not allow you to set the executor for parallel stream tasks. There is a well-known workaround to use a different pool than the global fork-join pool. But that does not work with virtual threads:

ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();
var results = executor.submit( // Won't use executor for subtasks
        () -> collection
            .parallelStream() 
            .map(e -> blockingWork(e)).toList()) // Splits into one task per processor
    .get();

You get one task per processor, not a task per call to map. And, to add insult to injury, the executor of the submission is not used for the split tasks since it is not an instance of ForkJoinPool. Use Gatherers.mapConcurrent if you want virtual threads.

Fine-Tuning Gatherers

When designing your own gatherer, you can signal two opportunities for optimization.

A small optimization is the “greediness” of a gatherer. A greedy gatherer processes all incoming elements. A non-greedy or short-circuiting gatherer may start rejecting elements. Elements can be pushed in bulk to a greedy gatherer, which may be more performant. The gain is not likely to be substantial, but it's easy enough to opt in. If you use a Gatherer.Integrator factory method to define your integrator, then use ofGreedy if the gatherer is greedy, or of if it is not. Alternatively, implement the Gatherers.Integrator.Greedy subinterface for greedy gatherers.

Next, you need to decide whether to make your gatherer sequential or parallelizable.

A sequential gatherer calls the initializer method once, then the integrator for each stream element in order, and then the finisher.

With a parallelizable gatherer on a parallel stream, the initializer method is called for each subrange. The integrator method is called as elements are encountered in the subranges. Elements that are pushed downstream will be assembled in the correct order.

Whenever neighboring subranges are complete, the combiner method is called. It can combine state, but it cannot push elements downstream.

Finally, the finisher method is called once, with the combined state. It can push elements downstream. They are appended after any elements that are pushed in an integrator.

It seems unlikely that a parallel gatherer will push elements both in the integrator and the finalizer. If the decision whether and what to push is independent of the neighbors, that can happen in the integrator. Otherwise, all pushing will have to wait to the finalizer.

The three Gatherer.of methods yield parallelizable gatherers, and the four Gatherer.ofSequential methods yield gatherers that are not parallelizable.

If you implement the Gatherer interface yourself, then your combiner method determines whether or not the gatherer is sequential or parallelizable. A sequential gatherer must return defaultCombiner() (which yields a fixed function object that will never be invoked). If combiner() returns any other function object, the gatherer is parallelizable.

Can the gatherer that drops adjacent duplicates be parallelizable? The integrator could buffer non-duplicate elements. The combiner could eliminate elements that appear at the end of the first buffer and and the beginning of the second. The finalizer would push buffered elements. But it may not be worth the trouble. The savings from parallelizing the comparisons may be eaten up by the cost of buffering.

Here is an example of a parallelizable gatherer. We want to replicate each stream element n times.

<T> Gatherer<T, ?, T> nCopies(int n) {
    return Gatherer.<T, T>of(
        Gatherer.Integrator.ofGreedy((_, element, downstream) -> {
            for (int i = 0; i < n; i++)
                if (!downstream.push(element)) return false;
            return !downstream.isRejecting();
        }));
}

void main() {
    var sentence = "The quick brown fox jumps over the lazy dog";
    Stream<String> words = new Scanner(sentence).tokens();
    List<String> result = words.gather(nCopies(3)).toList();
    println(result);
    int sum = IntStream.range(1, 101)
        .parallel()
        .boxed()
        .gather(nCopies(1000))
        .mapToInt(n -> n)
        .sum();
    println(sum);
}

Note: In this example, you don't actually need a gatherer, since there is no state. You can use the mapMulti method that has a slightly simpler parameter:

<T> BiConsumer<T, Consumer<T>> nCopies(int n) {
    return (element, sink) -> {
        for (int i = 0; i < n; i++) sink.accept(element);
    };
}

void main() {
    var sentence = "The quick brown fox jumps over the lazy dog";
    Stream<String> words = new Scanner(sentence).tokens();
    List<String> result = words.mapMulti(nCopies(3)).toList();
    println(result);
}

But the gatherer isn't that much more complicated. Since it is a more useful abstraction, I expect that programmers will soon find it familiar, whereas mapMulti is pretty obscure.

Note: The processing of a stream pipeline is highly optimized—see this article by Brian Goetz for a very clear description. Each stage of the pipeline reports whether it supports characteristics such as ORDERED, DISTINCT, SORTED, SIZED, and SUBSIZED. This information is used to optimize execution and elide unnecessary operations. Similarly, collectors have a (more limited) set of characteristics. However, when authoring a gatherer, there is no mechanism for reporting characteristics.

When to Gather

When stream operations depend on neighboring elements, or all preceding elements, then you need a gatherer. You may be able to use windowSliding or fold, or you can roll your own. These gatherers are likely sequential.

When stream operations depend on global characteristics, such as frequency of occurrence, then a gatherer can also work. You would write your own, track candidate elements in a state object, and push them in the finisher.

A gatherer can even be useful without looking at other elements. It can push any number of results, or it can use some kind of policy for processing each element. For example, the mapConcurrent gatherer computes each result in a virtual thread.

Here is another such gatherer for your amusement. It works like map, but the mapping method can throw checked exceptions.

interface CheckedFunction<T, R> {
    R apply(T arg) throws Exception;
}

<T, R> Gatherer<T, ?, R> mapChecked(CheckedFunction<? super T,? extends R> mapper) {
    return Gatherer.<T, R>of(
        Gatherer.Integrator.ofGreedy((_, element, downstream) -> {
            try {
                return downstream.push(mapper.apply(element));
            } catch (Exception ex) {
                throw new RuntimeException("mapChecked", ex);
            }
        }));
}

void main() {
    String[] paths = { "/etc/passwd", "/etc/environment" };
    List<String> contents = Stream.of(paths)
        .map(Path::of)
        .gather(mapChecked(Files::readString))
        .toList();
    println(contents);
}

Gunnar Morling shows how to zip two streams with a gatherer.

It is also easy to implement “zip with index”—see this StackOverflow answer.

You can find additional potentially useful gatherers in this project.

I'd like to conclude with a tip. When you use a gatherer, don't inline it. Write a method with a friendly name that yields it. That way, the intent is clear to those who read your code. After all, streams are all about “what, not how”.

References