xref: /AOO41X/main/cppu/source/threadpool/thread.cxx (revision 129fa3d1fc5edc85a690d91de1660cefa4e8a95b)
1 /**************************************************************
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one
4  * or more contributor license agreements.  See the NOTICE file
5  * distributed with this work for additional information
6  * regarding copyright ownership.  The ASF licenses this file
7  * to you under the Apache License, Version 2.0 (the
8  * "License"); you may not use this file except in compliance
9  * with the License.  You may obtain a copy of the License at
10  *
11  *   http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing,
14  * software distributed under the License is distributed on an
15  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16  * KIND, either express or implied.  See the License for the
17  * specific language governing permissions and limitations
18  * under the License.
19  *
20  *************************************************************/
21 
22 
23 
24 // MARKER(update_precomp.py): autogen include statement, do not remove
25 #include "precompiled_cppu.hxx"
26 #include <stdio.h>
27 #include <osl/diagnose.h>
28 #include <uno/threadpool.h>
29 
30 #include <rtl/instance.hxx>
31 
32 #include "thread.hxx"
33 #include "jobqueue.hxx"
34 #include "threadpool.hxx"
35 
36 
37 using namespace osl;
38 extern "C" {
39 
cppu_requestThreadWorker(void * pVoid)40 void SAL_CALL cppu_requestThreadWorker( void *pVoid )
41 {
42     ::cppu_threadpool::ORequestThread *pThread = ( ::cppu_threadpool::ORequestThread * ) pVoid;
43 
44     pThread->run();
45     pThread->onTerminated();
46 }
47 
48 }
49 
50 namespace cppu_threadpool {
51 
52 // ----------------------------------------------------------------------------------
~ThreadAdmin()53     ThreadAdmin::~ThreadAdmin()
54     {
55 #if OSL_DEBUG_LEVEL > 1
56         if( m_lst.size() )
57         {
58             fprintf( stderr, "%lu Threads left\n" , static_cast<unsigned long>(m_lst.size()) );
59         }
60 #endif
61     }
62 
add(ORequestThread * p)63     void ThreadAdmin::add( ORequestThread *p )
64     {
65         MutexGuard aGuard( m_mutex );
66         m_lst.push_back( p );
67     }
68 
remove(ORequestThread * p)69     void ThreadAdmin::remove( ORequestThread * p )
70     {
71         MutexGuard aGuard( m_mutex );
72         ::std::list< ORequestThread * >::iterator ii = ::std::find( m_lst.begin(), m_lst.end(), p );
73         OSL_ASSERT( ii != m_lst.end() );
74         m_lst.erase( ii );
75     }
76 
join()77     void ThreadAdmin::join()
78     {
79         ORequestThread *pCurrent;
80         do
81         {
82             pCurrent = 0;
83             {
84                 MutexGuard aGuard( m_mutex );
85                 if( ! m_lst.empty() )
86                 {
87                     pCurrent = m_lst.front();
88                     pCurrent->setDeleteSelf( sal_False );
89                 }
90             }
91             if ( pCurrent )
92             {
93                 pCurrent->join();
94                 delete pCurrent;
95             }
96         } while( pCurrent );
97     }
98 
99     struct theThreadAdmin : public rtl::StaticWithInit< ThreadAdminHolder, theThreadAdmin >
100     {
operator ()cppu_threadpool::theThreadAdmin101         ThreadAdminHolder operator () () {
102             ThreadAdminHolder aRet(new ThreadAdmin());
103             return aRet;
104         }
105     };
106 
getInstance()107     ThreadAdminHolder& ThreadAdmin::getInstance()
108     {
109         return theThreadAdmin::get();
110     }
111 
112 // ----------------------------------------------------------------------------------
ORequestThread(JobQueue * pQueue,const ByteSequence & aThreadId,sal_Bool bAsynchron)113     ORequestThread::ORequestThread( JobQueue *pQueue,
114                                     const ByteSequence &aThreadId,
115                                     sal_Bool bAsynchron )
116         : m_thread( 0 )
117         , m_aThreadAdmin( ThreadAdmin::getInstance() )
118         , m_pQueue( pQueue )
119         , m_aThreadId( aThreadId )
120         , m_bAsynchron( bAsynchron )
121         , m_bDeleteSelf( sal_True )
122     {
123         m_aThreadAdmin->add( this );
124     }
125 
126 
~ORequestThread()127     ORequestThread::~ORequestThread()
128     {
129         if (m_thread != 0)
130         {
131             osl_destroyThread(m_thread);
132         }
133     }
134 
135 
setTask(JobQueue * pQueue,const ByteSequence & aThreadId,sal_Bool bAsynchron)136     void ORequestThread::setTask( JobQueue *pQueue,
137                                   const ByteSequence &aThreadId,
138                                   sal_Bool bAsynchron )
139     {
140         m_pQueue = pQueue;
141         m_aThreadId = aThreadId;
142         m_bAsynchron = bAsynchron;
143     }
144 
create()145     sal_Bool ORequestThread::create()
146     {
147         OSL_ASSERT(m_thread == 0);  // only one running thread per instance
148 
149         m_thread = osl_createSuspendedThread( cppu_requestThreadWorker, (void*)this);
150         if ( m_thread )
151         {
152             osl_resumeThread( m_thread );
153         }
154 
155         return m_thread != 0;
156     }
157 
join()158     void ORequestThread::join()
159     {
160         osl_joinWithThread( m_thread );
161     }
162 
onTerminated()163     void ORequestThread::onTerminated()
164     {
165         m_aThreadAdmin->remove( this );
166         if( m_bDeleteSelf )
167         {
168             delete this;
169         }
170     }
171 
run()172     void ORequestThread::run()
173     {
174         ThreadPoolHolder theThreadPool = cppu_threadpool::ThreadPool::getInstance();
175 
176         while ( m_pQueue )
177         {
178             if( ! m_bAsynchron )
179             {
180                 if ( !uno_bindIdToCurrentThread( m_aThreadId.getHandle() ) )
181                 {
182                     OSL_ASSERT( false );
183                 }
184             }
185 
186             while( ! m_pQueue->isEmpty() )
187             {
188                 // Note : Oneways should not get a disposable disposeid,
189                 //        It does not make sense to dispose a call in this state.
190                 //        That's way we put it an disposeid, that can't be used otherwise.
191                 m_pQueue->enter(
192                     sal::static_int_cast< sal_Int64 >(
193                         reinterpret_cast< sal_IntPtr >(this)),
194                     sal_True );
195 
196                 if( m_pQueue->isEmpty() )
197                 {
198                     theThreadPool->revokeQueue( m_aThreadId , m_bAsynchron );
199                     // Note : revokeQueue might have failed because m_pQueue.isEmpty()
200                     //        may be false (race).
201                 }
202             }
203 
204             delete m_pQueue;
205             m_pQueue = 0;
206 
207             if( ! m_bAsynchron )
208             {
209                 uno_releaseIdFromCurrentThread();
210             }
211 
212             theThreadPool->waitInPool( this );
213         }
214     }
215 }
216