JDK-4681980 : IOException in poll0 during nio select
  • Type: Bug
  • Component: core-libs
  • Sub-Component: java.nio
  • Affected Version: 1.4.0
  • Priority: P3
  • Status: Closed
  • Resolution: Duplicate
  • OS: windows_nt
  • CPU: x86
  • Submitted: 2002-05-08
  • Updated: 2002-05-09
  • Resolved: 2002-05-09
Related Reports
Duplicate :  
Description
void acceptConnections() throws IOException, InterruptedException {
        SelectionKey acceptKey =
                this.selectableChannel.register(this.selector,
                SelectionKey.OP_ACCEPT);
        System.out.println("DEBUG:Acceptor loop...");

        while ((this.keysAdded = acceptKey.selector().select()) > 0) {

            System.out.println("DEBUG:Selector returned "
                    + this.keysAdded + " ready for IO operations");

            Set readyKeys = this.selector.selectedKeys();

            Iterator i = readyKeys.iterator();
            while (i.hasNext()) {
                SelectionKey key = (SelectionKey) i.next();
                i.remove();

                if (key.isConnectable()) {
                    System.out.println("DEBUG: key '" + key + " connectable");
                }

                if (key.isAcceptable()) {
                    ServerSocketChannel nextReady =
                            (ServerSocketChannel) key.channel();

                    System.out.println("DEBUG:Accept Processing selection key
read="
                            + key.isReadable() + " write=" + key.isWritable() +
                            " accept=" + key.isAcceptable());

                    SocketChannel channel = nextReady.accept();
                    channel.configureBlocking(false);
                    SelectionKey readKey =
                            channel.register(this.selector,
                            SelectionKey.OP_READ | SelectionKey.OP_WRITE);

                    readKey.attach(new ChannelCallback(channel));
                } else if (key.isReadable()) {
                    SelectableChannel nextReady =
                            (SelectableChannel) key.channel();

                    System.out.println("DEBUG:Read Processing selection key
read="
                            + key.isReadable() + " write=" + key.isWritable() +
                            " accept=" + key.isAcceptable());

                    try {
                        this.readMessage((ChannelCallback) key.attachment());
                    } catch (SocketClosedException sce) {
                        System.out.println("DEBUG: key '" + key + " connection
closed with " + sce.getMessage());
                    } catch (IOException ioe) {
                        System.out.println("DEBUG: key '" + key + " read failed
with " + ioe.getMessage());
                    }
                } else if (key.isWritable()) {
                    ChannelCallback callback = (ChannelCallback) key.attachment
();
                    String message = "What is your name? ";
                    ByteBuffer buf = ByteBuffer.wrap(message.getBytes());
                    int nbytes = callback.getChannel().write(buf);
                }
            }

        }

        System.out.println("DEBUG:End acceptor loop...");
    }


    /**
     *  Description of the Method
     *
     *@param  channel          Description of the Parameter
     *@param  message          Description of the Parameter
     *@exception  IOException  Description of the Exception
     */
    public void writeMessage(SocketChannel channel, String message) throws
IOException {
        ByteBuffer buf = ByteBuffer.wrap(message.getBytes());
        int nbytes = channel.write(buf);
        System.out.println("DEBUG:Wrote " + nbytes + " to channel.");
    }


    final static int BUFSIZE = 256;


    /**
     *  Description of the Method
     *
     *@param  byteBuffer                    Description of the Parameter
     *@return                               Description of the Return Value
     *@exception  CharacterCodingException  Description of the Exception
     */
    public String decode(ByteBuffer byteBuffer) throws CharacterCodingException
{
        Charset charset = Charset.forName("us-ascii");
        CharsetDecoder decoder = charset.newDecoder();
        CharBuffer charBuffer = decoder.decode(byteBuffer);
        String result = charBuffer.toString();
        return result;
    }


    /**
     *  Description of the Method
     *
     *@param  callback                   Description of the Parameter
     *@exception  IOException            Description of the Exception
     *@exception  SocketClosedException  Description of the Exception
     *@exception  InterruptedException   Description of the Exception
     */
    public void readMessage(ChannelCallback callback) throws IOException,
SocketClosedException, InterruptedException {
        ByteBuffer byteBuffer = ByteBuffer.allocate(BUFSIZE);
        int nbytes = callback.getChannel().read(byteBuffer);
        System.out.println("DEBUG:readMessage nbytes = " + nbytes);

        if (nbytes < 0) {
            callback.getChannel().close();
            throw new SocketClosedException("Connection Closed");
        }

        byteBuffer.flip();
        String result = this.decode(byteBuffer);

        System.out.println("DEBUG:" + result);

        if (result.indexOf("quit") >= 0) {
            callback.getChannel().close();
        } else if (result.indexOf("shutdown") >= 0) {
            callback.getChannel().close();
            throw new InterruptedException();
        } else {
            callback.append(result.toString());
            //If we are done with the line then we execute the callback.
            if (result.indexOf("\n") >= 0) {
                callback.execute();
            }
        }
    }


    /**
     *  Description of the Class
     *
     *@author     craig
     *@created    7 May 2002
     */
    public class SocketClosedException extends IOException {
        /**
         *  Constructor for the SocketClosedException object
         *
         *@param  msg  Description of the Parameter
         */
        SocketClosedException(String msg) {
            super(msg);
        }
    }


    /**
     *  Description of the Class
     *
     *@author     craig
     *@created    7 May 2002
     */
    public class ChannelCallback {
        private SocketChannel channel;
        private StringBuffer buffer;


        /**
         *  Constructor for the ChannelCallback object
         *
         *@param  channel  Description of the Parameter
         */
        public ChannelCallback(SocketChannel channel) {
            this.channel = channel;
            this.buffer = new StringBuffer();
        }


        /**
         *  Description of the Method
         *
         *@exception  IOException  Description of the Exception
         */
        public void execute() throws IOException {
            System.out.println("DEBUG:" + this.buffer.toString());
            writeMessage(this.channel, this.buffer.toString());
            buffer = new StringBuffer();
        }


        /**
         *  Gets the channel attribute of the ChannelCallback object
         *
         *@return    The channel value
         */
        public SocketChannel getChannel() {
            return this.channel;
        }


        /**
         *  Description of the Method
         *
         *@param  values  Description of the Parameter
         */
        public void append(String values) {
            buffer.append(values);
        }
    }


    /**
     *  The main program for the NonBlockingServer class
     *
     *@param  args  The command line arguments
     */
    public static void main(String[] args) {

        NonBlockingServer nbServer = new NonBlockingServer();
        try {
            nbServer.initialize();
        } catch (IOException e) {
            e.printStackTrace();
            System.exit(-1);
        }

        try {
            nbServer.acceptConnections();
        } catch (IOException e) {
            e.printStackTrace();
            System.out.println("ERROR:" + e);
        } catch (InterruptedException e) {
            System.out.println("INFO: Exiting normally...");
        }
    }
}


---------- END SOURCE ----------
(Review ID: 146267) 
======================================================================


Name: nt126004			Date: 05/08/2002


FULL PRODUCT VERSION :
java version "1.4.0"
Java(TM) 2 Runtime Environment, Standard Edition (build 1.4.0-b92)
Java HotSpot(TM) Client VM (build 1.4.0-b92, mixed mode)

FULL OPERATING SYSTEM VERSION :
Windows NT Version 4.0

A DESCRIPTION OF THE PROBLEM :
I have created a basic test server and client just to try
out the nio features in 1.4.0.

When I run the server and then run a couple of clients and
leave them running everything seems fine. However if I
stress test the server by repeatedly terminating the client
processes a problem occurs very quickly at the server end.

The stack trace is below.

I can't figure out what is going on as it seems to be at a
very low level.

The fact that I was originally running two clients seems
irrelevant as it occurs very easily with just one client.

STEPS TO FOLLOW TO REPRODUCE THE PROBLEM :
1. Run server.NonBlockingServer
2. Run client.NonBlockingClient
3. Randomly terminate (Ctrl C) client and repeat step 2
until the server fails.

EXPECTED VERSUS ACTUAL BEHAVIOR :
I expected steps 2 and 3 to be endlessly repeatable

ERROR MESSAGES/STACK TRACES THAT OCCUR :
java.io.IOException: The parameter is incorrect
        at sun.nio.ch.PollArrayWrapper.poll0(Native Method)
        at sun.nio.ch.PollArrayWrapper.poll(PollArrayWrapper.java:146)
        at sun.nio.ch.PollSelectorImpl.doSelect(PollSelectorImpl.java:46)
        at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:62)
        at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:67)
        at server.NonBlockingServer.acceptConnections(NonBlockingServer.java:84)

        at server.NonBlockingServer.main(NonBlockingServer.java:305)

