United StatesChange Country, Oracle Worldwide Web Sites Communities I am a... I want to...
JDK-6314788 : Tasks resubmitted to a ThreadPoolExecutor may fail to execute

Details
Type:
Bug
Submit Date:
2005-08-23
Status:
Resolved
Updated Date:
2010-04-02
Project Name:
JDK
Resolved Date:
2005-09-04
Component:
core-libs
OS:
generic
Sub-Component:
java.util.concurrent
CPU:
generic
Priority:
P3
Resolution:
Fixed
Affected Versions:
6
Fixed Versions:

Related Reports

Sub Tasks

Description
Doug Lea writes:

Re-submitting a task that is rejected but has also been
previously successfully accepted may cause an instance of that task
not to be run.

Test program:


import java.util.concurrent.*;
import java.util.concurrent.atomic.*;

public class RecycledTPETask {
     public static void main(String[] args) throws Exception {
         final int nTasks = 1000;
         final AtomicInteger nRun = new AtomicInteger(0);
         final Runnable recycledTask = new Runnable() {
                 public void run() {
                     nRun.getAndIncrement();
                 } };
         final ThreadPoolExecutor p =
             new ThreadPoolExecutor(1, 30, 60, TimeUnit.SECONDS,
                                    new ArrayBlockingQueue(30));
         try {
             for (int i = 0; i < nTasks; ++i) {
                 for (;;) {
                     try {
                         p.execute(recycledTask);
                         break;
                     }
                     catch (RejectedExecutionException ignore) {
                     }
                 }
             }
             Thread.sleep(5000); // enough time to run all tasks
             if (nRun.get() < nTasks)
                 throw new Error("Started " + nTasks +
                                 " Ran " + nRun.get());
         } catch(Exception ex) {
             ex.printStackTrace();
         } finally {
             p.shutdown();
         }
     }
}

                                    

Comments
EVALUATION

Will be fixed as part of jsr166x project.
Fix provided by Doug Lea.

David Holmes writes:

addIfUnderMaximumPoolSize needs to be changed to return <0, 0 or >0 for the
three cases. Seems straight-forward enough. This is the only place where ==
is used other than in a null test in TPE.
                                     
2005-08-23
SUGGESTED FIX

--- /tmp/geta24965	2005-08-23 13:31:58.947889200 -0700
+++ ThreadPoolExecutor.java	2005-08-23 13:21:37.827852000 -0700
@@ -441,35 +441,42 @@
 
     /**
      * Creates and starts a new thread only if fewer than maximumPoolSize
      * threads are running.  The new thread runs as its first task the
      * next task in queue, or if there is none, the given task.
      * @param firstTask the task the new thread should run first (or
      * null if none)
-     * @return null on failure, else the first task to be run by new thread.
+     * @return 0 if a new thread cannot be created, a positive number
+     * if firstTask will be run in a new thread, or a negative number
+     * if a new thread was created but is running some other task, in
+     * which case the caller must try some other way to run firstTask
+     * (perhaps by calling this method again).
      */
-    private Runnable addIfUnderMaximumPoolSize(Runnable firstTask) {
+    private int addIfUnderMaximumPoolSize(Runnable firstTask) {
         Thread t = null;
-        Runnable next = null;
+        int status = 0;
         final ReentrantLock mainLock = this.mainLock;
         mainLock.lock();
         try {
             if (poolSize < maximumPoolSize) {
-                next = workQueue.poll();
-                if (next == null)
+                Runnable next = workQueue.poll();
+                if (next == null) {
                     next = firstTask;
+                    status = 1;
+                } else
+                    status = -1;
                 t = addThread(next);
             }
         } finally {
             mainLock.unlock();
         }
         if (t == null)
-            return null;
+            return 0;
         t.start();
-        return next;
+        return status;
     }
 
 
     /**
      * Gets the next task for a worker thread to run.
      * @return the task
      */
@@ -866,22 +873,22 @@
                 reject(command);
                 return;
             }
             if (poolSize < corePoolSize && addIfUnderCorePoolSize(command))
                 return;
             if (workQueue.offer(command))
                 return;
-            Runnable r = addIfUnderMaximumPoolSize(command);
-            if (r == command)
+            int status = addIfUnderMaximumPoolSize(command);
+            if (status > 0)      // created new thread
                 return;
-            if (r == null) {
+            if (status == 0) {   // failed to create thread
                 reject(command);
                 return;
             }
-            // else retry
+            // Retry if created a new thread but it is busy with another task
         }
     }
 
     /**
      * Initiates an orderly shutdown in which previously submitted
      * tasks are executed, but no new tasks will be
      * accepted. Invocation has no additional effect if already shut
                                     
2005-08-23



Hardware and Software, Engineered to Work Together