JDK-8285450 : jsr166 refresh for jdk19
  • Type: CSR
  • Component: core-libs
  • Sub-Component: java.util.concurrent
  • Priority: P4
  • Status: Closed
  • Resolution: Approved
  • Fix Versions: 19
  • Submitted: 2022-04-22
  • Updated: 2022-05-13
  • Resolved: 2022-05-02
Related Reports
CSR :  
Description
Summary
-------

This CR adds ForkJoin methods needed by loom as well as other planned jdk enhancements that require dynamically changing target parallelism of ForkJoinPool, and also includes new methods planned to be introduced by loom, as discussed in https://openjdk.java.net/jeps/425, but not directly involving VirtualThreads. These changes to ExecutorService and Future interfaces were directly imported from loom but, still involve implementation updates in jsr166 code.

Problem
-------

Here are brief descriptions or the problems addressed by additions:

* ForkJoinPool.lazySubmit : Reduce unnecessary overhead when submitting a task that is known not to require activation of a new thread because another active worker (usually the caller) is sure to (eventually) process it.

* ForkJoinPool.setParalleism. Dynamically adjust to contexts in which the number of available processors changes, due to hypervisors, checkpoint/restore or other reasons. (Such adjustments are not automated, but the existence of this method enables further development.)

* ForkJoinTask.quietlyJoin(long timeout, TimeUnit unit) and ForkJoinTask.quietlyJoinUninterruptibly(long timeout, TimeUnit unit)  fill in gaps for "await" style resultless join methods that had previously omitted timed versions.

* ForkJoinTask.adaptInterruptible is an adaptor for tasks that must  interrupt runner threads upon cancellation. It has been used internally for several releases, but there are potential external uses as well, that are enabled by making the method public.

* new constructor:  protected ForkJoinWorkerThread(ThreadGroup group, ForkJoinPool pool,  boolean preserveThreadLocals) allows users to create thread factories requiring  group and threadlocal control available in Thread class constructors.
 
* ExecutorService.close (implementing AutoCloseable) enables ExecutorServices to be used in try-with-resources, and can be default-implemented using nearly the same code as was already illustrated in existing javadoc examples.

* Future.State and methods relying on it (state(), resultNow(), exceptionNow()) simplify some uses of Futures expected to be common in loom. The method specs are listed below; class ForkJoinTask was updated to override default implementations.

Solution
--------

(See above).

Specification
-------------
New methods in ForkJoinPool:

    /**
     * Submits the given task without guaranteeing that it will
     * eventually execute in the absence of available active threads.
     * In some contexts, this method may reduce contention and
     * overhead by relying on context-specific knowledge that existing
     * threads (possibly including the calling thread if operating in
     * this pool) will eventually be available to execute the task.
     *
     * @param task the task
     * @param <T> the type of the task's result
     * @return the task
     * @since 19
     */
    public <T> ForkJoinTask<T> lazySubmit(ForkJoinTask<T> task) {
        //;
    }

    /**
     * Changes the target parallelism of this pool, controlling the
     * future creation, use, and termination of worker threads.
     * Applications include contexts in which the number of available
     * processors changes over time.
     *
     * @implNote This implementation restricts the maximum number of
     * running threads to 32767
     *
     * @param size the target parallelism level
     * @return the previous parallelism level.
     * @throws IllegalArgumentException if size is less than 1 or
     *         greater than the maximum supported by this pool.
     * @throws  UnsupportedOperationException if this is the{@link #commonPool()} and
     *         parallelism level was set by System property
     *         {@systemProperty java.util.concurrent.ForkJoinPool.common.parallelism}.
     * @throws SecurityException if a security manager exists and
     *         the caller is not permitted to modify threads
     *         because it does not hold {@link
     *         java.lang.RuntimePermission}{@code ("modifyThread")}
     * @since 19
     */
    public int setParallelism(int size) {}