This bug can be reproduced often.

---------- BEGIN SOURCE ----------
/* CmIdentification $Header$  */
/* Copyright (c) 1998, 2002 TAB Limited. All Rights Reserved.  */
package client;

import java.io.*;
import java.nio.*;
import java.nio.channels.*;
import java.nio.channels.spi.*;
import java.nio.charset.*;
import java.net.*;
import java.util.*;

/**
 *  Description of the Class
 *
 *@author     craig
 *@created    7 May 2002
 */
public class NonBlockingClient {

    private String name = null;
    private NonBlockingReader nonBlockingReader = null;
    int port = 4001;
    Selector selector = null;
    List selectableChannelsList = new ArrayList();
//    SocketChannel selectableChannel = null;
    int keysAdded = 0;

    static String QUIT_SERVER = "quit";
    static String SHUTDOWN = "shutdown";
    String serverName = "mallett";

    int numberOfChannels = 1;


    /**
     *  Constructor for the NonBlockingClient object
     */
    public NonBlockingClient(String name, int numberOfChannels) {
        this.name = name;
        this.numberOfChannels = numberOfChannels;
        try {
            this.selector = SelectorProvider.provider().openSelector();
        } catch (IOException e) {
            e.printStackTrace();
            System.exit(-1);
        }

    }


