| //#define Trace |
| |
| // ParallelDeflateOutputStream.cs |
| // ------------------------------------------------------------------ |
| // |
| // A DeflateStream that does compression only, it uses a |
| // divide-and-conquer approach with multiple threads to exploit multiple |
| // CPUs for the DEFLATE computation. |
| // |
| // last saved: <2011-July-31 14:49:40> |
| // |
| // ------------------------------------------------------------------ |
| // |
| // Copyright (c) 2009-2011 by Dino Chiesa |
| // All rights reserved! |
| // |
| // This code module is part of DotNetZip, a zipfile class library. |
| // |
| // ------------------------------------------------------------------ |
| // |
| // This code is licensed under the Microsoft Public License. |
| // See the file License.txt for the license details. |
| // More info on: http://dotnetzip.codeplex.com |
| // |
| // ------------------------------------------------------------------ |
| |
| using System; |
| using System.Collections.Generic; |
| using System.Threading; |
| using System.IO; |
| |
| |
| namespace OfficeOpenXml.Packaging.Ionic.Zlib |
| { |
| internal class WorkItem |
| { |
| public byte[] buffer; |
| public byte[] compressed; |
| public int crc; |
| public int index; |
| public int ordinal; |
| public int inputBytesAvailable; |
| public int compressedBytesAvailable; |
| public ZlibCodec compressor; |
| |
| public WorkItem(int size, |
| Ionic.Zlib.CompressionLevel compressLevel, |
| CompressionStrategy strategy, |
| int ix) |
| { |
| this.buffer= new byte[size]; |
| // alloc 5 bytes overhead for every block (margin of safety= 2) |
| int n = size + ((size / 32768)+1) * 5 * 2; |
| this.compressed = new byte[n]; |
| this.compressor = new ZlibCodec(); |
| this.compressor.InitializeDeflate(compressLevel, false); |
| this.compressor.OutputBuffer = this.compressed; |
| this.compressor.InputBuffer = this.buffer; |
| this.index = ix; |
| } |
| } |
| |
| /// <summary> |
| /// A class for compressing streams using the |
| /// Deflate algorithm with multiple threads. |
| /// </summary> |
| /// |
| /// <remarks> |
| /// <para> |
| /// This class performs DEFLATE compression through writing. For |
| /// more information on the Deflate algorithm, see IETF RFC 1951, |
| /// "DEFLATE Compressed Data Format Specification version 1.3." |
| /// </para> |
| /// |
| /// <para> |
| /// This class is similar to <see cref="Ionic.Zlib.DeflateStream"/>, except |
| /// that this class is for compression only, and this implementation uses an |
| /// approach that employs multiple worker threads to perform the DEFLATE. On |
| /// a multi-cpu or multi-core computer, the performance of this class can be |
| /// significantly higher than the single-threaded DeflateStream, particularly |
| /// for larger streams. How large? Anything over 10mb is a good candidate |
| /// for parallel compression. |
| /// </para> |
| /// |
| /// <para> |
| /// The tradeoff is that this class uses more memory and more CPU than the |
| /// vanilla DeflateStream, and also is less efficient as a compressor. For |
| /// large files the size of the compressed data stream can be less than 1% |
| /// larger than the size of a compressed data stream from the vanialla |
| /// DeflateStream. For smaller files the difference can be larger. The |
| /// difference will also be larger if you set the BufferSize to be lower than |
| /// the default value. Your mileage may vary. Finally, for small files, the |
| /// ParallelDeflateOutputStream can be much slower than the vanilla |
| /// DeflateStream, because of the overhead associated to using the thread |
| /// pool. |
| /// </para> |
| /// |
| /// </remarks> |
| /// <seealso cref="Ionic.Zlib.DeflateStream" /> |
| public class ParallelDeflateOutputStream : System.IO.Stream |
| { |
| |
| private static readonly int IO_BUFFER_SIZE_DEFAULT = 64 * 1024; // 128k |
| private static readonly int BufferPairsPerCore = 4; |
| |
| private System.Collections.Generic.List<WorkItem> _pool; |
| private bool _leaveOpen; |
| private bool emitting; |
| private System.IO.Stream _outStream; |
| private int _maxBufferPairs; |
| private int _bufferSize = IO_BUFFER_SIZE_DEFAULT; |
| private AutoResetEvent _newlyCompressedBlob; |
| //private ManualResetEvent _writingDone; |
| //private ManualResetEvent _sessionReset; |
| private object _outputLock = new object(); |
| private bool _isClosed; |
| private bool _firstWriteDone; |
| private int _currentlyFilling; |
| private int _lastFilled; |
| private int _lastWritten; |
| private int _latestCompressed; |
| private int _Crc32; |
| private Ionic.Crc.CRC32 _runningCrc; |
| private object _latestLock = new object(); |
| private System.Collections.Generic.Queue<int> _toWrite; |
| private System.Collections.Generic.Queue<int> _toFill; |
| private Int64 _totalBytesProcessed; |
| private Ionic.Zlib.CompressionLevel _compressLevel; |
| private volatile Exception _pendingException; |
| private bool _handlingException; |
| private object _eLock = new Object(); // protects _pendingException |
| |
| // This bitfield is used only when Trace is defined. |
| //private TraceBits _DesiredTrace = TraceBits.Write | TraceBits.WriteBegin | |
| //TraceBits.WriteDone | TraceBits.Lifecycle | TraceBits.Fill | TraceBits.Flush | |
| //TraceBits.Session; |
| |
| //private TraceBits _DesiredTrace = TraceBits.WriteBegin | TraceBits.WriteDone | TraceBits.Synch | TraceBits.Lifecycle | TraceBits.Session ; |
| |
| private TraceBits _DesiredTrace = |
| TraceBits.Session | |
| TraceBits.Compress | |
| TraceBits.WriteTake | |
| TraceBits.WriteEnter | |
| TraceBits.EmitEnter | |
| TraceBits.EmitDone | |
| TraceBits.EmitLock | |
| TraceBits.EmitSkip | |
| TraceBits.EmitBegin; |
| |
| /// <summary> |
| /// Create a ParallelDeflateOutputStream. |
| /// </summary> |
| /// <remarks> |
| /// |
| /// <para> |
| /// This stream compresses data written into it via the DEFLATE |
| /// algorithm (see RFC 1951), and writes out the compressed byte stream. |
| /// </para> |
| /// |
| /// <para> |
| /// The instance will use the default compression level, the default |
| /// buffer sizes and the default number of threads and buffers per |
| /// thread. |
| /// </para> |
| /// |
| /// <para> |
| /// This class is similar to <see cref="Ionic.Zlib.DeflateStream"/>, |
| /// except that this implementation uses an approach that employs |
| /// multiple worker threads to perform the DEFLATE. On a multi-cpu or |
| /// multi-core computer, the performance of this class can be |
| /// significantly higher than the single-threaded DeflateStream, |
| /// particularly for larger streams. How large? Anything over 10mb is |
| /// a good candidate for parallel compression. |
| /// </para> |
| /// |
| /// </remarks> |
| /// |
| /// <example> |
| /// |
| /// This example shows how to use a ParallelDeflateOutputStream to compress |
| /// data. It reads a file, compresses it, and writes the compressed data to |
| /// a second, output file. |
| /// |
| /// <code> |
| /// byte[] buffer = new byte[WORKING_BUFFER_SIZE]; |
| /// int n= -1; |
| /// String outputFile = fileToCompress + ".compressed"; |
| /// using (System.IO.Stream input = System.IO.File.OpenRead(fileToCompress)) |
| /// { |
| /// using (var raw = System.IO.File.Create(outputFile)) |
| /// { |
| /// using (Stream compressor = new ParallelDeflateOutputStream(raw)) |
| /// { |
| /// while ((n= input.Read(buffer, 0, buffer.Length)) != 0) |
| /// { |
| /// compressor.Write(buffer, 0, n); |
| /// } |
| /// } |
| /// } |
| /// } |
| /// </code> |
| /// <code lang="VB"> |
| /// Dim buffer As Byte() = New Byte(4096) {} |
| /// Dim n As Integer = -1 |
| /// Dim outputFile As String = (fileToCompress & ".compressed") |
| /// Using input As Stream = File.OpenRead(fileToCompress) |
| /// Using raw As FileStream = File.Create(outputFile) |
| /// Using compressor As Stream = New ParallelDeflateOutputStream(raw) |
| /// Do While (n <> 0) |
| /// If (n > 0) Then |
| /// compressor.Write(buffer, 0, n) |
| /// End If |
| /// n = input.Read(buffer, 0, buffer.Length) |
| /// Loop |
| /// End Using |
| /// End Using |
| /// End Using |
| /// </code> |
| /// </example> |
| /// <param name="stream">The stream to which compressed data will be written.</param> |
| public ParallelDeflateOutputStream(System.IO.Stream stream) |
| : this(stream, CompressionLevel.Default, CompressionStrategy.Default, false) |
| { |
| } |
| |
| /// <summary> |
| /// Create a ParallelDeflateOutputStream using the specified CompressionLevel. |
| /// </summary> |
| /// <remarks> |
| /// See the <see cref="ParallelDeflateOutputStream(System.IO.Stream)"/> |
| /// constructor for example code. |
| /// </remarks> |
| /// <param name="stream">The stream to which compressed data will be written.</param> |
| /// <param name="level">A tuning knob to trade speed for effectiveness.</param> |
| public ParallelDeflateOutputStream(System.IO.Stream stream, CompressionLevel level) |
| : this(stream, level, CompressionStrategy.Default, false) |
| { |
| } |
| |
| /// <summary> |
| /// Create a ParallelDeflateOutputStream and specify whether to leave the captive stream open |
| /// when the ParallelDeflateOutputStream is closed. |
| /// </summary> |
| /// <remarks> |
| /// See the <see cref="ParallelDeflateOutputStream(System.IO.Stream)"/> |
| /// constructor for example code. |
| /// </remarks> |
| /// <param name="stream">The stream to which compressed data will be written.</param> |
| /// <param name="leaveOpen"> |
| /// true if the application would like the stream to remain open after inflation/deflation. |
| /// </param> |
| public ParallelDeflateOutputStream(System.IO.Stream stream, bool leaveOpen) |
| : this(stream, CompressionLevel.Default, CompressionStrategy.Default, leaveOpen) |
| { |
| } |
| |
| /// <summary> |
| /// Create a ParallelDeflateOutputStream and specify whether to leave the captive stream open |
| /// when the ParallelDeflateOutputStream is closed. |
| /// </summary> |
| /// <remarks> |
| /// See the <see cref="ParallelDeflateOutputStream(System.IO.Stream)"/> |
| /// constructor for example code. |
| /// </remarks> |
| /// <param name="stream">The stream to which compressed data will be written.</param> |
| /// <param name="level">A tuning knob to trade speed for effectiveness.</param> |
| /// <param name="leaveOpen"> |
| /// true if the application would like the stream to remain open after inflation/deflation. |
| /// </param> |
| public ParallelDeflateOutputStream(System.IO.Stream stream, CompressionLevel level, bool leaveOpen) |
| : this(stream, CompressionLevel.Default, CompressionStrategy.Default, leaveOpen) |
| { |
| } |
| |
| /// <summary> |
| /// Create a ParallelDeflateOutputStream using the specified |
| /// CompressionLevel and CompressionStrategy, and specifying whether to |
| /// leave the captive stream open when the ParallelDeflateOutputStream is |
| /// closed. |
| /// </summary> |
| /// <remarks> |
| /// See the <see cref="ParallelDeflateOutputStream(System.IO.Stream)"/> |
| /// constructor for example code. |
| /// </remarks> |
| /// <param name="stream">The stream to which compressed data will be written.</param> |
| /// <param name="level">A tuning knob to trade speed for effectiveness.</param> |
| /// <param name="strategy"> |
| /// By tweaking this parameter, you may be able to optimize the compression for |
| /// data with particular characteristics. |
| /// </param> |
| /// <param name="leaveOpen"> |
| /// true if the application would like the stream to remain open after inflation/deflation. |
| /// </param> |
| public ParallelDeflateOutputStream(System.IO.Stream stream, |
| CompressionLevel level, |
| CompressionStrategy strategy, |
| bool leaveOpen) |
| { |
| TraceOutput(TraceBits.Lifecycle | TraceBits.Session, "-------------------------------------------------------"); |
| TraceOutput(TraceBits.Lifecycle | TraceBits.Session, "Create {0:X8}", this.GetHashCode()); |
| _outStream = stream; |
| _compressLevel= level; |
| Strategy = strategy; |
| _leaveOpen = leaveOpen; |
| this.MaxBufferPairs = 16; // default |
| } |
| |
| |
| /// <summary> |
| /// The ZLIB strategy to be used during compression. |
| /// </summary> |
| /// |
| public CompressionStrategy Strategy |
| { |
| get; |
| private set; |
| } |
| |
| /// <summary> |
| /// The maximum number of buffer pairs to use. |
| /// </summary> |
| /// |
| /// <remarks> |
| /// <para> |
| /// This property sets an upper limit on the number of memory buffer |
| /// pairs to create. The implementation of this stream allocates |
| /// multiple buffers to facilitate parallel compression. As each buffer |
| /// fills up, this stream uses <see |
| /// cref="System.Threading.ThreadPool.QueueUserWorkItem(WaitCallback)"> |
| /// ThreadPool.QueueUserWorkItem()</see> |
| /// to compress those buffers in a background threadpool thread. After a |
| /// buffer is compressed, it is re-ordered and written to the output |
| /// stream. |
| /// </para> |
| /// |
| /// <para> |
| /// A higher number of buffer pairs enables a higher degree of |
| /// parallelism, which tends to increase the speed of compression on |
| /// multi-cpu computers. On the other hand, a higher number of buffer |
| /// pairs also implies a larger memory consumption, more active worker |
| /// threads, and a higher cpu utilization for any compression. This |
| /// property enables the application to limit its memory consumption and |
| /// CPU utilization behavior depending on requirements. |
| /// </para> |
| /// |
| /// <para> |
| /// For each compression "task" that occurs in parallel, there are 2 |
| /// buffers allocated: one for input and one for output. This property |
| /// sets a limit for the number of pairs. The total amount of storage |
| /// space allocated for buffering will then be (N*S*2), where N is the |
| /// number of buffer pairs, S is the size of each buffer (<see |
| /// cref="BufferSize"/>). By default, DotNetZip allocates 4 buffer |
| /// pairs per CPU core, so if your machine has 4 cores, and you retain |
| /// the default buffer size of 128k, then the |
| /// ParallelDeflateOutputStream will use 4 * 4 * 2 * 128kb of buffer |
| /// memory in total, or 4mb, in blocks of 128kb. If you then set this |
| /// property to 8, then the number will be 8 * 2 * 128kb of buffer |
| /// memory, or 2mb. |
| /// </para> |
| /// |
| /// <para> |
| /// CPU utilization will also go up with additional buffers, because a |
| /// larger number of buffer pairs allows a larger number of background |
| /// threads to compress in parallel. If you find that parallel |
| /// compression is consuming too much memory or CPU, you can adjust this |
| /// value downward. |
| /// </para> |
| /// |
| /// <para> |
| /// The default value is 16. Different values may deliver better or |
| /// worse results, depending on your priorities and the dynamic |
| /// performance characteristics of your storage and compute resources. |
| /// </para> |
| /// |
| /// <para> |
| /// This property is not the number of buffer pairs to use; it is an |
| /// upper limit. An illustration: Suppose you have an application that |
| /// uses the default value of this property (which is 16), and it runs |
| /// on a machine with 2 CPU cores. In that case, DotNetZip will allocate |
| /// 4 buffer pairs per CPU core, for a total of 8 pairs. The upper |
| /// limit specified by this property has no effect. |
| /// </para> |
| /// |
| /// <para> |
| /// The application can set this value at any time, but it is effective |
| /// only before the first call to Write(), which is when the buffers are |
| /// allocated. |
| /// </para> |
| /// </remarks> |
| public int MaxBufferPairs |
| { |
| get |
| { |
| return _maxBufferPairs; |
| } |
| set |
| { |
| if (value < 4) |
| throw new ArgumentException("MaxBufferPairs", |
| "Value must be 4 or greater."); |
| _maxBufferPairs = value; |
| } |
| } |
| |
| /// <summary> |
| /// The size of the buffers used by the compressor threads. |
| /// </summary> |
| /// <remarks> |
| /// |
| /// <para> |
| /// The default buffer size is 128k. The application can set this value |
| /// at any time, but it is effective only before the first Write(). |
| /// </para> |
| /// |
| /// <para> |
| /// Larger buffer sizes implies larger memory consumption but allows |
| /// more efficient compression. Using smaller buffer sizes consumes less |
| /// memory but may result in less effective compression. For example, |
| /// using the default buffer size of 128k, the compression delivered is |
| /// within 1% of the compression delivered by the single-threaded <see |
| /// cref="Ionic.Zlib.DeflateStream"/>. On the other hand, using a |
| /// BufferSize of 8k can result in a compressed data stream that is 5% |
| /// larger than that delivered by the single-threaded |
| /// <c>DeflateStream</c>. Excessively small buffer sizes can also cause |
| /// the speed of the ParallelDeflateOutputStream to drop, because of |
| /// larger thread scheduling overhead dealing with many many small |
| /// buffers. |
| /// </para> |
| /// |
| /// <para> |
| /// The total amount of storage space allocated for buffering will be |
| /// (N*S*2), where N is the number of buffer pairs, and S is the size of |
| /// each buffer (this property). There are 2 buffers used by the |
| /// compressor, one for input and one for output. By default, DotNetZip |
| /// allocates 4 buffer pairs per CPU core, so if your machine has 4 |
| /// cores, then the number of buffer pairs used will be 16. If you |
| /// accept the default value of this property, 128k, then the |
| /// ParallelDeflateOutputStream will use 16 * 2 * 128kb of buffer memory |
| /// in total, or 4mb, in blocks of 128kb. If you set this property to |
| /// 64kb, then the number will be 16 * 2 * 64kb of buffer memory, or |
| /// 2mb. |
| /// </para> |
| /// |
| /// </remarks> |
| public int BufferSize |
| { |
| get { return _bufferSize;} |
| set |
| { |
| if (value < 1024) |
| throw new ArgumentOutOfRangeException("BufferSize", |
| "BufferSize must be greater than 1024 bytes"); |
| _bufferSize = value; |
| } |
| } |
| |
| /// <summary> |
| /// The CRC32 for the data that was written out, prior to compression. |
| /// </summary> |
| /// <remarks> |
| /// This value is meaningful only after a call to Close(). |
| /// </remarks> |
| public int Crc32 { get { return _Crc32; } } |
| |
| |
| /// <summary> |
| /// The total number of uncompressed bytes processed by the ParallelDeflateOutputStream. |
| /// </summary> |
| /// <remarks> |
| /// This value is meaningful only after a call to Close(). |
| /// </remarks> |
| public Int64 BytesProcessed { get { return _totalBytesProcessed; } } |
| |
| |
| private void _InitializePoolOfWorkItems() |
| { |
| _toWrite = new Queue<int>(); |
| _toFill = new Queue<int>(); |
| _pool = new System.Collections.Generic.List<WorkItem>(); |
| int nTasks = BufferPairsPerCore * Environment.ProcessorCount; |
| nTasks = Math.Min(nTasks, _maxBufferPairs); |
| for(int i=0; i < nTasks; i++) |
| { |
| _pool.Add(new WorkItem(_bufferSize, _compressLevel, Strategy, i)); |
| _toFill.Enqueue(i); |
| } |
| |
| _newlyCompressedBlob = new AutoResetEvent(false); |
| _runningCrc = new Ionic.Crc.CRC32(); |
| _currentlyFilling = -1; |
| _lastFilled = -1; |
| _lastWritten = -1; |
| _latestCompressed = -1; |
| } |
| |
| |
| |
| |
| /// <summary> |
| /// Write data to the stream. |
| /// </summary> |
| /// |
| /// <remarks> |
| /// |
| /// <para> |
| /// To use the ParallelDeflateOutputStream to compress data, create a |
| /// ParallelDeflateOutputStream with CompressionMode.Compress, passing a |
| /// writable output stream. Then call Write() on that |
| /// ParallelDeflateOutputStream, providing uncompressed data as input. The |
| /// data sent to the output stream will be the compressed form of the data |
| /// written. |
| /// </para> |
| /// |
| /// <para> |
| /// To decompress data, use the <see cref="Ionic.Zlib.DeflateStream"/> class. |
| /// </para> |
| /// |
| /// </remarks> |
| /// <param name="buffer">The buffer holding data to write to the stream.</param> |
| /// <param name="offset">the offset within that data array to find the first byte to write.</param> |
| /// <param name="count">the number of bytes to write.</param> |
| public override void Write(byte[] buffer, int offset, int count) |
| { |
| bool mustWait = false; |
| |
| // This method does this: |
| // 0. handles any pending exceptions |
| // 1. write any buffers that are ready to be written, |
| // 2. fills a work buffer; when full, flip state to 'Filled', |
| // 3. if more data to be written, goto step 1 |
| |
| if (_isClosed) |
| throw new InvalidOperationException(); |
| |
| // dispense any exceptions that occurred on the BG threads |
| if (_pendingException != null) |
| { |
| _handlingException = true; |
| var pe = _pendingException; |
| _pendingException = null; |
| throw pe; |
| } |
| |
| if (count == 0) return; |
| |
| if (!_firstWriteDone) |
| { |
| // Want to do this on first Write, first session, and not in the |
| // constructor. We want to allow MaxBufferPairs to |
| // change after construction, but before first Write. |
| _InitializePoolOfWorkItems(); |
| _firstWriteDone = true; |
| } |
| |
| |
| do |
| { |
| // may need to make buffers available |
| EmitPendingBuffers(false, mustWait); |
| |
| mustWait = false; |
| // use current buffer, or get a new buffer to fill |
| int ix = -1; |
| if (_currentlyFilling >= 0) |
| { |
| ix = _currentlyFilling; |
| TraceOutput(TraceBits.WriteTake, |
| "Write notake wi({0}) lf({1})", |
| ix, |
| _lastFilled); |
| } |
| else |
| { |
| TraceOutput(TraceBits.WriteTake, "Write take?"); |
| if (_toFill.Count == 0) |
| { |
| // no available buffers, so... need to emit |
| // compressed buffers. |
| mustWait = true; |
| continue; |
| } |
| |
| ix = _toFill.Dequeue(); |
| TraceOutput(TraceBits.WriteTake, |
| "Write take wi({0}) lf({1})", |
| ix, |
| _lastFilled); |
| ++_lastFilled; // TODO: consider rollover? |
| } |
| |
| WorkItem workitem = _pool[ix]; |
| |
| int limit = ((workitem.buffer.Length - workitem.inputBytesAvailable) > count) |
| ? count |
| : (workitem.buffer.Length - workitem.inputBytesAvailable); |
| |
| workitem.ordinal = _lastFilled; |
| |
| TraceOutput(TraceBits.Write, |
| "Write lock wi({0}) ord({1}) iba({2})", |
| workitem.index, |
| workitem.ordinal, |
| workitem.inputBytesAvailable |
| ); |
| |
| // copy from the provided buffer to our workitem, starting at |
| // the tail end of whatever data we might have in there currently. |
| Buffer.BlockCopy(buffer, |
| offset, |
| workitem.buffer, |
| workitem.inputBytesAvailable, |
| limit); |
| |
| count -= limit; |
| offset += limit; |
| workitem.inputBytesAvailable += limit; |
| if (workitem.inputBytesAvailable == workitem.buffer.Length) |
| { |
| // No need for interlocked.increment: the Write() |
| // method is documented as not multi-thread safe, so |
| // we can assume Write() calls come in from only one |
| // thread. |
| TraceOutput(TraceBits.Write, |
| "Write QUWI wi({0}) ord({1}) iba({2}) nf({3})", |
| workitem.index, |
| workitem.ordinal, |
| workitem.inputBytesAvailable ); |
| |
| if (!ThreadPool.QueueUserWorkItem( _DeflateOne, workitem )) |
| throw new Exception("Cannot enqueue workitem"); |
| |
| _currentlyFilling = -1; // will get a new buffer next time |
| } |
| else |
| _currentlyFilling = ix; |
| |
| if (count > 0) |
| TraceOutput(TraceBits.WriteEnter, "Write more"); |
| } |
| while (count > 0); // until no more to write |
| |
| TraceOutput(TraceBits.WriteEnter, "Write exit"); |
| return; |
| } |
| |
| |
| |
| private void _FlushFinish() |
| { |
| // After writing a series of compressed buffers, each one closed |
| // with Flush.Sync, we now write the final one as Flush.Finish, |
| // and then stop. |
| byte[] buffer = new byte[128]; |
| var compressor = new ZlibCodec(); |
| int rc = compressor.InitializeDeflate(_compressLevel, false); |
| compressor.InputBuffer = null; |
| compressor.NextIn = 0; |
| compressor.AvailableBytesIn = 0; |
| compressor.OutputBuffer = buffer; |
| compressor.NextOut = 0; |
| compressor.AvailableBytesOut = buffer.Length; |
| rc = compressor.Deflate(FlushType.Finish); |
| |
| if (rc != ZlibConstants.Z_STREAM_END && rc != ZlibConstants.Z_OK) |
| throw new Exception("deflating: " + compressor.Message); |
| |
| if (buffer.Length - compressor.AvailableBytesOut > 0) |
| { |
| TraceOutput(TraceBits.EmitBegin, |
| "Emit begin flush bytes({0})", |
| buffer.Length - compressor.AvailableBytesOut); |
| |
| _outStream.Write(buffer, 0, buffer.Length - compressor.AvailableBytesOut); |
| |
| TraceOutput(TraceBits.EmitDone, |
| "Emit done flush"); |
| } |
| |
| compressor.EndDeflate(); |
| |
| _Crc32 = _runningCrc.Crc32Result; |
| } |
| |
| |
| private void _Flush(bool lastInput) |
| { |
| if (_isClosed) |
| throw new InvalidOperationException(); |
| |
| if (emitting) return; |
| |
| // compress any partial buffer |
| if (_currentlyFilling >= 0) |
| { |
| WorkItem workitem = _pool[_currentlyFilling]; |
| _DeflateOne(workitem); |
| _currentlyFilling = -1; // get a new buffer next Write() |
| } |
| |
| if (lastInput) |
| { |
| EmitPendingBuffers(true, false); |
| _FlushFinish(); |
| } |
| else |
| { |
| EmitPendingBuffers(false, false); |
| } |
| } |
| |
| |
| |
| /// <summary> |
| /// Flush the stream. |
| /// </summary> |
| public override void Flush() |
| { |
| if (_pendingException != null) |
| { |
| _handlingException = true; |
| var pe = _pendingException; |
| _pendingException = null; |
| throw pe; |
| } |
| if (_handlingException) |
| return; |
| |
| _Flush(false); |
| } |
| |
| |
| /// <summary> |
| /// Close the stream. |
| /// </summary> |
| /// <remarks> |
| /// You must call Close on the stream to guarantee that all of the data written in has |
| /// been compressed, and the compressed data has been written out. |
| /// </remarks> |
| public override void Close() |
| { |
| TraceOutput(TraceBits.Session, "Close {0:X8}", this.GetHashCode()); |
| |
| if (_pendingException != null) |
| { |
| _handlingException = true; |
| var pe = _pendingException; |
| _pendingException = null; |
| throw pe; |
| } |
| |
| if (_handlingException) |
| return; |
| |
| if (_isClosed) return; |
| |
| _Flush(true); |
| |
| if (!_leaveOpen) |
| _outStream.Close(); |
| |
| _isClosed= true; |
| } |
| |
| |
| |
| // workitem 10030 - implement a new Dispose method |
| |
| /// <summary>Dispose the object</summary> |
| /// <remarks> |
| /// <para> |
| /// Because ParallelDeflateOutputStream is IDisposable, the |
| /// application must call this method when finished using the instance. |
| /// </para> |
| /// <para> |
| /// This method is generally called implicitly upon exit from |
| /// a <c>using</c> scope in C# (<c>Using</c> in VB). |
| /// </para> |
| /// </remarks> |
| new public void Dispose() |
| { |
| TraceOutput(TraceBits.Lifecycle, "Dispose {0:X8}", this.GetHashCode()); |
| Close(); |
| _pool = null; |
| Dispose(true); |
| } |
| |
| |
| |
| /// <summary>The Dispose method</summary> |
| /// <param name="disposing"> |
| /// indicates whether the Dispose method was invoked by user code. |
| /// </param> |
| protected override void Dispose(bool disposing) |
| { |
| base.Dispose(disposing); |
| } |
| |
| |
| /// <summary> |
| /// Resets the stream for use with another stream. |
| /// </summary> |
| /// <remarks> |
| /// Because the ParallelDeflateOutputStream is expensive to create, it |
| /// has been designed so that it can be recycled and re-used. You have |
| /// to call Close() on the stream first, then you can call Reset() on |
| /// it, to use it again on another stream. |
| /// </remarks> |
| /// |
| /// <param name="stream"> |
| /// The new output stream for this era. |
| /// </param> |
| /// |
| /// <example> |
| /// <code> |
| /// ParallelDeflateOutputStream deflater = null; |
| /// foreach (var inputFile in listOfFiles) |
| /// { |
| /// string outputFile = inputFile + ".compressed"; |
| /// using (System.IO.Stream input = System.IO.File.OpenRead(inputFile)) |
| /// { |
| /// using (var outStream = System.IO.File.Create(outputFile)) |
| /// { |
| /// if (deflater == null) |
| /// deflater = new ParallelDeflateOutputStream(outStream, |
| /// CompressionLevel.Best, |
| /// CompressionStrategy.Default, |
| /// true); |
| /// deflater.Reset(outStream); |
| /// |
| /// while ((n= input.Read(buffer, 0, buffer.Length)) != 0) |
| /// { |
| /// deflater.Write(buffer, 0, n); |
| /// } |
| /// } |
| /// } |
| /// } |
| /// </code> |
| /// </example> |
| public void Reset(Stream stream) |
| { |
| TraceOutput(TraceBits.Session, "-------------------------------------------------------"); |
| TraceOutput(TraceBits.Session, "Reset {0:X8} firstDone({1})", this.GetHashCode(), _firstWriteDone); |
| |
| if (!_firstWriteDone) return; |
| |
| // reset all status |
| _toWrite.Clear(); |
| _toFill.Clear(); |
| foreach (var workitem in _pool) |
| { |
| _toFill.Enqueue(workitem.index); |
| workitem.ordinal = -1; |
| } |
| |
| _firstWriteDone = false; |
| _totalBytesProcessed = 0L; |
| _runningCrc = new Ionic.Crc.CRC32(); |
| _isClosed= false; |
| _currentlyFilling = -1; |
| _lastFilled = -1; |
| _lastWritten = -1; |
| _latestCompressed = -1; |
| _outStream = stream; |
| } |
| |
| |
| |
| |
| private void EmitPendingBuffers(bool doAll, bool mustWait) |
| { |
| // When combining parallel deflation with a ZipSegmentedStream, it's |
| // possible for the ZSS to throw from within this method. In that |
| // case, Close/Dispose will be called on this stream, if this stream |
| // is employed within a using or try/finally pair as required. But |
| // this stream is unaware of the pending exception, so the Close() |
| // method invokes this method AGAIN. This can lead to a deadlock. |
| // Therefore, failfast if re-entering. |
| |
| if (emitting) return; |
| emitting = true; |
| if (doAll || mustWait) |
| _newlyCompressedBlob.WaitOne(); |
| |
| do |
| { |
| int firstSkip = -1; |
| int millisecondsToWait = doAll ? 200 : (mustWait ? -1 : 0); |
| int nextToWrite = -1; |
| |
| do |
| { |
| if (Monitor.TryEnter(_toWrite, millisecondsToWait)) |
| { |
| nextToWrite = -1; |
| try |
| { |
| if (_toWrite.Count > 0) |
| nextToWrite = _toWrite.Dequeue(); |
| } |
| finally |
| { |
| Monitor.Exit(_toWrite); |
| } |
| |
| if (nextToWrite >= 0) |
| { |
| WorkItem workitem = _pool[nextToWrite]; |
| if (workitem.ordinal != _lastWritten + 1) |
| { |
| // out of order. requeue and try again. |
| TraceOutput(TraceBits.EmitSkip, |
| "Emit skip wi({0}) ord({1}) lw({2}) fs({3})", |
| workitem.index, |
| workitem.ordinal, |
| _lastWritten, |
| firstSkip); |
| |
| lock(_toWrite) |
| { |
| _toWrite.Enqueue(nextToWrite); |
| } |
| |
| if (firstSkip == nextToWrite) |
| { |
| // We went around the list once. |
| // None of the items in the list is the one we want. |
| // Now wait for a compressor to signal again. |
| _newlyCompressedBlob.WaitOne(); |
| firstSkip = -1; |
| } |
| else if (firstSkip == -1) |
| firstSkip = nextToWrite; |
| |
| continue; |
| } |
| |
| firstSkip = -1; |
| |
| TraceOutput(TraceBits.EmitBegin, |
| "Emit begin wi({0}) ord({1}) cba({2})", |
| workitem.index, |
| workitem.ordinal, |
| workitem.compressedBytesAvailable); |
| |
| _outStream.Write(workitem.compressed, 0, workitem.compressedBytesAvailable); |
| _runningCrc.Combine(workitem.crc, workitem.inputBytesAvailable); |
| _totalBytesProcessed += workitem.inputBytesAvailable; |
| workitem.inputBytesAvailable = 0; |
| |
| TraceOutput(TraceBits.EmitDone, |
| "Emit done wi({0}) ord({1}) cba({2}) mtw({3})", |
| workitem.index, |
| workitem.ordinal, |
| workitem.compressedBytesAvailable, |
| millisecondsToWait); |
| |
| _lastWritten = workitem.ordinal; |
| _toFill.Enqueue(workitem.index); |
| |
| // don't wait next time through |
| if (millisecondsToWait == -1) millisecondsToWait = 0; |
| } |
| } |
| else |
| nextToWrite = -1; |
| |
| } while (nextToWrite >= 0); |
| |
| //} while (doAll && (_lastWritten != _latestCompressed)); |
| } while (doAll && (_lastWritten != _latestCompressed || _lastWritten != _lastFilled)); |
| |
| emitting = false; |
| } |
| |
| |
| |
| #if OLD |
| private void _PerpetualWriterMethod(object state) |
| { |
| TraceOutput(TraceBits.WriterThread, "_PerpetualWriterMethod START"); |
| |
| try |
| { |
| do |
| { |
| // wait for the next session |
| TraceOutput(TraceBits.Synch | TraceBits.WriterThread, "Synch _sessionReset.WaitOne(begin) PWM"); |
| _sessionReset.WaitOne(); |
| TraceOutput(TraceBits.Synch | TraceBits.WriterThread, "Synch _sessionReset.WaitOne(done) PWM"); |
| |
| if (_isDisposed) break; |
| |
| TraceOutput(TraceBits.Synch | TraceBits.WriterThread, "Synch _sessionReset.Reset() PWM"); |
| _sessionReset.Reset(); |
| |
| // repeatedly write buffers as they become ready |
| WorkItem workitem = null; |
| Ionic.Zlib.CRC32 c= new Ionic.Zlib.CRC32(); |
| do |
| { |
| workitem = _pool[_nextToWrite % _pc]; |
| lock(workitem) |
| { |
| if (_noMoreInputForThisSegment) |
| TraceOutput(TraceBits.Write, |
| "Write drain wi({0}) stat({1}) canuse({2}) cba({3})", |
| workitem.index, |
| workitem.status, |
| (workitem.status == (int)WorkItem.Status.Compressed), |
| workitem.compressedBytesAvailable); |
| |
| do |
| { |
| if (workitem.status == (int)WorkItem.Status.Compressed) |
| { |
| TraceOutput(TraceBits.WriteBegin, |
| "Write begin wi({0}) stat({1}) cba({2})", |
| workitem.index, |
| workitem.status, |
| workitem.compressedBytesAvailable); |
| |
| workitem.status = (int)WorkItem.Status.Writing; |
| _outStream.Write(workitem.compressed, 0, workitem.compressedBytesAvailable); |
| c.Combine(workitem.crc, workitem.inputBytesAvailable); |
| _totalBytesProcessed += workitem.inputBytesAvailable; |
| _nextToWrite++; |
| workitem.inputBytesAvailable= 0; |
| workitem.status = (int)WorkItem.Status.Done; |
| |
| TraceOutput(TraceBits.WriteDone, |
| "Write done wi({0}) stat({1}) cba({2})", |
| workitem.index, |
| workitem.status, |
| workitem.compressedBytesAvailable); |
| |
| |
| Monitor.Pulse(workitem); |
| break; |
| } |
| else |
| { |
| int wcycles = 0; |
| // I've locked a workitem I cannot use. |
| // Therefore, wake someone else up, and then release the lock. |
| while (workitem.status != (int)WorkItem.Status.Compressed) |
| { |
| TraceOutput(TraceBits.WriteWait, |
| "Write waiting wi({0}) stat({1}) nw({2}) nf({3}) nomore({4})", |
| workitem.index, |
| workitem.status, |
| _nextToWrite, _nextToFill, |
| _noMoreInputForThisSegment ); |
| |
| if (_noMoreInputForThisSegment && _nextToWrite == _nextToFill) |
| break; |
| |
| wcycles++; |
| |
| // wake up someone else |
| Monitor.Pulse(workitem); |
| // release and wait |
| Monitor.Wait(workitem); |
| |
| if (workitem.status == (int)WorkItem.Status.Compressed) |
| TraceOutput(TraceBits.WriteWait, |
| "Write A-OK wi({0}) stat({1}) iba({2}) cba({3}) cyc({4})", |
| workitem.index, |
| workitem.status, |
| workitem.inputBytesAvailable, |
| workitem.compressedBytesAvailable, |
| wcycles); |
| } |
| |
| if (_noMoreInputForThisSegment && _nextToWrite == _nextToFill) |
| break; |
| |
| } |
| } |
| while (true); |
| } |
| |
| if (_noMoreInputForThisSegment) |
| TraceOutput(TraceBits.Write, |
| "Write nomore nw({0}) nf({1}) break({2})", |
| _nextToWrite, _nextToFill, (_nextToWrite == _nextToFill)); |
| |
| if (_noMoreInputForThisSegment && _nextToWrite == _nextToFill) |
| break; |
| |
| } while (true); |
| |
| |
| // Finish: |
| // After writing a series of buffers, closing each one with |
| // Flush.Sync, we now write the final one as Flush.Finish, and |
| // then stop. |
| byte[] buffer = new byte[128]; |
| ZlibCodec compressor = new ZlibCodec(); |
| int rc = compressor.InitializeDeflate(_compressLevel, false); |
| compressor.InputBuffer = null; |
| compressor.NextIn = 0; |
| compressor.AvailableBytesIn = 0; |
| compressor.OutputBuffer = buffer; |
| compressor.NextOut = 0; |
| compressor.AvailableBytesOut = buffer.Length; |
| rc = compressor.Deflate(FlushType.Finish); |
| |
| if (rc != ZlibConstants.Z_STREAM_END && rc != ZlibConstants.Z_OK) |
| throw new Exception("deflating: " + compressor.Message); |
| |
| if (buffer.Length - compressor.AvailableBytesOut > 0) |
| { |
| TraceOutput(TraceBits.WriteBegin, |
| "Write begin flush bytes({0})", |
| buffer.Length - compressor.AvailableBytesOut); |
| |
| _outStream.Write(buffer, 0, buffer.Length - compressor.AvailableBytesOut); |
| |
| TraceOutput(TraceBits.WriteBegin, |
| "Write done flush"); |
| } |
| |
| compressor.EndDeflate(); |
| |
| _Crc32 = c.Crc32Result; |
| |
| // signal that writing is complete: |
| TraceOutput(TraceBits.Synch, "Synch _writingDone.Set() PWM"); |
| _writingDone.Set(); |
| } |
| while (true); |
| } |
| catch (System.Exception exc1) |
| { |
| lock(_eLock) |
| { |
| // expose the exception to the main thread |
| if (_pendingException!=null) |
| _pendingException = exc1; |
| } |
| } |
| |
| TraceOutput(TraceBits.WriterThread, "_PerpetualWriterMethod FINIS"); |
| } |
| #endif |
| |
| |
| |
| |
| private void _DeflateOne(Object wi) |
| { |
| // compress one buffer |
| WorkItem workitem = (WorkItem) wi; |
| try |
| { |
| int myItem = workitem.index; |
| Ionic.Crc.CRC32 crc = new Ionic.Crc.CRC32(); |
| |
| // calc CRC on the buffer |
| crc.SlurpBlock(workitem.buffer, 0, workitem.inputBytesAvailable); |
| |
| // deflate it |
| DeflateOneSegment(workitem); |
| |
| // update status |
| workitem.crc = crc.Crc32Result; |
| TraceOutput(TraceBits.Compress, |
| "Compress wi({0}) ord({1}) len({2})", |
| workitem.index, |
| workitem.ordinal, |
| workitem.compressedBytesAvailable |
| ); |
| |
| lock(_latestLock) |
| { |
| if (workitem.ordinal > _latestCompressed) |
| _latestCompressed = workitem.ordinal; |
| } |
| lock (_toWrite) |
| { |
| _toWrite.Enqueue(workitem.index); |
| } |
| _newlyCompressedBlob.Set(); |
| } |
| catch (System.Exception exc1) |
| { |
| lock(_eLock) |
| { |
| // expose the exception to the main thread |
| if (_pendingException!=null) |
| _pendingException = exc1; |
| } |
| } |
| } |
| |
| |
| |
| |
| private bool DeflateOneSegment(WorkItem workitem) |
| { |
| ZlibCodec compressor = workitem.compressor; |
| int rc= 0; |
| compressor.ResetDeflate(); |
| compressor.NextIn = 0; |
| |
| compressor.AvailableBytesIn = workitem.inputBytesAvailable; |
| |
| // step 1: deflate the buffer |
| compressor.NextOut = 0; |
| compressor.AvailableBytesOut = workitem.compressed.Length; |
| do |
| { |
| compressor.Deflate(FlushType.None); |
| } |
| while (compressor.AvailableBytesIn > 0 || compressor.AvailableBytesOut == 0); |
| |
| // step 2: flush (sync) |
| rc = compressor.Deflate(FlushType.Sync); |
| |
| workitem.compressedBytesAvailable= (int) compressor.TotalBytesOut; |
| return true; |
| } |
| |
| |
| [System.Diagnostics.ConditionalAttribute("Trace")] |
| private void TraceOutput(TraceBits bits, string format, params object[] varParams) |
| { |
| if ((bits & _DesiredTrace) != 0) |
| { |
| lock(_outputLock) |
| { |
| int tid = Thread.CurrentThread.GetHashCode(); |
| #if !SILVERLIGHT |
| Console.ForegroundColor = (ConsoleColor) (tid % 8 + 8); |
| #endif |
| Console.Write("{0:000} PDOS ", tid); |
| Console.WriteLine(format, varParams); |
| #if !SILVERLIGHT |
| Console.ResetColor(); |
| #endif |
| } |
| } |
| } |
| |
| |
| // used only when Trace is defined |
| [Flags] |
| enum TraceBits : uint |
| { |
| None = 0, |
| NotUsed1 = 1, |
| EmitLock = 2, |
| EmitEnter = 4, // enter _EmitPending |
| EmitBegin = 8, // begin to write out |
| EmitDone = 16, // done writing out |
| EmitSkip = 32, // writer skipping a workitem |
| EmitAll = 58, // All Emit flags |
| Flush = 64, |
| Lifecycle = 128, // constructor/disposer |
| Session = 256, // Close/Reset |
| Synch = 512, // thread synchronization |
| Instance = 1024, // instance settings |
| Compress = 2048, // compress task |
| Write = 4096, // filling buffers, when caller invokes Write() |
| WriteEnter = 8192, // upon entry to Write() |
| WriteTake = 16384, // on _toFill.Take() |
| All = 0xffffffff, |
| } |
| |
| |
| |
| /// <summary> |
| /// Indicates whether the stream supports Seek operations. |
| /// </summary> |
| /// <remarks> |
| /// Always returns false. |
| /// </remarks> |
| public override bool CanSeek |
| { |
| get { return false; } |
| } |
| |
| |
| /// <summary> |
| /// Indicates whether the stream supports Read operations. |
| /// </summary> |
| /// <remarks> |
| /// Always returns false. |
| /// </remarks> |
| public override bool CanRead |
| { |
| get {return false;} |
| } |
| |
| /// <summary> |
| /// Indicates whether the stream supports Write operations. |
| /// </summary> |
| /// <remarks> |
| /// Returns true if the provided stream is writable. |
| /// </remarks> |
| public override bool CanWrite |
| { |
| get { return _outStream.CanWrite; } |
| } |
| |
| /// <summary> |
| /// Reading this property always throws a NotSupportedException. |
| /// </summary> |
| public override long Length |
| { |
| get { throw new NotSupportedException(); } |
| } |
| |
| /// <summary> |
| /// Returns the current position of the output stream. |
| /// </summary> |
| /// <remarks> |
| /// <para> |
| /// Because the output gets written by a background thread, |
| /// the value may change asynchronously. Setting this |
| /// property always throws a NotSupportedException. |
| /// </para> |
| /// </remarks> |
| public override long Position |
| { |
| get { return _outStream.Position; } |
| set { throw new NotSupportedException(); } |
| } |
| |
| /// <summary> |
| /// This method always throws a NotSupportedException. |
| /// </summary> |
| /// <param name="buffer"> |
| /// The buffer into which data would be read, IF THIS METHOD |
| /// ACTUALLY DID ANYTHING. |
| /// </param> |
| /// <param name="offset"> |
| /// The offset within that data array at which to insert the |
| /// data that is read, IF THIS METHOD ACTUALLY DID |
| /// ANYTHING. |
| /// </param> |
| /// <param name="count"> |
| /// The number of bytes to write, IF THIS METHOD ACTUALLY DID |
| /// ANYTHING. |
| /// </param> |
| /// <returns>nothing.</returns> |
| public override int Read(byte[] buffer, int offset, int count) |
| { |
| throw new NotSupportedException(); |
| } |
| |
| /// <summary> |
| /// This method always throws a NotSupportedException. |
| /// </summary> |
| /// <param name="offset"> |
| /// The offset to seek to.... |
| /// IF THIS METHOD ACTUALLY DID ANYTHING. |
| /// </param> |
| /// <param name="origin"> |
| /// The reference specifying how to apply the offset.... IF |
| /// THIS METHOD ACTUALLY DID ANYTHING. |
| /// </param> |
| /// <returns>nothing. It always throws.</returns> |
| public override long Seek(long offset, System.IO.SeekOrigin origin) |
| { |
| throw new NotSupportedException(); |
| } |
| |
| /// <summary> |
| /// This method always throws a NotSupportedException. |
| /// </summary> |
| /// <param name="value"> |
| /// The new value for the stream length.... IF |
| /// THIS METHOD ACTUALLY DID ANYTHING. |
| /// </param> |
| public override void SetLength(long value) |
| { |
| throw new NotSupportedException(); |
| } |
| |
| } |
| |
| } |
| |
| |