1 /************************************************************************* 2 * 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * Copyright 2000, 2010 Oracle and/or its affiliates. 6 * 7 * OpenOffice.org - a multi-platform office productivity suite 8 * 9 * This file is part of OpenOffice.org. 10 * 11 * OpenOffice.org is free software: you can redistribute it and/or modify 12 * it under the terms of the GNU Lesser General Public License version 3 13 * only, as published by the Free Software Foundation. 14 * 15 * OpenOffice.org is distributed in the hope that it will be useful, 16 * but WITHOUT ANY WARRANTY; without even the implied warranty of 17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 18 * GNU Lesser General Public License version 3 for more details 19 * (a copy is included in the LICENSE file that accompanied this code). 20 * 21 * You should have received a copy of the GNU Lesser General Public License 22 * version 3 along with OpenOffice.org. If not, see 23 * <http://www.openoffice.org/license.html> 24 * for a copy of the LGPLv3 License. 25 * 26 ************************************************************************/ 27 28 // MARKER(update_precomp.py): autogen include statement, do not remove 29 #include "precompiled_io.hxx" 30 31 // streams 32 #include <com/sun/star/io/XInputStream.hpp> 33 #include <com/sun/star/io/XOutputStream.hpp> 34 #include <com/sun/star/io/XConnectable.hpp> 35 36 #include <com/sun/star/lang/XServiceInfo.hpp> 37 38 #include <cppuhelper/factory.hxx> 39 40 #include <cppuhelper/implbase4.hxx> // OWeakObject 41 42 #include <osl/conditn.hxx> 43 #include <osl/mutex.hxx> 44 45 #include <limits> 46 #include <string.h> 47 48 using namespace ::rtl; 49 using namespace ::osl; 50 using namespace ::cppu; 51 using namespace ::com::sun::star::uno; 52 using namespace ::com::sun::star::io; 53 using namespace ::com::sun::star::lang; 54 55 #include "factreg.hxx" 56 #include "streamhelper.hxx" 57 58 // Implementation and service names 59 #define IMPLEMENTATION_NAME "com.sun.star.comp.io.stm.Pipe" 60 #define SERVICE_NAME "com.sun.star.io.Pipe" 61 62 namespace io_stm{ 63 64 class OPipeImpl : 65 public WeakImplHelper4< XInputStream , XOutputStream , XConnectable , XServiceInfo > 66 { 67 public: 68 OPipeImpl( ); 69 ~OPipeImpl(); 70 71 public: // XInputStream 72 virtual sal_Int32 SAL_CALL readBytes(Sequence< sal_Int8 >& aData, sal_Int32 nBytesToRead) 73 throw( NotConnectedException, 74 BufferSizeExceededException, 75 RuntimeException ); 76 virtual sal_Int32 SAL_CALL readSomeBytes(Sequence< sal_Int8 >& aData, sal_Int32 nMaxBytesToRead) 77 throw( NotConnectedException, 78 BufferSizeExceededException, 79 RuntimeException ); 80 virtual void SAL_CALL skipBytes(sal_Int32 nBytesToSkip) 81 throw( NotConnectedException, 82 BufferSizeExceededException, 83 RuntimeException ); 84 virtual sal_Int32 SAL_CALL available(void) 85 throw( NotConnectedException, 86 RuntimeException ); 87 virtual void SAL_CALL closeInput(void) 88 throw( NotConnectedException, 89 RuntimeException ); 90 91 public: // XOutputStream 92 93 virtual void SAL_CALL writeBytes(const Sequence< sal_Int8 >& aData) 94 throw( NotConnectedException, 95 BufferSizeExceededException, 96 RuntimeException ); 97 virtual void SAL_CALL flush(void) 98 throw( NotConnectedException, 99 BufferSizeExceededException, 100 RuntimeException ); 101 virtual void SAL_CALL closeOutput(void) 102 throw( NotConnectedException, 103 BufferSizeExceededException, 104 RuntimeException ); 105 106 public: // XConnectable 107 virtual void SAL_CALL setPredecessor(const Reference< XConnectable >& aPredecessor) 108 throw( RuntimeException ); 109 virtual Reference< XConnectable > SAL_CALL getPredecessor(void) throw( RuntimeException ); 110 virtual void SAL_CALL setSuccessor(const Reference < XConnectable > & aSuccessor) 111 throw( RuntimeException ); 112 virtual Reference < XConnectable > SAL_CALL getSuccessor(void) throw( RuntimeException ) ; 113 114 115 public: // XServiceInfo 116 OUString SAL_CALL getImplementationName() throw( ); 117 Sequence< OUString > SAL_CALL getSupportedServiceNames(void) throw( ); 118 sal_Bool SAL_CALL supportsService(const OUString& ServiceName) throw( ); 119 120 private: 121 122 // DEBUG 123 inline void checkInvariant(); 124 125 Reference < XConnectable > m_succ; 126 Reference < XConnectable > m_pred; 127 128 sal_Int32 m_nBytesToSkip; 129 130 sal_Int8 *m_p; 131 132 sal_Bool m_bOutputStreamClosed; 133 sal_Bool m_bInputStreamClosed; 134 135 oslCondition m_conditionBytesAvail; 136 Mutex m_mutexAccess; 137 IFIFO *m_pFIFO; 138 }; 139 140 141 142 OPipeImpl::OPipeImpl() 143 { 144 g_moduleCount.modCnt.acquire( &g_moduleCount.modCnt ); 145 m_nBytesToSkip = 0; 146 147 m_bOutputStreamClosed = sal_False; 148 m_bInputStreamClosed = sal_False; 149 150 m_pFIFO = new MemFIFO; 151 m_conditionBytesAvail = osl_createCondition(); 152 } 153 154 OPipeImpl::~OPipeImpl() 155 { 156 osl_destroyCondition( m_conditionBytesAvail ); 157 delete m_pFIFO; 158 g_moduleCount.modCnt.release( &g_moduleCount.modCnt ); 159 } 160 161 162 // These invariants must hold when entering a guarded method or leaving a guarded method. 163 void OPipeImpl::checkInvariant() 164 { 165 166 } 167 168 sal_Int32 OPipeImpl::readBytes(Sequence< sal_Int8 >& aData, sal_Int32 nBytesToRead) 169 throw( NotConnectedException, BufferSizeExceededException,RuntimeException ) 170 { 171 while( sal_True ) 172 { 173 { // start guarded section 174 MutexGuard guard( m_mutexAccess ); 175 if( m_bInputStreamClosed ) 176 { 177 throw NotConnectedException( 178 OUString( RTL_CONSTASCII_USTRINGPARAM( "Pipe::readBytes NotConnectedException" )), 179 *this ); 180 } 181 sal_Int32 nOccupiedBufferLen = m_pFIFO->getSize(); 182 183 if( m_bOutputStreamClosed && nBytesToRead > nOccupiedBufferLen ) 184 { 185 nBytesToRead = nOccupiedBufferLen; 186 } 187 188 if( nOccupiedBufferLen < nBytesToRead ) 189 { 190 // wait outside guarded section 191 osl_resetCondition( m_conditionBytesAvail ); 192 } 193 else { 194 // necessary bytes are available 195 m_pFIFO->read( aData , nBytesToRead ); 196 return nBytesToRead; 197 } 198 } // end guarded section 199 200 // wait for new data outside guarded section! 201 osl_waitCondition( m_conditionBytesAvail , 0 ); 202 } 203 } 204 205 206 sal_Int32 OPipeImpl::readSomeBytes(Sequence< sal_Int8 >& aData, sal_Int32 nMaxBytesToRead) 207 throw( NotConnectedException, 208 BufferSizeExceededException, 209 RuntimeException ) 210 { 211 while( sal_True ) { 212 { 213 MutexGuard guard( m_mutexAccess ); 214 if( m_bInputStreamClosed ) 215 { 216 throw NotConnectedException( 217 OUString( RTL_CONSTASCII_USTRINGPARAM( "Pipe::readSomeBytes NotConnectedException" )), 218 *this ); 219 } 220 if( m_pFIFO->getSize() ) 221 { 222 sal_Int32 nSize = Min( nMaxBytesToRead , m_pFIFO->getSize() ); 223 aData.realloc( nSize ); 224 m_pFIFO->read( aData , nSize ); 225 return nSize; 226 } 227 228 if( m_bOutputStreamClosed ) 229 { 230 // no bytes in buffer anymore 231 return 0; 232 } 233 } 234 235 osl_waitCondition( m_conditionBytesAvail , 0 ); 236 } 237 } 238 239 240 void OPipeImpl::skipBytes(sal_Int32 nBytesToSkip) 241 throw( NotConnectedException, 242 BufferSizeExceededException, 243 RuntimeException ) 244 { 245 MutexGuard guard( m_mutexAccess ); 246 if( m_bInputStreamClosed ) 247 { 248 throw NotConnectedException( 249 OUString( RTL_CONSTASCII_USTRINGPARAM( "Pipe::skipBytes NotConnectedException" ) ), 250 *this ); 251 } 252 253 if( nBytesToSkip < 0 254 || (nBytesToSkip 255 > std::numeric_limits< sal_Int32 >::max() - m_nBytesToSkip) ) 256 { 257 throw BufferSizeExceededException( 258 OUString( RTL_CONSTASCII_USTRINGPARAM( "Pipe::skipBytes BufferSizeExceededException" )), 259 *this ); 260 } 261 m_nBytesToSkip += nBytesToSkip; 262 263 nBytesToSkip = Min( m_pFIFO->getSize() , m_nBytesToSkip ); 264 m_pFIFO->skip( nBytesToSkip ); 265 m_nBytesToSkip -= nBytesToSkip; 266 } 267 268 269 sal_Int32 OPipeImpl::available(void) 270 throw( NotConnectedException, 271 RuntimeException ) 272 { 273 MutexGuard guard( m_mutexAccess ); 274 if( m_bInputStreamClosed ) 275 { 276 throw NotConnectedException( 277 OUString( RTL_CONSTASCII_USTRINGPARAM( "Pipe::available NotConnectedException" ) ), 278 *this ); 279 } 280 checkInvariant(); 281 return m_pFIFO->getSize(); 282 } 283 284 void OPipeImpl::closeInput(void) 285 throw( NotConnectedException, 286 RuntimeException) 287 { 288 MutexGuard guard( m_mutexAccess ); 289 290 m_bInputStreamClosed = sal_True; 291 292 delete m_pFIFO; 293 m_pFIFO = 0; 294 295 // readBytes may throw an exception 296 osl_setCondition( m_conditionBytesAvail ); 297 298 setSuccessor( Reference< XConnectable > () ); 299 return; 300 } 301 302 303 void OPipeImpl::writeBytes(const Sequence< sal_Int8 >& aData) 304 throw( NotConnectedException, 305 BufferSizeExceededException, 306 RuntimeException) 307 { 308 MutexGuard guard( m_mutexAccess ); 309 checkInvariant(); 310 311 if( m_bOutputStreamClosed ) 312 { 313 throw NotConnectedException( 314 OUString( RTL_CONSTASCII_USTRINGPARAM( "Pipe::writeBytes NotConnectedException (outputstream)" )), 315 *this ); 316 } 317 318 if( m_bInputStreamClosed ) 319 { 320 throw NotConnectedException( 321 OUString( RTL_CONSTASCII_USTRINGPARAM( "Pipe::writeBytes NotConnectedException (inputstream)" )), 322 *this ); 323 } 324 325 // check skipping 326 sal_Int32 nLen = aData.getLength(); 327 if( m_nBytesToSkip && m_nBytesToSkip >= nLen ) { 328 // all must be skipped - forget whole call 329 m_nBytesToSkip -= nLen; 330 return; 331 } 332 333 // adjust buffersize if necessary 334 335 try 336 { 337 if( m_nBytesToSkip ) 338 { 339 Sequence< sal_Int8 > seqCopy( nLen - m_nBytesToSkip ); 340 memcpy( seqCopy.getArray() , &( aData.getConstArray()[m_nBytesToSkip] ) , nLen-m_nBytesToSkip ); 341 m_pFIFO->write( seqCopy ); 342 } 343 else 344 { 345 m_pFIFO->write( aData ); 346 } 347 m_nBytesToSkip = 0; 348 } 349 catch ( IFIFO_OutOfBoundsException & ) 350 { 351 throw BufferSizeExceededException( 352 OUString( RTL_CONSTASCII_USTRINGPARAM( "Pipe::writeBytes BufferSizeExceededException" )), 353 *this ); 354 } 355 catch ( IFIFO_OutOfMemoryException & ) 356 { 357 throw BufferSizeExceededException( 358 OUString( RTL_CONSTASCII_USTRINGPARAM( "Pipe::writeBytes BufferSizeExceededException" )), 359 *this ); 360 } 361 362 // readBytes may check again if enough bytes are available 363 osl_setCondition( m_conditionBytesAvail ); 364 365 checkInvariant(); 366 } 367 368 369 void OPipeImpl::flush(void) 370 throw( NotConnectedException, 371 BufferSizeExceededException, 372 RuntimeException) 373 { 374 // nothing to do for a pipe 375 return; 376 } 377 378 void OPipeImpl::closeOutput(void) 379 throw( NotConnectedException, 380 BufferSizeExceededException, 381 RuntimeException) 382 { 383 MutexGuard guard( m_mutexAccess ); 384 385 m_bOutputStreamClosed = sal_True; 386 osl_setCondition( m_conditionBytesAvail ); 387 setPredecessor( Reference < XConnectable > () ); 388 return; 389 } 390 391 392 void OPipeImpl::setSuccessor( const Reference < XConnectable > &r ) 393 throw( RuntimeException ) 394 { 395 /// if the references match, nothing needs to be done 396 if( m_succ != r ) { 397 /// store the reference for later use 398 m_succ = r; 399 400 if( m_succ.is() ) 401 { 402 m_succ->setPredecessor( 403 Reference< XConnectable > ( SAL_STATIC_CAST( XConnectable * , this ) ) ); 404 } 405 } 406 } 407 408 Reference < XConnectable > OPipeImpl::getSuccessor() throw( RuntimeException ) 409 { 410 return m_succ; 411 } 412 413 414 // XDataSource 415 void OPipeImpl::setPredecessor( const Reference < XConnectable > &r ) 416 throw( RuntimeException ) 417 { 418 if( r != m_pred ) { 419 m_pred = r; 420 if( m_pred.is() ) { 421 m_pred->setSuccessor( 422 Reference < XConnectable > ( SAL_STATIC_CAST( XConnectable * , this ) ) ); 423 } 424 } 425 } 426 427 Reference < XConnectable > OPipeImpl::getPredecessor() throw( RuntimeException ) 428 { 429 return m_pred; 430 } 431 432 433 434 435 // XServiceInfo 436 OUString OPipeImpl::getImplementationName() throw( ) 437 { 438 return OPipeImpl_getImplementationName(); 439 } 440 441 // XServiceInfo 442 sal_Bool OPipeImpl::supportsService(const OUString& ServiceName) throw( ) 443 { 444 Sequence< OUString > aSNL = getSupportedServiceNames(); 445 const OUString * pArray = aSNL.getConstArray(); 446 447 for( sal_Int32 i = 0; i < aSNL.getLength(); i++ ) 448 if( pArray[i] == ServiceName ) 449 return sal_True; 450 451 return sal_False; 452 } 453 454 // XServiceInfo 455 Sequence< OUString > OPipeImpl::getSupportedServiceNames(void) throw( ) 456 { 457 return OPipeImpl_getSupportedServiceNames(); 458 } 459 460 461 462 463 464 /* implementation functions 465 * 466 * 467 */ 468 469 470 Reference < XInterface > SAL_CALL OPipeImpl_CreateInstance( 471 const Reference < XComponentContext > & ) throw(Exception) 472 { 473 OPipeImpl *p = new OPipeImpl; 474 475 return Reference < XInterface > ( SAL_STATIC_CAST( OWeakObject * , p ) ); 476 } 477 478 479 OUString OPipeImpl_getImplementationName() 480 { 481 return OUString( RTL_CONSTASCII_USTRINGPARAM ( IMPLEMENTATION_NAME ) ); 482 } 483 484 Sequence<OUString> OPipeImpl_getSupportedServiceNames(void) 485 { 486 Sequence<OUString> aRet(1); 487 aRet.getArray()[0] = OUString( RTL_CONSTASCII_USTRINGPARAM( SERVICE_NAME )); 488 return aRet; 489 } 490 } 491 492 493