    /**
     *  Constructor for the NonBlockingClient object
     *
     *@param  port  Description of the Parameter
     */
    public NonBlockingClient(String name, int numberOfChannels, int port) {
        this.name = name;
        this.port = port;
       this.numberOfChannels = numberOfChannels;
         try {
            this.selector = SelectorProvider.provider().openSelector();
        } catch (IOException e) {
            e.printStackTrace();
            System.exit(-1);
        }
    }


    /**
     *  Description of the Method
     *
     *@exception  IOException  Description of the Exception
     */
    public void initialize(int numberOfChannels) throws IOException {

    }


    /**
     *  Description of the Method
     *
     *@exception  IOException  Description of the Exception
     */
    public void finalize() throws IOException {
        Iterator it = selectableChannelsList.iterator();
        while (it.hasNext()) {
           ((SocketChannel)it.next()).close();
        }
        this.selector.close();
    }


    /**
     *  Description of the Method
     *
     *@exception  IOException           Description of the Exception
     *@exception  InterruptedException  Description of the Exception
     */
    public void establishConnections() throws IOException, InterruptedException
{
        nonBlockingReader = new NonBlockingReader();

        for(int idx=0;idx<numberOfChannels;idx++) {
            SocketChannel selChannel = SocketChannel.open();
            selChannel.configureBlocking(false);
            selectableChannelsList.add(selChannel);
            System.out.println("DEBUG:@@@Registering");
            nonBlockingReader.registerConnection(selChannel);
            System.out.println("DEBUG:@@@Registered");
            InetSocketAddress isa = new InetSocketAddress(serverName,
this.port);
            System.out.println("DEBUG:@@@Connecting");
            selChannel.connect(isa);
            System.out.println("DEBUG:@@@Connected");
        }

        Thread readThread = new Thread(nonBlockingReader);
        readThread.setDaemon(true);
        readThread.start();


    }


    /**
     *  Description of the Method
     *
     *@param  msg  Description of the Parameter
     */
    public void sendTestMessage(String msg) {
        try {
            Iterator it = selectableChannelsList.iterator();
            while (it.hasNext()) {
               SocketChannel sc = (SocketChannel)it.next();
               writeMessage(sc, msg);
            }
        } catch (IOException ioe) {
            ioe.printStackTrace();
        }
    }


