1 /************************************************************************* 2 * 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * Copyright 2000, 2010 Oracle and/or its affiliates. 6 * 7 * OpenOffice.org - a multi-platform office productivity suite 8 * 9 * This file is part of OpenOffice.org. 10 * 11 * OpenOffice.org is free software: you can redistribute it and/or modify 12 * it under the terms of the GNU Lesser General Public License version 3 13 * only, as published by the Free Software Foundation. 14 * 15 * OpenOffice.org is distributed in the hope that it will be useful, 16 * but WITHOUT ANY WARRANTY; without even the implied warranty of 17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 18 * GNU Lesser General Public License version 3 for more details 19 * (a copy is included in the LICENSE file that accompanied this code). 20 * 21 * You should have received a copy of the GNU Lesser General Public License 22 * version 3 along with OpenOffice.org. If not, see 23 * <http://www.openoffice.org/license.html> 24 * for a copy of the LGPLv3 License. 25 * 26 ************************************************************************/ 27 28 package com.sun.star.lib.uno.environments.remote; 29 30 import com.sun.star.lang.DisposedException; 31 32 /** 33 * The <code>JobQueue</code> implements a queue for jobs. 34 * For every jobs thread id exists a job queue which is registered 35 * at the <code>ThreadPool</code>. 36 * A JobQueue is splitted in a sync job queue and an async job queue. 37 * The sync job queue is the registerd queue, it delegates async jobs 38 * (put by <code>putjob</code>) into the async queue, which is only 39 * known by the sync queue. 40 * <p> 41 * @version $Revision: 1.19 $ $ $Date: 2008-04-11 11:21:18 $ 42 * @author Kay Ramme 43 * @see com.sun.star.lib.uno.environments.remote.ThreadPool 44 * @see com.sun.star.lib.uno.environments.remote.Job 45 * @see com.sun.star.lib.uno.environments.remote.ThreadID 46 * @since UDK1.0 47 */ 48 public class JobQueue { 49 /** 50 * When set to true, enables various debugging output. 51 */ 52 private static final boolean DEBUG = false; 53 54 protected Job _head; // the head of the job list 55 protected Job _tail; // the tail of the job list 56 57 protected ThreadId _threadId; // the thread id of the queue 58 protected int _ref_count = 0; // the stack deepness 59 protected boolean _createThread; // create a worker thread, if needed 60 protected boolean _createThread_now; // create a worker thread, if needed 61 protected Thread _worker_thread; // the thread that does the jobs 62 63 protected Object _disposeId; // the active dispose id 64 protected Object _doDispose = null; 65 protected Throwable _throwable; 66 67 protected JobQueue _async_jobQueue; // chaining job qeueus for asyncs 68 protected JobQueue _sync_jobQueue; // chaining job qeueus for syncs 69 70 protected boolean _active = false; 71 72 protected JavaThreadPoolFactory _javaThreadPoolFactory; 73 74 /** 75 * A thread for dispatching jobs 76 */ 77 class JobDispatcher extends Thread { 78 Object _disposeId; 79 80 JobDispatcher(Object disposeId) { 81 if(DEBUG) System.err.println("JobQueue$JobDispatcher.<init>:" + _threadId); 82 83 _disposeId = disposeId; 84 } 85 86 ThreadId getThreadId() { 87 return _threadId; 88 } 89 90 public void run() { 91 if(DEBUG) System.err.println("ThreadPool$JobDispatcher.run: " + Thread.currentThread()); 92 93 try { 94 enter(2000, _disposeId); 95 } 96 catch(Throwable throwable) { 97 if(_head != null || _active) { // there was a job in progress, so give a stack 98 System.err.println(getClass().getName() + " - exception occurred:" + throwable); 99 throwable.printStackTrace(System.err); 100 } 101 } 102 finally { 103 release(); 104 } 105 106 if(DEBUG) System.err.println("##### " + getClass().getName() + ".run - exit:" + _threadId); 107 108 // try { 109 // Object object = new Object(); 110 // synchronized(object) { 111 // object.wait(); 112 // } 113 // } 114 // catch(InterruptedException interruptedException) { 115 // } 116 } 117 } 118 119 120 /** 121 * Constructs a async job queue with the given thread id 122 * which belongs to the given sync job queue. 123 * <p> 124 * @param threadId the thread id 125 * @param sync_jobQueue the sync queue this async queue belongs to 126 * @see com.sun.star.lib.uno.environments.remote.ThreadID 127 */ 128 JobQueue(JavaThreadPoolFactory javaThreadPoolFactory, ThreadId threadId) { 129 _javaThreadPoolFactory = javaThreadPoolFactory; 130 _threadId = ThreadId.createFresh(); 131 132 _sync_jobQueue = javaThreadPoolFactory.getJobQueue(threadId); 133 if(_sync_jobQueue == null) { 134 _sync_jobQueue = new JobQueue(javaThreadPoolFactory, threadId, true); 135 _sync_jobQueue.acquire(); 136 } 137 138 _sync_jobQueue._async_jobQueue = this; 139 140 _createThread = true; 141 _createThread_now = true; 142 143 acquire(); 144 145 if(DEBUG) System.err.println("##### " + getClass().getName() + " - init:" + _threadId); 146 } 147 148 /** 149 * Constructs a sync job queue with the given thread id and the given thread. 150 * <p> 151 * @param threadId the thread id 152 * @param createThread if true, the queue creates a worker thread if needed 153 * @see com.sun.star.lib.uno.environments.remote.ThreadID 154 */ 155 JobQueue(JavaThreadPoolFactory javaThreadPoolFactory, ThreadId threadId, boolean createThread){ 156 _javaThreadPoolFactory = javaThreadPoolFactory; 157 _threadId = threadId; 158 _createThread = createThread; 159 _createThread_now = createThread; 160 161 if(DEBUG) System.err.println("##### " + getClass().getName() + " - init:" + _threadId + " " + createThread); 162 } 163 164 /** 165 * Gives the thread id of this queue 166 * <p> 167 * @return the thread id 168 * @see com.sun.star.lib.uno.environments.remote.ThreadID 169 */ 170 ThreadId getThreadId() { 171 return _threadId; 172 } 173 174 synchronized void acquire() { 175 // add only synchronous queues . 176 if(_ref_count <= 0 && _sync_jobQueue == null ) 177 _javaThreadPoolFactory.addJobQueue(this); 178 179 ++ _ref_count; 180 } 181 182 synchronized void release() { 183 -- _ref_count; 184 185 if(_ref_count <= 0) { 186 // only synchronous queues needs to be removed . 187 if( _sync_jobQueue == null ) 188 _javaThreadPoolFactory.removeJobQueue(this); 189 190 191 if(_sync_jobQueue != null) { 192 _sync_jobQueue._async_jobQueue = null; 193 _sync_jobQueue.release(); 194 } 195 } 196 } 197 198 /** 199 * Removes a job from the queue. 200 * <p> 201 * @return a job or null if timed out 202 * @param waitTime the maximum amount of time to wait for a job 203 */ 204 private Job removeJob(int waitTime) { 205 if(DEBUG) System.err.println("##### " + getClass().getName() + ".removeJob:" + _head + " " + _threadId); 206 207 Job job = null; 208 synchronized (this) { 209 // wait max. waitTime time for a job to enter the queue 210 boolean waited = false; 211 while(_head == null && (waitTime == 0 || !waited)) { 212 if(_doDispose == _disposeId) { 213 _doDispose = null; 214 throw (DisposedException) 215 new DisposedException().initCause(_throwable); 216 } 217 218 // notify sync queues 219 notifyAll(); 220 221 try { 222 // wait for new job 223 wait(waitTime); 224 } 225 catch(InterruptedException interruptedException) { 226 throw new com.sun.star.uno.RuntimeException(getClass().getName() + ".removeJob - unexpected:" + interruptedException); 227 } 228 229 // signal that we have already waited once 230 waited = true; 231 } 232 233 234 if(_head != null) { 235 Job current = _head; 236 _head = _head._next; 237 238 if(_head == null) 239 _tail = null; 240 241 job = current; 242 _active = true; 243 } 244 } 245 246 // always wait for asynchron jobqueue to be finished ! 247 if(job != null && _async_jobQueue != null) { 248 synchronized(_async_jobQueue) { 249 // wait for async queue to be empty and last job to be done 250 while(_async_jobQueue._active || _async_jobQueue._head != null) { 251 if(DEBUG) System.err.println("waiting for async:" + _async_jobQueue._head + " " + _async_jobQueue._worker_thread); 252 253 if(_doDispose == _disposeId) { 254 _doDispose = null; 255 throw (DisposedException) 256 new DisposedException().initCause(_throwable); 257 } 258 259 try { 260 _async_jobQueue.wait(); 261 } 262 catch(InterruptedException interruptedException) { 263 throw new com.sun.star.uno.RuntimeException(getClass().getName() + ".removeJob - unexpected:" + interruptedException); 264 } 265 } 266 } 267 } 268 269 return job; 270 } 271 272 /** 273 * Puts a job into the queue. 274 * <p> 275 * @param job the job 276 * @param disposeId a dispose id 277 */ 278 synchronized void putJob(Job job, Object disposeId) { 279 if(DEBUG) System.err.println("##### " + getClass().getName() + ".putJob todoes: " + " job:" + job); 280 281 if(_tail != null) 282 _tail._next = job; 283 else 284 _head = job; 285 286 _tail = job; 287 288 if(_worker_thread == null && _createThread && _createThread_now) { // if there is no thread, which dispatches and if shall create one, create one 289 290 acquire(); 291 292 _createThread_now = false; 293 new JobDispatcher(disposeId).start(); 294 } 295 296 // always notify possible waiters 297 notifyAll(); 298 } 299 300 /** 301 * Enters the job queue. 302 * <p> 303 * @return the result of the final job (reply) 304 * @param disposeId a dispose id 305 */ 306 Object enter(Object disposeId) throws Throwable { 307 return enter(0, disposeId); // wait infinitly 308 } 309 310 /** 311 * Enters the job queue. 312 * <p> 313 * @return the result of the final job (reply) 314 * @param waitTime the maximum amount of time to wait for a job (0 means wait infinitly) 315 * @param disposeId a dispose id 316 */ 317 Object enter(int waitTime, Object disposeId) throws Throwable { 318 if(DEBUG) System.err.println("#####" + getClass().getName() + ".enter: " + _threadId); 319 320 boolean quit = false; 321 322 Object hold_disposeId = _disposeId; 323 _disposeId = disposeId; 324 325 Object result = null; 326 327 Thread hold_worker_thread = _worker_thread; 328 _worker_thread = Thread.currentThread(); 329 330 while(!quit) { 331 Job job = null; 332 333 try { 334 job = removeJob(waitTime); 335 336 if(job != null) { 337 try { 338 result = job.execute(); 339 } 340 finally { 341 _active = false; 342 } 343 344 if (!job.isRequest()) { 345 job.dispose(); 346 347 quit = true; 348 } 349 350 job = null; 351 } 352 else 353 quit = true; 354 355 356 } 357 finally { // ensure that this queue becomes disposed, if necessary 358 if(DEBUG) System.err.println("##### " + getClass().getName() + ".enter leaving: " + _threadId + " " + _worker_thread + " " + hold_worker_thread + " " + result); 359 360 synchronized(this) { 361 if(job != null || (quit && _head == null)) { 362 _worker_thread = hold_worker_thread; 363 364 _createThread_now = true; 365 366 _disposeId = hold_disposeId; 367 368 if(_sync_jobQueue != null) 369 notifyAll(); // notify waiters (e.g. this is an asyncQueue and there is a sync waiting) 370 } 371 else 372 quit = false; 373 374 } 375 } 376 } 377 378 return result; 379 } 380 381 /** 382 * If the given disposeId is registered, 383 * interrups the worker thread. 384 * <p> 385 * @param disposeId the dispose id 386 */ 387 synchronized void dispose(Object disposeId, Throwable throwable) { 388 if(_sync_jobQueue == null) { // dispose only sync queues 389 _doDispose = disposeId; 390 _throwable = throwable; 391 392 // get thread out of wait and let it throw the throwable 393 if(DEBUG) System.err.println(getClass().getName() + ".dispose - notifying thread"); 394 395 notifyAll(); 396 } 397 } 398 } 399 400