New constructor in ForkJoinWorkerThread:

    /**
     * Creates a ForkJoinWorkerThread operating in the given thread group and
     * pool, and with the given policy for preserving ThreadLocals.
     *
     * @param group if non-null, the thread group for this
     * thread. Otherwise, the thread group is chosen by the security
     * manager if present, else set to the current thread's thread
     * group.
     * @param pool the pool this thread works in
     * @param preserveThreadLocals if true, always preserve the values of
     * ThreadLocal variables across tasks; otherwise they may be cleared.
     * @throws NullPointerException if pool is null
     */
    protected ForkJoinWorkerThread(ThreadGroup group, ForkJoinPool pool,
                                   boolean preserveThreadLocals) {}

New methods in ForkJoinTask:

    /**
     * Tries to join this task, returning true if it completed
     * (possibly exceptionally) before the given timeout and the
     * the current thread has not been interrupted.
     *
     * @param timeout the maximum time to wait
     * @param unit the time unit of the timeout argument
     * @return true if this task completed
     * @throws InterruptedException if the current thread was
     * interrupted while waiting
     * @since 19
     */
     public final boolean quietlyJoin(long timeout, TimeUnit unit)
        throws InterruptedException {
    /**
     * Tries to join this task, returning true if it completed
     * (possibly exceptionally) before the given timeout.
     *
     * @param timeout the maximum time to wait
     * @param unit the time unit of the timeout argument
     * @return true if this task completed
     * @since 19
     */
    public final boolean quietlyJoinUninterruptibly(long timeout,
                                                    TimeUnit unit) {}

    /**
     * Returns a new {@code ForkJoinTask} that performs the {@code call}
     * method of the given {@code Callable} as its action, and returns
     * its result upon {@link #join}, translating any checked exceptions
     * encountered into {@code RuntimeException}.  Additionally,
     * invocations of {@code cancel} with {@code mayInterruptIfRunning
     * true} will attempt to interrupt the thread performing the task.
     *
     * @param callable the callable action
     * @param <T> the type of the callable's result
     * @return the task
     *
     * @since 19
     */
    public static <T> ForkJoinTask<T> adaptInterruptible(Callable<? extends T> callable) {}

Interface ExecutorService:

(Added documentation in interface spec:)

 * An {@code ExecutorService} may also be established and closed
 * (shutdown, blocking until terminated) as follows; illustrating with
 * a different {@code Executors} factory method:
 *
 * <pre> {@code
 * try (ExecutorService e =  Executors.newWorkStealingPool()) {
 *   // submit or execute many tasks with e ...
 * }}</pre>
 *
 * Further customization is also possible. For example, the following
 * ... [same as before] ...

Method close
    /**
     * Initiates an orderly shutdown in which previously submitted tasks are
     * executed, but no new tasks will be accepted. This method waits until all
     * tasks have completed execution and the executor has terminated.
     *
     * <p> If interrupted while waiting, this method stops all executing tasks as
     * if by invoking {@link #shutdownNow()}. It then continues to wait until all
     * actively executing tasks have completed. Tasks that were awaiting
     * execution are not executed. The interrupt status will be re-asserted
     * before this method returns.
     *
     * <p> If already terminated, invoking this method has no effect.
     *
     * @implSpec
     * The default implementation invokes {@code shutdown()} and waits for tasks
     * to complete execution with {@code awaitTermination}.
     *
     * @throws SecurityException if a security manager exists and
     *         shutting down this ExecutorService may manipulate
     *         threads that the caller is not permitted to modify
     *         because it does not hold {@link
     *         java.lang.RuntimePermission}{@code ("modifyThread")},
     *         or the security manager's {@code checkAccess} method
     *         denies access.
     * @since 19
     */
    @Override
    default void close() {}
  
Interface Future:

    /**
     * Returns the computed result, without waiting.
     *
     * <p> This method is for cases where the caller knows that the task has
     * already completed successfully, for example when filtering a stream
     * of Future objects for the successful tasks and using a mapping
     * operation to obtain a stream of results.
     * {@snippet lang=java :
     *     results = futures.stream()
     *                .filter(f -> f.state() == Future.State.SUCCESS)
     *                .map(Future::resultNow)
     *                .toList();
     * }
     *
     * @implSpec
     * The default implementation invokes {@code isDone()} to test if the task
     * has completed. If done, it invokes {@code get()} to obtain the result.
     *
     * @return the computed result
     * @throws IllegalStateException if the task has not completed or the task
     * did not complete with a result
     * @since 19
     */
    default V resultNow() {}

    /**
     * Returns the exception thrown by the task, without waiting.
     *
     * <p> This method is for cases where the caller knows that the task
     * has already completed with an exception.
     *
     * @implSpec
     * The default implementation invokes {@code isDone()} to test if the task
     * has completed. If done and not cancelled, it invokes {@code get()} and
     * catches the {@code ExecutionException} to obtain the exception.
     *
     * @return the exception thrown by the task
     * @throws IllegalStateException if the task has not completed, the task
     * completed normally, or the task was cancelled
     * @since 19
     */
    default Throwable exceptionNow() {}
  

    /**
     * Represents the computation state.
     * @since 19
     */
    enum State {
        /**
         * The task has not completed.
         */
        RUNNING,
        /**
         * The task completed with a result.
         * @see Future#resultNow()
         */
        SUCCESS,
        /**
         * The task completed with an exception.
         * @see Future#exceptionNow()
         */
        FAILED,
        /**
         * The task was cancelled.
         * @see #cancel(boolean)
         */
        CANCELLED
    }

    /**
     * {@return the computation state}
     *
     * @implSpec
     * The default implementation uses {@code isDone()}, {@code isCancelled()},
     * and {@code get()} to determine the state.
     *
     * @since 19
     */
    default State state() {}
  

Comments
OK. I agree that the simplest choice is just to override spec and code to say that close is a no-op on common pool.
06-05-2022

I've created JDK-8286294 to follow-up on the commonPool().close() issue.
06-05-2022

Agree, overriding the close() with NOOP would avoid the blocking. Thanks.
05-05-2022

Given Doug's comment then it may be that close will need to be overridden, otherwise close will appear to wait forever as it is waiting for the pool to terminate.
05-05-2022

Yes, this is expected, or rather it is just the consequence of shutdown having no effect when invoked on the common pool. If needed, this javadoc could include a note to say that the method waits forever when invoked on the common pool.
05-05-2022

Given the documented properties of the commonPool, it would not normally be a good idea to do this. But it is not guaranteed to hang, because awaitTermination is equivalent to awaitQuiesence for the commonPool
05-05-2022

ForkJoinPool now is AutoCloseable, One can safely wrap this into try with resources. One observation - If the program wraps the ForkJoinPool created from commonPool() would hang. try(ForkJoinPool forkJoinPool = ForkJoinPool.commonPool()) {} I think that is the expected behavior?
05-05-2022

Moving to Approved.
02-05-2022

Thanks also to Alan for suggesting use of UnsupportedOperationException instead of IllegalStateExecption in setParallelism (also updated above).
02-05-2022

Thank you.
02-05-2022

Thanks! Changed to just: * Tries to join this task, returning true if it completed * (possibly exceptionally) before the given timeout and the * the current thread has not been interrupted.
02-05-2022

One observation regarding quietlyJoin(long timeout, TimeUnit unit) - The spec mention that "returning true if it completed (possibly exceptionally) before the given timeout and/or the the current thread has been interrupted, else false." Actually when current thread interrupted the behavior just throws the - "InterruptedException", And it never returns any boolean The spec "returning true if it completed (possibly exceptionally) before the given timeout and/or the the current thread has been interrupted: needs to be revisited.
02-05-2022