1 /************************************************************** 2 * 3 * Licensed to the Apache Software Foundation (ASF) under one 4 * or more contributor license agreements. See the NOTICE file 5 * distributed with this work for additional information 6 * regarding copyright ownership. The ASF licenses this file 7 * to you under the Apache License, Version 2.0 (the 8 * "License"); you may not use this file except in compliance 9 * with the License. You may obtain a copy of the License at 10 * 11 * http://www.apache.org/licenses/LICENSE-2.0 12 * 13 * Unless required by applicable law or agreed to in writing, 14 * software distributed under the License is distributed on an 15 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 16 * KIND, either express or implied. See the License for the 17 * specific language governing permissions and limitations 18 * under the License. 19 * 20 *************************************************************/ 21 22 23 24 // MARKER(update_precomp.py): autogen include statement, do not remove 25 #include "precompiled_cppu.hxx" 26 #include <stdio.h> 27 #include <osl/diagnose.h> 28 #include <uno/threadpool.h> 29 30 #include <rtl/instance.hxx> 31 32 #include "thread.hxx" 33 #include "jobqueue.hxx" 34 #include "threadpool.hxx" 35 36 37 using namespace osl; 38 extern "C" { 39 40 void SAL_CALL cppu_requestThreadWorker( void *pVoid ) 41 { 42 ::cppu_threadpool::ORequestThread *pThread = ( ::cppu_threadpool::ORequestThread * ) pVoid; 43 44 pThread->run(); 45 pThread->onTerminated(); 46 } 47 48 } 49 50 namespace cppu_threadpool { 51 52 // ---------------------------------------------------------------------------------- 53 ThreadAdmin::~ThreadAdmin() 54 { 55 #if OSL_DEBUG_LEVEL > 1 56 if( m_lst.size() ) 57 { 58 fprintf( stderr, "%lu Threads left\n" , static_cast<unsigned long>(m_lst.size()) ); 59 } 60 #endif 61 } 62 63 void ThreadAdmin::add( ORequestThread *p ) 64 { 65 MutexGuard aGuard( m_mutex ); 66 m_lst.push_back( p ); 67 } 68 69 void ThreadAdmin::remove( ORequestThread * p ) 70 { 71 MutexGuard aGuard( m_mutex ); 72 ::std::list< ORequestThread * >::iterator ii = ::std::find( m_lst.begin(), m_lst.end(), p ); 73 OSL_ASSERT( ii != m_lst.end() ); 74 m_lst.erase( ii ); 75 } 76 77 void ThreadAdmin::join() 78 { 79 ORequestThread *pCurrent; 80 do 81 { 82 pCurrent = 0; 83 { 84 MutexGuard aGuard( m_mutex ); 85 if( ! m_lst.empty() ) 86 { 87 pCurrent = m_lst.front(); 88 pCurrent->setDeleteSelf( sal_False ); 89 } 90 } 91 if ( pCurrent ) 92 { 93 pCurrent->join(); 94 delete pCurrent; 95 } 96 } while( pCurrent ); 97 } 98 99 struct theThreadAdmin : public rtl::StaticWithInit< ThreadAdminHolder, theThreadAdmin > 100 { 101 ThreadAdminHolder operator () () { 102 ThreadAdminHolder aRet(new ThreadAdmin()); 103 return aRet; 104 } 105 }; 106 107 ThreadAdminHolder& ThreadAdmin::getInstance() 108 { 109 return theThreadAdmin::get(); 110 } 111 112 // ---------------------------------------------------------------------------------- 113 ORequestThread::ORequestThread( JobQueue *pQueue, 114 const ByteSequence &aThreadId, 115 sal_Bool bAsynchron ) 116 : m_thread( 0 ) 117 , m_aThreadAdmin( ThreadAdmin::getInstance() ) 118 , m_pQueue( pQueue ) 119 , m_aThreadId( aThreadId ) 120 , m_bAsynchron( bAsynchron ) 121 , m_bDeleteSelf( sal_True ) 122 { 123 m_aThreadAdmin->add( this ); 124 } 125 126 127 ORequestThread::~ORequestThread() 128 { 129 if (m_thread != 0) 130 { 131 osl_destroyThread(m_thread); 132 } 133 } 134 135 136 void ORequestThread::setTask( JobQueue *pQueue, 137 const ByteSequence &aThreadId, 138 sal_Bool bAsynchron ) 139 { 140 m_pQueue = pQueue; 141 m_aThreadId = aThreadId; 142 m_bAsynchron = bAsynchron; 143 } 144 145 sal_Bool ORequestThread::create() 146 { 147 OSL_ASSERT(m_thread == 0); // only one running thread per instance 148 149 m_thread = osl_createSuspendedThread( cppu_requestThreadWorker, (void*)this); 150 if ( m_thread ) 151 { 152 osl_resumeThread( m_thread ); 153 } 154 155 return m_thread != 0; 156 } 157 158 void ORequestThread::join() 159 { 160 osl_joinWithThread( m_thread ); 161 } 162 163 void ORequestThread::onTerminated() 164 { 165 m_aThreadAdmin->remove( this ); 166 if( m_bDeleteSelf ) 167 { 168 delete this; 169 } 170 } 171 172 void ORequestThread::run() 173 { 174 ThreadPoolHolder theThreadPool = cppu_threadpool::ThreadPool::getInstance(); 175 176 while ( m_pQueue ) 177 { 178 if( ! m_bAsynchron ) 179 { 180 if ( !uno_bindIdToCurrentThread( m_aThreadId.getHandle() ) ) 181 { 182 OSL_ASSERT( false ); 183 } 184 } 185 186 while( ! m_pQueue->isEmpty() ) 187 { 188 // Note : Oneways should not get a disposable disposeid, 189 // It does not make sense to dispose a call in this state. 190 // That's way we put it an disposeid, that can't be used otherwise. 191 m_pQueue->enter( 192 sal::static_int_cast< sal_Int64 >( 193 reinterpret_cast< sal_IntPtr >(this)), 194 sal_True ); 195 196 if( m_pQueue->isEmpty() ) 197 { 198 theThreadPool->revokeQueue( m_aThreadId , m_bAsynchron ); 199 // Note : revokeQueue might have failed because m_pQueue.isEmpty() 200 // may be false (race). 201 } 202 } 203 204 delete m_pQueue; 205 m_pQueue = 0; 206 207 if( ! m_bAsynchron ) 208 { 209 uno_releaseIdFromCurrentThread(); 210 } 211 212 theThreadPool->waitInPool( this ); 213 } 214 } 215 } 216