package filerogue.server; import java.net.*; import java.io.IOException; import java.util.Hashtable; import java.util.logging.Level; import java.util.logging.Logger; import filerogue.Connection; import filerogue.ConnectionListener; import filerogue.ThreadState; import seda.nbio.*; // Non-Blocking PortMonitor // // This is a bit of a misnomer... NBPortMonitor DOES in fact block, // but it creates NonblockingSocket objects which are passed to a // non-blocking event handling framework, as opposed to forking // a new thread for every connection public class NBPortMonitor extends Thread implements ThreadState { private static final Logger logger = Logger.getLogger( "global" ); ConnectionListener conListener; // an interface to report connection events to int iPort; // port to listen on byte[] buffer = new byte[ 32768 ]; NonblockingServerSocket serverSocket; // the server socket which listens for incoming connections SelectSet selSet; // the master SelectSet, all incoming events come through this Hashtable hashConItems = new Hashtable(); // maps Connections to SelectItems for removal lookup private volatile boolean bAlive = true; // thread control variable private static String stBaseName; public NBPortMonitor( ConnectionListener listener, int port ) { stBaseName = "NBPortMonitor:" + port; setState( "Init" ); conListener = listener; iPort = port; } public void setState( String state ) { super.setName( stBaseName + "( " + state + " )" ); } // find the SelectItem for this connection and remove it from the SelectSet public void connectionKilled( Connection con ) { SelectItem item = ( SelectItem ) hashConItems.remove( con ); if ( item != null ) { selSet.remove( item ); } } public void run() { logger.log( Level.INFO, "NBPortMonitor started" ); SelectItem item; try { logger.log( Level.INFO, "Creating new SelectSet..." ); // create a new select set selSet = new SelectSet(); logger.log( Level.INFO, "Creating NonblockingServerSocket on port: " + iPort ); // create a server socket on the desired port serverSocket = new NonblockingServerSocket( iPort ); // create a SelectItem for this server socket and register the ACCEPT event with the select set item = new SelectItem( serverSocket, Selectable.ACCEPT_READY ); selSet.add( item ); logger.log( Level.INFO, "ACCEPT event added to SelectSet" ); } catch ( Exception x ) { bAlive = false; x.printStackTrace(); } NonblockingSocket clientSocket = null; long start, afterAccept, afterCreateCon, afterInitInputStream, afterNewConCalled, afterNewSelItem, afterSelSetAdd, afterHashPut; while ( bAlive ) { try { // logger.log( Level.FINE, "SelectSet.size(): " + selSet.size() ); setState( "Waiting for event" ); int events = selSet.select( -1 ); // -1 timeout means block until an event is detected setState( "Event(s) detected" ); if ( events == 0 ) { logger.log( Level.FINE, "No events detected!" ); continue; // skip to start of next while() iteration } SelectItem[] eventItems = selSet.getEvents(); if ( eventItems == null || eventItems.length == 0 ) { logger.log( Level.SEVERE, "getEvents() returned null or empty array!" ); continue; } if ( eventItems.length > 1 ) { logger.log( Level.INFO, "Received " + eventItems.length + " events, starting event-handling loop" ); } long eventStart; int eventMask; // loop through all the events for ( int i = 0; i < eventItems.length; i++ ) { eventStart = System.currentTimeMillis(); eventMask = eventItems[ i ].revents; setState( "Starting Event# " + i + ", Mask: " + eventItems[ i ].revents ); if ( eventItems[ i ].getSelectable() == serverSocket ) { start = System.currentTimeMillis(); setState( "Calling nbAccept()" ); // logger.log( Level.FINE, "ServerSocket Event" ); // this should return immediately clientSocket = serverSocket.nbAccept(); afterAccept = System.currentTimeMillis() - start; setState( "Returned from nbAccept()" ); if ( clientSocket == null ) { logger.log( Level.SEVERE, "ServerSocket received NULL client socket after event reported!" ); eventItems[ i ].revents = 0; continue; } else { // client socket not null logger.log( Level.INFO, "New connection: " + clientSocket ); setState( "Creating new connection object" ); // got a new socket, create a Connection object for it (but DON'T run it!!!) Connection con = new Connection( conListener, clientSocket ); afterCreateCon = System.currentTimeMillis() - start - afterAccept; setState( "Init input stream for " + con ); // open the input stream con.initInputStream(); afterInitInputStream = System.currentTimeMillis() - start - afterCreateCon; setState( "conListener.newConnection( " + con + " )" ); // notify connection listener conListener.newConnection( con ); afterNewConCalled = System.currentTimeMillis() - start - afterInitInputStream; setState( "Creating SelectItem for new connection " + con ); // OK, now create a new event (with attached Connection object) and add to SelectSet SelectItem clientItem = new SelectItem( clientSocket, con, ( short ) ( Selectable.READ_READY | Selectable.SELECT_ERROR ) ); afterNewSelItem = System.currentTimeMillis() - start - afterNewConCalled; setState( "Adding SelectItem for new connection to SelectSet" ); selSet.add( clientItem ); afterSelSetAdd = System.currentTimeMillis() - start - afterNewSelItem; // add to the lookup hashtable hashConItems.put( con, clientItem ); afterHashPut = System.currentTimeMillis() - start - afterSelSetAdd; logger.log( Level.FINE, "NewCon( accept:" + afterAccept + ", create: " + afterCreateCon + ", initInput: " + afterInitInputStream + ", newCon: " + afterNewConCalled + ", SelItem: " + afterNewSelItem + ", SelSetAdd: " + afterSelSetAdd + ", Hash: " + afterHashPut + " )" ); } } else // not a serverSocket event, must be an event from a client socket { // logger.log( Level.FINE, "ClientSocket Event" ); setState( "Starting Client Socket event" ); // first, get the associated Connection object Connection con = ( Connection ) eventItems[ i ].getObj(); if ( con == null ) { logger.log( Level.SEVERE, "NULL Connection state pointer!" ); eventItems[ i ].revents = 0; continue; } // check the event type if ( ( eventItems[ i ].revents & Selectable.SELECT_ERROR ) != 0 ) { // connection error setState( "SELECT_ERROR" ); // clear event mask right away eventItems[ i ].revents -= Selectable.SELECT_ERROR; logger.log( Level.INFO, "SELECT_ERROR detected for " + con + ", killing and removing" ); setState( "Removing SELECT_ERROR connection: " + con ); selSet.remove( eventItems[ i ] ); hashConItems.remove( con ); setState( "SELECT_ERROR: connectionError( " + con + " )" ); conListener.connectionError( con, null ); } if ( ( eventItems[ i ].revents & Selectable.READ_READY ) != 0 ) { // read event setState( "Starting READ_READY event" ); // clear event mask right away eventItems[ i ].revents -= Selectable.READ_READY; // logger.log( Level.INFO, "READ_READY" ); try { setState( "readObject() for " + con ); con.readObject(); } catch ( ClassNotFoundException x ) { logger.log( Level.SEVERE, "ClassNotFound! " + x.toString() ); } catch ( IOException x ) { logger.log( Level.INFO, "IO Error for " + con + " => " + x.toString() ); selSet.remove( eventItems[ i ] ); hashConItems.remove( con ); conListener.connectionError( con, x ); } } } // logger.log( Level.FINE, "Clearing event mask..." ); // clear the revents mask eventItems[ i ].revents = 0; if ( System.currentTimeMillis() - eventStart > 1000 ) { logger.log( Level.SEVERE, "LONG EVENT! Event " + i + "/" + eventItems.length + ", Mask: " + eventMask + " (" + ( System.currentTimeMillis() - eventStart ) + " ms)" ); } } } catch ( Exception x ) { logger.log( Level.SEVERE, "Uncaught exception in NBPortMonitor.run(): " + x.toString() ); System.err.println( filerogue.Utilities.getTimestamp() ); x.printStackTrace(); } } } public void kill() { bAlive = false; try { serverSocket.close(); } catch ( IOException x ) { /** @todo LOH */ } } }