package org.gametrack.bf42.poll; import java.io.*; import java.net.*; import java.nio.*; import java.nio.channels.*; import java.util.*; import org.gametrack.bf42.*; import org.gametrack.bf42.game.*; import org.gametrack.util.*; /** *
Title: BFTracker
*Description: Tracks online Battlefield 1942 games, outputs game records
*Copyright: Copyright (c) 2003
*Company: bf1942.gametrack.org
* @author Brian Cairns * @version 1.0 */ // This class represents a Datagram channel connection. // Commands can be sent with send(). When new data is available, call receive(). // This class is specifically designed for serial access, // and instances share many common resources to minimize instance memory usage // single threaded usage of these instances only! public class Connection { public static volatile int recentPollsSent = 0; public static volatile int recentPollsRecieved = 0; public int packetType = -1; // 1 = BF42, 2 = BFV, -1 is unknown public final static int PACKET_BF42 = 1; public final static int PACKET_BFV = 2; public boolean DEBUG = false; public boolean active = false; // currently receiving data public boolean neededTestPoll = false; SocketAddress address; String hostName; Message message; // most recent message int attempt; int serverID; private static final int MAX_NO_RESPONSE = 2; // maximum number of polls not responded to before server is declared unresponsive public long lastPollSent; // last time a poll was sent public long lastResponseReceived; // last time data was received from this connection int noResponses = 0; // number of no responses in a row public boolean pollPending = false; static final ByteBuffer bufRecv = ByteBuffer.allocateDirect( 65536 ); static final boolean bArrayAccess = bufRecv.hasArray(); static final IntegerKey lookup = new IntegerKey( 0 ); // use this object to lookup hashmap entries private static final String PACKET_END = "queryid"; // indicates end of packet private static final String FINAL_PACKET = "final"; // indicates final packet DatagramChannel channel; private HashMap mapQueries = new HashMap(); // keep track of all partially completed messages public Connection( int sid, SocketAddress server ) throws IOException { serverID = sid; address = server; channel = DatagramChannel.open(); channel.configureBlocking( false ); channel.connect( server ); } public Connection( Server server ) throws IOException { this( server.serverID, server.address ); } public Connection( SocketAddress server ) throws IOException { this( -1, server ); } public static int clearRecentSent() { int temp = recentPollsSent; recentPollsSent = 0; return temp; } public static int clearRecentReceived() { int temp = recentPollsRecieved; recentPollsRecieved = 0; return temp; } public DatagramChannel getChannel() { return channel; } /** * send a poll command, and suppresses exceptions */ public final void sendPoll() { if( packetType == Packet.BF1942 ) { // Log.debug( 1, "Sending BF42 query #" + attempt + ", " + this.toString() ); safeWrite( Commands.BUF_POLL ); } else { // if not a known BF1942 server, we try BFV format first (there's only a very small response if we're wrong) // Log.debug( 1, "Sending BFV query #" + attempt + ", " + this.toString() ); safeWrite( Commands.BUF_BFV ); } } /** * Write a byte buffer to a channel and rewind it, supress IO exceptions * @param buffer the buffer to write from */ private void safeWrite( ByteBuffer buffer ) { // complain, don't send, if( lastPollSent > System.currentTimeMillis() - 500 ) { Log.err( "Early/Repeat poll? Attempt: " + attempt + ", Last: " + Formatting.timestamp( lastPollSent ) + ", current: " + Formatting.timestamp() + " - " + this ); return; } try { channel.write( buffer ); lastPollSent = System.currentTimeMillis(); recentPollsSent++; } catch( PortUnreachableException ex ) { Log.debug( 1, "Con.safeWrite(): Port unreachable - " + this ); } catch( IOException ex1 ) { Log.err( "Con.safeWrite() - " + ex1 ); } buffer.rewind(); } /** * Cancels any pending polls. Called if the connection appears invalid to avoid wasting rechecks */ void notResponding() { pollPending = false; if( noResponses++ == MAX_NO_RESPONSE ) { // if this is the cutoff, cut it off Log.log( "Con.notResponding(): " + this + " missed " + noResponses + " polls. (" + ( noResponses - 1 ) + "min)" ); active = false; ConnectionManager.isActive( this, false ); GameManager.notResponding( this.address ); } else { if( noResponses < MAX_NO_RESPONSE ) { GameTracker game = GameManager.getGame( this.address ); if( game != null ) { Log.debug( 2, "Con.notResponding(): Waiting.. " + game.summary() ); } } } } /** * Polls or checks a poll, returns true if we need another checkpoll after this */ public final boolean pollCheck( int attemptNumber ) { attempt = attemptNumber; if( attempt == 0 ) neededTestPoll = false; if( neededTestPoll ) attempt--; if( attempt == 0 ) { message = null; synchronized( mapQueries ) { mapQueries.clear(); } pollPending = true; sendPoll(); return true; } else if( pollPending ) { if( attempt >= 2 && mapQueries.size() == 0 ) { // if after 2 replies still not even a PARTIAL response, we assume not responding Log.debug( 1, "PollCheck - " + this + " no response after 2 attempts, cancelling remaining retries" ); notResponding(); return false; } sendPoll(); // if this is FINAL poll, return false, as there is no point in checking this one if( attemptNumber + 1 >= Config.POLL_ATTEMPTS ) { Log.err( "Sending final poll for " + this +", " + getSummary() ); return false; } else { return true; } } return false; } /** * Send directly from a buffer * @param buf the send buffer */ public final void send_notused( ByteBuffer buf ) throws IOException { channel.write( buf ); buf.rewind(); } /** * Send directly from a buffer[] * @param buf the send buffer */ public final void send_notused( ByteBuffer[] buf ) throws IOException { channel.write( buf ); for( int i = 0; i < buf.length; i++ ) { buf[i].rewind(); } } /** * Call when new data is available * final for speed */ public final void receive( long time ) throws IOException { byte[] array; int readCount; readCount = channel.read( bufRecv ); bufRecv.flip(); recentPollsRecieved++; noResponses = 0; lastResponseReceived = time; // we've read data.. check if this means we're newly active if( !active ) { active = true; ConnectionManager.isActive( this, true ); } /** @todo use a pool of byte[] to avoid new alloc for each receive() (we will get thousands a minute) */ array = new byte[readCount]; bufRecv.get( array ); bufRecv.clear(); Log.debug( 6, this.toString() + ", received " + readCount + ( array.length > 0 ? ", b[0]=" + array[0] : "" ) ); // check for packet type.. BF1942 or BFV /** @todo reuse packet objects also */ Packet packet = new Packet( array ); if( packet.type == packet.BFVIETNAM ) { packetType = Packet.BFVIETNAM; // just make a message and add it try { pollPending = false; Message message = null; message = new Message( packet, address, time, lastPollSent ); ConnectionManager.completedMessage( message ); } catch( PacketException ex1 ) { packetType = -1; Log.err( "Con.receive(): Vietnam packet exception: " + ex1 ); } } else if( packet.type == packet.BF1942 ) { // check if we have real data, or just a dummy reply if( packet.data.length == 3 || // sometimes we get '\f\' ( packet.data.length <= 30 && packet.isFinal && packet.segment == 1 ) ) { // this is a BF1942 server which was sent a BFV query. // it will just return '\final\\queryid\xxxx.1' // set its type, reset attempt to 0, and exit packetType = Packet.BF1942; neededTestPoll = true; // this tells the connection that we wasted the first attempt for a failed BFV check return; } // add to existing message, or create new one lookup.set( packet.queryID ); try { // check for packet exceptions if( ( message = ( Message )mapQueries.get( lookup ) ) != null ) { // add() returns true if the message is now complete if( message.add( packet ) ) { synchronized( mapQueries ) { mapQueries.remove( lookup ); } } } else { /** @todo reuse Message objects */ message = new Message( packet, address, time, lastPollSent ); // if message has more to come, track it for future reference if( !message.complete ) { synchronized( mapQueries ) { mapQueries.put( new IntegerKey( lookup.value ), message ); } } } if( message == null ) { Log.err( "NULL message from " + this ); } else if( message.complete ) { pollPending = false; if( attempt != 0 ) { Log.debug( 3, "RETRY[" + attempt + "] succeeded for " + this +", queryID=" + message.queryID + ", " + getSummary() ); // handle the completed message } // Log.debug( "completed 42 message" ); ConnectionManager.completedMessage( message ); } } catch( PacketException ex ) { // set packet type to unknown packetType = -1; Log.log( "Connection.receive(): invalid packet from: " + this.toString() + " - " + ex ); } } } public void handleBFVData( byte[] data, long time ) { } public void handleBF42Data( byte[] data, long time ) { } public String getSummary() { return "Msgs: " + getSpreads(); } public void setHostName( String s ) { hostName = s; } public String getName() { return hostName; } public String getSpreads() { StringBuffer buf = new StringBuffer( "(" ); Message msg; synchronized( mapQueries ) { for( Iterator iter = mapQueries.values().iterator(); iter.hasNext(); ) { msg = ( Message )iter.next(); buf.append( msg.getSummary() ).append( ":" ).append( msg.getLag() ); if( iter.hasNext() ) { buf.append( "," ); } } } return buf.append( ")" ).toString(); } public String getChar() { String s = address.toString(); int pos = s.indexOf( ":" ); return s.substring( pos - 1, pos ); } public String toString() { if( hostName == null ) { return "<" + serverID + "> " + address.toString(); } else { return "<" + serverID + "> [" + address.toString() + "] " + hostName; } } }