JDK-6460501 : Synchronizer timed acquire still leaks memory
  • Type: Bug
  • Component: core-libs
  • Sub-Component: java.util.concurrent
  • Affected Version: 5.0,6
  • Priority: P3
  • Status: Closed
  • Resolution: Fixed
  • OS:
    generic,linux,windows_2003,windows_xp generic,linux,windows_2003,windows_xp
  • CPU: generic,x86
  • Submitted: 2006-08-15
  • Updated: 2011-03-07
  • Resolved: 2011-03-07
The Version table provides details related to the release that this issue/RFE will be addressed.

Unresolved : Release in which this issue/RFE will be addressed.
Resolved: Release in which this issue/RFE has been resolved.
Fixed : Release in which this issue/RFE has been fixed. The release containing this fix may be available for download as an Early Access Release or a General Availability Release.

To download the current JDK release, click here.
Other JDK 6 JDK 7
5.0u12Fixed 6u2Fixed 7 b06Fixed
Related Reports
Duplicate :  
Duplicate :  
Duplicate :  
Duplicate :  
Duplicate :  
Relates :  
Relates :  
Relates :  
Relates :  
Relates :  
Description
Continually looping on an unavailable semaphore,
for example, with a timed tryAcquire, continues to consume memory.

------------------------------------
public class Leak6 {
    public static void main(String [] args) throws Throwable {
    final int n = 1000*1000;

    long mem = Runtime.getRuntime().freeMemory();
    System.out.println("Free: " + mem);

    Semaphore s = new Semaphore(0);
    int i = 0;
    try {
        while (++i < n)
        s.tryAcquire(1,TimeUnit.MICROSECONDS);
    } catch (OutOfMemoryError oome) {
        System.out.printf("OOME on iteration %d%n", i);
    }

    System.gc();
    long mem2 = Runtime.getRuntime().freeMemory();
    System.out.println("Memory used: " + (mem2 - mem));
    }
}
------------------------------------
==> java -Xmx16m -esa -ea Leak6
Free: 7675240
OOME on iteration 488232
Memory used: 5888064

