package filerogue; import java.io.FilterInputStream; import java.io.IOException; import java.io.InputStream; import java.util.logging.Level; import java.util.logging.Logger; import java.util.zip.Deflater; /** * This class implements an output stream filter for compressing data in the * "deflate" compression format. * * It pre-buffers and compresses the data into self-contained * "packets" ahead of time (using an internal thread) for immediate * access when read() is called. */ public class CompressingInputStream extends FilterInputStream implements CompressionReporter { private static final Logger logger = Logger.getLogger( "global" ); /** set debug output */ private static final boolean bDebug = false; private static final int DEFAULT_CHUNK = 256 * 1024; private static final int DEFAULT_PACKETS = 2; private DeflaterThread deflaterThread; private Object obPauseLock = new Object(); // used to pause/unpause the internal thread private int iChunkSize; // the uncompressed size of each "packet" private int iPackets; // the number of compressed packets to keep prepared private byte[] currentPacket; private int currentPacketSize; private int currentChunkSize; private double currentCompressionFactor; private int packetBytesRead = 0; private boolean compressionWorking = false; // deflater thread vars private boolean bCompressing = false; private byte[] buffer; // buffer for uncompressed file data private int bytesRead; // count of bytes read private int bytesCompressed; // size of compressed packet private Deflater deflater = new Deflater( Deflater.BEST_COMPRESSION, true ); private int maxPacketPayload; // max packet size is uncompressed size + 10% private byte[][] compressedPackets; // the packet blocks (4 byte header + payload) private byte nextPacketStore = 0; // the next packet block to write to private byte nextPacketReady = -1; // the next packet block which is ready to send private byte currentPacketNumber = 0; // the packet which is currently being sent private boolean moreToRead = true; private boolean threadCreated = false; private long totalBytes; private int packetsPrepared = 0; private int packetsDelivered = 0; private IOException deflaterThreadIOException; /** * Creates a CompressingInputStream with the specified chunk size and packetStore. * Saves source for later use. * * @param source the underlying input stream * @param chunkSize the uncompressed size of each packet * @param packetStore the number of compressed packets to keep prepared * * @throws java.io.IOException compression error or underlying input stream exception */ public CompressingInputStream( InputStream source, int chunkSize, int packetStore ) throws IOException { super( source ); iChunkSize = chunkSize; iPackets = packetStore; buffer = new byte[ iChunkSize ]; maxPacketPayload = iChunkSize + ( iChunkSize / 10 ); compressedPackets = new byte[ iPackets ][ 8 + maxPacketPayload ]; // read and compress first packet, to test the compression compressionWorking = prepareNextPacket(); if ( compressionWorking ) // if compression is working, assume this stream will be used { // create and start deflater thread deflaterThread = new DeflaterThread(); deflaterThread.start(); threadCreated = true; } } /** * Creates a CompressingInputStream with a default chunk size of 256KB and * default packet store of 2. * * @param source the underlying input stream * * @throws java.io.IOException compression error or underlying input stream exception */ public CompressingInputStream( InputStream source ) throws IOException { this( source, DEFAULT_CHUNK, DEFAULT_PACKETS ); } /** * Determines whether or not the current compressed packet is smaller than * the uncompressed packet. * * @return true if compressed packet is smaller than original */ public boolean isCompressionWorking() { return compressionWorking; } /** * Closes this input stream and releases any system resources associated * with the stream. * * @throws IOException if an I/O error occurs. */ public void close() throws IOException { if ( threadCreated ) { deflaterThread.kill(); while ( deflaterThread.isAlive() ) { try { Thread.currentThread().sleep( 100 ); } catch ( InterruptedException e ) {} } if ( deflaterThreadIOException != null ) { throw deflaterThreadIOException; } } else { in.close(); } } /** * Reads up to len bytes of data into an array of bytes from this input * stream. * * @param b the buffer into which the data is read * @param off the start offset of the data * @param len the maximum number of bytes to read * * @return the total number of bytes read into the buffer, or -1 if there * is no more data because the end of the stream has been reached, * or 0 if a packet is still being prepared by the deflater thread. * * @throws java.io.IOException IO error in underlyng input stream */ public int read( byte[] b, int off, int len ) throws IOException { // debug( "- read( byte[" + b.length + "], " + off + ", " + len + " )" ); if ( !threadCreated ) { debug( "- read - Creating DeflaterThread." ); // create and start deflater thread deflaterThread = new DeflaterThread(); deflaterThread.start(); threadCreated = true; } if ( currentPacket == null || packetBytesRead >= currentPacketSize ) { // debug( "- read - TotalBytes: " + totalBytes + " + CurrentChunkSize: " + currentChunkSize ); totalBytes += currentChunkSize; if ( !deflaterThread.hasMorePackets() ) { debug( "- read - DeflaterThread has no more packets. Return -1." ); return -1; // all compressed data has been read } // try and get the next packet currentPacket = deflaterThread.getNextPacket(); // this means there's more to come, but deflater is still busy compressing if ( currentPacket == null ) { debug( "- read - CurrentPacket is null. Waiting for DeflaterThread. Return 0." ); currentChunkSize = 0; Thread.currentThread().yield(); return 0; } currentPacketSize = Utilities.toInt( currentPacket, 0 ) + 8; currentChunkSize = Utilities.toInt( currentPacket, 4 ); currentCompressionFactor = ( double ) currentChunkSize / currentPacketSize; packetBytesRead = 0; debug( "- read - CurrentPacketSize: " + currentPacketSize + " CurrentChunkSize: " + currentChunkSize + " CurrentCompressionFactor: " + currentCompressionFactor ); } // ok we should have a packet at this point, do bounds checking, then copy the requested portion to the read buffer if ( len + off > b.length ) { debug( "- read - Requested read exceeds buffer size. Truncating." ); len = b.length - off; } if ( len + packetBytesRead > currentPacketSize ) { debug( "- read - Requested read exceeds packet boundary. Truncating." ); len = currentPacketSize - packetBytesRead; } System.arraycopy( currentPacket, packetBytesRead, b, off, len ); packetBytesRead += len; debug( "- read - Requested Length: " + len + " PacketBytesRead: " + packetBytesRead + " / " + currentPacketSize ); if ( deflaterThreadIOException != null ) { throw deflaterThreadIOException; } return len; } // read in a "chunk" from the input stream and compress it to a packet private boolean prepareNextPacket() throws IOException { debug( "- prepareNextPacket()" ); bCompressing = true; // read in the chunk bytesRead = in.read( buffer ); debug( "- prepareNextPacket - Bytes Read from InputStream: " + bytesRead ); if ( bytesRead == -1 ) { moreToRead = false; } else { // set input for deflater, and tell it this is the complete data set deflater.setInput( buffer, 0, bytesRead ); deflater.finish(); // compress to waiting packet block, remember size bytesCompressed = deflater.deflate( compressedPackets[ nextPacketStore ], 8, maxPacketPayload ); debug( "- prepareNextPacket - Bytes of Compressed Data: " + bytesCompressed ); // now fill in the header Utilities.toByteArray( bytesCompressed, compressedPackets[ nextPacketStore ], 0 ); Utilities.toByteArray( bytesRead, compressedPackets[ nextPacketStore ], 4 ); // update counters if ( ++nextPacketStore >= iPackets ) { nextPacketStore = 0; } if ( nextPacketReady == -1 ) { nextPacketReady = 0; } packetsPrepared++; // reset deflater for next run deflater.reset(); } bCompressing = false; debug( "- prepareNextPacket - Next Packet Store: " + nextPacketStore + " Next Packet Ready: " + nextPacketReady + " Packets Prepared: " + packetsPrepared ); // returns whether the compressed packet (including header) is smaller than the original uncompressed chunk return bytesRead > ( bytesCompressed + 8 ); } // CompressionReporter interface /////////////////////// /** * Returns the compression factor of the current packet. * * @return compression factor as percentage */ public double getCurrentCompressionFactor() { return currentCompressionFactor; } /** * Returns the total bytes which have been read so far. * * @return total bytes read */ public long getTotalBytes() { return totalBytes; } /** * Internal deflater thread. */ public class DeflaterThread extends Thread { private boolean bPaused = false; private boolean bAlive = true; private int i; public DeflaterThread() { setPriority( NORM_PRIORITY ); } public void run() { try { // read initial packets (minus the first one, which is read during construction) for ( int i = 1; i < iPackets && moreToRead; i++ ) { debug( "* DeflaterThread - Preparing initial packet '" + i + "'." ); prepareNextPacket(); } // now go to our sleeping loop, woken once in a while to read a new packet while ( bAlive && moreToRead ) { // go to sleep if we're full of packets if ( nextPacketStore == currentPacketNumber ) { debug( "* DeflaterThread - Pausing." ); synchronized ( obPauseLock ) { bPaused = true; obPauseLock.wait(); } bPaused = false; debug( "* DeflaterThread - Woken up." ); } else { prepareNextPacket(); } } } catch ( InterruptedException e ) { deflaterThreadIOException = new IOException( "InterruptedException - " + e.toString() ); } catch ( IOException e ) { deflaterThreadIOException = e; } finally { try { in.close(); } catch ( IOException e ) { deflaterThreadIOException = e; } } } private void awaken() { if ( bPaused ) { synchronized ( obPauseLock ) { obPauseLock.notifyAll(); } } else { debug( "* DeflaterThread - Already awake." ); } } public void kill() { bAlive = false; awaken(); } public boolean hasMorePackets() { if ( moreToRead ) { return true; } return! ( nextPacketReady == nextPacketStore ); } // return null if deflater is still busy compressing the packet we need public byte[] getNextPacket() { if ( nextPacketReady == -1 || ( packetsDelivered >= packetsPrepared ) ) { return null; } // get packet and size // debug( "- getNextPacket - compressedPackets[ " + nextPacketReady + " ] = " + compressedPackets[ nextPacketReady ] ); byte[] array = compressedPackets[ nextPacketReady ]; // remember what packet the requestor is reading currentPacketNumber = nextPacketReady; // increment next packet if ( ++nextPacketReady >= iPackets ) { nextPacketReady = 0; } packetsDelivered++; // debug( "next store: " + nextPacketStore + ", next ready: " + nextPacketReady + ", currently reading: " + currentPacketNumber ); /** wakeup thread to compress next chunk */ awaken(); // debug( "After Get - next store: " + nextPacketStore + ", next ready: " + nextPacketReady + ", currently reading: " + currentPacketNumber ); // return size // debug( "pointer: " + pointer ); return array; } } private void debug( String stMessage ) { if ( bDebug ) { logger.log( Level.SEVERE, stMessage ); } } }