    /**
     *  Description of the Method
     *
     *@param  channel          Description of the Parameter
     *@param  message          Description of the Parameter
     *@exception  IOException  Description of the Exception
     */
    public void writeMessage(SocketChannel channel, String message) throws
IOException {
        ByteBuffer buf = ByteBuffer.wrap(message.getBytes());
        int nbytes = channel.write(buf);
        System.out.println("DEBUG:Wrote " + nbytes + " to channel.");
    }


    final static int BUFSIZE = 256;


    /**
     *  Description of the Method
     *
     *@param  byteBuffer                    Description of the Parameter
     *@return                               Description of the Return Value
     *@exception  CharacterCodingException  Description of the Exception
     */
    public String decode(ByteBuffer byteBuffer) throws CharacterCodingException
{
        Charset charset = Charset.forName("us-ascii");
        CharsetDecoder decoder = charset.newDecoder();
        CharBuffer charBuffer = decoder.decode(byteBuffer);
        String result = charBuffer.toString();
        return result;
    }

    /**
     *  Description of the Class
     *
     *@author     craig
     *@created    7 May 2002
     */
    public class SocketClosedException extends IOException {
        /**
         *  Constructor for the SocketClosedException object
         *
         *@param  msg  Description of the Parameter
         */
        SocketClosedException(String msg) {
            super(msg);
        }
    }


    /**
     *  Description of the Class
     *
     *@author     craig
     *@created    7 May 2002
     */
    public class ChannelCallback {
        private SocketChannel channel;
        private StringBuffer buffer;


        /**
         *  Constructor for the ChannelCallback object
         *
         *@param  channel  Description of the Parameter
         */
        public ChannelCallback(SocketChannel channel) {
            this.channel = channel;
            this.buffer = new StringBuffer();
        }


        /**
         *  Description of the Method
         *
         *@exception  IOException  Description of the Exception
         */
        public void execute() throws IOException {
            System.out.println("DEBUG:" + this.buffer.toString());
            writeMessage(this.channel, this.buffer.toString());
            buffer = new StringBuffer();
        }


        /**
         *  Gets the channel attribute of the ChannelCallback object
         *
         *@return    The channel value
         */
        public SocketChannel getChannel() {
            return this.channel;
        }


        /**
         *  Description of the Method
         *
         *@param  values  Description of the Parameter
         */
        public void append(String values) {
            buffer.append(values);
        }
    }


    /**
     *  Description of the Class
     *
     *@author     craig
     *@created    7 May 2002
     */
    public class NonBlockingReader implements Runnable {
        private Selector selector = null;
        private int keysAdded = 0;


        /**
         *  Description of the Method
         *
         *@param  channel  Description of the Parameter
         */
        public synchronized void registerConnection(SocketChannel channel) {
            try {
                System.out.println("DEBUG:registerConnection:CHK1");
                SelectionKey readKey =
                        channel.register(this.selector,
                        SelectionKey.OP_CONNECT | SelectionKey.OP_READ |
SelectionKey.OP_WRITE);
                System.out.println("DEBUG:registerConnection:CHK2");
            } catch (ClosedChannelException cce) {
                cce.printStackTrace();
            }
        }


