JDK-6431315 : ThreadPoolExecutor.shutdownNow may hang ExecutorService.invokeAll
  • Type: Bug
  • Component: core-libs
  • Sub-Component: java.util.concurrent
  • Affected Version: 6
  • Priority: P2
  • Status: Closed
  • Resolution: Fixed
  • OS: generic
  • CPU: generic
  • Submitted: 2006-05-28
  • Updated: 2010-05-11
  • Resolved: 2006-06-21
The Version table provides details related to the release that this issue/RFE will be addressed.

Unresolved : Release in which this issue/RFE will be addressed.
Resolved: Release in which this issue/RFE has been resolved.
Fixed : Release in which this issue/RFE has been fixed. The release containing this fix may be available for download as an Early Access Release or a General Availability Release.

To download the current JDK release, click here.
JDK 6
6 b89Fixed
Related Reports
Relates :  
Description
Doug Lea writes:

Neither David nor I much liked the yield I put in FutureTask. We discussed
some ways of changing. The simplest is a cross between our suggestions:
If a set sees that task is cancelled, it now nulls out runner, in case
canceller thread is stalled and so will otherwise interrupt too late. There
is still an unsolvable race here, so cancellers can still interrupt
a thread after set completes, but this closes the gap as narrowly as would
any other approach (modulo a few instruction cycles) and doesn't require
any further changes.

It does however look possible to do at least this well plus make
set and get a bit faster by using a highly customized/hacked version
of Bill Scherer and my dual stuff. This would be too much of a change
for a Mustang bug fix though.

Luckily Martin didn't pay attention the first time so hasn't
filed bug report. Martin can you do this now? The other parts of
changes are below.

David: nulling runner in set only on seeing CANCELLED rather
than unconditionally allows current sentinel use of runner,
so avoids need to add READY state while still nulling thread
about as soon as logically possible.

I checked in these changes to FutureTask.


