JDK-8196106 : Support nested infinite or recursive flat mapped streams
  • Type: Bug
  • Component: core-libs
  • Sub-Component: java.util.stream
  • Priority: P4
  • Status: Resolved
  • Resolution: Fixed
  • Submitted: 2018-01-24
  • Updated: 2024-04-18
  • Resolved: 2024-04-16
The Version table provides details related to the release that this issue/RFE will be addressed.

Unresolved : Release in which this issue/RFE will be addressed.
Resolved: Release in which this issue/RFE has been resolved.
Fixed : Release in which this issue/RFE has been fixed. The release containing this fix may be available for download as an Early Access Release or a General Availability Release.

To download the current JDK release, click here.
JDK 23
23 b19Fixed
Related Reports
Duplicate :  
Duplicate :  
Relates :  
Description
JDK-8075939 fixed the case where a flat map operation contains an infinite stream. However, it did not fix the case where there are nested flat map calls that contain infinite streams via recursion or explicitly.

For example, here is a recursive example from JDK-8189234:

    IntStream.rangeClosed(0, 1).
            flatMap(Test::map).
            limit(5).
            forEach(System.out::println);

    static IntStream getChildren() {
        return IntStream.rangeClosed(0, 1).
                flatMap(Test::map);
    }

    static IntStream map(int c) {
        return IntStream.concat(
                IntStream.of(c),
                getChildren());
    }

which will result in a StackOverlowError.

Here is another example using infinite streams:

    IntStream.rangeClosed(i - 1, i + 1).
            flatMap(c -> IntStream.generate(() -> 1).
                    flatMap(x -> IntStream.generate(() -> x))).
            limit(5).forEach(System.out::println);

which will result in an OutOfMemoryError.

To solve these issues will require a mechanism to propagate the enclosing stream's cancellation function (Sink::cancellationRequested) to nested streams. Such as a terminal operation forEachOrderedWithCancel that accepts a Predicate function to query the enclosing cancellation state.

This will also avoid an issue with the spliterator obtained from a stream (AbstractWrappingSpliterator and sub-classes) which has to buffer elements to avoid reporting more than one element via Spliterator::tryAdvance (independently the implementation can be improved by directly reporting the first element and buffering subsequent elements).
Comments
Changeset: 8a5b86c5 Author: Viktor Klang <vklang@openjdk.org> Date: 2024-04-16 11:09:59 +0000 URL: https://git.openjdk.org/jdk/commit/8a5b86c52954f6917acfda11df183691beb07f56
16-04-2024

A pull request was submitted for review. URL: https://git.openjdk.org/jdk/pull/18625 Date: 2024-04-04 12:18:07 +0000
09-04-2024

[~psandoz] I think the main challenge will be performance (since my solution above is unlikely to out-of-the-box perform as the current impl). I'll think of alternatives / possibilities. :)
11-12-2023

[~vklang] very good!
11-12-2023

[~psandoz] With the following implementation of flatMap using Gatherers, I can get this to cleanly terminate: Impl: ``` public final <R> Stream<R> flatMap(Function<? super P_OUT, ? extends Stream<? extends R>> mapper) { Objects.requireNonNull(mapper); @SuppressWarnings("serial") class FlatMapGatherer implements Gatherer<P_OUT, Void, R>, Gatherer.Integrator<Void, P_OUT, R>, BinaryOperator<Void> { @Override public Integrator<Void, P_OUT, R> integrator() { return this; } @Override public BinaryOperator<Void> combiner() { return this; } @Override public Void apply(Void l, Void r) { return l; } @Override public boolean integrate(Void state, P_OUT element, Downstream<? super R> downstream) { try (var result = mapper.apply(element)) { if (result != null) { return (result.isParallel() ? result.sequential() : result).allMatch(downstream::push); } } return true; } } return gather(new FlatMapGatherer()); } ``` Example: ```jshell> Stream.of(1). ...> flatMap(c -> Stream.generate(() -> 1). ...> flatMap(x -> Stream.generate(() -> x))). ...> limit(5).forEach(System.out::println); 1 1 1 1 1 ```
11-12-2023