Comments
EVALUATION Here's a test case that shows that the leak has been plugged, at least for ArrayBlockingQueue.poll, but shows occasional transient leaks of Nodes for Semaphore.tryAcquire Try commenting/uncommenting these lines: //q.poll(1, TimeUnit.MICROSECONDS); sem.tryAcquire(1, TimeUnit.MICROSECONDS); /* * @test %W% %E% * @bug 6460501 * @summary Repeated timed tryAcquire shouldn't leak memory * @author Martin Buchholz */ import java.util.*; import java.util.regex.*; import java.util.concurrent.*; import java.io.*; public class TimedAcquireLeak { static String javahome() { String jh = System.getProperty("java.home"); return (jh.endsWith("jre")) ? jh.substring(0, jh.length() - 4) : jh; } static final File bin = new File(javahome(), "bin"); static String javaProgramPath(String programName) { return new File(bin, programName).getPath(); } static final String java = javaProgramPath("java"); static final String jmap = javaProgramPath("jmap"); static final String jps = javaProgramPath("jps"); static String outputOf(Reader r) throws IOException { final StringBuilder sb = new StringBuilder(); final char[] buf = new char[1024]; int n; while ((n = r.read(buf)) > 0) sb.append(buf, 0, n); return sb.toString(); } static String outputOf(Process p) { try { final Reader r = new InputStreamReader(p.getInputStream(),"UTF-8"); final String output = outputOf(r); // Check for successful process completion equal(p.waitFor(), 0); equal(p.exitValue(), 0); equal(p.getErrorStream().read(), -1); return output; } catch (Throwable t) { unexpected(t); throw new Error(t); } } static String commandOutputOf(String... cmd) { try { return outputOf(new ProcessBuilder(cmd).start()); } catch (Throwable t) { unexpected(t); throw new Error(t); } } // To be called exactly twice by the parent process static <T> T rendezvousParent(Process p, Callable<T> callable) throws Throwable { p.getInputStream().read(); T result = callable.call(); OutputStream os = p.getOutputStream(); os.write((byte)'\n'); os.flush(); return result; } // To be called exactly twice by the child process public static void rendezvousChild() { try { System.gc(); System.runFinalization(); Thread.sleep(10); System.gc(); System.runFinalization(); Thread.sleep(10); System.out.write((byte)'\n'); System.out.flush(); System.in.read(); } catch (Throwable t) { throw new Error(t); } } static String match(String s, String regex, int group) { Matcher matcher = Pattern.compile(regex).matcher(s); matcher.find(); return matcher.group(group); } static int objectsInUse(final Process child, final String childPid, final String className) { final String regex = "(?m)^ *[0-9]+: +([0-9]+) +[0-9]+ +\\Q"+className+"\\E$"; final Callable<Integer> objectsInUse = new Callable<Integer>() { public Integer call() { return Integer.parseInt( match(commandOutputOf(jmap, "-histo:live", childPid), regex, 1));}}; try { return rendezvousParent(child, objectsInUse); } catch (Throwable t) { unexpected(t); return -1; } } static void realMain(String[] args) throws Throwable { // jmap doesn't work on Windows if (System.getProperty("os.name").startsWith("Windows")) return; final String childClassName = Job.class.getName(); final String classToCheckForLeaks = Job.classToCheckForLeaks(); final String uniqueID = String.valueOf(new Random().nextInt(Integer.MAX_VALUE)); final String[] jobCmd = {java, childClassName, uniqueID}; final Process p = new ProcessBuilder(jobCmd).start(); final String childPid = match(commandOutputOf(jps, "-m"), "(?m)^ *([0-9]+) +\\Q"+childClassName+"\\E *"+uniqueID+"$", 1); final int n0 = objectsInUse(p, childPid, classToCheckForLeaks); final int n1 = objectsInUse(p, childPid, classToCheckForLeaks); equal(p.waitFor(), 0); equal(p.exitValue(), 0); // Check that no objects were leaked. System.out.printf("%d -> %d%n", n0, n1); equal(n0, n1); } //---------------------------------------------------------------- // The main class of the child process. // Job's job is to: // - provide the name of a class to check for leaks. // - call rendezvousChild exactly twice, while quiescent. // - in between calls to rendezvousChild, run code that may leak. //---------------------------------------------------------------- public static class Job { static String classToCheckForLeaks() { return "java.util.concurrent.locks.AbstractQueuedSynchronizer$Node"; } public static void main(String[] args) throws Throwable { final BlockingQueue<Object> q = new LinkedBlockingQueue<Object>(); final Random rnd = new Random(); final boolean fair = rnd.nextBoolean(); debugPrintf("fair=%s%n", fair); final Semaphore sem = new Semaphore(0, fair); final int threads = 3; final int iterations = 10000; final CyclicBarrier cb = new CyclicBarrier(threads+1); for (int i = 0; i < threads; i++) new Thread() { public void run() { try { for (int j = 0; j < iterations; j++) { if (j == iterations/10 || j == iterations - 1) { cb.await(); // Quiesce cb.await(); // Resume } //q.poll(1, TimeUnit.MICROSECONDS); sem.tryAcquire(1, TimeUnit.MICROSECONDS); } } catch (Throwable t) { unexpected(t); } }}.start(); cb.await(); // Quiesce rendezvousChild(); // Measure cb.await(); // Resume cb.await(); // Quiesce rendezvousChild(); // Measure cb.await(); // Resume System.exit(failed); } // If something goes wrong, we might never see it, since IO // streams are connected to the parent. So we need a special // purpose print method to debug Jobs. static void debugPrintf(String format, Object... args) { try { new PrintStream(new FileOutputStream("/dev/tty")) .printf(format, args); } catch (Throwable t) { throw new Error(t); } } } //--------------------- Infrastructure --------------------------- static volatile int passed = 0, failed = 0; static void pass() {passed++;} static void fail() {failed++; Thread.dumpStack();} static void fail(String msg) {System.out.println(msg); fail();} static void unexpected(Throwable t) {failed++; t.printStackTrace();} static void check(boolean cond) {if (cond) pass(); else fail();} static void check(boolean cond, String m) {if (cond) pass(); else fail(m);} static void equal(Object x, Object y) { if (x == null ? y == null : x.equals(y)) pass(); else fail(x + " not equal to " + y);} public static void main(String[] args) throws Throwable { try {realMain(args);} catch (Throwable t) {unexpected(t);} System.out.printf("%nPassed = %d, failed = %d%n%n", passed, failed); if (failed > 0) throw new AssertionError("Some tests failed");} } This often results in: 7 -> 7 Passed = 12, failed = 0 but occasionally the disquieting: fair=true 7 -> 7171 which suggests that we've still missed something in our latest solution.
03-12-2006

EVALUATION Doug Lea is providing a fix for this. Hopefully we will get it right this time. Our first attempt to fix this only worked if one thread was polling. With multiple polling threads, a more sophisticated fix is necessary. Here's the multi-threaded test case that should run forever without exhausting CPU or memory: ----------------------------------------- import java.util.concurrent.*; public class Bug3 { private static int intArg(String[] args, int i, int defaultValue) { return args.length > i ? Integer.parseInt(args[i]) : defaultValue; } public static void main(String[] args) { final BlockingQueue<Object> q = new LinkedBlockingQueue<Object>(); final int threads = intArg(args, 0, 10); final int millis = intArg(args, 1, 100); for (int i = 0; i < threads; i++) new Thread() { public void run() { while (true) { try { q.poll(millis, TimeUnit.MILLISECONDS); } catch (Throwable t) { throw new Error(t); }}}}.start(); } } ----------------------------------------- There is a real memory leak in the libraries, but... hotspot's reaction to the memory leak in Bug3 also appears unsatisfactory. I filed two bugs against java/hotspot/garbage_collector 6493287: Unproductive CPU-spinning GCs on heap exhaustion; please throw OOME instead 6493335: Mismatch between -Xm[sx] and verbose:gc output Our attempts to diagnose this are also documented in the dup 6490770: Supposedly fixed memory leak leads to 100% VM CPU usage Here's Doug's fix. --- /u/martin/ws/dolphin/src/share/classes/java/util/concurrent/locks/AbstractQueuedSynchronizer.java 2006-08-14 15:35:26.035005000 -0700 +++ /u/martin/ws/Leak7/src/share/classes/java/util/concurrent/locks/AbstractQueuedSynchronizer.java 2006-11-12 00:07:22.789528000 -0800 @@ -660,6 +660,10 @@ // Can use unconditional write instead of CAS here node.waitStatus = Node.CANCELLED; unparkSuccessor(node); + // Bypass pointer to this node to avoid garbage retention + Node pred = node.prev; + if (pred != null) + compareAndSetNext(pred, node, node.next); } } @@ -801,8 +805,8 @@ cancelAcquire(node); return false; } - if (nanosTimeout > spinForTimeoutThreshold && - shouldParkAfterFailedAcquire(p, node)) + if (shouldParkAfterFailedAcquire(p, node) && + nanosTimeout > spinForTimeoutThreshold) LockSupport.parkNanos(this, nanosTimeout); long now = System.nanoTime(); nanosTimeout -= now - lastTime; @@ -907,8 +911,8 @@ cancelAcquire(node); return false; } - if (nanosTimeout > spinForTimeoutThreshold && - shouldParkAfterFailedAcquire(p, node)) + if (shouldParkAfterFailedAcquire(p, node) && + nanosTimeout > spinForTimeoutThreshold) LockSupport.parkNanos(this, nanosTimeout); long now = System.nanoTime(); nanosTimeout -= now - lastTime; @@ -1695,8 +1699,13 @@ * @return its new wait node */ private Node addConditionWaiter() { - Node node = new Node(Thread.currentThread(), Node.CONDITION); Node t = lastWaiter; + // If lastWaiter is cancelled, clean out. + if (t != null && t.waitStatus != Node.CONDITION) { + unlinkCancelledWaiters(); + t = lastWaiter; + } + Node node = new Node(Thread.currentThread(), Node.CONDITION); if (t == null) firstWaiter = node; else @@ -1735,40 +1744,36 @@ } /** - * Returns true if given node is on this condition queue. - * Call only when holding lock. - */ - private boolean isOnConditionQueue(Node node) { - return node.next != null || node == lastWaiter; - } - - /** - * Unlinks a cancelled waiter node from condition queue. This - * is called when cancellation occurred during condition wait, - * not lock wait, and is called only after lock has been - * re-acquired by a cancelled waiter and the node is not known - * to already have been dequeued. It is needed to avoid - * garbage retention in the absence of signals. So even though - * it may require a full traversal, it comes into play only - * when timeouts or cancellations occur in the absence of - * signals. + * Unlinks cancelled waiter nodes from condition queue. + * Called only while holding lock. This is called when + * cancellation occurred during condition wait, and upon + * insertion of a new waiter when lastWaiter is seen to have + * been cancelled. This method is needed to avoid garbage + * retention in the absence of signals. So even though it may + * require a full traversal, it comes into play only when + * timeouts or cancellations occur in the absence of + * signals. It traverses all nodes rather than stopping at a + * particular target to unlink all pointers to garbage nodes + * without requiring many re-traversals during cancellation + * storms. */ - private void unlinkCancelledWaiter(Node node) { + private void unlinkCancelledWaiters() { Node t = firstWaiter; Node trail = null; while (t != null) { - if (t == node) { Node next = t.nextWaiter; + if (t.waitStatus != Node.CONDITION) { + t.nextWaiter = null; if (trail == null) firstWaiter = next; else trail.nextWaiter = next; - if (lastWaiter == node) + if (next == null) lastWaiter = trail; - break; } + else trail = t; - t = t.nextWaiter; + t = next; } } @@ -1892,8 +1897,8 @@ } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; - if (isOnConditionQueue(node)) - unlinkCancelledWaiter(node); + if (node.nextWaiter != null) // clean up if cancelled + unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); } @@ -1934,8 +1939,8 @@ } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; - if (isOnConditionQueue(node)) - unlinkCancelledWaiter(node); + if (node.nextWaiter != null) + unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); return nanosTimeout - (System.nanoTime() - lastTime); @@ -1977,8 +1982,8 @@ } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; - if (isOnConditionQueue(node)) - unlinkCancelledWaiter(node); + if (node.nextWaiter != null) + unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); return !timedout; @@ -2015,6 +2020,7 @@ timedout = transferAfterCancelledWait(node); break; } + if (nanosTimeout >= spinForTimeoutThreshold) LockSupport.parkNanos(this, nanosTimeout); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; @@ -2024,8 +2030,8 @@ } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; - if (isOnConditionQueue(node)) - unlinkCancelledWaiter(node); + if (node.nextWaiter != null) + unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); return !timedout; @@ -2119,6 +2125,7 @@ private static final long headOffset; private static final long tailOffset; private static final long waitStatusOffset; + private static final long nextOffset; static { try { @@ -2130,6 +2137,8 @@ (AbstractQueuedSynchronizer.class.getDeclaredField("tail")); waitStatusOffset = unsafe.objectFieldOffset (Node.class.getDeclaredField("waitStatus")); + nextOffset = unsafe.objectFieldOffset + (Node.class.getDeclaredField("next")); } catch (Exception ex) { throw new Error(ex); } } @@ -2157,4 +2166,11 @@ return unsafe.compareAndSwapInt(node, waitStatusOffset, expect, update); } + + private final static boolean compareAndSetNext(Node node, + Node expect, + Node update) { + return unsafe.compareAndSwapObject(node, nextOffset, + expect, update); + } }
14-11-2006

WORK AROUND -
15-08-2006

EVALUATION David writes: I think we may have missed something. 6236036 was closed because it was apparently fixed as part of AQS mods in b49. However those mods only apply to AQS.ConditionObject. We seem to have the same garbage retention problem in AQS itself and that has not been addressed. I don't know whether 6236036 was specifically about the ConditionObject or AQS in general. Doug writes: Amusing, in a non-amusing way. Long ago, the code to splice out next fields after timeouts was clobbered because someone pointed out that it wasn't needed and I believed them. And they were right, except for garbage accumulation which now shows up with a vengence for short waits because of spinForTimeoutThreshold bypass introduced in Mustang.
15-08-2006

SUGGESTED FIX --- /u/martin/ws/dolphin/src/share/classes/java/util/concurrent/locks/AbstractQueuedSynchronizer.java 2006-08-14 15:35:26.035005000 -0700 +++ /u/martin/ws/Leak7/src/share/classes/java/util/concurrent/locks/AbstractQueuedSynchronizer.java 2006-11-12 00:07:22.789528000 -0800 @@ -660,6 +660,10 @@ // Can use unconditional write instead of CAS here node.waitStatus = Node.CANCELLED; unparkSuccessor(node); + // Bypass pointer to this node to avoid garbage retention + Node pred = node.prev; + if (pred != null) + compareAndSetNext(pred, node, node.next); } } @@ -801,8 +805,8 @@ cancelAcquire(node); return false; } - if (nanosTimeout > spinForTimeoutThreshold && - shouldParkAfterFailedAcquire(p, node)) + if (shouldParkAfterFailedAcquire(p, node) && + nanosTimeout > spinForTimeoutThreshold) LockSupport.parkNanos(this, nanosTimeout); long now = System.nanoTime(); nanosTimeout -= now - lastTime; @@ -907,8 +911,8 @@ cancelAcquire(node); return false; } - if (nanosTimeout > spinForTimeoutThreshold && - shouldParkAfterFailedAcquire(p, node)) + if (shouldParkAfterFailedAcquire(p, node) && + nanosTimeout > spinForTimeoutThreshold) LockSupport.parkNanos(this, nanosTimeout); long now = System.nanoTime(); nanosTimeout -= now - lastTime; @@ -1695,8 +1699,13 @@ * @return its new wait node */ private Node addConditionWaiter() { - Node node = new Node(Thread.currentThread(), Node.CONDITION); Node t = lastWaiter; + // If lastWaiter is cancelled, clean out. + if (t != null && t.waitStatus != Node.CONDITION) { + unlinkCancelledWaiters(); + t = lastWaiter; + } + Node node = new Node(Thread.currentThread(), Node.CONDITION); if (t == null) firstWaiter = node; else @@ -1735,40 +1744,36 @@ } /** - * Returns true if given node is on this condition queue. - * Call only when holding lock. - */ - private boolean isOnConditionQueue(Node node) { - return node.next != null || node == lastWaiter; - } - - /** - * Unlinks a cancelled waiter node from condition queue. This - * is called when cancellation occurred during condition wait, - * not lock wait, and is called only after lock has been - * re-acquired by a cancelled waiter and the node is not known - * to already have been dequeued. It is needed to avoid - * garbage retention in the absence of signals. So even though - * it may require a full traversal, it comes into play only - * when timeouts or cancellations occur in the absence of - * signals. + * Unlinks cancelled waiter nodes from condition queue. + * Called only while holding lock. This is called when + * cancellation occurred during condition wait, and upon + * insertion of a new waiter when lastWaiter is seen to have + * been cancelled. This method is needed to avoid garbage + * retention in the absence of signals. So even though it may + * require a full traversal, it comes into play only when + * timeouts or cancellations occur in the absence of + * signals. It traverses all nodes rather than stopping at a + * particular target to unlink all pointers to garbage nodes + * without requiring many re-traversals during cancellation + * storms. */ - private void unlinkCancelledWaiter(Node node) { + private void unlinkCancelledWaiters() { Node t = firstWaiter; Node trail = null; while (t != null) { - if (t == node) { Node next = t.nextWaiter; + if (t.waitStatus != Node.CONDITION) { + t.nextWaiter = null; if (trail == null) firstWaiter = next; else trail.nextWaiter = next; - if (lastWaiter == node) + if (next == null) lastWaiter = trail; - break; } + else trail = t; - t = t.nextWaiter; + t = next; } } @@ -1892,8 +1897,8 @@ } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; - if (isOnConditionQueue(node)) - unlinkCancelledWaiter(node); + if (node.nextWaiter != null) // clean up if cancelled + unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); } @@ -1934,8 +1939,8 @@ } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; - if (isOnConditionQueue(node)) - unlinkCancelledWaiter(node); + if (node.nextWaiter != null) + unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); return nanosTimeout - (System.nanoTime() - lastTime); @@ -1977,8 +1982,8 @@ } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; - if (isOnConditionQueue(node)) - unlinkCancelledWaiter(node); + if (node.nextWaiter != null) + unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); return !timedout; @@ -2015,6 +2020,7 @@ timedout = transferAfterCancelledWait(node); break; } + if (nanosTimeout >= spinForTimeoutThreshold) LockSupport.parkNanos(this, nanosTimeout); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; @@ -2024,8 +2030,8 @@ } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; - if (isOnConditionQueue(node)) - unlinkCancelledWaiter(node); + if (node.nextWaiter != null) + unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); return !timedout; @@ -2119,6 +2125,7 @@ private static final long headOffset; private static final long tailOffset; private static final long waitStatusOffset; + private static final long nextOffset; static { try { @@ -2130,6 +2137,8 @@ (AbstractQueuedSynchronizer.class.getDeclaredField("tail")); waitStatusOffset = unsafe.objectFieldOffset (Node.class.getDeclaredField("waitStatus")); + nextOffset = unsafe.objectFieldOffset + (Node.class.getDeclaredField("next")); } catch (Exception ex) { throw new Error(ex); } } @@ -2157,4 +2166,11 @@ return unsafe.compareAndSwapInt(node, waitStatusOffset, expect, update); } + + private final static boolean compareAndSetNext(Node node, + Node expect, + Node update) { + return unsafe.compareAndSwapObject(node, nextOffset, + expect, update); + } }
15-08-2006