United StatesChange Country, Oracle Worldwide Web Sites Communities I am a... I want to...
JDK-8075939 : Stream.flatMap() causes breaking of short-circuiting of terminal operations

Details
Type:
Bug
Submit Date:
2015-03-24
Status:
Open
Updated Date:
2016-12-05
Project Name:
JDK
Resolved Date:
Component:
core-libs
OS:
Sub-Component:
java.util.stream
CPU:
Priority:
P4
Resolution:
Unresolved
Affected Versions:
8
Targeted Versions:
tbd_major

Related Reports
Duplicate:
Duplicate:
Duplicate:

Sub Tasks

Description
FULL PRODUCT VERSION :
java version "1.8.0_40"
Java(TM) SE Runtime Environment (build 1.8.0_40-b25)
Java HotSpot(TM) 64-Bit Server VM (build 25.40-b25, mixed mode)

ADDITIONAL OS VERSION INFORMATION :
Linux smobile 3.16.0-33-generic #44-Ubuntu SMP Thu Mar 12 12:19:35 UTC 2015 x86_64 x86_64 x86_64 GNU/Linux

A DESCRIPTION OF THE PROBLEM :
Terminal short-circuit operations behave differently when called on stream produced by flatMap comparing to stream generated without involving flatMap.

Stream.of(1, 2, 3).filter(i -> { System.out.println(i); return true; }).findFirst() as expected will print single element of the stream, namely '1', as findFirst terminates once first element is produced (sequential stream case).
Unlike this,
Stream.of(1, 2, 3).flatMap(i -> Stream.of(i - 1, i, i + 1)).flatMap(i -> Stream.of(i - 1, i, i + 1)).filter(i -> { System.out.println(i); return true; }).findFirst()
will print 9 stream element which correspond to flattened stream corresponding to first element of source stream despite it is not needed for findFirst().
In case of finite stream, it is just inefficiency. However, if flatMap function produces infinite stream it will cause findFirst() never return which seems to be breaking short-circuiting behaviour of terminal operations on the stream.

From docs: "An intermediate operation is short-circuiting if, when presented with infinite input, it may produce a finite stream as a result."
"findFirst()
...
This is a short-circuiting terminal operation."

STEPS TO FOLLOW TO REPRODUCE THE PROBLEM :
System.out.println("Infinite result: " +
                        Stream.of(0).flatMap(x->Stream.iterate(0, i->i+1)).findFirst());

EXPECTED VERSUS ACTUAL BEHAVIOR :
EXPECTED -
Infinite result: 0
ACTUAL -
It never prints anything as fails to unwind infinite stream.

REPRODUCIBILITY :
This bug can be reproduced always.

---------- BEGIN SOURCE ----------
        System.out.println(
                "Correct Result: " +
                Stream.of(1, 2, 3)
                        .filter(i -> {
                            System.out.println(i);
                            return true;
                        })
                        .findFirst()
                        .get()
        );
        System.out.println("-----------");
        System.out.println(
                "Correct Result with unnecessary filtering: " +
                Stream.of(1, 2, 3)
                        .flatMap(i -> Stream.of(i - 1, i, i + 1))
                        .flatMap(i -> Stream.of(i - 1, i, i + 1))
                        .filter(i -> {
                            System.out.println(i);
                            return true;
                        })
                        .findFirst()
                        .get()
        );

        System.out.println("Result is never printed: " +
                        Stream.of(0).flatMap(x->Stream.iterate(0, i->i+1)).findFirst());
---------- END SOURCE ----------


                                    

Comments
Yes, good point, a Predicate is sufficient.

Spliterator.forEachRemainingWithCancel does not add much value IMO since that functionality can be easily supported as a layer above looping over tryAdvance (as is already the case for short-circuiting stream pipelines). Furthermore, operating on Stream.spliterator will be less efficient than Stream.forEachOrderedWithCancel. Of course, any default implementation of Stream.forEachOrderedWithCancel will need to utilize Stream.spliterator.

There are a number of things pulling on streams such as known/effectively infinite sources, tricky parallelizable ops, I/O based cases etc, which sometimes makes for awkward trade-offs. In this case is the addition to the API worth it? maybe... :-) but i am hesitiating if it is only flatMap that is motivating this new API point.
                                     
2016-03-01
Note that having both consumer and Predicate is unnecessary: only Predicate is enough which would act as consumer, but return false if processing should be stopped.

Another alternative is to add forEachRemainingWithCancel(Predicate<? super T> action) into Spliterator interface (providing default implementation is not very hard). This way flatMap could use stream.spliterator().forEachRemainingWithCancel() or stream.spliterator().forEachRemaining(). Such spliterator method could be optimized for some sources (compared to the series of tryAdvance() calls).
                                     
2016-03-01
An alternative approach is to define a terminal operation:

  forEachOrderedWithCancel(Consumer<T> c, Predicate<T> p)

When p tests true then the pipeline is short-circuited (note that for parallel execution it might be necessary to evaluate the predicate in sequential order when reporting elements to avoid non-determinism).

With such a method flatMap can choose to use forEachOrdered or forEachOrderedWithCancel. If the downstream is short-circuiting then the flatMap operation can pass the downstream short-circuiting "sink" into the forEachOrderedWithCancel so cancelation is propagated upstream.
                                     
2016-02-29
While the outer-pipeline is short-circuiting the stream passed to the flatMap operation is not. The flatMap operation is implemented by forEach'ing the elements down to the next pipeline stage, which is why for an infinite stream the execution does not terminate.

Note that one should be careful when using side-effects of printing out elements to infer behaviour. Although the findFirst operation is short-circuiting an implementation is free to consume two or more elements from upstream (which is what parallel execution may do).

The flatMap operation needs to detect that the pipeline is short-circuiting and then use a (to be implemented/exposed) internal "forEachWithCancel" operation. Nested operations, where a flatMap operation maps an element to a stream that in turn contains a flatMap operation would also need to be considered.
                                     
2015-03-25
Not a regression. Streams have behaved this way going back to JDK 8 GA.

                                     
2015-03-25



Hardware and Software, Engineered to Work Together