        /**
         *  Constructor for the NonBlockingReader object
         */
        public NonBlockingReader() {
            try {
                synchronized(this) {
                   this.selector = SelectorProvider.provider().openSelector();
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }


        /**
         *  Main processing method for the NonBlockingReader object
         */
        public void run() {

            try {
                while (true) {

                    System.out.println("DEBUG:NonBlockingReader - Selector
started");
                    keysAdded = this.selector.select();
                    System.out.println("DEBUG:NonBlockingReader = readSelector
returned "
                            + this.keysAdded + " ready for IO operations");

                    Set readyKeys = this.selector.selectedKeys();

                    Iterator i = readyKeys.iterator();
                    while (i.hasNext()) {
                        SelectionKey key = (SelectionKey) i.next();
                        i.remove();
                        if (key.isConnectable()) {
                            System.out.println("DEBUG:NonBlockingReader
Finishing Connect key read="
                                    + key.isReadable() + " write=" +
key.isWritable());
                            ((SocketChannel) key.channel()).finishConnect();
                        }
                        if (key.isReadable()) {
                            SelectableChannel nextReady =
                                    (SelectableChannel) key.channel();

                            System.out.println("DEBUG:NonBlockingReader Read
Processing selection key read="
                                    + key.isReadable() + " write=" +
key.isWritable());
                            ByteBuffer byteBuffer = ByteBuffer.allocate
(BUFSIZE);
                            int nbytes = ((SocketChannel) nextReady).read
(byteBuffer);
                            System.out.println("DEBUG:NonBlockingReader read
nbytes = " + nbytes);
                            byteBuffer.flip();
                            String result = decode(byteBuffer);

                            System.out.println("DEBUG:NonBlockingReader
result='" + result + "'");

                        } else if (key.isWritable()) {
                            System.out.println("DEBUG:NonBlockingReader
Writable key read="
                                    + key.isReadable() + " write=" +
key.isWritable());
                        }
                    }
                }

            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                try {
                    this.selector.close();
                } catch (IOException ioe) {
                    ioe.printStackTrace();
                }
            }
        }
    }


    /**
     *  The main program for the NonBlockingClient class
     *
     *@param  args  The command line arguments
     */
    public static void main(String[] args) {
        int clientIdx = 1;

        NonBlockingClient nbClient = new NonBlockingClient("Client"+
(clientIdx++),12);
        try {
            nbClient.establishConnections();

            Thread.sleep(1000L);
            int idx = 0;
            while (true) {
                nbClient.sendTestMessage("Test" + (idx++) + "\n");
                Thread.sleep(1000L);
            }

        } catch (IOException e) {
            e.printStackTrace();
            System.out.println("ERROR:" + e);
        } catch (InterruptedException e) {
            System.out.println("INFO: Exiting normally...");
        }
    }
}


/* CmIdentification $Header$  */
/* Copyright (c) 1998, 2002 TAB Limited. All Rights Reserved.  */
package server;

import java.io.*;
import java.nio.*;
import java.nio.channels.*;
import java.nio.channels.spi.*;
import java.nio.charset.*;
import java.net.*;
import java.util.*;

/**
 *  Description of the Class
 *
 *@author     craig
 *@created    7 May 2002
 */
public class NonBlockingServer {

    int port = 4001;
    Selector selector = null;
    ServerSocketChannel selectableChannel = null;
    int keysAdded = 0;

    static String QUIT_SERVER = "quit";
    static String SHUTDOWN = "shutdown";


    /**
     *  Constructor for the NonBlockingServer object
     */
    public NonBlockingServer() { }


    /**
     *  Constructor for the NonBlockingServer object
     *
     *@param  port  Description of the Parameter
     */
    public NonBlockingServer(int port) {
        this.port = port;
    }


    /**
     *  Description of the Method
     *
     *@exception  IOException  Description of the Exception
     */
    public void initialize() throws IOException {
        this.selector = SelectorProvider.provider().openSelector();
        this.selectableChannel = ServerSocketChannel.open();
        this.selectableChannel.configureBlocking(false);
        InetAddress lh = InetAddress.getLocalHost();
        InetSocketAddress isa = new InetSocketAddress(lh, this.port);
        this.selectableChannel.socket().bind(isa);
    }


    /**
     *  Description of the Method
     *
     *@exception  IOException  Description of the Exception
     */
    public void finalize() throws IOException {
        this.selectableChannel.close();
        this.selector.close();
    }


    /**
     *  Description of the Method
     *
     *@exception  IOException           Description of the Exception
     *@exception  InterruptedException  Description of the Exception
     */
    public 

Comments
EVALUATION I can reproduce this in 1.4. However this bug has been fixed by the new selector implementation added for windows in 1.4.1. ###@###.### 2002-05-09
09-05-2002