12be43276SAndrew Rist /************************************************************** 2cdf0e10cSrcweir * 32be43276SAndrew Rist * Licensed to the Apache Software Foundation (ASF) under one 42be43276SAndrew Rist * or more contributor license agreements. See the NOTICE file 52be43276SAndrew Rist * distributed with this work for additional information 62be43276SAndrew Rist * regarding copyright ownership. The ASF licenses this file 72be43276SAndrew Rist * to you under the Apache License, Version 2.0 (the 82be43276SAndrew Rist * "License"); you may not use this file except in compliance 92be43276SAndrew Rist * with the License. You may obtain a copy of the License at 10cdf0e10cSrcweir * 112be43276SAndrew Rist * http://www.apache.org/licenses/LICENSE-2.0 12cdf0e10cSrcweir * 132be43276SAndrew Rist * Unless required by applicable law or agreed to in writing, 142be43276SAndrew Rist * software distributed under the License is distributed on an 152be43276SAndrew Rist * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 162be43276SAndrew Rist * KIND, either express or implied. See the License for the 172be43276SAndrew Rist * specific language governing permissions and limitations 182be43276SAndrew Rist * under the License. 19cdf0e10cSrcweir * 202be43276SAndrew Rist *************************************************************/ 212be43276SAndrew Rist 222be43276SAndrew Rist 23cdf0e10cSrcweir 24cdf0e10cSrcweir package com.sun.star.lib.uno.environments.remote; 25cdf0e10cSrcweir 26cdf0e10cSrcweir import com.sun.star.lang.DisposedException; 27cdf0e10cSrcweir 28cdf0e10cSrcweir /** 29cdf0e10cSrcweir * The <code>JobQueue</code> implements a queue for jobs. 30cdf0e10cSrcweir * For every jobs thread id exists a job queue which is registered 31cdf0e10cSrcweir * at the <code>ThreadPool</code>. 32cdf0e10cSrcweir * A JobQueue is splitted in a sync job queue and an async job queue. 33cdf0e10cSrcweir * The sync job queue is the registerd queue, it delegates async jobs 34cdf0e10cSrcweir * (put by <code>putjob</code>) into the async queue, which is only 35cdf0e10cSrcweir * known by the sync queue. 36cdf0e10cSrcweir * <p> 37cdf0e10cSrcweir * @version $Revision: 1.19 $ $ $Date: 2008-04-11 11:21:18 $ 38cdf0e10cSrcweir * @author Kay Ramme 39cdf0e10cSrcweir * @see com.sun.star.lib.uno.environments.remote.ThreadPool 40cdf0e10cSrcweir * @see com.sun.star.lib.uno.environments.remote.Job 41*e6b649b5SPedro Giffuni * @see com.sun.star.lib.uno.environments.remote.ThreadId 42cdf0e10cSrcweir * @since UDK1.0 43cdf0e10cSrcweir */ 44cdf0e10cSrcweir public class JobQueue { 45cdf0e10cSrcweir /** 46cdf0e10cSrcweir * When set to true, enables various debugging output. 47cdf0e10cSrcweir */ 48cdf0e10cSrcweir private static final boolean DEBUG = false; 49cdf0e10cSrcweir 50cdf0e10cSrcweir protected Job _head; // the head of the job list 51cdf0e10cSrcweir protected Job _tail; // the tail of the job list 52cdf0e10cSrcweir 53cdf0e10cSrcweir protected ThreadId _threadId; // the thread id of the queue 54cdf0e10cSrcweir protected int _ref_count = 0; // the stack deepness 55cdf0e10cSrcweir protected boolean _createThread; // create a worker thread, if needed 56cdf0e10cSrcweir protected boolean _createThread_now; // create a worker thread, if needed 57cdf0e10cSrcweir protected Thread _worker_thread; // the thread that does the jobs 58cdf0e10cSrcweir 59cdf0e10cSrcweir protected Object _disposeId; // the active dispose id 60cdf0e10cSrcweir protected Object _doDispose = null; 61cdf0e10cSrcweir protected Throwable _throwable; 62cdf0e10cSrcweir 63cdf0e10cSrcweir protected JobQueue _async_jobQueue; // chaining job qeueus for asyncs 64cdf0e10cSrcweir protected JobQueue _sync_jobQueue; // chaining job qeueus for syncs 65cdf0e10cSrcweir 66cdf0e10cSrcweir protected boolean _active = false; 67cdf0e10cSrcweir 68cdf0e10cSrcweir protected JavaThreadPoolFactory _javaThreadPoolFactory; 69cdf0e10cSrcweir 70cdf0e10cSrcweir /** 71cdf0e10cSrcweir * A thread for dispatching jobs 72cdf0e10cSrcweir */ 73cdf0e10cSrcweir class JobDispatcher extends Thread { 74cdf0e10cSrcweir Object _disposeId; 75cdf0e10cSrcweir JobDispatcher(Object disposeId)76cdf0e10cSrcweir JobDispatcher(Object disposeId) { 77cdf0e10cSrcweir if(DEBUG) System.err.println("JobQueue$JobDispatcher.<init>:" + _threadId); 78cdf0e10cSrcweir 79cdf0e10cSrcweir _disposeId = disposeId; 80cdf0e10cSrcweir } 81cdf0e10cSrcweir getThreadId()82cdf0e10cSrcweir ThreadId getThreadId() { 83cdf0e10cSrcweir return _threadId; 84cdf0e10cSrcweir } 85cdf0e10cSrcweir run()86cdf0e10cSrcweir public void run() { 87cdf0e10cSrcweir if(DEBUG) System.err.println("ThreadPool$JobDispatcher.run: " + Thread.currentThread()); 88cdf0e10cSrcweir 89cdf0e10cSrcweir try { 90cdf0e10cSrcweir enter(2000, _disposeId); 91cdf0e10cSrcweir } 92cdf0e10cSrcweir catch(Throwable throwable) { 93cdf0e10cSrcweir if(_head != null || _active) { // there was a job in progress, so give a stack 94cdf0e10cSrcweir System.err.println(getClass().getName() + " - exception occurred:" + throwable); 95cdf0e10cSrcweir throwable.printStackTrace(System.err); 96cdf0e10cSrcweir } 97cdf0e10cSrcweir } 98cdf0e10cSrcweir finally { 99cdf0e10cSrcweir release(); 100cdf0e10cSrcweir } 101cdf0e10cSrcweir 102cdf0e10cSrcweir if(DEBUG) System.err.println("##### " + getClass().getName() + ".run - exit:" + _threadId); 103cdf0e10cSrcweir 104cdf0e10cSrcweir // try { 105cdf0e10cSrcweir // Object object = new Object(); 106cdf0e10cSrcweir // synchronized(object) { 107cdf0e10cSrcweir // object.wait(); 108cdf0e10cSrcweir // } 109cdf0e10cSrcweir // } 110cdf0e10cSrcweir // catch(InterruptedException interruptedException) { 111cdf0e10cSrcweir // } 112cdf0e10cSrcweir } 113cdf0e10cSrcweir } 114cdf0e10cSrcweir 115cdf0e10cSrcweir 116cdf0e10cSrcweir /** 117cdf0e10cSrcweir * Constructs a async job queue with the given thread id 118cdf0e10cSrcweir * which belongs to the given sync job queue. 119cdf0e10cSrcweir * <p> 120cdf0e10cSrcweir * @param threadId the thread id 121cdf0e10cSrcweir * @param sync_jobQueue the sync queue this async queue belongs to 122cdf0e10cSrcweir * @see com.sun.star.lib.uno.environments.remote.ThreadID 123cdf0e10cSrcweir */ JobQueue(JavaThreadPoolFactory javaThreadPoolFactory, ThreadId threadId)124cdf0e10cSrcweir JobQueue(JavaThreadPoolFactory javaThreadPoolFactory, ThreadId threadId) { 125cdf0e10cSrcweir _javaThreadPoolFactory = javaThreadPoolFactory; 126cdf0e10cSrcweir _threadId = ThreadId.createFresh(); 127cdf0e10cSrcweir 128cdf0e10cSrcweir _sync_jobQueue = javaThreadPoolFactory.getJobQueue(threadId); 129cdf0e10cSrcweir if(_sync_jobQueue == null) { 130cdf0e10cSrcweir _sync_jobQueue = new JobQueue(javaThreadPoolFactory, threadId, true); 131cdf0e10cSrcweir _sync_jobQueue.acquire(); 132cdf0e10cSrcweir } 133cdf0e10cSrcweir 134cdf0e10cSrcweir _sync_jobQueue._async_jobQueue = this; 135cdf0e10cSrcweir 136cdf0e10cSrcweir _createThread = true; 137cdf0e10cSrcweir _createThread_now = true; 138cdf0e10cSrcweir 139cdf0e10cSrcweir acquire(); 140cdf0e10cSrcweir 141cdf0e10cSrcweir if(DEBUG) System.err.println("##### " + getClass().getName() + " - init:" + _threadId); 142cdf0e10cSrcweir } 143cdf0e10cSrcweir 144cdf0e10cSrcweir /** 145cdf0e10cSrcweir * Constructs a sync job queue with the given thread id and the given thread. 146cdf0e10cSrcweir * <p> 147cdf0e10cSrcweir * @param threadId the thread id 148cdf0e10cSrcweir * @param createThread if true, the queue creates a worker thread if needed 149cdf0e10cSrcweir * @see com.sun.star.lib.uno.environments.remote.ThreadID 150cdf0e10cSrcweir */ JobQueue(JavaThreadPoolFactory javaThreadPoolFactory, ThreadId threadId, boolean createThread)151cdf0e10cSrcweir JobQueue(JavaThreadPoolFactory javaThreadPoolFactory, ThreadId threadId, boolean createThread){ 152cdf0e10cSrcweir _javaThreadPoolFactory = javaThreadPoolFactory; 153cdf0e10cSrcweir _threadId = threadId; 154cdf0e10cSrcweir _createThread = createThread; 155cdf0e10cSrcweir _createThread_now = createThread; 156cdf0e10cSrcweir 157cdf0e10cSrcweir if(DEBUG) System.err.println("##### " + getClass().getName() + " - init:" + _threadId + " " + createThread); 158cdf0e10cSrcweir } 159cdf0e10cSrcweir 160cdf0e10cSrcweir /** 161cdf0e10cSrcweir * Gives the thread id of this queue 162cdf0e10cSrcweir * <p> 163cdf0e10cSrcweir * @return the thread id 164cdf0e10cSrcweir * @see com.sun.star.lib.uno.environments.remote.ThreadID 165cdf0e10cSrcweir */ getThreadId()166cdf0e10cSrcweir ThreadId getThreadId() { 167cdf0e10cSrcweir return _threadId; 168cdf0e10cSrcweir } 169cdf0e10cSrcweir acquire()170cdf0e10cSrcweir synchronized void acquire() { 171cdf0e10cSrcweir // add only synchronous queues . 172cdf0e10cSrcweir if(_ref_count <= 0 && _sync_jobQueue == null ) 173cdf0e10cSrcweir _javaThreadPoolFactory.addJobQueue(this); 174cdf0e10cSrcweir 175cdf0e10cSrcweir ++ _ref_count; 176cdf0e10cSrcweir } 177cdf0e10cSrcweir release()178cdf0e10cSrcweir synchronized void release() { 179cdf0e10cSrcweir -- _ref_count; 180cdf0e10cSrcweir 181cdf0e10cSrcweir if(_ref_count <= 0) { 182cdf0e10cSrcweir // only synchronous queues needs to be removed . 183cdf0e10cSrcweir if( _sync_jobQueue == null ) 184cdf0e10cSrcweir _javaThreadPoolFactory.removeJobQueue(this); 185cdf0e10cSrcweir 186cdf0e10cSrcweir 187cdf0e10cSrcweir if(_sync_jobQueue != null) { 188cdf0e10cSrcweir _sync_jobQueue._async_jobQueue = null; 189cdf0e10cSrcweir _sync_jobQueue.release(); 190cdf0e10cSrcweir } 191cdf0e10cSrcweir } 192cdf0e10cSrcweir } 193cdf0e10cSrcweir 194cdf0e10cSrcweir /** 195cdf0e10cSrcweir * Removes a job from the queue. 196cdf0e10cSrcweir * <p> 197cdf0e10cSrcweir * @return a job or null if timed out 198cdf0e10cSrcweir * @param waitTime the maximum amount of time to wait for a job 199cdf0e10cSrcweir */ removeJob(int waitTime)200cdf0e10cSrcweir private Job removeJob(int waitTime) { 201cdf0e10cSrcweir if(DEBUG) System.err.println("##### " + getClass().getName() + ".removeJob:" + _head + " " + _threadId); 202cdf0e10cSrcweir 203cdf0e10cSrcweir Job job = null; 204cdf0e10cSrcweir synchronized (this) { 205cdf0e10cSrcweir // wait max. waitTime time for a job to enter the queue 206cdf0e10cSrcweir boolean waited = false; 207cdf0e10cSrcweir while(_head == null && (waitTime == 0 || !waited)) { 208cdf0e10cSrcweir if(_doDispose == _disposeId) { 209cdf0e10cSrcweir _doDispose = null; 210cdf0e10cSrcweir throw (DisposedException) 211cdf0e10cSrcweir new DisposedException().initCause(_throwable); 212cdf0e10cSrcweir } 213cdf0e10cSrcweir 214cdf0e10cSrcweir // notify sync queues 215cdf0e10cSrcweir notifyAll(); 216cdf0e10cSrcweir 217cdf0e10cSrcweir try { 218cdf0e10cSrcweir // wait for new job 219cdf0e10cSrcweir wait(waitTime); 220cdf0e10cSrcweir } 221cdf0e10cSrcweir catch(InterruptedException interruptedException) { 222cdf0e10cSrcweir throw new com.sun.star.uno.RuntimeException(getClass().getName() + ".removeJob - unexpected:" + interruptedException); 223cdf0e10cSrcweir } 224cdf0e10cSrcweir 225cdf0e10cSrcweir // signal that we have already waited once 226cdf0e10cSrcweir waited = true; 227cdf0e10cSrcweir } 228cdf0e10cSrcweir 229cdf0e10cSrcweir 230cdf0e10cSrcweir if(_head != null) { 231cdf0e10cSrcweir Job current = _head; 232cdf0e10cSrcweir _head = _head._next; 233cdf0e10cSrcweir 234cdf0e10cSrcweir if(_head == null) 235cdf0e10cSrcweir _tail = null; 236cdf0e10cSrcweir 237cdf0e10cSrcweir job = current; 238cdf0e10cSrcweir _active = true; 239cdf0e10cSrcweir } 240cdf0e10cSrcweir } 241cdf0e10cSrcweir 242cdf0e10cSrcweir // always wait for asynchron jobqueue to be finished ! 243cdf0e10cSrcweir if(job != null && _async_jobQueue != null) { 244cdf0e10cSrcweir synchronized(_async_jobQueue) { 245cdf0e10cSrcweir // wait for async queue to be empty and last job to be done 246cdf0e10cSrcweir while(_async_jobQueue._active || _async_jobQueue._head != null) { 247cdf0e10cSrcweir if(DEBUG) System.err.println("waiting for async:" + _async_jobQueue._head + " " + _async_jobQueue._worker_thread); 248cdf0e10cSrcweir 249cdf0e10cSrcweir if(_doDispose == _disposeId) { 250cdf0e10cSrcweir _doDispose = null; 251cdf0e10cSrcweir throw (DisposedException) 252cdf0e10cSrcweir new DisposedException().initCause(_throwable); 253cdf0e10cSrcweir } 254cdf0e10cSrcweir 255cdf0e10cSrcweir try { 256cdf0e10cSrcweir _async_jobQueue.wait(); 257cdf0e10cSrcweir } 258cdf0e10cSrcweir catch(InterruptedException interruptedException) { 259cdf0e10cSrcweir throw new com.sun.star.uno.RuntimeException(getClass().getName() + ".removeJob - unexpected:" + interruptedException); 260cdf0e10cSrcweir } 261cdf0e10cSrcweir } 262cdf0e10cSrcweir } 263cdf0e10cSrcweir } 264cdf0e10cSrcweir 265cdf0e10cSrcweir return job; 266cdf0e10cSrcweir } 267cdf0e10cSrcweir 268cdf0e10cSrcweir /** 269cdf0e10cSrcweir * Puts a job into the queue. 270cdf0e10cSrcweir * <p> 271cdf0e10cSrcweir * @param job the job 272cdf0e10cSrcweir * @param disposeId a dispose id 273cdf0e10cSrcweir */ putJob(Job job, Object disposeId)274cdf0e10cSrcweir synchronized void putJob(Job job, Object disposeId) { 275cdf0e10cSrcweir if(DEBUG) System.err.println("##### " + getClass().getName() + ".putJob todoes: " + " job:" + job); 276cdf0e10cSrcweir 277cdf0e10cSrcweir if(_tail != null) 278cdf0e10cSrcweir _tail._next = job; 279cdf0e10cSrcweir else 280cdf0e10cSrcweir _head = job; 281cdf0e10cSrcweir 282cdf0e10cSrcweir _tail = job; 283cdf0e10cSrcweir 284cdf0e10cSrcweir if(_worker_thread == null && _createThread && _createThread_now) { // if there is no thread, which dispatches and if shall create one, create one 285cdf0e10cSrcweir 286cdf0e10cSrcweir acquire(); 287cdf0e10cSrcweir 288cdf0e10cSrcweir _createThread_now = false; 289cdf0e10cSrcweir new JobDispatcher(disposeId).start(); 290cdf0e10cSrcweir } 291cdf0e10cSrcweir 292cdf0e10cSrcweir // always notify possible waiters 293cdf0e10cSrcweir notifyAll(); 294cdf0e10cSrcweir } 295cdf0e10cSrcweir 296cdf0e10cSrcweir /** 297cdf0e10cSrcweir * Enters the job queue. 298cdf0e10cSrcweir * <p> 299cdf0e10cSrcweir * @return the result of the final job (reply) 300cdf0e10cSrcweir * @param disposeId a dispose id 301cdf0e10cSrcweir */ enter(Object disposeId)302cdf0e10cSrcweir Object enter(Object disposeId) throws Throwable { 303cdf0e10cSrcweir return enter(0, disposeId); // wait infinitly 304cdf0e10cSrcweir } 305cdf0e10cSrcweir 306cdf0e10cSrcweir /** 307cdf0e10cSrcweir * Enters the job queue. 308cdf0e10cSrcweir * <p> 309cdf0e10cSrcweir * @return the result of the final job (reply) 310cdf0e10cSrcweir * @param waitTime the maximum amount of time to wait for a job (0 means wait infinitly) 311cdf0e10cSrcweir * @param disposeId a dispose id 312cdf0e10cSrcweir */ enter(int waitTime, Object disposeId)313cdf0e10cSrcweir Object enter(int waitTime, Object disposeId) throws Throwable { 314cdf0e10cSrcweir if(DEBUG) System.err.println("#####" + getClass().getName() + ".enter: " + _threadId); 315cdf0e10cSrcweir 316cdf0e10cSrcweir boolean quit = false; 317cdf0e10cSrcweir 318cdf0e10cSrcweir Object hold_disposeId = _disposeId; 319cdf0e10cSrcweir _disposeId = disposeId; 320cdf0e10cSrcweir 321cdf0e10cSrcweir Object result = null; 322cdf0e10cSrcweir 323cdf0e10cSrcweir Thread hold_worker_thread = _worker_thread; 324cdf0e10cSrcweir _worker_thread = Thread.currentThread(); 325cdf0e10cSrcweir 326cdf0e10cSrcweir while(!quit) { 327cdf0e10cSrcweir Job job = null; 328cdf0e10cSrcweir 329cdf0e10cSrcweir try { 330cdf0e10cSrcweir job = removeJob(waitTime); 331cdf0e10cSrcweir 332cdf0e10cSrcweir if(job != null) { 333cdf0e10cSrcweir try { 334cdf0e10cSrcweir result = job.execute(); 335cdf0e10cSrcweir } 336cdf0e10cSrcweir finally { 337cdf0e10cSrcweir _active = false; 338cdf0e10cSrcweir } 339cdf0e10cSrcweir 340cdf0e10cSrcweir if (!job.isRequest()) { 341cdf0e10cSrcweir job.dispose(); 342cdf0e10cSrcweir 343cdf0e10cSrcweir quit = true; 344cdf0e10cSrcweir } 345cdf0e10cSrcweir 346cdf0e10cSrcweir job = null; 347cdf0e10cSrcweir } 348cdf0e10cSrcweir else 349cdf0e10cSrcweir quit = true; 350cdf0e10cSrcweir 351cdf0e10cSrcweir 352cdf0e10cSrcweir } 353cdf0e10cSrcweir finally { // ensure that this queue becomes disposed, if necessary 354cdf0e10cSrcweir if(DEBUG) System.err.println("##### " + getClass().getName() + ".enter leaving: " + _threadId + " " + _worker_thread + " " + hold_worker_thread + " " + result); 355cdf0e10cSrcweir 356cdf0e10cSrcweir synchronized(this) { 357cdf0e10cSrcweir if(job != null || (quit && _head == null)) { 358cdf0e10cSrcweir _worker_thread = hold_worker_thread; 359cdf0e10cSrcweir 360cdf0e10cSrcweir _createThread_now = true; 361cdf0e10cSrcweir 362cdf0e10cSrcweir _disposeId = hold_disposeId; 363cdf0e10cSrcweir 364cdf0e10cSrcweir if(_sync_jobQueue != null) 365cdf0e10cSrcweir notifyAll(); // notify waiters (e.g. this is an asyncQueue and there is a sync waiting) 366cdf0e10cSrcweir } 367cdf0e10cSrcweir else 368cdf0e10cSrcweir quit = false; 369cdf0e10cSrcweir 370cdf0e10cSrcweir } 371cdf0e10cSrcweir } 372cdf0e10cSrcweir } 373cdf0e10cSrcweir 374cdf0e10cSrcweir return result; 375cdf0e10cSrcweir } 376cdf0e10cSrcweir 377cdf0e10cSrcweir /** 378cdf0e10cSrcweir * If the given disposeId is registered, 379cdf0e10cSrcweir * interrups the worker thread. 380cdf0e10cSrcweir * <p> 381cdf0e10cSrcweir * @param disposeId the dispose id 382cdf0e10cSrcweir */ dispose(Object disposeId, Throwable throwable)383cdf0e10cSrcweir synchronized void dispose(Object disposeId, Throwable throwable) { 384cdf0e10cSrcweir if(_sync_jobQueue == null) { // dispose only sync queues 385cdf0e10cSrcweir _doDispose = disposeId; 386cdf0e10cSrcweir _throwable = throwable; 387cdf0e10cSrcweir 388cdf0e10cSrcweir // get thread out of wait and let it throw the throwable 389cdf0e10cSrcweir if(DEBUG) System.err.println(getClass().getName() + ".dispose - notifying thread"); 390cdf0e10cSrcweir 391cdf0e10cSrcweir notifyAll(); 392cdf0e10cSrcweir } 393cdf0e10cSrcweir } 394cdf0e10cSrcweir } 395cdf0e10cSrcweir 396