Problem in NIO server....
I have written an NIO server which gets data from aStatic queue and sends it to the clients... (this is my intention.. but the server doesn't)...
The problem is the server sends data to one client... and if another client is connected, the server sends data to the latest client and stops sending data to the older client...
But the printouts says that it has sent data to two clients...
I couldn't figure out the problem.. can anyone help me..
I have posted the server code here...
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.channels.spi.SelectorProvider;
import java.util.Iterator;
publicclass StreamingServerimplements Runnable{
// The channel on which we'll accept connections
private ServerSocketChannel serverChannel;
// The selector to be monitored for events
private Selector selector;
int numRead;
//public static boolean isAvailable = false;
int write = 0;
publicint socInit(int port){
try{
this.selector = this.initSelector(port);
System.out.println("Streaming engine started... " + port);
return 1;
}
catch(Exception e){
return -1;
}
}
private Selector initSelector(int port)throws IOException{
// Create a new selector
Selector socketSelector = SelectorProvider.provider().openSelector();
// Create a new non-blocking server socket channel
this.serverChannel = ServerSocketChannel.open();
serverChannel.configureBlocking(false);
// Bind the server socket to the specified address and port
InetSocketAddress isa =new InetSocketAddress(port);
serverChannel.socket().bind(isa);
System.out.println("Streaming server bound to port > " + isa.getHostName() +" : " + isa.getPort());
// Register the server socket channel, indicating an interest in
// accepting new connections
serverChannel.register(socketSelector, SelectionKey.OP_ACCEPT);
return socketSelector;
}
/**
*
*/
publicsynchronizedvoid run(){
while (true){
try{
//System.out.println("Streaming server Waiting for an event...");
// Wait for an event one of the registered channels
this.selector.select();
// Iterate over the set of keys for which events are available
Iterator selectedKeys = this.selector.selectedKeys().iterator();
int count = 0;
write = 0;
ByteBuffer readBuffer =null;
int queueSize = InternalEngine.queue.size();
if( queueSize >= 1){
readBuffer = (ByteBuffer)InternalEngine.queue.get(queueSize - 1);
InternalEngine.queue.removeElementAt(queueSize - 1);
}
else{
Thread.sleep(10);
}
while (selectedKeys.hasNext()){
SelectionKey key = (SelectionKey) selectedKeys.next();
selectedKeys.remove();
if (!key.isValid()){
System.out.println("This key is invalid...");
continue;
}
// Check what event is available and deal with it
if (key.isAcceptable()){
System.out.println("Accepting Connection...");
this.accept(key);
}
elseif(key.isWritable()){
System.out.println("Key is writable ...");
if(readBuffer !=null){
System.out.println("Readbuffer has data... Writing to client...");
//write the data
write(key,readBuffer);
count ++;
}else{
System.out.println("No data in readBuffer... NOT writing to client");
}
}
}//End of inner While
/*System.out.println("clients connected > " + count);
System.out.println("Written to clients --> " + write);*/
}catch (Exception e){
e.printStackTrace();
}
}// End of outer while
}
/**
*
* @param key
* @throws IOException
*/
privatevoid write(SelectionKey key, ByteBuffer readBuffer)throws IOException{
SocketChannel socketChannel = (SocketChannel) key.channel();
System.out.println("socketChannel assigned > " + socketChannel.socket().getRemoteSocketAddress());
System.out.println("Scoket channel is connected ? " + socketChannel.socket().isConnected());
// Attempt to write to the channel
try{
while(readBuffer.hasRemaining()){
System.out.println(socketChannel.write(readBuffer) +" bytes written to client");
}
write ++;
}catch (Exception e){
// The remote forcibly closed the connection, cancel
// the selection key and close the channel.
e.printStackTrace();
key.cancel();
socketChannel.close();
//return;
}
}
privatevoid accept(SelectionKey key)throws IOException{
// For an accept to be pending the channel must be a server socket channel.
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
// Accept the connection and make it non-blocking
System.out.println("Waiting for connections ... ");
SocketChannel socketChannel = serverSocketChannel.accept();
//Socket socket = socketChannel.socket();
socketChannel.configureBlocking(false);
// Register the new SocketChannel with our Selector, indicating
// we'd like to be notified when there's data waiting to be read
System.out.println("Registering OP_WRITE after OP_ACCEPT");
socketChannel.register(this.selector, SelectionKey.OP_WRITE);
}
}

