1 /************************************************************** 2 * 3 * Licensed to the Apache Software Foundation (ASF) under one 4 * or more contributor license agreements. See the NOTICE file 5 * distributed with this work for additional information 6 * regarding copyright ownership. The ASF licenses this file 7 * to you under the Apache License, Version 2.0 (the 8 * "License"); you may not use this file except in compliance 9 * with the License. You may obtain a copy of the License at 10 * 11 * http://www.apache.org/licenses/LICENSE-2.0 12 * 13 * Unless required by applicable law or agreed to in writing, 14 * software distributed under the License is distributed on an 15 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 16 * KIND, either express or implied. See the License for the 17 * specific language governing permissions and limitations 18 * under the License. 19 * 20 *************************************************************/ 21 22 23 24 // MARKER(update_precomp.py): autogen include statement, do not remove 25 #include "precompiled_io.hxx" 26 27 // streams 28 #include <com/sun/star/io/XInputStream.hpp> 29 #include <com/sun/star/io/XOutputStream.hpp> 30 #include <com/sun/star/io/XConnectable.hpp> 31 32 #include <com/sun/star/lang/XServiceInfo.hpp> 33 34 #include <cppuhelper/factory.hxx> 35 36 #include <cppuhelper/implbase4.hxx> // OWeakObject 37 38 #include <osl/conditn.hxx> 39 #include <osl/mutex.hxx> 40 41 #include <limits> 42 #include <string.h> 43 44 using namespace ::rtl; 45 using namespace ::osl; 46 using namespace ::cppu; 47 using namespace ::com::sun::star::uno; 48 using namespace ::com::sun::star::io; 49 using namespace ::com::sun::star::lang; 50 51 #include "factreg.hxx" 52 #include "streamhelper.hxx" 53 54 // Implementation and service names 55 #define IMPLEMENTATION_NAME "com.sun.star.comp.io.stm.Pipe" 56 #define SERVICE_NAME "com.sun.star.io.Pipe" 57 58 namespace io_stm{ 59 60 class OPipeImpl : 61 public WeakImplHelper4< XInputStream , XOutputStream , XConnectable , XServiceInfo > 62 { 63 public: 64 OPipeImpl( ); 65 ~OPipeImpl(); 66 67 public: // XInputStream 68 virtual sal_Int32 SAL_CALL readBytes(Sequence< sal_Int8 >& aData, sal_Int32 nBytesToRead) 69 throw( NotConnectedException, 70 BufferSizeExceededException, 71 RuntimeException ); 72 virtual sal_Int32 SAL_CALL readSomeBytes(Sequence< sal_Int8 >& aData, sal_Int32 nMaxBytesToRead) 73 throw( NotConnectedException, 74 BufferSizeExceededException, 75 RuntimeException ); 76 virtual void SAL_CALL skipBytes(sal_Int32 nBytesToSkip) 77 throw( NotConnectedException, 78 BufferSizeExceededException, 79 RuntimeException ); 80 virtual sal_Int32 SAL_CALL available(void) 81 throw( NotConnectedException, 82 RuntimeException ); 83 virtual void SAL_CALL closeInput(void) 84 throw( NotConnectedException, 85 RuntimeException ); 86 87 public: // XOutputStream 88 89 virtual void SAL_CALL writeBytes(const Sequence< sal_Int8 >& aData) 90 throw( NotConnectedException, 91 BufferSizeExceededException, 92 RuntimeException ); 93 virtual void SAL_CALL flush(void) 94 throw( NotConnectedException, 95 BufferSizeExceededException, 96 RuntimeException ); 97 virtual void SAL_CALL closeOutput(void) 98 throw( NotConnectedException, 99 BufferSizeExceededException, 100 RuntimeException ); 101 102 public: // XConnectable 103 virtual void SAL_CALL setPredecessor(const Reference< XConnectable >& aPredecessor) 104 throw( RuntimeException ); 105 virtual Reference< XConnectable > SAL_CALL getPredecessor(void) throw( RuntimeException ); 106 virtual void SAL_CALL setSuccessor(const Reference < XConnectable > & aSuccessor) 107 throw( RuntimeException ); 108 virtual Reference < XConnectable > SAL_CALL getSuccessor(void) throw( RuntimeException ) ; 109 110 111 public: // XServiceInfo 112 OUString SAL_CALL getImplementationName() throw( ); 113 Sequence< OUString > SAL_CALL getSupportedServiceNames(void) throw( ); 114 sal_Bool SAL_CALL supportsService(const OUString& ServiceName) throw( ); 115 116 private: 117 118 // DEBUG 119 inline void checkInvariant(); 120 121 Reference < XConnectable > m_succ; 122 Reference < XConnectable > m_pred; 123 124 sal_Int32 m_nBytesToSkip; 125 126 sal_Int8 *m_p; 127 128 sal_Bool m_bOutputStreamClosed; 129 sal_Bool m_bInputStreamClosed; 130 131 oslCondition m_conditionBytesAvail; 132 Mutex m_mutexAccess; 133 IFIFO *m_pFIFO; 134 }; 135 136 137 138 OPipeImpl::OPipeImpl() 139 { 140 g_moduleCount.modCnt.acquire( &g_moduleCount.modCnt ); 141 m_nBytesToSkip = 0; 142 143 m_bOutputStreamClosed = sal_False; 144 m_bInputStreamClosed = sal_False; 145 146 m_pFIFO = new MemFIFO; 147 m_conditionBytesAvail = osl_createCondition(); 148 } 149 150 OPipeImpl::~OPipeImpl() 151 { 152 osl_destroyCondition( m_conditionBytesAvail ); 153 delete m_pFIFO; 154 g_moduleCount.modCnt.release( &g_moduleCount.modCnt ); 155 } 156 157 158 // These invariants must hold when entering a guarded method or leaving a guarded method. 159 void OPipeImpl::checkInvariant() 160 { 161 162 } 163 164 sal_Int32 OPipeImpl::readBytes(Sequence< sal_Int8 >& aData, sal_Int32 nBytesToRead) 165 throw( NotConnectedException, BufferSizeExceededException,RuntimeException ) 166 { 167 while( sal_True ) 168 { 169 { // start guarded section 170 MutexGuard guard( m_mutexAccess ); 171 if( m_bInputStreamClosed ) 172 { 173 throw NotConnectedException( 174 OUString( RTL_CONSTASCII_USTRINGPARAM( "Pipe::readBytes NotConnectedException" )), 175 *this ); 176 } 177 sal_Int32 nOccupiedBufferLen = m_pFIFO->getSize(); 178 179 if( m_bOutputStreamClosed && nBytesToRead > nOccupiedBufferLen ) 180 { 181 nBytesToRead = nOccupiedBufferLen; 182 } 183 184 if( nOccupiedBufferLen < nBytesToRead ) 185 { 186 // wait outside guarded section 187 osl_resetCondition( m_conditionBytesAvail ); 188 } 189 else { 190 // necessary bytes are available 191 m_pFIFO->read( aData , nBytesToRead ); 192 return nBytesToRead; 193 } 194 } // end guarded section 195 196 // wait for new data outside guarded section! 197 osl_waitCondition( m_conditionBytesAvail , 0 ); 198 } 199 } 200 201 202 sal_Int32 OPipeImpl::readSomeBytes(Sequence< sal_Int8 >& aData, sal_Int32 nMaxBytesToRead) 203 throw( NotConnectedException, 204 BufferSizeExceededException, 205 RuntimeException ) 206 { 207 while( sal_True ) { 208 { 209 MutexGuard guard( m_mutexAccess ); 210 if( m_bInputStreamClosed ) 211 { 212 throw NotConnectedException( 213 OUString( RTL_CONSTASCII_USTRINGPARAM( "Pipe::readSomeBytes NotConnectedException" )), 214 *this ); 215 } 216 if( m_pFIFO->getSize() ) 217 { 218 sal_Int32 nSize = Min( nMaxBytesToRead , m_pFIFO->getSize() ); 219 aData.realloc( nSize ); 220 m_pFIFO->read( aData , nSize ); 221 return nSize; 222 } 223 224 if( m_bOutputStreamClosed ) 225 { 226 // no bytes in buffer anymore 227 return 0; 228 } 229 } 230 231 osl_waitCondition( m_conditionBytesAvail , 0 ); 232 } 233 } 234 235 236 void OPipeImpl::skipBytes(sal_Int32 nBytesToSkip) 237 throw( NotConnectedException, 238 BufferSizeExceededException, 239 RuntimeException ) 240 { 241 MutexGuard guard( m_mutexAccess ); 242 if( m_bInputStreamClosed ) 243 { 244 throw NotConnectedException( 245 OUString( RTL_CONSTASCII_USTRINGPARAM( "Pipe::skipBytes NotConnectedException" ) ), 246 *this ); 247 } 248 249 if( nBytesToSkip < 0 250 || (nBytesToSkip 251 > std::numeric_limits< sal_Int32 >::max() - m_nBytesToSkip) ) 252 { 253 throw BufferSizeExceededException( 254 OUString( RTL_CONSTASCII_USTRINGPARAM( "Pipe::skipBytes BufferSizeExceededException" )), 255 *this ); 256 } 257 m_nBytesToSkip += nBytesToSkip; 258 259 nBytesToSkip = Min( m_pFIFO->getSize() , m_nBytesToSkip ); 260 m_pFIFO->skip( nBytesToSkip ); 261 m_nBytesToSkip -= nBytesToSkip; 262 } 263 264 265 sal_Int32 OPipeImpl::available(void) 266 throw( NotConnectedException, 267 RuntimeException ) 268 { 269 MutexGuard guard( m_mutexAccess ); 270 if( m_bInputStreamClosed ) 271 { 272 throw NotConnectedException( 273 OUString( RTL_CONSTASCII_USTRINGPARAM( "Pipe::available NotConnectedException" ) ), 274 *this ); 275 } 276 checkInvariant(); 277 return m_pFIFO->getSize(); 278 } 279 280 void OPipeImpl::closeInput(void) 281 throw( NotConnectedException, 282 RuntimeException) 283 { 284 MutexGuard guard( m_mutexAccess ); 285 286 m_bInputStreamClosed = sal_True; 287 288 delete m_pFIFO; 289 m_pFIFO = 0; 290 291 // readBytes may throw an exception 292 osl_setCondition( m_conditionBytesAvail ); 293 294 setSuccessor( Reference< XConnectable > () ); 295 return; 296 } 297 298 299 void OPipeImpl::writeBytes(const Sequence< sal_Int8 >& aData) 300 throw( NotConnectedException, 301 BufferSizeExceededException, 302 RuntimeException) 303 { 304 MutexGuard guard( m_mutexAccess ); 305 checkInvariant(); 306 307 if( m_bOutputStreamClosed ) 308 { 309 throw NotConnectedException( 310 OUString( RTL_CONSTASCII_USTRINGPARAM( "Pipe::writeBytes NotConnectedException (outputstream)" )), 311 *this ); 312 } 313 314 if( m_bInputStreamClosed ) 315 { 316 throw NotConnectedException( 317 OUString( RTL_CONSTASCII_USTRINGPARAM( "Pipe::writeBytes NotConnectedException (inputstream)" )), 318 *this ); 319 } 320 321 // check skipping 322 sal_Int32 nLen = aData.getLength(); 323 if( m_nBytesToSkip && m_nBytesToSkip >= nLen ) { 324 // all must be skipped - forget whole call 325 m_nBytesToSkip -= nLen; 326 return; 327 } 328 329 // adjust buffersize if necessary 330 331 try 332 { 333 if( m_nBytesToSkip ) 334 { 335 Sequence< sal_Int8 > seqCopy( nLen - m_nBytesToSkip ); 336 memcpy( seqCopy.getArray() , &( aData.getConstArray()[m_nBytesToSkip] ) , nLen-m_nBytesToSkip ); 337 m_pFIFO->write( seqCopy ); 338 } 339 else 340 { 341 m_pFIFO->write( aData ); 342 } 343 m_nBytesToSkip = 0; 344 } 345 catch ( IFIFO_OutOfBoundsException & ) 346 { 347 throw BufferSizeExceededException( 348 OUString( RTL_CONSTASCII_USTRINGPARAM( "Pipe::writeBytes BufferSizeExceededException" )), 349 *this ); 350 } 351 catch ( IFIFO_OutOfMemoryException & ) 352 { 353 throw BufferSizeExceededException( 354 OUString( RTL_CONSTASCII_USTRINGPARAM( "Pipe::writeBytes BufferSizeExceededException" )), 355 *this ); 356 } 357 358 // readBytes may check again if enough bytes are available 359 osl_setCondition( m_conditionBytesAvail ); 360 361 checkInvariant(); 362 } 363 364 365 void OPipeImpl::flush(void) 366 throw( NotConnectedException, 367 BufferSizeExceededException, 368 RuntimeException) 369 { 370 // nothing to do for a pipe 371 return; 372 } 373 374 void OPipeImpl::closeOutput(void) 375 throw( NotConnectedException, 376 BufferSizeExceededException, 377 RuntimeException) 378 { 379 MutexGuard guard( m_mutexAccess ); 380 381 m_bOutputStreamClosed = sal_True; 382 osl_setCondition( m_conditionBytesAvail ); 383 setPredecessor( Reference < XConnectable > () ); 384 return; 385 } 386 387 388 void OPipeImpl::setSuccessor( const Reference < XConnectable > &r ) 389 throw( RuntimeException ) 390 { 391 /// if the references match, nothing needs to be done 392 if( m_succ != r ) { 393 /// store the reference for later use 394 m_succ = r; 395 396 if( m_succ.is() ) 397 { 398 m_succ->setPredecessor( 399 Reference< XConnectable > ( SAL_STATIC_CAST( XConnectable * , this ) ) ); 400 } 401 } 402 } 403 404 Reference < XConnectable > OPipeImpl::getSuccessor() throw( RuntimeException ) 405 { 406 return m_succ; 407 } 408 409 410 // XDataSource 411 void OPipeImpl::setPredecessor( const Reference < XConnectable > &r ) 412 throw( RuntimeException ) 413 { 414 if( r != m_pred ) { 415 m_pred = r; 416 if( m_pred.is() ) { 417 m_pred->setSuccessor( 418 Reference < XConnectable > ( SAL_STATIC_CAST( XConnectable * , this ) ) ); 419 } 420 } 421 } 422 423 Reference < XConnectable > OPipeImpl::getPredecessor() throw( RuntimeException ) 424 { 425 return m_pred; 426 } 427 428 429 430 431 // XServiceInfo 432 OUString OPipeImpl::getImplementationName() throw( ) 433 { 434 return OPipeImpl_getImplementationName(); 435 } 436 437 // XServiceInfo 438 sal_Bool OPipeImpl::supportsService(const OUString& ServiceName) throw( ) 439 { 440 Sequence< OUString > aSNL = getSupportedServiceNames(); 441 const OUString * pArray = aSNL.getConstArray(); 442 443 for( sal_Int32 i = 0; i < aSNL.getLength(); i++ ) 444 if( pArray[i] == ServiceName ) 445 return sal_True; 446 447 return sal_False; 448 } 449 450 // XServiceInfo 451 Sequence< OUString > OPipeImpl::getSupportedServiceNames(void) throw( ) 452 { 453 return OPipeImpl_getSupportedServiceNames(); 454 } 455 456 457 458 459 460 /* implementation functions 461 * 462 * 463 */ 464 465 466 Reference < XInterface > SAL_CALL OPipeImpl_CreateInstance( 467 const Reference < XComponentContext > & ) throw(Exception) 468 { 469 OPipeImpl *p = new OPipeImpl; 470 471 return Reference < XInterface > ( SAL_STATIC_CAST( OWeakObject * , p ) ); 472 } 473 474 475 OUString OPipeImpl_getImplementationName() 476 { 477 return OUString( RTL_CONSTASCII_USTRINGPARAM ( IMPLEMENTATION_NAME ) ); 478 } 479 480 Sequence<OUString> OPipeImpl_getSupportedServiceNames(void) 481 { 482 Sequence<OUString> aRet(1); 483 aRet.getArray()[0] = OUString( RTL_CONSTASCII_USTRINGPARAM( SERVICE_NAME )); 484 return aRet; 485 } 486 } 487 488 489