1 /************************************************************** 2 * 3 * Licensed to the Apache Software Foundation (ASF) under one 4 * or more contributor license agreements. See the NOTICE file 5 * distributed with this work for additional information 6 * regarding copyright ownership. The ASF licenses this file 7 * to you under the Apache License, Version 2.0 (the 8 * "License"); you may not use this file except in compliance 9 * with the License. You may obtain a copy of the License at 10 * 11 * http://www.apache.org/licenses/LICENSE-2.0 12 * 13 * Unless required by applicable law or agreed to in writing, 14 * software distributed under the License is distributed on an 15 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 16 * KIND, either express or implied. See the License for the 17 * specific language governing permissions and limitations 18 * under the License. 19 * 20 *************************************************************/ 21 22 23 24 // MARKER(update_precomp.py): autogen include statement, do not remove 25 #include "precompiled_io.hxx" 26 27 #include <stdio.h> 28 29 #include <osl/diagnose.h> 30 31 #include <com/sun/star/io/XActiveDataSource.hpp> 32 #include <com/sun/star/io/XActiveDataSink.hpp> 33 #include <com/sun/star/io/XActiveDataControl.hpp> 34 #include <com/sun/star/io/XConnectable.hpp> 35 #include <com/sun/star/lang/XSingleServiceFactory.hpp> 36 #include <com/sun/star/lang/XMultiServiceFactory.hpp> 37 #include <com/sun/star/lang/XServiceInfo.hpp> 38 #include <com/sun/star/registry/XRegistryKey.hpp> 39 40 #include <uno/dispatcher.h> 41 #include <uno/mapping.hxx> 42 #include <cppuhelper/implbase5.hxx> 43 #include <cppuhelper/factory.hxx> 44 #include <cppuhelper/interfacecontainer.hxx> 45 #include <osl/mutex.hxx> 46 #include <osl/thread.h> 47 48 49 using namespace osl; 50 using namespace std; 51 using namespace rtl; 52 using namespace cppu; 53 using namespace com::sun::star::uno; 54 using namespace com::sun::star::lang; 55 using namespace com::sun::star::registry; 56 using namespace com::sun::star::io; 57 58 #include "factreg.hxx" 59 60 namespace io_stm { 61 62 class Pump : public WeakImplHelper5< 63 XActiveDataSource, XActiveDataSink, XActiveDataControl, XConnectable, XServiceInfo > 64 { 65 Mutex m_aMutex; 66 oslThread m_aThread; 67 68 Reference< XConnectable > m_xPred; 69 Reference< XConnectable > m_xSucc; 70 Reference< XInputStream > m_xInput; 71 Reference< XOutputStream > m_xOutput; 72 OInterfaceContainerHelper m_cnt; 73 sal_Bool m_closeFired; 74 75 void run(); 76 static void static_run( void* pObject ); 77 78 void close(); 79 void fireClose(); 80 void fireStarted(); 81 void fireTerminated(); 82 void fireError( const Any &a ); 83 84 public: 85 Pump(); 86 virtual ~Pump(); 87 88 // XActiveDataSource 89 virtual void SAL_CALL setOutputStream( const Reference< ::com::sun::star::io::XOutputStream >& xOutput ) throw(); 90 virtual Reference< ::com::sun::star::io::XOutputStream > SAL_CALL getOutputStream() throw(); 91 92 // XActiveDataSink 93 virtual void SAL_CALL setInputStream( const Reference< ::com::sun::star::io::XInputStream >& xStream ) throw(); 94 virtual Reference< ::com::sun::star::io::XInputStream > SAL_CALL getInputStream() throw(); 95 96 // XActiveDataControl 97 virtual void SAL_CALL addListener( const Reference< ::com::sun::star::io::XStreamListener >& xListener ) throw(); 98 virtual void SAL_CALL removeListener( const Reference< ::com::sun::star::io::XStreamListener >& xListener ) throw(); 99 virtual void SAL_CALL start() throw( RuntimeException ); 100 virtual void SAL_CALL terminate() throw(); 101 102 // XConnectable 103 virtual void SAL_CALL setPredecessor( const Reference< ::com::sun::star::io::XConnectable >& xPred ) throw(); 104 virtual Reference< ::com::sun::star::io::XConnectable > SAL_CALL getPredecessor() throw(); 105 virtual void SAL_CALL setSuccessor( const Reference< ::com::sun::star::io::XConnectable >& xSucc ) throw(); 106 virtual Reference< ::com::sun::star::io::XConnectable > SAL_CALL getSuccessor() throw(); 107 108 public: // XServiceInfo 109 virtual OUString SAL_CALL getImplementationName() throw( ); 110 virtual Sequence< OUString > SAL_CALL getSupportedServiceNames(void) throw( ); 111 virtual sal_Bool SAL_CALL supportsService(const OUString& ServiceName) throw( ); 112 }; 113 114 Pump::Pump() : m_aThread( 0 ), 115 m_cnt( m_aMutex ), 116 m_closeFired( sal_False ) 117 { 118 g_moduleCount.modCnt.acquire( &g_moduleCount.modCnt ); 119 } 120 121 Pump::~Pump() 122 { 123 // exit gracefully 124 if( m_aThread ) 125 { 126 osl_joinWithThread( m_aThread ); 127 osl_destroyThread( m_aThread ); 128 } 129 g_moduleCount.modCnt.release( &g_moduleCount.modCnt ); 130 } 131 132 void Pump::fireError( const Any & exception ) 133 { 134 OInterfaceIteratorHelper iter( m_cnt ); 135 while( iter.hasMoreElements() ) 136 { 137 try 138 { 139 static_cast< XStreamListener * > ( iter.next() )->error( exception ); 140 } 141 catch ( RuntimeException &e ) 142 { 143 OString sMessage = OUStringToOString( e.Message , RTL_TEXTENCODING_ASCII_US ); 144 OSL_ENSURE( !"com.sun.star.comp.stoc.Pump: unexpected exception during calling listeners", sMessage.getStr() ); 145 } 146 } 147 } 148 149 void Pump::fireClose() 150 { 151 sal_Bool bFire = sal_False; 152 { 153 MutexGuard guard( m_aMutex ); 154 if( ! m_closeFired ) 155 { 156 m_closeFired = sal_True; 157 bFire = sal_True; 158 } 159 } 160 161 if( bFire ) 162 { 163 OInterfaceIteratorHelper iter( m_cnt ); 164 while( iter.hasMoreElements() ) 165 { 166 try 167 { 168 static_cast< XStreamListener * > ( iter.next() )->closed( ); 169 } 170 catch ( RuntimeException &e ) 171 { 172 OString sMessage = OUStringToOString( e.Message , RTL_TEXTENCODING_ASCII_US ); 173 OSL_ENSURE( !"com.sun.star.comp.stoc.Pump: unexpected exception during calling listeners", sMessage.getStr() ); 174 } 175 } 176 } 177 } 178 179 void Pump::fireStarted() 180 { 181 OInterfaceIteratorHelper iter( m_cnt ); 182 while( iter.hasMoreElements() ) 183 { 184 try 185 { 186 static_cast< XStreamListener * > ( iter.next() )->started( ); 187 } 188 catch ( RuntimeException &e ) 189 { 190 OString sMessage = OUStringToOString( e.Message , RTL_TEXTENCODING_ASCII_US ); 191 OSL_ENSURE( !"com.sun.star.comp.stoc.Pump: unexpected exception during calling listeners", sMessage.getStr() ); 192 } 193 } 194 } 195 196 void Pump::fireTerminated() 197 { 198 OInterfaceIteratorHelper iter( m_cnt ); 199 while( iter.hasMoreElements() ) 200 { 201 try 202 { 203 static_cast< XStreamListener * > ( iter.next() )->terminated(); 204 } 205 catch ( RuntimeException &e ) 206 { 207 OString sMessage = OUStringToOString( e.Message , RTL_TEXTENCODING_ASCII_US ); 208 OSL_ENSURE( !"com.sun.star.comp.stoc.Pump: unexpected exception during calling listeners", sMessage.getStr() ); 209 } 210 } 211 } 212 213 214 215 void Pump::close() 216 { 217 // close streams and release references 218 Reference< XInputStream > rInput; 219 Reference< XOutputStream > rOutput; 220 { 221 MutexGuard guard( m_aMutex ); 222 rInput = m_xInput; 223 m_xInput.clear(); 224 225 rOutput = m_xOutput; 226 m_xOutput.clear(); 227 m_xSucc.clear(); 228 m_xPred.clear(); 229 } 230 if( rInput.is() ) 231 { 232 try 233 { 234 rInput->closeInput(); 235 } 236 catch( Exception & ) 237 { 238 // go down calm 239 } 240 } 241 if( rOutput.is() ) 242 { 243 try 244 { 245 rOutput->closeOutput(); 246 } 247 catch( Exception & ) 248 { 249 // go down calm 250 } 251 } 252 } 253 254 void Pump::static_run( void* pObject ) 255 { 256 ((Pump*)pObject)->run(); 257 ((Pump*)pObject)->release(); 258 } 259 260 void Pump::run() 261 { 262 try 263 { 264 fireStarted(); 265 try 266 { 267 Reference< XInputStream > rInput; 268 Reference< XOutputStream > rOutput; 269 { 270 Guard< Mutex > aGuard( m_aMutex ); 271 rInput = m_xInput; 272 rOutput = m_xOutput; 273 } 274 275 if( ! rInput.is() ) 276 { 277 NotConnectedException exception( 278 OUString::createFromAscii( "no input stream set" ) , Reference<XInterface>((OWeakObject*)this) ); 279 throw exception; 280 } 281 Sequence< sal_Int8 > aData; 282 while( rInput->readSomeBytes( aData, 65536 ) ) 283 { 284 if( ! rOutput.is() ) 285 { 286 NotConnectedException exception( 287 OUString::createFromAscii( "no output stream set" ) , Reference<XInterface>( (OWeakObject*)this) ); 288 throw exception; 289 } 290 rOutput->writeBytes( aData ); 291 osl_yieldThread(); 292 } 293 } 294 catch ( IOException & e ) 295 { 296 fireError( makeAny( e ) ); 297 } 298 catch ( RuntimeException & e ) 299 { 300 fireError( makeAny( e ) ); 301 } 302 catch ( Exception & e ) 303 { 304 fireError( makeAny( e ) ); 305 } 306 307 close(); 308 fireClose(); 309 } 310 catch ( com::sun::star::uno::Exception &e ) 311 { 312 // we are the last on the stack. 313 // this is to avoid crashing the program, when e.g. a bridge crashes 314 OString sMessage = OUStringToOString( e.Message , RTL_TEXTENCODING_ASCII_US ); 315 OSL_ENSURE( !"com.sun.star.comp.stoc.Pump: unexpected exception", sMessage.getStr() ); 316 } 317 } 318 319 // ------------------------------------------------------------ 320 321 /* 322 * XConnectable 323 */ 324 325 void Pump::setPredecessor( const Reference< XConnectable >& xPred ) throw() 326 { 327 Guard< Mutex > aGuard( m_aMutex ); 328 m_xPred = xPred; 329 } 330 331 // ------------------------------------------------------------ 332 333 Reference< XConnectable > Pump::getPredecessor() throw() 334 { 335 Guard< Mutex > aGuard( m_aMutex ); 336 return m_xPred; 337 } 338 339 // ------------------------------------------------------------ 340 341 void Pump::setSuccessor( const Reference< XConnectable >& xSucc ) throw() 342 { 343 Guard< Mutex > aGuard( m_aMutex ); 344 m_xSucc = xSucc; 345 } 346 347 // ------------------------------------------------------------ 348 349 Reference< XConnectable > Pump::getSuccessor() throw() 350 { 351 Guard< Mutex > aGuard( m_aMutex ); 352 return m_xSucc; 353 } 354 355 // ----------------------------------------------------------------- 356 357 /* 358 * XActiveDataControl 359 */ 360 361 void Pump::addListener( const Reference< XStreamListener >& xListener ) throw() 362 { 363 m_cnt.addInterface( xListener ); 364 } 365 366 // ------------------------------------------------------------ 367 368 void Pump::removeListener( const Reference< XStreamListener >& xListener ) throw() 369 { 370 m_cnt.removeInterface( xListener ); 371 } 372 373 // ------------------------------------------------------------ 374 375 void Pump::start() throw( RuntimeException ) 376 { 377 Guard< Mutex > aGuard( m_aMutex ); 378 m_aThread = osl_createSuspendedThread((oslWorkerFunction)Pump::static_run,this); 379 if( m_aThread ) 380 { 381 // will be released by OPump::static_run 382 acquire(); 383 osl_resumeThread( m_aThread ); 384 } 385 else 386 { 387 throw RuntimeException( 388 OUString( RTL_CONSTASCII_USTRINGPARAM( "Pump::start Couldn't create worker thread" )), 389 *this); 390 } 391 } 392 393 // ------------------------------------------------------------ 394 395 void Pump::terminate() throw() 396 { 397 close(); 398 399 // wait for the worker to die 400 if( m_aThread ) 401 osl_joinWithThread( m_aThread ); 402 403 fireTerminated(); 404 fireClose(); 405 } 406 407 // ------------------------------------------------------------ 408 409 /* 410 * XActiveDataSink 411 */ 412 413 void Pump::setInputStream( const Reference< XInputStream >& xStream ) throw() 414 { 415 Guard< Mutex > aGuard( m_aMutex ); 416 m_xInput = xStream; 417 Reference< XConnectable > xConnect( xStream, UNO_QUERY ); 418 if( xConnect.is() ) 419 xConnect->setSuccessor( this ); 420 // data transfer starts in XActiveDataControl::start 421 } 422 423 // ------------------------------------------------------------ 424 425 Reference< XInputStream > Pump::getInputStream() throw() 426 { 427 Guard< Mutex > aGuard( m_aMutex ); 428 return m_xInput; 429 } 430 431 // ------------------------------------------------------------ 432 433 /* 434 * XActiveDataSource 435 */ 436 437 void Pump::setOutputStream( const Reference< XOutputStream >& xOut ) throw() 438 { 439 Guard< Mutex > aGuard( m_aMutex ); 440 m_xOutput = xOut; 441 Reference< XConnectable > xConnect( xOut, UNO_QUERY ); 442 if( xConnect.is() ) 443 xConnect->setPredecessor( this ); 444 // data transfer starts in XActiveDataControl::start 445 } 446 447 // ------------------------------------------------------------ 448 449 Reference< XOutputStream > Pump::getOutputStream() throw() 450 { 451 Guard< Mutex > aGuard( m_aMutex ); 452 return m_xOutput; 453 } 454 455 456 // XServiceInfo 457 OUString Pump::getImplementationName() throw( ) 458 { 459 return OPumpImpl_getImplementationName(); 460 } 461 462 // XServiceInfo 463 sal_Bool Pump::supportsService(const OUString& ServiceName) throw( ) 464 { 465 Sequence< OUString > aSNL = getSupportedServiceNames(); 466 const OUString * pArray = aSNL.getConstArray(); 467 468 for( sal_Int32 i = 0; i < aSNL.getLength(); i++ ) 469 if( pArray[i] == ServiceName ) 470 return sal_True; 471 472 return sal_False; 473 } 474 475 // XServiceInfo 476 Sequence< OUString > Pump::getSupportedServiceNames(void) throw( ) 477 { 478 return OPumpImpl_getSupportedServiceNames(); 479 } 480 481 482 Reference< XInterface > SAL_CALL OPumpImpl_CreateInstance( const Reference< XComponentContext > & ) throw (Exception) 483 { 484 return Reference< XInterface >( *new Pump ); 485 } 486 487 OUString OPumpImpl_getImplementationName() 488 { 489 return OUString( RTL_CONSTASCII_USTRINGPARAM( "com.sun.star.comp.io.Pump") ); 490 } 491 492 Sequence<OUString> OPumpImpl_getSupportedServiceNames(void) 493 { 494 OUString s( RTL_CONSTASCII_USTRINGPARAM( "com.sun.star.io.Pump" ) ); 495 Sequence< OUString > seq( &s , 1 ); 496 return seq; 497 } 498 499 } 500 501