1 /************************************************************** 2 * 3 * Licensed to the Apache Software Foundation (ASF) under one 4 * or more contributor license agreements. See the NOTICE file 5 * distributed with this work for additional information 6 * regarding copyright ownership. The ASF licenses this file 7 * to you under the Apache License, Version 2.0 (the 8 * "License"); you may not use this file except in compliance 9 * with the License. You may obtain a copy of the License at 10 * 11 * http://www.apache.org/licenses/LICENSE-2.0 12 * 13 * Unless required by applicable law or agreed to in writing, 14 * software distributed under the License is distributed on an 15 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 16 * KIND, either express or implied. See the License for the 17 * specific language governing permissions and limitations 18 * under the License. 19 * 20 *************************************************************/ 21 22 23 24 // MARKER(update_precomp.py): autogen include statement, do not remove 25 #include "precompiled_cppu.hxx" 26 #include <hash_set> 27 #include <stdio.h> 28 29 #include <osl/diagnose.h> 30 #include <osl/mutex.hxx> 31 #include <osl/thread.h> 32 #include <rtl/instance.hxx> 33 34 #include <uno/threadpool.h> 35 36 #include "threadpool.hxx" 37 #include "thread.hxx" 38 39 using namespace ::std; 40 using namespace ::osl; 41 42 namespace cppu_threadpool 43 { 44 struct theDisposedCallerAdmin : 45 public rtl::StaticWithInit< DisposedCallerAdminHolder, theDisposedCallerAdmin > 46 { 47 DisposedCallerAdminHolder operator () () { 48 return DisposedCallerAdminHolder(new DisposedCallerAdmin()); 49 } 50 }; 51 52 DisposedCallerAdminHolder DisposedCallerAdmin::getInstance() 53 { 54 return theDisposedCallerAdmin::get(); 55 } 56 57 DisposedCallerAdmin::~DisposedCallerAdmin() 58 { 59 #if OSL_DEBUG_LEVEL > 1 60 if( !m_lst.empty() ) 61 { 62 printf( "DisposedCallerList : %lu left\n" , static_cast<unsigned long>(m_lst.size( ))); 63 } 64 #endif 65 } 66 67 void DisposedCallerAdmin::dispose( sal_Int64 nDisposeId ) 68 { 69 MutexGuard guard( m_mutex ); 70 m_lst.push_back( nDisposeId ); 71 } 72 73 void DisposedCallerAdmin::stopDisposing( sal_Int64 nDisposeId ) 74 { 75 MutexGuard guard( m_mutex ); 76 for( DisposedCallerList::iterator ii = m_lst.begin() ; 77 ii != m_lst.end() ; 78 ++ ii ) 79 { 80 if( (*ii) == nDisposeId ) 81 { 82 m_lst.erase( ii ); 83 break; 84 } 85 } 86 } 87 88 sal_Bool DisposedCallerAdmin::isDisposed( sal_Int64 nDisposeId ) 89 { 90 MutexGuard guard( m_mutex ); 91 for( DisposedCallerList::iterator ii = m_lst.begin() ; 92 ii != m_lst.end() ; 93 ++ ii ) 94 { 95 if( (*ii) == nDisposeId ) 96 { 97 return sal_True; 98 } 99 } 100 return sal_False; 101 } 102 103 104 //------------------------------------------------------------------------------- 105 106 struct theThreadPool : 107 public rtl::StaticWithInit< ThreadPoolHolder, theThreadPool > 108 { 109 ThreadPoolHolder operator () () { 110 ThreadPoolHolder aRet(new ThreadPool()); 111 return aRet; 112 } 113 }; 114 115 ThreadPool::ThreadPool() 116 { 117 m_DisposedCallerAdmin = DisposedCallerAdmin::getInstance(); 118 } 119 120 ThreadPool::~ThreadPool() 121 { 122 #if OSL_DEBUG_LEVEL > 1 123 if( m_mapQueue.size() ) 124 { 125 printf( "ThreadIdHashMap : %lu left\n" , static_cast<unsigned long>(m_mapQueue.size()) ); 126 } 127 #endif 128 } 129 ThreadPoolHolder ThreadPool::getInstance() 130 { 131 return theThreadPool::get(); 132 } 133 134 135 void ThreadPool::dispose( sal_Int64 nDisposeId ) 136 { 137 if( nDisposeId ) 138 { 139 m_DisposedCallerAdmin->dispose( nDisposeId ); 140 141 MutexGuard guard( m_mutex ); 142 for( ThreadIdHashMap::iterator ii = m_mapQueue.begin() ; 143 ii != m_mapQueue.end(); 144 ++ii) 145 { 146 if( (*ii).second.first ) 147 { 148 (*ii).second.first->dispose( nDisposeId ); 149 } 150 if( (*ii).second.second ) 151 { 152 (*ii).second.second->dispose( nDisposeId ); 153 } 154 } 155 } 156 else 157 { 158 { 159 MutexGuard guard( m_mutexWaitingThreadList ); 160 for( WaitingThreadList::iterator ii = m_lstThreads.begin() ; 161 ii != m_lstThreads.end() ; 162 ++ ii ) 163 { 164 // wake the threads up 165 osl_setCondition( (*ii)->condition ); 166 } 167 } 168 ThreadAdmin::getInstance()->join(); 169 } 170 } 171 172 void ThreadPool::stopDisposing( sal_Int64 nDisposeId ) 173 { 174 m_DisposedCallerAdmin->stopDisposing( nDisposeId ); 175 } 176 177 /****************** 178 * This methods lets the thread wait a certain amount of time. If within this timespan 179 * a new request comes in, this thread is reused. This is done only to improve performance, 180 * it is not required for threadpool functionality. 181 ******************/ 182 void ThreadPool::waitInPool( ORequestThread * pThread ) 183 { 184 struct WaitingThread waitingThread; 185 waitingThread.condition = osl_createCondition(); 186 waitingThread.thread = pThread; 187 { 188 MutexGuard guard( m_mutexWaitingThreadList ); 189 m_lstThreads.push_front( &waitingThread ); 190 } 191 192 // let the thread wait 2 seconds 193 TimeValue time = { 2 , 0 }; 194 osl_waitCondition( waitingThread.condition , &time ); 195 196 { 197 MutexGuard guard ( m_mutexWaitingThreadList ); 198 if( waitingThread.thread ) 199 { 200 // thread wasn't reused, remove it from the list 201 WaitingThreadList::iterator ii = find( 202 m_lstThreads.begin(), m_lstThreads.end(), &waitingThread ); 203 OSL_ASSERT( ii != m_lstThreads.end() ); 204 m_lstThreads.erase( ii ); 205 } 206 } 207 208 osl_destroyCondition( waitingThread.condition ); 209 } 210 211 void ThreadPool::createThread( JobQueue *pQueue , 212 const ByteSequence &aThreadId, 213 sal_Bool bAsynchron ) 214 { 215 sal_Bool bCreate = sal_True; 216 { 217 // Can a thread be reused ? 218 MutexGuard guard( m_mutexWaitingThreadList ); 219 if( ! m_lstThreads.empty() ) 220 { 221 // inform the thread and let it go 222 struct WaitingThread *pWaitingThread = m_lstThreads.back(); 223 pWaitingThread->thread->setTask( pQueue , aThreadId , bAsynchron ); 224 pWaitingThread->thread = 0; 225 226 // remove from list 227 m_lstThreads.pop_back(); 228 229 // let the thread go 230 osl_setCondition( pWaitingThread->condition ); 231 bCreate = sal_False; 232 } 233 } 234 235 if( bCreate ) 236 { 237 ORequestThread *pThread = 238 new ORequestThread( pQueue , aThreadId, bAsynchron); 239 // deletes itself ! 240 pThread->create(); 241 } 242 } 243 244 sal_Bool ThreadPool::revokeQueue( const ByteSequence &aThreadId, sal_Bool bAsynchron ) 245 { 246 MutexGuard guard( m_mutex ); 247 248 ThreadIdHashMap::iterator ii = m_mapQueue.find( aThreadId ); 249 OSL_ASSERT( ii != m_mapQueue.end() ); 250 251 if( bAsynchron ) 252 { 253 if( ! (*ii).second.second->isEmpty() ) 254 { 255 // another thread has put something into the queue 256 return sal_False; 257 } 258 259 (*ii).second.second = 0; 260 if( (*ii).second.first ) 261 { 262 // all oneway request have been processed, now 263 // synchronus requests may go on 264 (*ii).second.first->resume(); 265 } 266 } 267 else 268 { 269 if( ! (*ii).second.first->isEmpty() ) 270 { 271 // another thread has put something into the queue 272 return sal_False; 273 } 274 (*ii).second.first = 0; 275 } 276 277 if( 0 == (*ii).second.first && 0 == (*ii).second.second ) 278 { 279 m_mapQueue.erase( ii ); 280 } 281 282 return sal_True; 283 } 284 285 286 void ThreadPool::addJob( 287 const ByteSequence &aThreadId , 288 sal_Bool bAsynchron, 289 void *pThreadSpecificData, 290 RequestFun * doRequest ) 291 { 292 sal_Bool bCreateThread = sal_False; 293 JobQueue *pQueue = 0; 294 { 295 MutexGuard guard( m_mutex ); 296 297 ThreadIdHashMap::iterator ii = m_mapQueue.find( aThreadId ); 298 299 if( ii == m_mapQueue.end() ) 300 { 301 m_mapQueue[ aThreadId ] = pair < JobQueue * , JobQueue * > ( 0 , 0 ); 302 ii = m_mapQueue.find( aThreadId ); 303 OSL_ASSERT( ii != m_mapQueue.end() ); 304 } 305 306 if( bAsynchron ) 307 { 308 if( ! (*ii).second.second ) 309 { 310 (*ii).second.second = new JobQueue(); 311 bCreateThread = sal_True; 312 } 313 pQueue = (*ii).second.second; 314 } 315 else 316 { 317 if( ! (*ii).second.first ) 318 { 319 (*ii).second.first = new JobQueue(); 320 bCreateThread = sal_True; 321 } 322 pQueue = (*ii).second.first; 323 324 if( (*ii).second.second && ( (*ii).second.second->isBusy() ) ) 325 { 326 pQueue->suspend(); 327 } 328 } 329 pQueue->add( pThreadSpecificData , doRequest ); 330 } 331 332 if( bCreateThread ) 333 { 334 createThread( pQueue , aThreadId , bAsynchron); 335 } 336 } 337 338 void ThreadPool::prepare( const ByteSequence &aThreadId ) 339 { 340 MutexGuard guard( m_mutex ); 341 342 ThreadIdHashMap::iterator ii = m_mapQueue.find( aThreadId ); 343 344 if( ii == m_mapQueue.end() ) 345 { 346 JobQueue *p = new JobQueue(); 347 m_mapQueue[ aThreadId ] = pair< JobQueue * , JobQueue * > ( p , 0 ); 348 } 349 else if( 0 == (*ii).second.first ) 350 { 351 (*ii).second.first = new JobQueue(); 352 } 353 } 354 355 void * ThreadPool::enter( const ByteSequence & aThreadId , sal_Int64 nDisposeId ) 356 { 357 JobQueue *pQueue = 0; 358 { 359 MutexGuard guard( m_mutex ); 360 361 ThreadIdHashMap::iterator ii = m_mapQueue.find( aThreadId ); 362 363 OSL_ASSERT( ii != m_mapQueue.end() ); 364 pQueue = (*ii).second.first; 365 } 366 367 OSL_ASSERT( pQueue ); 368 void *pReturn = pQueue->enter( nDisposeId ); 369 370 if( pQueue->isCallstackEmpty() ) 371 { 372 if( revokeQueue( aThreadId , sal_False) ) 373 { 374 // remove queue 375 delete pQueue; 376 } 377 } 378 return pReturn; 379 } 380 } 381 382 383 using namespace cppu_threadpool; 384 385 struct uno_ThreadPool_Equal 386 { 387 sal_Bool operator () ( const uno_ThreadPool &a , const uno_ThreadPool &b ) const 388 { 389 return a == b; 390 } 391 }; 392 393 struct uno_ThreadPool_Hash 394 { 395 sal_Size operator () ( const uno_ThreadPool &a ) const 396 { 397 return (sal_Size) a; 398 } 399 }; 400 401 402 403 typedef ::std::hash_map< uno_ThreadPool, ThreadPoolHolder, uno_ThreadPool_Hash, uno_ThreadPool_Equal > ThreadpoolHashSet; 404 405 static ThreadpoolHashSet *g_pThreadpoolHashSet; 406 407 struct _uno_ThreadPool 408 { 409 sal_Int32 dummy; 410 }; 411 412 extern "C" uno_ThreadPool SAL_CALL 413 uno_threadpool_create() SAL_THROW_EXTERN_C() 414 { 415 MutexGuard guard( Mutex::getGlobalMutex() ); 416 if( ! g_pThreadpoolHashSet ) 417 { 418 g_pThreadpoolHashSet = new ThreadpoolHashSet(); 419 } 420 421 // Just ensure that the handle is unique in the process (via heap) 422 uno_ThreadPool h = new struct _uno_ThreadPool; 423 g_pThreadpoolHashSet->insert( ThreadpoolHashSet::value_type(h, ThreadPool::getInstance()) ); 424 return h; 425 } 426 427 extern "C" void SAL_CALL 428 uno_threadpool_attach( uno_ThreadPool ) SAL_THROW_EXTERN_C() 429 { 430 sal_Sequence *pThreadId = 0; 431 uno_getIdOfCurrentThread( &pThreadId ); 432 ThreadPool::getInstance()->prepare( pThreadId ); 433 rtl_byte_sequence_release( pThreadId ); 434 uno_releaseIdFromCurrentThread(); 435 } 436 437 extern "C" void SAL_CALL 438 uno_threadpool_enter( uno_ThreadPool hPool , void **ppJob ) 439 SAL_THROW_EXTERN_C() 440 { 441 sal_Sequence *pThreadId = 0; 442 uno_getIdOfCurrentThread( &pThreadId ); 443 *ppJob = 444 ThreadPool::getInstance()->enter( 445 pThreadId, 446 sal::static_int_cast< sal_Int64 >( 447 reinterpret_cast< sal_IntPtr >(hPool)) ); 448 rtl_byte_sequence_release( pThreadId ); 449 uno_releaseIdFromCurrentThread(); 450 } 451 452 extern "C" void SAL_CALL 453 uno_threadpool_detach( uno_ThreadPool ) SAL_THROW_EXTERN_C() 454 { 455 // we might do here some tiding up in case a thread called attach but never detach 456 } 457 458 extern "C" void SAL_CALL 459 uno_threadpool_putJob( 460 uno_ThreadPool, 461 sal_Sequence *pThreadId, 462 void *pJob, 463 void ( SAL_CALL * doRequest ) ( void *pThreadSpecificData ), 464 sal_Bool bIsOneway ) SAL_THROW_EXTERN_C() 465 { 466 ThreadPool::getInstance()->addJob( pThreadId, bIsOneway, pJob ,doRequest ); 467 } 468 469 extern "C" void SAL_CALL 470 uno_threadpool_dispose( uno_ThreadPool hPool ) SAL_THROW_EXTERN_C() 471 { 472 ThreadPool::getInstance()->dispose( 473 sal::static_int_cast< sal_Int64 >( 474 reinterpret_cast< sal_IntPtr >(hPool)) ); 475 } 476 477 extern "C" void SAL_CALL 478 uno_threadpool_destroy( uno_ThreadPool hPool ) SAL_THROW_EXTERN_C() 479 { 480 ThreadPool::getInstance()->stopDisposing( 481 sal::static_int_cast< sal_Int64 >( 482 reinterpret_cast< sal_IntPtr >(hPool)) ); 483 484 if( hPool ) 485 { 486 // special treatment for 0 ! 487 OSL_ASSERT( g_pThreadpoolHashSet ); 488 489 MutexGuard guard( Mutex::getGlobalMutex() ); 490 491 ThreadpoolHashSet::iterator ii = g_pThreadpoolHashSet->find( hPool ); 492 OSL_ASSERT( ii != g_pThreadpoolHashSet->end() ); 493 g_pThreadpoolHashSet->erase( ii ); 494 delete hPool; 495 496 if( g_pThreadpoolHashSet->empty() ) 497 { 498 delete g_pThreadpoolHashSet; 499 g_pThreadpoolHashSet = 0; 500 } 501 } 502 } 503