xref: /AOO41X/main/io/source/stm/opipe.cxx (revision 3716f815df2d68347af345f8524e39097ef453f6)
1 /**************************************************************
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one
4  * or more contributor license agreements.  See the NOTICE file
5  * distributed with this work for additional information
6  * regarding copyright ownership.  The ASF licenses this file
7  * to you under the Apache License, Version 2.0 (the
8  * "License"); you may not use this file except in compliance
9  * with the License.  You may obtain a copy of the License at
10  *
11  *   http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing,
14  * software distributed under the License is distributed on an
15  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16  * KIND, either express or implied.  See the License for the
17  * specific language governing permissions and limitations
18  * under the License.
19  *
20  *************************************************************/
21 
22 
23 
24 // MARKER(update_precomp.py): autogen include statement, do not remove
25 #include "precompiled_io.hxx"
26 
27 // streams
28 #include <com/sun/star/io/XInputStream.hpp>
29 #include <com/sun/star/io/XOutputStream.hpp>
30 #include <com/sun/star/io/XConnectable.hpp>
31 
32 #include <com/sun/star/lang/XServiceInfo.hpp>
33 
34 #include <cppuhelper/factory.hxx>
35 
36 #include <cppuhelper/implbase4.hxx>      // OWeakObject
37 
38 #include <osl/conditn.hxx>
39 #include <osl/mutex.hxx>
40 
41 #include <limits>
42 #include <string.h>
43 
44 using namespace ::rtl;
45 using namespace ::osl;
46 using namespace ::cppu;
47 using namespace ::com::sun::star::uno;
48 using namespace ::com::sun::star::io;
49 using namespace ::com::sun::star::lang;
50 
51 #include "factreg.hxx"
52 #include "streamhelper.hxx"
53 
54 // Implementation and service names
55 #define IMPLEMENTATION_NAME "com.sun.star.comp.io.stm.Pipe"
56 #define SERVICE_NAME "com.sun.star.io.Pipe"
57 
58 namespace io_stm{
59 
60 class OPipeImpl :
61     public WeakImplHelper4< XInputStream , XOutputStream , XConnectable , XServiceInfo >
62 {
63 public:
64     OPipeImpl( );
65     ~OPipeImpl();
66 
67 public: // XInputStream
68     virtual sal_Int32 SAL_CALL readBytes(Sequence< sal_Int8 >& aData, sal_Int32 nBytesToRead)
69         throw(  NotConnectedException,
70                 BufferSizeExceededException,
71                 RuntimeException );
72     virtual sal_Int32 SAL_CALL readSomeBytes(Sequence< sal_Int8 >& aData, sal_Int32 nMaxBytesToRead)
73         throw( NotConnectedException,
74                BufferSizeExceededException,
75                RuntimeException );
76     virtual void SAL_CALL skipBytes(sal_Int32 nBytesToSkip)
77         throw( NotConnectedException,
78                BufferSizeExceededException,
79                RuntimeException );
80     virtual sal_Int32 SAL_CALL available(void)
81         throw( NotConnectedException,
82                RuntimeException );
83     virtual void SAL_CALL closeInput(void)
84         throw( NotConnectedException,
85                RuntimeException );
86 
87 public: // XOutputStream
88 
89     virtual void SAL_CALL writeBytes(const Sequence< sal_Int8 >& aData)
90         throw( NotConnectedException,
91                BufferSizeExceededException,
92                RuntimeException );
93     virtual void SAL_CALL flush(void)
94         throw( NotConnectedException,
95                BufferSizeExceededException,
96                RuntimeException );
97     virtual void SAL_CALL closeOutput(void)
98         throw( NotConnectedException,
99                BufferSizeExceededException,
100                RuntimeException );
101 
102 public: // XConnectable
103     virtual void SAL_CALL setPredecessor(const Reference< XConnectable >& aPredecessor)
104         throw( RuntimeException );
105     virtual Reference< XConnectable > SAL_CALL getPredecessor(void) throw( RuntimeException );
106     virtual void SAL_CALL setSuccessor(const Reference < XConnectable > & aSuccessor)
107         throw( RuntimeException );
108     virtual Reference < XConnectable > SAL_CALL getSuccessor(void) throw( RuntimeException ) ;
109 
110 
111 public: // XServiceInfo
112     OUString                    SAL_CALL getImplementationName() throw(  );
113     Sequence< OUString >         SAL_CALL getSupportedServiceNames(void) throw(  );
114     sal_Bool                        SAL_CALL supportsService(const OUString& ServiceName) throw(  );
115 
116 private:
117 
118     // DEBUG
119     inline void checkInvariant();
120 
121     Reference < XConnectable >  m_succ;
122     Reference < XConnectable >  m_pred;
123 
124     sal_Int32 m_nBytesToSkip;
125 
126     sal_Int8  *m_p;
127 
128     sal_Bool m_bOutputStreamClosed;
129     sal_Bool m_bInputStreamClosed;
130 
131     oslCondition m_conditionBytesAvail;
132     Mutex     m_mutexAccess;
133     IFIFO       *m_pFIFO;
134 };
135 
136 
137 
OPipeImpl()138 OPipeImpl::OPipeImpl()
139 {
140     g_moduleCount.modCnt.acquire( &g_moduleCount.modCnt );
141     m_nBytesToSkip  = 0;
142 
143     m_bOutputStreamClosed   = sal_False;
144     m_bInputStreamClosed    = sal_False;
145 
146     m_pFIFO = new MemFIFO;
147     m_conditionBytesAvail = osl_createCondition();
148 }
149 
~OPipeImpl()150 OPipeImpl::~OPipeImpl()
151 {
152     osl_destroyCondition( m_conditionBytesAvail );
153     delete m_pFIFO;
154     g_moduleCount.modCnt.release( &g_moduleCount.modCnt );
155 }
156 
157 
158 // These invariants must hold when entering a guarded method or leaving a guarded method.
checkInvariant()159 void OPipeImpl::checkInvariant()
160 {
161 
162 }
163 
readBytes(Sequence<sal_Int8> & aData,sal_Int32 nBytesToRead)164 sal_Int32 OPipeImpl::readBytes(Sequence< sal_Int8 >& aData, sal_Int32 nBytesToRead)
165     throw( NotConnectedException, BufferSizeExceededException,RuntimeException )
166 {
167     while( sal_True )
168     {
169         { // start guarded section
170             MutexGuard guard( m_mutexAccess );
171             if( m_bInputStreamClosed )
172             {
173                 throw NotConnectedException(
174                     OUString( RTL_CONSTASCII_USTRINGPARAM( "Pipe::readBytes NotConnectedException" )),
175                     *this );
176             }
177             sal_Int32 nOccupiedBufferLen = m_pFIFO->getSize();
178 
179             if( m_bOutputStreamClosed && nBytesToRead > nOccupiedBufferLen )
180             {
181                 nBytesToRead = nOccupiedBufferLen;
182             }
183 
184             if( nOccupiedBufferLen < nBytesToRead )
185             {
186                 // wait outside guarded section
187                 osl_resetCondition( m_conditionBytesAvail );
188             }
189             else {
190                 // necessary bytes are available
191                 m_pFIFO->read( aData , nBytesToRead );
192                 return nBytesToRead;
193             }
194         } // end guarded section
195 
196         // wait for new data outside guarded section!
197         osl_waitCondition( m_conditionBytesAvail , 0 );
198     }
199 }
200 
201 
readSomeBytes(Sequence<sal_Int8> & aData,sal_Int32 nMaxBytesToRead)202 sal_Int32 OPipeImpl::readSomeBytes(Sequence< sal_Int8 >& aData, sal_Int32 nMaxBytesToRead)
203     throw( NotConnectedException,
204            BufferSizeExceededException,
205            RuntimeException )
206 {
207     while( sal_True ) {
208         {
209             MutexGuard guard( m_mutexAccess );
210             if( m_bInputStreamClosed )
211             {
212                 throw NotConnectedException(
213                     OUString( RTL_CONSTASCII_USTRINGPARAM( "Pipe::readSomeBytes NotConnectedException" )),
214                     *this );
215             }
216             if( m_pFIFO->getSize() )
217             {
218                 sal_Int32 nSize = Min( nMaxBytesToRead , m_pFIFO->getSize() );
219                 aData.realloc( nSize );
220                 m_pFIFO->read( aData , nSize );
221                 return nSize;
222             }
223 
224             if( m_bOutputStreamClosed )
225             {
226                 // no bytes in buffer anymore
227                 return 0;
228             }
229         }
230 
231         osl_waitCondition( m_conditionBytesAvail , 0 );
232     }
233 }
234 
235 
skipBytes(sal_Int32 nBytesToSkip)236 void OPipeImpl::skipBytes(sal_Int32 nBytesToSkip)
237     throw( NotConnectedException,
238            BufferSizeExceededException,
239            RuntimeException )
240 {
241     MutexGuard guard( m_mutexAccess );
242     if( m_bInputStreamClosed )
243     {
244         throw NotConnectedException(
245             OUString( RTL_CONSTASCII_USTRINGPARAM( "Pipe::skipBytes NotConnectedException" ) ),
246             *this );
247     }
248 
249     if( nBytesToSkip < 0
250         || (nBytesToSkip
251             > std::numeric_limits< sal_Int32 >::max() - m_nBytesToSkip) )
252     {
253         throw BufferSizeExceededException(
254             OUString( RTL_CONSTASCII_USTRINGPARAM( "Pipe::skipBytes BufferSizeExceededException" )),
255             *this );
256     }
257     m_nBytesToSkip += nBytesToSkip;
258 
259     nBytesToSkip = Min( m_pFIFO->getSize() , m_nBytesToSkip );
260     m_pFIFO->skip( nBytesToSkip );
261     m_nBytesToSkip -= nBytesToSkip;
262 }
263 
264 
available(void)265 sal_Int32 OPipeImpl::available(void)
266     throw( NotConnectedException,
267            RuntimeException )
268  {
269     MutexGuard guard( m_mutexAccess );
270     if( m_bInputStreamClosed )
271     {
272         throw NotConnectedException(
273             OUString( RTL_CONSTASCII_USTRINGPARAM( "Pipe::available NotConnectedException" ) ),
274             *this );
275     }
276     checkInvariant();
277     return m_pFIFO->getSize();
278 }
279 
closeInput(void)280 void OPipeImpl::closeInput(void)
281     throw( NotConnectedException,
282            RuntimeException)
283 {
284     MutexGuard guard( m_mutexAccess );
285 
286     m_bInputStreamClosed = sal_True;
287 
288     delete m_pFIFO;
289     m_pFIFO = 0;
290 
291     // readBytes may throw an exception
292     osl_setCondition( m_conditionBytesAvail );
293 
294     setSuccessor( Reference< XConnectable > () );
295     return;
296 }
297 
298 
writeBytes(const Sequence<sal_Int8> & aData)299 void OPipeImpl::writeBytes(const Sequence< sal_Int8 >& aData)
300     throw( NotConnectedException,
301            BufferSizeExceededException,
302            RuntimeException)
303 {
304     MutexGuard guard( m_mutexAccess );
305     checkInvariant();
306 
307     if( m_bOutputStreamClosed )
308     {
309         throw NotConnectedException(
310             OUString( RTL_CONSTASCII_USTRINGPARAM( "Pipe::writeBytes NotConnectedException (outputstream)" )),
311             *this );
312     }
313 
314     if( m_bInputStreamClosed )
315     {
316         throw NotConnectedException(
317             OUString( RTL_CONSTASCII_USTRINGPARAM( "Pipe::writeBytes NotConnectedException (inputstream)" )),
318             *this );
319     }
320 
321     // check skipping
322     sal_Int32 nLen = aData.getLength();
323     if( m_nBytesToSkip  && m_nBytesToSkip >= nLen  ) {
324         // all must be skipped - forget whole call
325         m_nBytesToSkip -= nLen;
326         return;
327     }
328 
329     // adjust buffersize if necessary
330 
331     try
332     {
333         if( m_nBytesToSkip )
334         {
335             Sequence< sal_Int8 > seqCopy( nLen - m_nBytesToSkip );
336             memcpy( seqCopy.getArray() , &( aData.getConstArray()[m_nBytesToSkip] ) , nLen-m_nBytesToSkip );
337             m_pFIFO->write( seqCopy );
338         }
339         else
340         {
341             m_pFIFO->write( aData );
342         }
343         m_nBytesToSkip = 0;
344     }
345     catch ( IFIFO_OutOfBoundsException & )
346     {
347         throw BufferSizeExceededException(
348             OUString( RTL_CONSTASCII_USTRINGPARAM( "Pipe::writeBytes BufferSizeExceededException" )),
349             *this );
350     }
351     catch ( IFIFO_OutOfMemoryException & )
352     {
353         throw BufferSizeExceededException(
354             OUString( RTL_CONSTASCII_USTRINGPARAM( "Pipe::writeBytes BufferSizeExceededException" )),
355             *this );
356     }
357 
358     // readBytes may check again if enough bytes are available
359     osl_setCondition( m_conditionBytesAvail );
360 
361     checkInvariant();
362 }
363 
364 
flush(void)365 void OPipeImpl::flush(void)
366     throw( NotConnectedException,
367            BufferSizeExceededException,
368            RuntimeException)
369 {
370     // nothing to do for a pipe
371     return;
372 }
373 
closeOutput(void)374 void OPipeImpl::closeOutput(void)
375     throw( NotConnectedException,
376            BufferSizeExceededException,
377            RuntimeException)
378 {
379     MutexGuard guard( m_mutexAccess );
380 
381     m_bOutputStreamClosed = sal_True;
382     osl_setCondition( m_conditionBytesAvail );
383     setPredecessor( Reference < XConnectable > () );
384     return;
385 }
386 
387 
setSuccessor(const Reference<XConnectable> & r)388 void OPipeImpl::setSuccessor( const Reference < XConnectable >  &r )
389     throw( RuntimeException )
390 {
391      /// if the references match, nothing needs to be done
392      if( m_succ != r ) {
393          /// store the reference for later use
394          m_succ = r;
395 
396          if( m_succ.is() )
397          {
398               m_succ->setPredecessor(
399                   Reference< XConnectable > ( SAL_STATIC_CAST( XConnectable * , this ) ) );
400          }
401      }
402 }
403 
getSuccessor()404 Reference < XConnectable > OPipeImpl::getSuccessor()    throw( RuntimeException )
405 {
406     return m_succ;
407 }
408 
409 
410 // XDataSource
setPredecessor(const Reference<XConnectable> & r)411 void OPipeImpl::setPredecessor( const Reference < XConnectable > &r )
412     throw( RuntimeException )
413 {
414     if( r != m_pred ) {
415         m_pred = r;
416         if( m_pred.is() ) {
417             m_pred->setSuccessor(
418                 Reference < XConnectable > ( SAL_STATIC_CAST( XConnectable * , this ) ) );
419         }
420     }
421 }
422 
getPredecessor()423 Reference < XConnectable > OPipeImpl::getPredecessor() throw( RuntimeException )
424 {
425     return m_pred;
426 }
427 
428 
429 
430 
431 // XServiceInfo
getImplementationName()432 OUString OPipeImpl::getImplementationName() throw(  )
433 {
434     return OPipeImpl_getImplementationName();
435 }
436 
437 // XServiceInfo
supportsService(const OUString & ServiceName)438 sal_Bool OPipeImpl::supportsService(const OUString& ServiceName) throw(  )
439 {
440     Sequence< OUString > aSNL = getSupportedServiceNames();
441     const OUString * pArray = aSNL.getConstArray();
442 
443     for( sal_Int32 i = 0; i < aSNL.getLength(); i++ )
444         if( pArray[i] == ServiceName )
445             return sal_True;
446 
447     return sal_False;
448 }
449 
450 // XServiceInfo
getSupportedServiceNames(void)451 Sequence< OUString > OPipeImpl::getSupportedServiceNames(void) throw(  )
452 {
453     return OPipeImpl_getSupportedServiceNames();
454 }
455 
456 
457 
458 
459 
460 /* implementation functions
461 *
462 *
463 */
464 
465 
OPipeImpl_CreateInstance(const Reference<XComponentContext> &)466 Reference < XInterface > SAL_CALL OPipeImpl_CreateInstance(
467     const Reference < XComponentContext > & ) throw(Exception)
468 {
469     OPipeImpl *p = new OPipeImpl;
470 
471     return Reference < XInterface > ( SAL_STATIC_CAST( OWeakObject * , p ) );
472 }
473 
474 
OPipeImpl_getImplementationName()475 OUString    OPipeImpl_getImplementationName()
476 {
477     return OUString( RTL_CONSTASCII_USTRINGPARAM ( IMPLEMENTATION_NAME ) );
478 }
479 
OPipeImpl_getSupportedServiceNames(void)480 Sequence<OUString> OPipeImpl_getSupportedServiceNames(void)
481 {
482     Sequence<OUString> aRet(1);
483     aRet.getArray()[0] = OUString( RTL_CONSTASCII_USTRINGPARAM( SERVICE_NAME ));
484     return aRet;
485 }
486 }
487 
488 
489