xref: /AOO41X/main/cppu/source/threadpool/threadpool.cxx (revision 129fa3d1fc5edc85a690d91de1660cefa4e8a95b)
1 /**************************************************************
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one
4  * or more contributor license agreements.  See the NOTICE file
5  * distributed with this work for additional information
6  * regarding copyright ownership.  The ASF licenses this file
7  * to you under the Apache License, Version 2.0 (the
8  * "License"); you may not use this file except in compliance
9  * with the License.  You may obtain a copy of the License at
10  *
11  *   http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing,
14  * software distributed under the License is distributed on an
15  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16  * KIND, either express or implied.  See the License for the
17  * specific language governing permissions and limitations
18  * under the License.
19  *
20  *************************************************************/
21 
22 
23 
24 // MARKER(update_precomp.py): autogen include statement, do not remove
25 #include "precompiled_cppu.hxx"
26 #include <hash_set>
27 #include <stdio.h>
28 
29 #include <osl/diagnose.h>
30 #include <osl/mutex.hxx>
31 #include <osl/thread.h>
32 #include <rtl/instance.hxx>
33 
34 #include <uno/threadpool.h>
35 
36 #include "threadpool.hxx"
37 #include "thread.hxx"
38 
39 using namespace ::std;
40 using namespace ::osl;
41 
42 namespace cppu_threadpool
43 {
44     struct theDisposedCallerAdmin :
45         public rtl::StaticWithInit< DisposedCallerAdminHolder, theDisposedCallerAdmin >
46     {
operator ()cppu_threadpool::theDisposedCallerAdmin47         DisposedCallerAdminHolder operator () () {
48             return DisposedCallerAdminHolder(new DisposedCallerAdmin());
49         }
50     };
51 
getInstance()52     DisposedCallerAdminHolder DisposedCallerAdmin::getInstance()
53     {
54         return theDisposedCallerAdmin::get();
55     }
56 
~DisposedCallerAdmin()57     DisposedCallerAdmin::~DisposedCallerAdmin()
58     {
59 #if OSL_DEBUG_LEVEL > 1
60         if( !m_lst.empty() )
61         {
62             printf( "DisposedCallerList : %lu left\n" , static_cast<unsigned long>(m_lst.size( )));
63         }
64 #endif
65     }
66 
dispose(sal_Int64 nDisposeId)67     void DisposedCallerAdmin::dispose( sal_Int64 nDisposeId )
68     {
69         MutexGuard guard( m_mutex );
70         m_lst.push_back( nDisposeId );
71     }
72 
stopDisposing(sal_Int64 nDisposeId)73     void DisposedCallerAdmin::stopDisposing( sal_Int64 nDisposeId )
74     {
75         MutexGuard guard( m_mutex );
76         for( DisposedCallerList::iterator ii = m_lst.begin() ;
77              ii != m_lst.end() ;
78              ++ ii )
79         {
80             if( (*ii) == nDisposeId )
81             {
82                 m_lst.erase( ii );
83                 break;
84             }
85         }
86     }
87 
isDisposed(sal_Int64 nDisposeId)88     sal_Bool DisposedCallerAdmin::isDisposed( sal_Int64 nDisposeId )
89     {
90         MutexGuard guard( m_mutex );
91         for( DisposedCallerList::iterator ii = m_lst.begin() ;
92              ii != m_lst.end() ;
93              ++ ii )
94         {
95             if( (*ii) == nDisposeId )
96             {
97                 return sal_True;
98             }
99         }
100         return sal_False;
101     }
102 
103 
104     //-------------------------------------------------------------------------------
105 
106     struct theThreadPool :
107         public rtl::StaticWithInit< ThreadPoolHolder, theThreadPool >
108     {
operator ()cppu_threadpool::theThreadPool109         ThreadPoolHolder operator () () {
110             ThreadPoolHolder aRet(new ThreadPool());
111             return aRet;
112         }
113     };
114 
ThreadPool()115     ThreadPool::ThreadPool()
116     {
117             m_DisposedCallerAdmin = DisposedCallerAdmin::getInstance();
118     }
119 
~ThreadPool()120     ThreadPool::~ThreadPool()
121     {
122 #if OSL_DEBUG_LEVEL > 1
123         if( m_mapQueue.size() )
124         {
125             printf( "ThreadIdHashMap : %lu left\n" , static_cast<unsigned long>(m_mapQueue.size()) );
126         }
127 #endif
128     }
getInstance()129     ThreadPoolHolder ThreadPool::getInstance()
130     {
131         return theThreadPool::get();
132     }
133 
134 
dispose(sal_Int64 nDisposeId)135     void ThreadPool::dispose( sal_Int64 nDisposeId )
136     {
137         if( nDisposeId )
138         {
139             m_DisposedCallerAdmin->dispose( nDisposeId );
140 
141             MutexGuard guard( m_mutex );
142             for( ThreadIdHashMap::iterator ii = m_mapQueue.begin() ;
143                  ii != m_mapQueue.end();
144                  ++ii)
145             {
146                 if( (*ii).second.first )
147                 {
148                     (*ii).second.first->dispose( nDisposeId );
149                 }
150                 if( (*ii).second.second )
151                 {
152                     (*ii).second.second->dispose( nDisposeId );
153                 }
154             }
155         }
156         else
157         {
158             {
159                 MutexGuard guard( m_mutexWaitingThreadList );
160                 for( WaitingThreadList::iterator ii = m_lstThreads.begin() ;
161                      ii != m_lstThreads.end() ;
162                      ++ ii )
163                 {
164                     // wake the threads up
165                     osl_setCondition( (*ii)->condition );
166                 }
167             }
168             ThreadAdmin::getInstance()->join();
169         }
170     }
171 
stopDisposing(sal_Int64 nDisposeId)172     void ThreadPool::stopDisposing( sal_Int64 nDisposeId )
173     {
174         m_DisposedCallerAdmin->stopDisposing( nDisposeId );
175     }
176 
177     /******************
178      * This methods lets the thread wait a certain amount of time. If within this timespan
179      * a new request comes in, this thread is reused. This is done only to improve performance,
180      * it is not required for threadpool functionality.
181      ******************/
waitInPool(ORequestThread * pThread)182     void ThreadPool::waitInPool( ORequestThread * pThread )
183     {
184         struct WaitingThread waitingThread;
185         waitingThread.condition = osl_createCondition();
186         waitingThread.thread = pThread;
187         {
188             MutexGuard guard( m_mutexWaitingThreadList );
189             m_lstThreads.push_front( &waitingThread );
190         }
191 
192         // let the thread wait 2 seconds
193         TimeValue time = { 2 , 0 };
194         osl_waitCondition( waitingThread.condition , &time );
195 
196         {
197             MutexGuard guard ( m_mutexWaitingThreadList );
198             if( waitingThread.thread )
199             {
200                 // thread wasn't reused, remove it from the list
201                 WaitingThreadList::iterator ii = find(
202                     m_lstThreads.begin(), m_lstThreads.end(), &waitingThread );
203                 OSL_ASSERT( ii != m_lstThreads.end() );
204                 m_lstThreads.erase( ii );
205             }
206         }
207 
208         osl_destroyCondition( waitingThread.condition );
209     }
210 
createThread(JobQueue * pQueue,const ByteSequence & aThreadId,sal_Bool bAsynchron)211     void ThreadPool::createThread( JobQueue *pQueue ,
212                                    const ByteSequence &aThreadId,
213                                    sal_Bool bAsynchron )
214     {
215         sal_Bool bCreate = sal_True;
216         {
217             // Can a thread be reused ?
218             MutexGuard guard( m_mutexWaitingThreadList );
219             if( ! m_lstThreads.empty() )
220             {
221                 // inform the thread and let it go
222                 struct WaitingThread *pWaitingThread = m_lstThreads.back();
223                 pWaitingThread->thread->setTask( pQueue , aThreadId , bAsynchron );
224                 pWaitingThread->thread = 0;
225 
226                 // remove from list
227                 m_lstThreads.pop_back();
228 
229                 // let the thread go
230                 osl_setCondition( pWaitingThread->condition );
231                 bCreate = sal_False;
232             }
233         }
234 
235         if( bCreate )
236         {
237             ORequestThread *pThread =
238                 new ORequestThread( pQueue , aThreadId, bAsynchron);
239             // deletes itself !
240             pThread->create();
241         }
242     }
243 
revokeQueue(const ByteSequence & aThreadId,sal_Bool bAsynchron)244     sal_Bool ThreadPool::revokeQueue( const ByteSequence &aThreadId, sal_Bool bAsynchron )
245     {
246         MutexGuard guard( m_mutex );
247 
248         ThreadIdHashMap::iterator ii = m_mapQueue.find( aThreadId );
249         OSL_ASSERT( ii != m_mapQueue.end() );
250 
251         if( bAsynchron )
252         {
253             if( ! (*ii).second.second->isEmpty() )
254             {
255                 // another thread has put something into the queue
256                 return sal_False;
257             }
258 
259             (*ii).second.second = 0;
260             if( (*ii).second.first )
261             {
262                 // all oneway request have been processed, now
263                 // synchronus requests may go on
264                 (*ii).second.first->resume();
265             }
266         }
267         else
268         {
269             if( ! (*ii).second.first->isEmpty() )
270             {
271                 // another thread has put something into the queue
272                 return sal_False;
273             }
274             (*ii).second.first = 0;
275         }
276 
277         if( 0 == (*ii).second.first && 0 == (*ii).second.second )
278         {
279             m_mapQueue.erase( ii );
280         }
281 
282         return sal_True;
283     }
284 
285 
addJob(const ByteSequence & aThreadId,sal_Bool bAsynchron,void * pThreadSpecificData,RequestFun * doRequest)286     void ThreadPool::addJob(
287         const ByteSequence &aThreadId ,
288         sal_Bool bAsynchron,
289         void *pThreadSpecificData,
290         RequestFun * doRequest )
291     {
292         sal_Bool bCreateThread = sal_False;
293         JobQueue *pQueue = 0;
294         {
295             MutexGuard guard( m_mutex );
296 
297             ThreadIdHashMap::iterator ii = m_mapQueue.find( aThreadId );
298 
299             if( ii == m_mapQueue.end() )
300             {
301                 m_mapQueue[ aThreadId ] = pair < JobQueue * , JobQueue * > ( 0 , 0 );
302                 ii = m_mapQueue.find( aThreadId );
303                 OSL_ASSERT( ii != m_mapQueue.end() );
304             }
305 
306             if( bAsynchron )
307             {
308                 if( ! (*ii).second.second )
309                 {
310                     (*ii).second.second = new JobQueue();
311                     bCreateThread = sal_True;
312                 }
313                 pQueue = (*ii).second.second;
314             }
315             else
316             {
317                 if( ! (*ii).second.first )
318                 {
319                     (*ii).second.first = new JobQueue();
320                     bCreateThread = sal_True;
321                 }
322                 pQueue = (*ii).second.first;
323 
324                 if( (*ii).second.second && ( (*ii).second.second->isBusy() ) )
325                 {
326                     pQueue->suspend();
327                 }
328             }
329             pQueue->add( pThreadSpecificData , doRequest );
330         }
331 
332         if( bCreateThread )
333         {
334             createThread( pQueue , aThreadId , bAsynchron);
335         }
336     }
337 
prepare(const ByteSequence & aThreadId)338     void ThreadPool::prepare( const ByteSequence &aThreadId )
339     {
340         MutexGuard guard( m_mutex );
341 
342         ThreadIdHashMap::iterator ii = m_mapQueue.find( aThreadId );
343 
344         if( ii == m_mapQueue.end() )
345         {
346             JobQueue *p = new JobQueue();
347             m_mapQueue[ aThreadId ] = pair< JobQueue * , JobQueue * > ( p , 0 );
348         }
349         else if( 0 == (*ii).second.first )
350         {
351             (*ii).second.first = new JobQueue();
352         }
353     }
354 
enter(const ByteSequence & aThreadId,sal_Int64 nDisposeId)355     void * ThreadPool::enter( const ByteSequence & aThreadId , sal_Int64 nDisposeId )
356     {
357         JobQueue *pQueue = 0;
358         {
359             MutexGuard guard( m_mutex );
360 
361             ThreadIdHashMap::iterator ii = m_mapQueue.find( aThreadId );
362 
363             OSL_ASSERT( ii != m_mapQueue.end() );
364             pQueue = (*ii).second.first;
365         }
366 
367         OSL_ASSERT( pQueue );
368         void *pReturn = pQueue->enter( nDisposeId );
369 
370         if( pQueue->isCallstackEmpty() )
371         {
372             if( revokeQueue( aThreadId , sal_False) )
373             {
374                 // remove queue
375                 delete pQueue;
376             }
377         }
378         return pReturn;
379     }
380 }
381 
382 
383 using namespace cppu_threadpool;
384 
385 struct uno_ThreadPool_Equal
386 {
operator ()uno_ThreadPool_Equal387     sal_Bool operator () ( const uno_ThreadPool &a , const uno_ThreadPool &b ) const
388         {
389             return a == b;
390         }
391 };
392 
393 struct uno_ThreadPool_Hash
394 {
operator ()uno_ThreadPool_Hash395     sal_Size operator () ( const uno_ThreadPool &a  )  const
396         {
397             return (sal_Size) a;
398         }
399 };
400 
401 
402 
403 typedef ::std::hash_map< uno_ThreadPool, ThreadPoolHolder, uno_ThreadPool_Hash, uno_ThreadPool_Equal > ThreadpoolHashSet;
404 
405 static ThreadpoolHashSet *g_pThreadpoolHashSet;
406 
407 struct _uno_ThreadPool
408 {
409     sal_Int32 dummy;
410 };
411 
412 extern "C" uno_ThreadPool SAL_CALL
uno_threadpool_create()413 uno_threadpool_create() SAL_THROW_EXTERN_C()
414 {
415     MutexGuard guard( Mutex::getGlobalMutex() );
416     if( ! g_pThreadpoolHashSet )
417     {
418         g_pThreadpoolHashSet = new ThreadpoolHashSet();
419     }
420 
421     // Just ensure that the handle is unique in the process (via heap)
422     uno_ThreadPool h = new struct _uno_ThreadPool;
423     g_pThreadpoolHashSet->insert( ThreadpoolHashSet::value_type(h, ThreadPool::getInstance()) );
424     return h;
425 }
426 
427 extern "C" void SAL_CALL
uno_threadpool_attach(uno_ThreadPool)428 uno_threadpool_attach( uno_ThreadPool ) SAL_THROW_EXTERN_C()
429 {
430     sal_Sequence *pThreadId = 0;
431     uno_getIdOfCurrentThread( &pThreadId );
432     ThreadPool::getInstance()->prepare( pThreadId );
433     rtl_byte_sequence_release( pThreadId );
434     uno_releaseIdFromCurrentThread();
435 }
436 
437 extern "C" void SAL_CALL
uno_threadpool_enter(uno_ThreadPool hPool,void ** ppJob)438 uno_threadpool_enter( uno_ThreadPool hPool , void **ppJob )
439     SAL_THROW_EXTERN_C()
440 {
441     sal_Sequence *pThreadId = 0;
442     uno_getIdOfCurrentThread( &pThreadId );
443     *ppJob =
444         ThreadPool::getInstance()->enter(
445             pThreadId,
446             sal::static_int_cast< sal_Int64 >(
447                 reinterpret_cast< sal_IntPtr >(hPool)) );
448     rtl_byte_sequence_release( pThreadId );
449     uno_releaseIdFromCurrentThread();
450 }
451 
452 extern "C" void SAL_CALL
uno_threadpool_detach(uno_ThreadPool)453 uno_threadpool_detach( uno_ThreadPool ) SAL_THROW_EXTERN_C()
454 {
455     // we might do here some tiding up in case a thread called attach but never detach
456 }
457 
458 extern "C" void SAL_CALL
uno_threadpool_putJob(uno_ThreadPool,sal_Sequence * pThreadId,void * pJob,void (SAL_CALL * doRequest)(void * pThreadSpecificData),sal_Bool bIsOneway)459 uno_threadpool_putJob(
460     uno_ThreadPool,
461     sal_Sequence *pThreadId,
462     void *pJob,
463     void ( SAL_CALL * doRequest ) ( void *pThreadSpecificData ),
464     sal_Bool bIsOneway ) SAL_THROW_EXTERN_C()
465 {
466     ThreadPool::getInstance()->addJob( pThreadId, bIsOneway, pJob ,doRequest );
467 }
468 
469 extern "C" void SAL_CALL
uno_threadpool_dispose(uno_ThreadPool hPool)470 uno_threadpool_dispose( uno_ThreadPool hPool ) SAL_THROW_EXTERN_C()
471 {
472     ThreadPool::getInstance()->dispose(
473         sal::static_int_cast< sal_Int64 >(
474             reinterpret_cast< sal_IntPtr >(hPool)) );
475 }
476 
477 extern "C" void SAL_CALL
uno_threadpool_destroy(uno_ThreadPool hPool)478 uno_threadpool_destroy( uno_ThreadPool hPool ) SAL_THROW_EXTERN_C()
479 {
480     ThreadPool::getInstance()->stopDisposing(
481         sal::static_int_cast< sal_Int64 >(
482             reinterpret_cast< sal_IntPtr >(hPool)) );
483 
484     if( hPool )
485     {
486         // special treatment for 0 !
487         OSL_ASSERT( g_pThreadpoolHashSet );
488 
489         MutexGuard guard( Mutex::getGlobalMutex() );
490 
491         ThreadpoolHashSet::iterator ii = g_pThreadpoolHashSet->find( hPool );
492         OSL_ASSERT( ii != g_pThreadpoolHashSet->end() );
493         g_pThreadpoolHashSet->erase( ii );
494         delete hPool;
495 
496         if( g_pThreadpoolHashSet->empty() )
497         {
498             delete g_pThreadpoolHashSet;
499             g_pThreadpoolHashSet = 0;
500         }
501     }
502 }
503