Doug Lea wrote:
>> Sorry that I had somehow completely forgot about this one.
>> It IS a bug, or at least a failure to meet reasonable expectations
>> in the submitted not-very-reasonable usage.
>> 
>> Martin: Can you file bug:
>> 
>>   ThreadPoolExecutor.shutdownNow may hang ExecutorService.invokeAll.
>> 
>> A test program is attached (it's just a simple adaptation of posted 
>> program.)
>> 
>> Details....
>> [... omitted ...]
>> 
>>>>
>>>> Now back to TPE and it dropping a task due to the check for shutdown. 
>>>> Here
>>>> are the possible states as I see them after the worker has grabbed a task
>>>> from the queue:
>>>>
>>>> 1. shutdown requested, thread interrupted
>>>> 2. shutdown not-requested, thread interrupted
>>>>
>>>> For (1) we should just go ahead and execute the task leaving the 
>>>> interrupt
>>>> set - that way if the task responds to interrupt it will return quickly.
>>>>
>>>> For (2) it means we probably have a later interrupt from a previous 
>>>> task and
>>>> we should clear it (as we currently do).
>>>>
>>>>                 ...
>>>>
>>>> The problem is of course that in both TPE and futureTask the
>>>> setting/clearing of interrupt state is not atomic with respect to the 
>>>> TPE or
>>>> FutureTask state. So in the above we may get interrupted due to a 
>>>> shutdown
>>>> just after we checked for not being shutdown, and so will clear an 
>>>> interrupt
>>>> we shouldn't have. We can fix that with a second check.
>> 
>> 
>> Yes; I agree. This leads to the simple change:
>> 
>> Index: ThreadPoolExecutor.java
>> ===================================================================
>> RCS file: 
>> /export/home/jsr166/jsr166/jsr166/src/main/java/util/concurrent/ThreadPoolExecutor.java,v 
>> 
>> retrieving revision 1.80
>> diff -c -r1.80 ThreadPoolExecutor.java
>> *** ThreadPoolExecutor.java    20 Apr 2006 07:20:42 -0000    1.80
>> --- ThreadPoolExecutor.java    17 May 2006 12:04:52 -0000
>> ***************
>> *** 666,677 ****
>>               final ReentrantLock runLock = this.runLock;
>>               runLock.lock();
>>               try {
>> !                 Thread.interrupted(); // clear interrupt status on entry
>> !                 // Abort now if immediate cancel.  Otherwise, we have
>> !                 // committed to run this task.
>> !                 if (runState == STOP)
>> !                     return;
>> !
>>                   boolean ran = false;
>>                   beforeExecute(thread, task);
>>                   try {
>> --- 666,676 ----
>>               final ReentrantLock runLock = this.runLock;
>>               runLock.lock();
>>               try {
>> !                 // If not shutting down then clear an outstanding 
>> interrupt.
>> !                 if (runState != STOP &&
>> !                     Thread.interrupted() &&
>> !                     runState == STOP) // Re-interrupt if stopped after 
>> clearing
>> !                     thread.interrupt();
>>                   boolean ran = false;
>>                   beforeExecute(thread, task);
>>                   try {
>> 
>> 
>> ------------------------------------------------------------------------
>> 
>> 
>> import java.util.*;
>> import java.util.concurrent.*;
>> 
>> /**
>>  * Adapted from posting by Tom Sugden tom at epcc.ed.ac.uk 
>>  */
>> public class BlockingTaskExecutorTest {
>> 
>> 
>>     public static void main(String[] args) throws Exception {
>>         for (int i = 1; i <= 100; i++) {
>>             System.out.print(".");
>>             
>>             final ExecutorService executor = Executors.newCachedThreadPool();
>> 
>>             final NotificationReceiver notifier1 = new NotificationReceiver();
>>             final NotificationReceiver notifier2 = new NotificationReceiver();
>> 
>>             final Callable task1 = new BlockingTask(notifier1);
>>             final Callable task2 = new BlockingTask(notifier2);
>>             final Callable task3 = new NonBlockingTask();        // *** A ***
>> 
>>             final List tasks = new ArrayList();
>>             tasks.add(task1);
>>             tasks.add(task2);
>>             tasks.add(task3);                                    // *** B***
>> 
>>             // start a thread to invoke the tasks
>>             Thread thread = new Thread() {
>>                     public void run() {
>>                         try {
>>                             executor.invokeAll(tasks);
>>                         }
>>                         catch (Throwable e) {
>>                             //                        e.printStackTrace();
>>                             //                        System.exit(-1);
>>                         }
>>                     }
>>                 };
>>             thread.start();
>>             
>>             // wait until tasks begin execution
>>             notifier1.waitForNotification();
>>             notifier2.waitForNotification();
>> 
>>             // now try to shutdown the executor service while tasks
>>             // are blocked.  This should cause the tasks to be
>>             // interupted.
>>             executor.shutdownNow();
>>             boolean stopped = executor.awaitTermination(5, TimeUnit.SECONDS);
>>             //            System.out.println("Terminated? " + stopped);
>>             if (!stopped) throw new Error("Executor stuck");
>> 
>>             // wait for the invocation thread to complete
>>             thread.join(1000);
>>             if (thread.isAlive()) {
>>                 thread.interrupt();
>>                 thread.join(1000);
>>                 throw new Error("invokeAll stuck");
>>             }
>>         }
>>         System.out.println("\n done.");
>>     }
>> }
>> 
>> /**
>>  * A helper class with a method to wait for a notification. The notification
>> is
>>  * received via the <code>sendNotification</code> method.
>>  */
>> class NotificationReceiver
>> {
>>     /** Has the notifier been notified? */
>>     boolean mNotified = false;
>> 
>>     /**
>>      * Notify the notification receiver.
>>      */
>>     public synchronized void sendNotification()
>>     {
>>         mNotified = true;
>>         notifyAll();
>>     }
>> 
>>     /**
>>      * Waits until a notification has been received.
>>      * 
>>      * @throws InterruptedException
>>      *             if the wait is interrupted
>>      */
>>     public synchronized void waitForNotification() throws InterruptedException
>>     {
>>         while (!mNotified)
>>         {
>>             wait();
>>         }
>>     }
>> }
>> 
>> /**
>>  * A callable task that blocks until it is interupted. This task sends a 
>>  * notification to a notification receiver when it is first called.
>>  */
>> class BlockingTask implements Callable
>> {
>>     private final NotificationReceiver mReceiver;
>>     
>>     BlockingTask(NotificationReceiver notifier)
>>     {
>>         mReceiver = notifier;
>>     }
>> 
>>     public Object call() throws Exception
>>     {
>>         mReceiver.sendNotification();
>>         
>>         // wait indefinitely until task is interupted
>>         while (true)
>>         {
>>             synchronized (this)
>>             {
>>                 wait();
>>             }
>>         }
>>     }
>> }
>> 
>> /**
>>  * A callable task that simply returns a string result.
>>  */
>> class NonBlockingTask implements Callable
>> {
>>     public Object call() throws Exception
>>     {
>>         return "NonBlockingTaskResult";
>>     }
>> }
>> 
>> 
>> ------------------------------------------------------------------------
>> 
>> _______________________________________________
>> Concurrency-jsr mailing list
>> ###@###.###
>> http://altair.cs.oswego.edu/mailman/listinfo/concurrency-jsr

_______________________________________________
Concurrency-jsr mailing list
###@###.###
http://altair.cs.oswego.edu/mailman/listinfo/concurrency-jsr

Comments
EVALUATION Just to clarify there were two problems here: 1. When a TPE worker thread found the TPE was in state STOP (which indicates shutdownNow() has been invoked) it would simply return and ignore the task that it had just removed from the work queue. This was technically valid as it behaved as if immediate cancellation of the task had occurred, as it should when shutdownNow is called, but surprising for code that expects that cancellation to only come about through detection of the thread's interrupt state. Task cancellation in TPE is cooperative. 2. After correcting #1 there was a problem with the way that the interrupt state of the thread was always cleared. It was cleared to ensure no leftover interrupt from a previously cancelled FutureTask would leak into the next task to be processed. However, if the state of the TPE has transitioned to STOP then the interrupt needs to be re-asserted to ensure the new task can see the cancellation request. The latter was fixed in TPE and in addition FutureTask was modified to minimise the possibility of an unwanted interrupt being generated.
20-06-2006

EVALUATION Doug Lea is providing a fix. --- /u/martin/ws/mustang/src/share/classes/java/util/concurrent/FutureTask.java 2006-01-30 15:31:01.297726000 -0800 +++ /u/martin/ws/jsr166/src/share/classes/java/util/concurrent/FutureTask.java 2006-05-28 14:03:28.853296000 -0700 @@ -236,28 +236,38 @@ void innerSet(V v) { for (;;) { int s = getState(); - if (ranOrCancelled(s)) + if (s == RAN) + return; + if (s == CANCELLED) { + releaseShared(0); return; - if (compareAndSetState(s, RAN)) - break; } + if (compareAndSetState(s, RAN)) { result = v; releaseShared(0); done(); + return; + } + } } void innerSetException(Throwable t) { for (;;) { int s = getState(); - if (ranOrCancelled(s)) + if (s == RAN) + return; + if (s == CANCELLED) { + releaseShared(0); return; - if (compareAndSetState(s, RAN)) - break; } + if (compareAndSetState(s, RAN)) { exception = t; result = null; releaseShared(0); done(); + return; + } + } } boolean innerCancel(boolean mayInterruptIfRunning) { --- /u/martin/ws/mustang/src/share/classes/java/util/concurrent/ThreadPoolExecutor.java 2006-04-05 02:25:04.043965000 -0700 +++ /u/martin/ws/jsr166/src/share/classes/java/util/concurrent/ThreadPoolExecutor.java 2006-05-28 16:40:10.506548000 -0700 @@ -667,12 +667,11 @@ final ReentrantLock runLock = this.runLock; runLock.lock(); try { - Thread.interrupted(); // clear interrupt status on entry - // Abort now if immediate cancel. Otherwise, we have - // committed to run this task. - if (runState == STOP) - return; - + // If not shutting down then clear an outstanding interrupt. + if (runState != STOP && + Thread.interrupted() && + runState == STOP) // Re-interrupt if stopped after clearing + thread.interrupt(); boolean ran = false; beforeExecute(thread, task); try {
28-05-2006