View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   * http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing,
13   * software distributed under the License is distributed on an
14   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   * KIND, either express or implied.  See the License for the
16   * specific language governing permissions and limitations
17   * under the License.
18   */
19  package org.apache.chemistry.opencmis.server.shared;
20  
21  import java.io.BufferedInputStream;
22  import java.io.BufferedOutputStream;
23  import java.io.File;
24  import java.io.FileInputStream;
25  import java.io.FileNotFoundException;
26  import java.io.FileOutputStream;
27  import java.io.FilterInputStream;
28  import java.io.IOException;
29  import java.io.InputStream;
30  import java.io.OutputStream;
31  
32  
33  /**
34   * An OutputStream that stores the data in main memory until it reaches a
35   * threshold. If the threshold is passed the data is written to a temporary
36   * file.
37   *
38   * It it is important to close this OutputStream before
39   * {@link #getInputStream()} is called or call {@link #destroy()} if the
40   * InputStream isn't required!
41   */
42  public class ThresholdOutputStream extends OutputStream
43  {
44      private static final int MAX_GROW = 10 * 1024 * 1024;
45      private static final int DEFAULT_THRESHOLD = 4 * 1024 * 1024;
46      private File tempDir;
47      private int memoryThreshold;
48      private byte[] buf = null;
49      private int bufSize = 0;
50      private long size;
51      private File tempFile;
52      private OutputStream tmpStream;
53  
54      public ThresholdOutputStream( File tempDir, int memoryThreshold )
55      {
56          this( 64 * 1024, tempDir, memoryThreshold );
57      }
58  
59      public ThresholdOutputStream( int initSize, File tempDir, int memoryThreshold )
60      {
61          if ( initSize < 0 )
62          {
63              throw new IllegalArgumentException( "Negative initial size: " + initSize );
64          }
65  
66          this.tempDir = tempDir;
67          this.memoryThreshold = ( ( memoryThreshold < 0 ) ? DEFAULT_THRESHOLD : memoryThreshold );
68  
69          buf = new byte[initSize];
70      }
71  
72      private void expand( int nextBufferSize ) throws IOException
73      {
74          if ( ( bufSize + nextBufferSize ) <= buf.length )
75          {
76              return;
77          }
78  
79          if ( ( bufSize + nextBufferSize ) > memoryThreshold )
80          {
81              if ( tmpStream == null )
82              {
83                  tempFile = File.createTempFile( "opencmis", null, tempDir );
84                  tmpStream = new BufferedOutputStream( new FileOutputStream( tempFile ) );
85              }
86  
87              tmpStream.write( buf, 0, bufSize );
88  
89              if ( buf.length != memoryThreshold )
90              {
91                  buf = new byte[memoryThreshold];
92              }
93  
94              bufSize = 0;
95  
96              return;
97          }
98  
99          int newSize = ( ( ( ( bufSize + nextBufferSize ) * 2 ) < MAX_GROW ) ? ( ( bufSize + nextBufferSize ) * 2 )
100                                                                             : ( buf.length + nextBufferSize + MAX_GROW ) );
101         byte[] newbuf = new byte[newSize];
102         System.arraycopy( buf, 0, newbuf, 0, bufSize );
103         buf = newbuf;
104     }
105 
106     public long getSize(  )
107     {
108         return size;
109     }
110 
111     @Override
112     public void write( byte[] buffer ) throws IOException
113     {
114         write( buffer, 0, buffer.length );
115     }
116 
117     @Override
118     public void write( byte[] buffer, int offset, int len )
119         throws IOException
120     {
121         if ( len == 0 )
122         {
123             return;
124         }
125 
126         expand( len );
127         System.arraycopy( buffer, offset, buf, bufSize, len );
128         bufSize += len;
129         size += len;
130     }
131 
132     @Override
133     public void write( int oneByte ) throws IOException
134     {
135         if ( bufSize == buf.length )
136         {
137             expand( 1 );
138         }
139 
140         buf[bufSize++] = (byte) oneByte;
141         size++;
142     }
143 
144     @Override
145     public void flush(  ) throws IOException
146     {
147         if ( tmpStream != null )
148         {
149             if ( bufSize > 0 )
150             {
151                 tmpStream.write( buf, 0, bufSize );
152                 bufSize = 0;
153             }
154 
155             tmpStream.flush(  );
156         }
157     }
158 
159     @Override
160     public void close(  ) throws IOException
161     {
162         flush(  );
163 
164         if ( tmpStream != null )
165         {
166             tmpStream.close(  );
167         }
168     }
169 
170     /**
171      * Destroys the object before it has been read.
172      */
173     public void destroy(  )
174     {
175         try
176         {
177             close(  );
178         }
179         catch ( Exception e )
180         {
181             // ignore
182         }
183 
184         if ( tempFile != null )
185         {
186             tempFile.delete(  );
187         }
188 
189         buf = null;
190     }
191 
192     /**
193      * Returns the data as an InputStream.
194      */
195     public InputStream getInputStream(  ) throws Exception
196     {
197         if ( tmpStream != null )
198         {
199             close(  );
200             buf = null;
201 
202             return new InternalTempFileInputStream(  );
203         }
204         else
205         {
206             return new InternalBufferInputStream(  );
207         }
208     }
209 
210     /**
211      * Provides information about the input stream.
212      */
213     public interface ThresholdInputStream
214     {
215         /**
216          * Returns <code>true</code> if the data is in memory. Returns
217          * <code>false</code> if the data resides in a temporary file.
218          */
219         boolean isInMemory(  );
220 
221         /**
222          * Returns the temporary file if the data stored in a file. Returns
223          * <code>null</code> is the data is stored in memory.
224          */
225         File getTemporaryFile(  );
226     }
227 
228     /**
229      * InputStream for in-memory data.
230      */
231     private class InternalBufferInputStream extends InputStream implements ThresholdInputStream
232     {
233         private int pos = 0;
234 
235         public boolean isInMemory(  )
236         {
237             return true;
238         }
239 
240         public File getTemporaryFile(  )
241         {
242             return null;
243         }
244 
245         @Override
246         public boolean markSupported(  )
247         {
248             return false;
249         }
250 
251         @Override
252         public int available(  )
253         {
254             return bufSize - pos;
255         }
256 
257         @Override
258         public int read(  )
259         {
260             return ( ( pos < bufSize ) && ( buf != null ) ) ? ( buf[pos++] & 0xff ) : ( -1 );
261         }
262 
263         @Override
264         public int read( byte[] b ) throws IOException
265         {
266             return read( b, 0, b.length );
267         }
268 
269         @Override
270         public int read( byte[] b, int off, int len )
271         {
272             if ( ( pos >= bufSize ) || ( buf == null ) )
273             {
274                 return -1;
275             }
276 
277             if ( ( pos + len ) > bufSize )
278             {
279                 len = ( bufSize - pos );
280             }
281 
282             System.arraycopy( buf, pos, b, off, len );
283             pos += len;
284 
285             return len;
286         }
287 
288         @Override
289         public long skip( long n )
290         {
291             if ( ( pos + n ) > bufSize )
292             {
293                 n = bufSize - pos;
294             }
295 
296             if ( n < 0 )
297             {
298                 return 0;
299             }
300 
301             pos += n;
302 
303             return n;
304         }
305 
306         @Override
307         public void close(  ) throws IOException
308         {
309             buf = null;
310         }
311     }
312 
313     /**
314      * InputStream for file data.
315      */
316     private class InternalTempFileInputStream extends FilterInputStream implements ThresholdInputStream
317     {
318         private boolean isDeleted = false;
319 
320         public InternalTempFileInputStream(  ) throws FileNotFoundException
321         {
322             super( new BufferedInputStream( new FileInputStream( tempFile ), memoryThreshold ) );
323         }
324 
325         public boolean isInMemory(  )
326         {
327             return false;
328         }
329 
330         public File getTemporaryFile(  )
331         {
332             return tempFile;
333         }
334 
335         @Override
336         public boolean markSupported(  )
337         {
338             return false;
339         }
340 
341         @Override
342         public int read(  ) throws IOException
343         {
344             int b = super.read(  );
345 
346             if ( ( b == -1 ) && !isDeleted )
347             {
348                 super.close(  );
349                 isDeleted = tempFile.delete(  );
350             }
351 
352             return b;
353         }
354 
355         @Override
356         public int read( byte[] b ) throws IOException
357         {
358             return read( b, 0, b.length );
359         }
360 
361         @Override
362         public int read( byte[] b, int off, int len ) throws IOException
363         {
364             int n = super.read( b, off, len );
365 
366             if ( ( n == -1 ) && !isDeleted )
367             {
368                 super.close(  );
369                 isDeleted = tempFile.delete(  );
370             }
371 
372             return n;
373         }
374 
375         @Override
376         public void close(  ) throws IOException
377         {
378             if ( !isDeleted )
379             {
380                 super.close(  );
381                 isDeleted = tempFile.delete(  );
382             }
383         }
384     }
385 }