JDK-8299768 : No. of running threads may exceed parallelism when using timed ForkJoinTask.get
  • Type: Bug
  • Component: core-libs
  • Sub-Component: java.util.concurrent
  • Affected Version: 8,17,18
  • Priority: P4
  • Status: Open
  • Resolution: Unresolved
  • OS: generic
  • CPU: generic
  • Submitted: 2023-01-04
  • Updated: 2023-01-12
Related Reports
Relates :  
Description
ADDITIONAL SYSTEM INFORMATION :
Reproduced on Linux and Windows.
Is present in Java 17.0.5 and 18.0.2, but not in 16.0.1 and 19.0.1.

A DESCRIPTION OF THE PROBLEM :
After switching to Java 17 we noticed that our applications, using a fork join pool, had more threads running in parallel than set by the parallelism value. This behavior can be observed when submitting a sub task from an internal task and waiting on the result of the sub task with ForkJoinTask.get(long, TimeUnit). It seems that if the fork join pool has enough tasks submitted or the tasks take long enough to execute the fork join pool will start additional threads and exceed the parallelism value. The precise conditions for this to happen are unclear. The same behavior does not show when waiting on tasks with the blocking ForkJoinTask.get().

REGRESSION : Last worked in version 16

STEPS TO FOLLOW TO REPRODUCE THE PROBLEM :
See the code example

EXPECTED VERSUS ACTUAL BEHAVIOR :
EXPECTED -
Expected the fork join pool to not have more threads running concurrently than the parallelism value, which is the behavior in Java 16 & 19 or when using the blocking ForkJoinTask.get().

---------- BEGIN SOURCE ----------

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.TimeUnit;

public class ForkJoinParallelism {
    static final ForkJoinPool pool = new ForkJoinPool(2);

    public static void main(final String[] args) {
        final Timer timer = new Timer();
        final Statistics statistics = new Statistics();
        timer.schedule(statistics, 100, 100);

        try {
            final List<ForkJoinTask<?>> tasks = new ArrayList<>();

            for (int c = 0; c < 3; c++) {
                System.out.println("start cycle #" + c);

                for (int i = 0; i < 50; i++) {
                    tasks.add(pool.submit(() -> {
                        final List<ForkJoinTask<?>> subtasks = new ArrayList<>();

                        for (int j = 0; j < 50; j++) {
                            subtasks.add(pool.submit(() -> {
                                // spend some cpu time
                                final Random rnd = new Random();
                                final int[] numbers = new int[100_000];
                                for (int k = 0; k < 100_000; k++) {
                                    numbers[k] = rnd.nextInt(100_000);
                                }
                                Arrays.sort(numbers);
                            }));
                        }

                        getAll(subtasks);
                    }));
                }

                getAll(tasks);

                System.out.println("end cycle #" + c);
            }

        } catch (final Exception e) {
            e.printStackTrace();
        } finally {
            timer.cancel();
            pool.shutdownNow();

            System.out.println("parallelism: " + pool.getParallelism());
            System.out.println("max running threads: " + statistics.max);
        }
    }

    static void getAll(final List<ForkJoinTask<?>> tasks) {
        for (final ForkJoinTask<?> task : tasks) {
            try {
                // using the blocking get removes the behavior
                // task.get();
                task.get(20, TimeUnit.SECONDS);
            } catch (final Exception e) {
                e.printStackTrace();
            }
        }
    }

    public static class Statistics extends TimerTask {
        int max = 0;

        @Override
        public void run() {
            final int running = ForkJoinParallelism.pool.getRunningThreadCount();

            System.out.println("running thread count: " + running);

            if (running > max) {
                max = running;
            }
        }
    }
}
---------- END SOURCE ----------

FREQUENCY : always



Comments
[~tongwan] My interpretation of `parallelism` in `ForkJoinPool` (given the text from the Javadoc quoted below) is the available parallelism to perform useful work. As blocking (arguably) does not constitute "useful work" and could lead, if not properly guarded, to complete stalls or even deadlocks, I would expect the ForkJoinPool to be free to perform "evasive maneuvers" up to the limit goverened by by the system property mentioned below. The Javadoc for JDK19 states: "For applications that require separate or custom pools, a ForkJoinPool may be constructed with a given target parallelism level; by default, equal to the number of available processors. The pool attempts to maintain enough active (or available) threads by dynamically adding, suspending, or resuming internal worker threads, even if some tasks are stalled waiting to join others. However, no such adjustments are guaranteed in the face of blocked I/O or other unmanaged synchronization." The max number of "extra threads" afforded to the ForkJoinPool is governed by the following system property: "java.util.concurrent.ForkJoinPool.common.maximumSpares - the maximum number of allowed extra threads to maintain target parallelism (default 256). "
12-01-2023

Yes, ForkJoinPool intentionally starts additional threads if it detects that some threads are blocked. https://download.java.net/java/early_access/jdk21/docs/api/java.base/java/util/concurrent/ForkJoinPool.html#%3Cinit%3E(int,java.util.concurrent.ForkJoinPool.ForkJoinWorkerThreadFactory,java.lang.Thread.UncaughtExceptionHandler,boolean,int,int,int,java.util.function.Predicate,long,java.util.concurrent.TimeUnit) > To ensure progress, when too few unblocked threads exist and unexecuted tasks may exist, new threads are constructed, up to the given maximumPoolSize. Probably this should be explained better in the class documentation
07-01-2023

The observations on WIndows 10: JDK 8: Failed, max number of running threads exceeds the parallelism value JDK 11: Passed. JDK 17ea+4: Passed. JDK 17ea+5: Failed. JDK 18: Failed. JDK 19: Passed. JDK 20ea+23: Passed.
07-01-2023

Additional information from the submitter: The expected result is that the value shown for max running threads does not exceed the parallelism value e.g.: parallelism: 2 max running threads: 2 In other words the number of concurrently running threads of the fork join pool does not exceed the parallelism value. This is not the case when using JDK 17 as your test run has shown as well as JDK 18 as we have observed. However when using JDK16/19 or the blocking ForkJoinTask.get/ForkJoinTask.join (change line 65/66), instead of the timed ForkJoinTask.get(long, TimeUnit), we observed the expected behavior of max running threads <= parallelism.
06-01-2023