xref: /AOO41X/main/io/source/stm/opipe.cxx (revision 1ecadb572e7010ff3b3382ad9bf179dbc6efadbb)
1 /*************************************************************************
2  *
3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4  *
5  * Copyright 2000, 2010 Oracle and/or its affiliates.
6  *
7  * OpenOffice.org - a multi-platform office productivity suite
8  *
9  * This file is part of OpenOffice.org.
10  *
11  * OpenOffice.org is free software: you can redistribute it and/or modify
12  * it under the terms of the GNU Lesser General Public License version 3
13  * only, as published by the Free Software Foundation.
14  *
15  * OpenOffice.org is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  * GNU Lesser General Public License version 3 for more details
19  * (a copy is included in the LICENSE file that accompanied this code).
20  *
21  * You should have received a copy of the GNU Lesser General Public License
22  * version 3 along with OpenOffice.org.  If not, see
23  * <http://www.openoffice.org/license.html>
24  * for a copy of the LGPLv3 License.
25  *
26  ************************************************************************/
27 
28 // MARKER(update_precomp.py): autogen include statement, do not remove
29 #include "precompiled_io.hxx"
30 
31 // streams
32 #include <com/sun/star/io/XInputStream.hpp>
33 #include <com/sun/star/io/XOutputStream.hpp>
34 #include <com/sun/star/io/XConnectable.hpp>
35 
36 #include <com/sun/star/lang/XServiceInfo.hpp>
37 
38 #include <cppuhelper/factory.hxx>
39 
40 #include <cppuhelper/implbase4.hxx>      // OWeakObject
41 
42 #include <osl/conditn.hxx>
43 #include <osl/mutex.hxx>
44 
45 #include <limits>
46 #include <string.h>
47 
48 using namespace ::rtl;
49 using namespace ::osl;
50 using namespace ::cppu;
51 using namespace ::com::sun::star::uno;
52 using namespace ::com::sun::star::io;
53 using namespace ::com::sun::star::lang;
54 
55 #include "factreg.hxx"
56 #include "streamhelper.hxx"
57 
58 // Implementation and service names
59 #define IMPLEMENTATION_NAME	"com.sun.star.comp.io.stm.Pipe"
60 #define SERVICE_NAME "com.sun.star.io.Pipe"
61 
62 namespace io_stm{
63 
64 class OPipeImpl :
65 	public WeakImplHelper4< XInputStream , XOutputStream , XConnectable , XServiceInfo >
66 {
67 public:
68 	OPipeImpl( );
69 	~OPipeImpl();
70 
71 public: // XInputStream
72     virtual sal_Int32 SAL_CALL readBytes(Sequence< sal_Int8 >& aData, sal_Int32 nBytesToRead)
73 		throw(	NotConnectedException,
74 				BufferSizeExceededException,
75 				RuntimeException );
76     virtual sal_Int32 SAL_CALL readSomeBytes(Sequence< sal_Int8 >& aData, sal_Int32 nMaxBytesToRead)
77 		throw( NotConnectedException,
78 			   BufferSizeExceededException,
79 			   RuntimeException );
80     virtual void SAL_CALL skipBytes(sal_Int32 nBytesToSkip)
81 		throw( NotConnectedException,
82 			   BufferSizeExceededException,
83 			   RuntimeException );
84     virtual sal_Int32 SAL_CALL available(void)
85 		throw( NotConnectedException,
86 			   RuntimeException );
87     virtual void SAL_CALL closeInput(void)
88 		throw( NotConnectedException,
89 			   RuntimeException );
90 
91 public: // XOutputStream
92 
93     virtual void SAL_CALL writeBytes(const Sequence< sal_Int8 >& aData)
94 		throw( NotConnectedException,
95 			   BufferSizeExceededException,
96 			   RuntimeException );
97     virtual void SAL_CALL flush(void)
98 		throw( NotConnectedException,
99 			   BufferSizeExceededException,
100 			   RuntimeException );
101     virtual void SAL_CALL closeOutput(void)
102 		throw( NotConnectedException,
103 			   BufferSizeExceededException,
104 			   RuntimeException );
105 
106 public: // XConnectable
107     virtual void SAL_CALL setPredecessor(const Reference< XConnectable >& aPredecessor)
108 		throw( RuntimeException );
109     virtual Reference< XConnectable > SAL_CALL getPredecessor(void) throw( RuntimeException );
110     virtual void SAL_CALL setSuccessor(const Reference < XConnectable > & aSuccessor)
111 		throw( RuntimeException );
112     virtual Reference < XConnectable > SAL_CALL getSuccessor(void) throw( RuntimeException ) ;
113 
114 
115 public: // XServiceInfo
116     OUString                    SAL_CALL getImplementationName() throw(  );
117     Sequence< OUString >         SAL_CALL getSupportedServiceNames(void) throw(  );
118     sal_Bool                        SAL_CALL supportsService(const OUString& ServiceName) throw(  );
119 
120 private:
121 
122 	// DEBUG
123 	inline void checkInvariant();
124 
125 	Reference < XConnectable > 	m_succ;
126 	Reference < XConnectable > 	m_pred;
127 
128 	sal_Int32 m_nBytesToSkip;
129 
130 	sal_Int8  *m_p;
131 
132 	sal_Bool m_bOutputStreamClosed;
133 	sal_Bool m_bInputStreamClosed;
134 
135 	oslCondition m_conditionBytesAvail;
136 	Mutex     m_mutexAccess;
137 	IFIFO		*m_pFIFO;
138 };
139 
140 
141 
142 OPipeImpl::OPipeImpl()
143 {
144 	g_moduleCount.modCnt.acquire( &g_moduleCount.modCnt );
145 	m_nBytesToSkip  = 0;
146 
147 	m_bOutputStreamClosed 	= sal_False;
148 	m_bInputStreamClosed 	= sal_False;
149 
150 	m_pFIFO = new MemFIFO;
151 	m_conditionBytesAvail = osl_createCondition();
152 }
153 
154 OPipeImpl::~OPipeImpl()
155 {
156 	osl_destroyCondition( m_conditionBytesAvail );
157 	delete m_pFIFO;
158 	g_moduleCount.modCnt.release( &g_moduleCount.modCnt );
159 }
160 
161 
162 // These invariants must hold when entering a guarded method or leaving a guarded method.
163 void OPipeImpl::checkInvariant()
164 {
165 
166 }
167 
168 sal_Int32 OPipeImpl::readBytes(Sequence< sal_Int8 >& aData, sal_Int32 nBytesToRead)
169 	throw( NotConnectedException, BufferSizeExceededException,RuntimeException )
170 {
171 	while( sal_True )
172 	{
173 		{ // start guarded section
174 			MutexGuard guard( m_mutexAccess );
175 			if( m_bInputStreamClosed )
176 			{
177 				throw NotConnectedException(
178                     OUString( RTL_CONSTASCII_USTRINGPARAM( "Pipe::readBytes NotConnectedException" )),
179                     *this );
180 			}
181 			sal_Int32 nOccupiedBufferLen = m_pFIFO->getSize();
182 
183 			if( m_bOutputStreamClosed && nBytesToRead > nOccupiedBufferLen )
184 			{
185 				nBytesToRead = nOccupiedBufferLen;
186 			}
187 
188 			if( nOccupiedBufferLen < nBytesToRead )
189 			{
190 				// wait outside guarded section
191 				osl_resetCondition( m_conditionBytesAvail );
192 			}
193 			else {
194 				// necessary bytes are available
195 				m_pFIFO->read( aData , nBytesToRead );
196 				return nBytesToRead;
197 			}
198 		} // end guarded section
199 
200 		// wait for new data outside guarded section!
201 		osl_waitCondition( m_conditionBytesAvail , 0 );
202 	}
203 }
204 
205 
206 sal_Int32 OPipeImpl::readSomeBytes(Sequence< sal_Int8 >& aData, sal_Int32 nMaxBytesToRead)
207 	throw( NotConnectedException,
208 		   BufferSizeExceededException,
209 		   RuntimeException )
210 {
211 	while( sal_True ) {
212 		{
213 			MutexGuard guard( m_mutexAccess );
214 			if( m_bInputStreamClosed )
215 			{
216 				throw NotConnectedException(
217                     OUString( RTL_CONSTASCII_USTRINGPARAM( "Pipe::readSomeBytes NotConnectedException" )),
218                     *this );
219 			}
220 			if( m_pFIFO->getSize() )
221 			{
222 				sal_Int32 nSize = Min( nMaxBytesToRead , m_pFIFO->getSize() );
223 				aData.realloc( nSize );
224 				m_pFIFO->read( aData , nSize );
225 				return nSize;
226 			}
227 
228 			if( m_bOutputStreamClosed )
229 			{
230 				// no bytes in buffer anymore
231 				return 0;
232 			}
233 		}
234 
235 		osl_waitCondition( m_conditionBytesAvail , 0 );
236 	}
237 }
238 
239 
240 void OPipeImpl::skipBytes(sal_Int32 nBytesToSkip)
241 	throw( NotConnectedException,
242 		   BufferSizeExceededException,
243 		   RuntimeException )
244 {
245 	MutexGuard guard( m_mutexAccess );
246     if( m_bInputStreamClosed )
247     {
248         throw NotConnectedException(
249             OUString( RTL_CONSTASCII_USTRINGPARAM( "Pipe::skipBytes NotConnectedException" ) ),
250             *this );
251     }
252 
253 	if( nBytesToSkip < 0
254         || (nBytesToSkip
255             > std::numeric_limits< sal_Int32 >::max() - m_nBytesToSkip) )
256 	{
257 		throw BufferSizeExceededException(
258             OUString( RTL_CONSTASCII_USTRINGPARAM( "Pipe::skipBytes BufferSizeExceededException" )),
259             *this );
260 	}
261 	m_nBytesToSkip += nBytesToSkip;
262 
263 	nBytesToSkip = Min( m_pFIFO->getSize() , m_nBytesToSkip );
264 	m_pFIFO->skip( nBytesToSkip );
265 	m_nBytesToSkip -= nBytesToSkip;
266 }
267 
268 
269 sal_Int32 OPipeImpl::available(void)
270 	throw( NotConnectedException,
271 		   RuntimeException )
272  {
273 	MutexGuard guard( m_mutexAccess );
274     if( m_bInputStreamClosed )
275     {
276         throw NotConnectedException(
277             OUString( RTL_CONSTASCII_USTRINGPARAM( "Pipe::available NotConnectedException" ) ),
278             *this );
279     }
280 	checkInvariant();
281 	return m_pFIFO->getSize();
282 }
283 
284 void OPipeImpl::closeInput(void)
285 	throw( NotConnectedException,
286 		   RuntimeException)
287 {
288 	MutexGuard guard( m_mutexAccess );
289 
290 	m_bInputStreamClosed = sal_True;
291 
292 	delete m_pFIFO;
293 	m_pFIFO = 0;
294 
295 	// readBytes may throw an exception
296 	osl_setCondition( m_conditionBytesAvail );
297 
298 	setSuccessor( Reference< XConnectable > () );
299 	return;
300 }
301 
302 
303 void OPipeImpl::writeBytes(const Sequence< sal_Int8 >& aData)
304 	throw( NotConnectedException,
305 		   BufferSizeExceededException,
306 		   RuntimeException)
307 {
308 	MutexGuard guard( m_mutexAccess );
309 	checkInvariant();
310 
311 	if( m_bOutputStreamClosed )
312 	{
313         throw NotConnectedException(
314             OUString( RTL_CONSTASCII_USTRINGPARAM( "Pipe::writeBytes NotConnectedException (outputstream)" )),
315             *this );
316 	}
317 
318 	if( m_bInputStreamClosed )
319 	{
320         throw NotConnectedException(
321             OUString( RTL_CONSTASCII_USTRINGPARAM( "Pipe::writeBytes NotConnectedException (inputstream)" )),
322             *this );
323 	}
324 
325 	// check skipping
326 	sal_Int32 nLen = aData.getLength();
327 	if( m_nBytesToSkip  && m_nBytesToSkip >= nLen  ) {
328 		// all must be skipped - forget whole call
329 		m_nBytesToSkip -= nLen;
330 		return;
331 	}
332 
333 	// adjust buffersize if necessary
334 
335 	try
336 	{
337 		if( m_nBytesToSkip )
338 		{
339 			Sequence< sal_Int8 > seqCopy( nLen - m_nBytesToSkip );
340 			memcpy( seqCopy.getArray() , &( aData.getConstArray()[m_nBytesToSkip] ) , nLen-m_nBytesToSkip );
341 			m_pFIFO->write( seqCopy );
342 		}
343 		else
344 		{
345 			m_pFIFO->write( aData );
346 		}
347 		m_nBytesToSkip = 0;
348 	}
349 	catch ( IFIFO_OutOfBoundsException & )
350 	{
351 		throw BufferSizeExceededException(
352             OUString( RTL_CONSTASCII_USTRINGPARAM( "Pipe::writeBytes BufferSizeExceededException" )),
353             *this );
354 	}
355 	catch ( IFIFO_OutOfMemoryException & )
356 	{
357 		throw BufferSizeExceededException(
358             OUString( RTL_CONSTASCII_USTRINGPARAM( "Pipe::writeBytes BufferSizeExceededException" )),
359             *this );
360 	}
361 
362 	// readBytes may check again if enough bytes are available
363 	osl_setCondition( m_conditionBytesAvail );
364 
365 	checkInvariant();
366 }
367 
368 
369 void OPipeImpl::flush(void)
370 	throw( NotConnectedException,
371 		   BufferSizeExceededException,
372 		   RuntimeException)
373 {
374 	// nothing to do for a pipe
375 	return;
376 }
377 
378 void OPipeImpl::closeOutput(void)
379 	throw( NotConnectedException,
380 		   BufferSizeExceededException,
381 		   RuntimeException)
382 {
383 	MutexGuard guard( m_mutexAccess );
384 
385 	m_bOutputStreamClosed = sal_True;
386 	osl_setCondition( m_conditionBytesAvail );
387 	setPredecessor( Reference < XConnectable > () );
388 	return;
389 }
390 
391 
392 void OPipeImpl::setSuccessor( const Reference < XConnectable >  &r )
393 	throw( RuntimeException )
394 {
395      /// if the references match, nothing needs to be done
396      if( m_succ != r ) {
397          /// store the reference for later use
398          m_succ = r;
399 
400          if( m_succ.is() )
401 		 {
402               m_succ->setPredecessor(
403 				  Reference< XConnectable > ( SAL_STATIC_CAST( XConnectable * , this ) ) );
404          }
405      }
406 }
407 
408 Reference < XConnectable > OPipeImpl::getSuccessor() 	throw( RuntimeException )
409 {
410 	return m_succ;
411 }
412 
413 
414 // XDataSource
415 void OPipeImpl::setPredecessor( const Reference < XConnectable > &r )
416 	throw( RuntimeException )
417 {
418 	if( r != m_pred ) {
419 		m_pred = r;
420 		if( m_pred.is() ) {
421 			m_pred->setSuccessor(
422 				Reference < XConnectable > ( SAL_STATIC_CAST( XConnectable * , this ) ) );
423 		}
424 	}
425 }
426 
427 Reference < XConnectable > OPipeImpl::getPredecessor() throw( RuntimeException )
428 {
429 	return m_pred;
430 }
431 
432 
433 
434 
435 // XServiceInfo
436 OUString OPipeImpl::getImplementationName() throw(  )
437 {
438     return OPipeImpl_getImplementationName();
439 }
440 
441 // XServiceInfo
442 sal_Bool OPipeImpl::supportsService(const OUString& ServiceName) throw(  )
443 {
444     Sequence< OUString > aSNL = getSupportedServiceNames();
445     const OUString * pArray = aSNL.getConstArray();
446 
447     for( sal_Int32 i = 0; i < aSNL.getLength(); i++ )
448         if( pArray[i] == ServiceName )
449             return sal_True;
450 
451     return sal_False;
452 }
453 
454 // XServiceInfo
455 Sequence< OUString > OPipeImpl::getSupportedServiceNames(void) throw(  )
456 {
457 	return OPipeImpl_getSupportedServiceNames();
458 }
459 
460 
461 
462 
463 
464 /* implementation functions
465 *
466 *
467 */
468 
469 
470 Reference < XInterface > SAL_CALL OPipeImpl_CreateInstance(
471 	const Reference < XComponentContext > & ) throw(Exception)
472 {
473 	OPipeImpl *p = new OPipeImpl;
474 
475 	return Reference < XInterface > ( SAL_STATIC_CAST( OWeakObject * , p ) );
476 }
477 
478 
479 OUString 	OPipeImpl_getImplementationName()
480 {
481 	return OUString( RTL_CONSTASCII_USTRINGPARAM ( IMPLEMENTATION_NAME ) );
482 }
483 
484 Sequence<OUString> OPipeImpl_getSupportedServiceNames(void)
485 {
486 	Sequence<OUString> aRet(1);
487 	aRet.getArray()[0] = OUString( RTL_CONSTASCII_USTRINGPARAM( SERVICE_NAME ));
488 	return aRet;
489 }
490 }
491 
492 
493