void acceptConnections() throws IOException, InterruptedException {
SelectionKey acceptKey =
this.selectableChannel.register(this.selector,
SelectionKey.OP_ACCEPT);
System.out.println("DEBUG:Acceptor loop...");
while ((this.keysAdded = acceptKey.selector().select()) > 0) {
System.out.println("DEBUG:Selector returned "
+ this.keysAdded + " ready for IO operations");
Set readyKeys = this.selector.selectedKeys();
Iterator i = readyKeys.iterator();
while (i.hasNext()) {
SelectionKey key = (SelectionKey) i.next();
i.remove();
if (key.isConnectable()) {
System.out.println("DEBUG: key '" + key + " connectable");
}
if (key.isAcceptable()) {
ServerSocketChannel nextReady =
(ServerSocketChannel) key.channel();
System.out.println("DEBUG:Accept Processing selection key
read="
+ key.isReadable() + " write=" + key.isWritable() +
" accept=" + key.isAcceptable());
SocketChannel channel = nextReady.accept();
channel.configureBlocking(false);
SelectionKey readKey =
channel.register(this.selector,
SelectionKey.OP_READ | SelectionKey.OP_WRITE);
readKey.attach(new ChannelCallback(channel));
} else if (key.isReadable()) {
SelectableChannel nextReady =
(SelectableChannel) key.channel();
System.out.println("DEBUG:Read Processing selection key
read="
+ key.isReadable() + " write=" + key.isWritable() +
" accept=" + key.isAcceptable());
try {
this.readMessage((ChannelCallback) key.attachment());
} catch (SocketClosedException sce) {
System.out.println("DEBUG: key '" + key + " connection
closed with " + sce.getMessage());
} catch (IOException ioe) {
System.out.println("DEBUG: key '" + key + " read failed
with " + ioe.getMessage());
}
} else if (key.isWritable()) {
ChannelCallback callback = (ChannelCallback) key.attachment
();
String message = "What is your name? ";
ByteBuffer buf = ByteBuffer.wrap(message.getBytes());
int nbytes = callback.getChannel().write(buf);
}
}
}
System.out.println("DEBUG:End acceptor loop...");
}
/**
* Description of the Method
*
*@param channel Description of the Parameter
*@param message Description of the Parameter
*@exception IOException Description of the Exception
*/
public void writeMessage(SocketChannel channel, String message) throws
IOException {
ByteBuffer buf = ByteBuffer.wrap(message.getBytes());
int nbytes = channel.write(buf);
System.out.println("DEBUG:Wrote " + nbytes + " to channel.");
}
final static int BUFSIZE = 256;
/**
* Description of the Method
*
*@param byteBuffer Description of the Parameter
*@return Description of the Return Value
*@exception CharacterCodingException Description of the Exception
*/
public String decode(ByteBuffer byteBuffer) throws CharacterCodingException
{
Charset charset = Charset.forName("us-ascii");
CharsetDecoder decoder = charset.newDecoder();
CharBuffer charBuffer = decoder.decode(byteBuffer);
String result = charBuffer.toString();
return result;
}
/**
* Description of the Method
*
*@param callback Description of the Parameter
*@exception IOException Description of the Exception
*@exception SocketClosedException Description of the Exception
*@exception InterruptedException Description of the Exception
*/
public void readMessage(ChannelCallback callback) throws IOException,
SocketClosedException, InterruptedException {
ByteBuffer byteBuffer = ByteBuffer.allocate(BUFSIZE);
int nbytes = callback.getChannel().read(byteBuffer);
System.out.println("DEBUG:readMessage nbytes = " + nbytes);
if (nbytes < 0) {
callback.getChannel().close();
throw new SocketClosedException("Connection Closed");
}
byteBuffer.flip();
String result = this.decode(byteBuffer);
System.out.println("DEBUG:" + result);
if (result.indexOf("quit") >= 0) {
callback.getChannel().close();
} else if (result.indexOf("shutdown") >= 0) {
callback.getChannel().close();
throw new InterruptedException();
} else {
callback.append(result.toString());
//If we are done with the line then we execute the callback.
if (result.indexOf("\n") >= 0) {
callback.execute();
}
}
}
/**
* Description of the Class
*
*@author craig
*@created 7 May 2002
*/
public class SocketClosedException extends IOException {
/**
* Constructor for the SocketClosedException object
*
*@param msg Description of the Parameter
*/
SocketClosedException(String msg) {
super(msg);
}
}
/**
* Description of the Class
*
*@author craig
*@created 7 May 2002
*/
public class ChannelCallback {
private SocketChannel channel;
private StringBuffer buffer;
/**
* Constructor for the ChannelCallback object
*
*@param channel Description of the Parameter
*/
public ChannelCallback(SocketChannel channel) {
this.channel = channel;
this.buffer = new StringBuffer();
}
/**
* Description of the Method
*
*@exception IOException Description of the Exception
*/
public void execute() throws IOException {
System.out.println("DEBUG:" + this.buffer.toString());
writeMessage(this.channel, this.buffer.toString());
buffer = new StringBuffer();
}
/**
* Gets the channel attribute of the ChannelCallback object
*
*@return The channel value
*/
public SocketChannel getChannel() {
return this.channel;
}
/**
* Description of the Method
*
*@param values Description of the Parameter
*/
public void append(String values) {
buffer.append(values);
}
}
/**
* The main program for the NonBlockingServer class
*
*@param args The command line arguments
*/
public static void main(String[] args) {
NonBlockingServer nbServer = new NonBlockingServer();
try {
nbServer.initialize();
} catch (IOException e) {
e.printStackTrace();
System.exit(-1);
}
try {
nbServer.acceptConnections();
} catch (IOException e) {
e.printStackTrace();
System.out.println("ERROR:" + e);
} catch (InterruptedException e) {
System.out.println("INFO: Exiting normally...");
}
}
}
---------- END SOURCE ----------
(Review ID: 146267)
======================================================================
Name: nt126004 Date: 05/08/2002
FULL PRODUCT VERSION :
java version "1.4.0"
Java(TM) 2 Runtime Environment, Standard Edition (build 1.4.0-b92)
Java HotSpot(TM) Client VM (build 1.4.0-b92, mixed mode)
FULL OPERATING SYSTEM VERSION :
Windows NT Version 4.0
A DESCRIPTION OF THE PROBLEM :
I have created a basic test server and client just to try
out the nio features in 1.4.0.
When I run the server and then run a couple of clients and
leave them running everything seems fine. However if I
stress test the server by repeatedly terminating the client
processes a problem occurs very quickly at the server end.
The stack trace is below.
I can't figure out what is going on as it seems to be at a
very low level.
The fact that I was originally running two clients seems
irrelevant as it occurs very easily with just one client.
STEPS TO FOLLOW TO REPRODUCE THE PROBLEM :
1. Run server.NonBlockingServer
2. Run client.NonBlockingClient
3. Randomly terminate (Ctrl C) client and repeat step 2
until the server fails.
EXPECTED VERSUS ACTUAL BEHAVIOR :
I expected steps 2 and 3 to be endlessly repeatable
ERROR MESSAGES/STACK TRACES THAT OCCUR :
java.io.IOException: The parameter is incorrect
at sun.nio.ch.PollArrayWrapper.poll0(Native Method)
at sun.nio.ch.PollArrayWrapper.poll(PollArrayWrapper.java:146)
at sun.nio.ch.PollSelectorImpl.doSelect(PollSelectorImpl.java:46)
at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:62)
at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:67)
at server.NonBlockingServer.acceptConnections(NonBlockingServer.java:84)
at server.NonBlockingServer.main(NonBlockingServer.java:305)
This bug can be reproduced often.
---------- BEGIN SOURCE ----------
/* CmIdentification $Header$ */
/* Copyright (c) 1998, 2002 TAB Limited. All Rights Reserved. */
package client;
import java.io.*;
import java.nio.*;
import java.nio.channels.*;
import java.nio.channels.spi.*;
import java.nio.charset.*;
import java.net.*;
import java.util.*;
/**
* Description of the Class
*
*@author craig
*@created 7 May 2002
*/
public class NonBlockingClient {
private String name = null;
private NonBlockingReader nonBlockingReader = null;
int port = 4001;
Selector selector = null;
List selectableChannelsList = new ArrayList();
// SocketChannel selectableChannel = null;
int keysAdded = 0;
static String QUIT_SERVER = "quit";
static String SHUTDOWN = "shutdown";
String serverName = "mallett";
int numberOfChannels = 1;
/**
* Constructor for the NonBlockingClient object
*/
public NonBlockingClient(String name, int numberOfChannels) {
this.name = name;
this.numberOfChannels = numberOfChannels;
try {
this.selector = SelectorProvider.provider().openSelector();
} catch (IOException e) {
e.printStackTrace();
System.exit(-1);
}
}
/**
* Constructor for the NonBlockingClient object
*
*@param port Description of the Parameter
*/
public NonBlockingClient(String name, int numberOfChannels, int port) {
this.name = name;
this.port = port;
this.numberOfChannels = numberOfChannels;
try {
this.selector = SelectorProvider.provider().openSelector();
} catch (IOException e) {
e.printStackTrace();
System.exit(-1);
}
}
/**
* Description of the Method
*
*@exception IOException Description of the Exception
*/
public void initialize(int numberOfChannels) throws IOException {
}
/**
* Description of the Method
*
*@exception IOException Description of the Exception
*/
public void finalize() throws IOException {
Iterator it = selectableChannelsList.iterator();
while (it.hasNext()) {
((SocketChannel)it.next()).close();
}
this.selector.close();
}
/**
* Description of the Method
*
*@exception IOException Description of the Exception
*@exception InterruptedException Description of the Exception
*/
public void establishConnections() throws IOException, InterruptedException
{
nonBlockingReader = new NonBlockingReader();
for(int idx=0;idx<numberOfChannels;idx++) {
SocketChannel selChannel = SocketChannel.open();
selChannel.configureBlocking(false);
selectableChannelsList.add(selChannel);
System.out.println("DEBUG:@@@Registering");
nonBlockingReader.registerConnection(selChannel);
System.out.println("DEBUG:@@@Registered");
InetSocketAddress isa = new InetSocketAddress(serverName,
this.port);
System.out.println("DEBUG:@@@Connecting");
selChannel.connect(isa);
System.out.println("DEBUG:@@@Connected");
}
Thread readThread = new Thread(nonBlockingReader);
readThread.setDaemon(true);
readThread.start();
}
/**
* Description of the Method
*
*@param msg Description of the Parameter
*/
public void sendTestMessage(String msg) {
try {
Iterator it = selectableChannelsList.iterator();
while (it.hasNext()) {
SocketChannel sc = (SocketChannel)it.next();
writeMessage(sc, msg);
}
} catch (IOException ioe) {
ioe.printStackTrace();
}
}
/**
* Description of the Method
*
*@param channel Description of the Parameter
*@param message Description of the Parameter
*@exception IOException Description of the Exception
*/
public void writeMessage(SocketChannel channel, String message) throws
IOException {
ByteBuffer buf = ByteBuffer.wrap(message.getBytes());
int nbytes = channel.write(buf);
System.out.println("DEBUG:Wrote " + nbytes + " to channel.");
}
final static int BUFSIZE = 256;
/**
* Description of the Method
*
*@param byteBuffer Description of the Parameter
*@return Description of the Return Value
*@exception CharacterCodingException Description of the Exception
*/
public String decode(ByteBuffer byteBuffer) throws CharacterCodingException
{
Charset charset = Charset.forName("us-ascii");
CharsetDecoder decoder = charset.newDecoder();
CharBuffer charBuffer = decoder.decode(byteBuffer);
String result = charBuffer.toString();
return result;
}
/**
* Description of the Class
*
*@author craig
*@created 7 May 2002
*/
public class SocketClosedException extends IOException {
/**
* Constructor for the SocketClosedException object
*
*@param msg Description of the Parameter
*/
SocketClosedException(String msg) {
super(msg);
}
}
/**
* Description of the Class
*
*@author craig
*@created 7 May 2002
*/
public class ChannelCallback {
private SocketChannel channel;
private StringBuffer buffer;
/**
* Constructor for the ChannelCallback object
*
*@param channel Description of the Parameter
*/
public ChannelCallback(SocketChannel channel) {
this.channel = channel;
this.buffer = new StringBuffer();
}
/**
* Description of the Method
*
*@exception IOException Description of the Exception
*/
public void execute() throws IOException {
System.out.println("DEBUG:" + this.buffer.toString());
writeMessage(this.channel, this.buffer.toString());
buffer = new StringBuffer();
}
/**
* Gets the channel attribute of the ChannelCallback object
*
*@return The channel value
*/
public SocketChannel getChannel() {
return this.channel;
}
/**
* Description of the Method
*
*@param values Description of the Parameter
*/
public void append(String values) {
buffer.append(values);
}
}
/**
* Description of the Class
*
*@author craig
*@created 7 May 2002
*/
public class NonBlockingReader implements Runnable {
private Selector selector = null;
private int keysAdded = 0;
/**
* Description of the Method
*
*@param channel Description of the Parameter
*/
public synchronized void registerConnection(SocketChannel channel) {
try {
System.out.println("DEBUG:registerConnection:CHK1");
SelectionKey readKey =
channel.register(this.selector,
SelectionKey.OP_CONNECT | SelectionKey.OP_READ |
SelectionKey.OP_WRITE);
System.out.println("DEBUG:registerConnection:CHK2");
} catch (ClosedChannelException cce) {
cce.printStackTrace();
}
}
/**
* Constructor for the NonBlockingReader object
*/
public NonBlockingReader() {
try {
synchronized(this) {
this.selector = SelectorProvider.provider().openSelector();
}
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* Main processing method for the NonBlockingReader object
*/
public void run() {
try {
while (true) {
System.out.println("DEBUG:NonBlockingReader - Selector
started");
keysAdded = this.selector.select();
System.out.println("DEBUG:NonBlockingReader = readSelector
returned "
+ this.keysAdded + " ready for IO operations");
Set readyKeys = this.selector.selectedKeys();
Iterator i = readyKeys.iterator();
while (i.hasNext()) {
SelectionKey key = (SelectionKey) i.next();
i.remove();
if (key.isConnectable()) {
System.out.println("DEBUG:NonBlockingReader
Finishing Connect key read="
+ key.isReadable() + " write=" +
key.isWritable());
((SocketChannel) key.channel()).finishConnect();
}
if (key.isReadable()) {
SelectableChannel nextReady =
(SelectableChannel) key.channel();
System.out.println("DEBUG:NonBlockingReader Read
Processing selection key read="
+ key.isReadable() + " write=" +
key.isWritable());
ByteBuffer byteBuffer = ByteBuffer.allocate
(BUFSIZE);
int nbytes = ((SocketChannel) nextReady).read
(byteBuffer);
System.out.println("DEBUG:NonBlockingReader read
nbytes = " + nbytes);
byteBuffer.flip();
String result = decode(byteBuffer);
System.out.println("DEBUG:NonBlockingReader
result='" + result + "'");
} else if (key.isWritable()) {
System.out.println("DEBUG:NonBlockingReader
Writable key read="
+ key.isReadable() + " write=" +
key.isWritable());
}
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
this.selector.close();
} catch (IOException ioe) {
ioe.printStackTrace();
}
}
}
}
/**
* The main program for the NonBlockingClient class
*
*@param args The command line arguments
*/
public static void main(String[] args) {
int clientIdx = 1;
NonBlockingClient nbClient = new NonBlockingClient("Client"+
(clientIdx++),12);
try {
nbClient.establishConnections();
Thread.sleep(1000L);
int idx = 0;
while (true) {
nbClient.sendTestMessage("Test" + (idx++) + "\n");
Thread.sleep(1000L);
}
} catch (IOException e) {
e.printStackTrace();
System.out.println("ERROR:" + e);
} catch (InterruptedException e) {
System.out.println("INFO: Exiting normally...");
}
}
}
/* CmIdentification $Header$ */
/* Copyright (c) 1998, 2002 TAB Limited. All Rights Reserved. */
package server;
import java.io.*;
import java.nio.*;
import java.nio.channels.*;
import java.nio.channels.spi.*;
import java.nio.charset.*;
import java.net.*;
import java.util.*;
/**
* Description of the Class
*
*@author craig
*@created 7 May 2002
*/
public class NonBlockingServer {
int port = 4001;
Selector selector = null;
ServerSocketChannel selectableChannel = null;
int keysAdded = 0;
static String QUIT_SERVER = "quit";
static String SHUTDOWN = "shutdown";
/**
* Constructor for the NonBlockingServer object
*/
public NonBlockingServer() { }
/**
* Constructor for the NonBlockingServer object
*
*@param port Description of the Parameter
*/
public NonBlockingServer(int port) {
this.port = port;
}
/**
* Description of the Method
*
*@exception IOException Description of the Exception
*/
public void initialize() throws IOException {
this.selector = SelectorProvider.provider().openSelector();
this.selectableChannel = ServerSocketChannel.open();
this.selectableChannel.configureBlocking(false);
InetAddress lh = InetAddress.getLocalHost();
InetSocketAddress isa = new InetSocketAddress(lh, this.port);
this.selectableChannel.socket().bind(isa);
}
/**
* Description of the Method
*
*@exception IOException Description of the Exception
*/
public void finalize() throws IOException {
this.selectableChannel.close();
this.selector.close();
}
/**
* Description of the Method
*
*@exception IOException Description of the Exception
*@exception InterruptedException Description of the Exception
*/
public