xref: /AOO41X/main/jurt/com/sun/star/lib/uno/environments/remote/JobQueue.java (revision 1ecadb572e7010ff3b3382ad9bf179dbc6efadbb)
1 /*************************************************************************
2  *
3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4  *
5  * Copyright 2000, 2010 Oracle and/or its affiliates.
6  *
7  * OpenOffice.org - a multi-platform office productivity suite
8  *
9  * This file is part of OpenOffice.org.
10  *
11  * OpenOffice.org is free software: you can redistribute it and/or modify
12  * it under the terms of the GNU Lesser General Public License version 3
13  * only, as published by the Free Software Foundation.
14  *
15  * OpenOffice.org is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  * GNU Lesser General Public License version 3 for more details
19  * (a copy is included in the LICENSE file that accompanied this code).
20  *
21  * You should have received a copy of the GNU Lesser General Public License
22  * version 3 along with OpenOffice.org.  If not, see
23  * <http://www.openoffice.org/license.html>
24  * for a copy of the LGPLv3 License.
25  *
26  ************************************************************************/
27 
28 package com.sun.star.lib.uno.environments.remote;
29 
30 import com.sun.star.lang.DisposedException;
31 
32 /**
33  * The <code>JobQueue</code> implements a queue for jobs.
34  * For every jobs thread id exists a job queue which is registered
35  * at the <code>ThreadPool</code>.
36  * A JobQueue is splitted in a sync job queue and an async job queue.
37  * The sync job queue is the registerd queue, it delegates async jobs
38  * (put by <code>putjob</code>) into the async queue, which is only
39  * known by the sync queue.
40  * <p>
41  * @version 	$Revision: 1.19 $ $ $Date: 2008-04-11 11:21:18 $
42  * @author 	    Kay Ramme
43  * @see         com.sun.star.lib.uno.environments.remote.ThreadPool
44  * @see         com.sun.star.lib.uno.environments.remote.Job
45  * @see         com.sun.star.lib.uno.environments.remote.ThreadID
46  * @since       UDK1.0
47  */
48 public class JobQueue {
49 	/**
50 	 * When set to true, enables various debugging output.
51 	 */
52 	private static final boolean DEBUG = false;
53 
54 	protected Job _head;                 // the head of the job list
55 	protected Job _tail;                 // the tail of the job list
56 
57 	protected ThreadId  _threadId;       // the thread id of the queue
58 	protected int       _ref_count = 0;  // the stack deepness
59 	protected boolean   _createThread;   // create a worker thread, if needed
60 	protected boolean   _createThread_now;   // create a worker thread, if needed
61 	protected Thread    _worker_thread;  // the thread that does the jobs
62 
63 	protected Object    _disposeId; // the active dispose id
64 	protected Object    _doDispose = null;
65 	protected Throwable _throwable;
66 
67 	protected JobQueue  _async_jobQueue; // chaining job qeueus for asyncs
68 	protected JobQueue  _sync_jobQueue;  // chaining job qeueus for syncs
69 
70 	protected boolean _active = false;
71 
72 	protected JavaThreadPoolFactory _javaThreadPoolFactory;
73 
74 	/**
75 	 * A thread for dispatching jobs
76 	 */
77 	class JobDispatcher extends Thread {
78 		Object _disposeId;
79 
80 		JobDispatcher(Object disposeId) {
81 			if(DEBUG) System.err.println("JobQueue$JobDispatcher.<init>:" + _threadId);
82 
83 			_disposeId = disposeId;
84 		}
85 
86 		ThreadId getThreadId() {
87 			return _threadId;
88 		}
89 
90 		public void run() {
91 			if(DEBUG) System.err.println("ThreadPool$JobDispatcher.run: " + Thread.currentThread());
92 
93 			try {
94   				enter(2000, _disposeId);
95 			}
96 			catch(Throwable throwable) {
97 				if(_head != null || _active) { // there was a job in progress, so give a stack
98 					System.err.println(getClass().getName() + " - exception occurred:" + throwable);
99 					throwable.printStackTrace(System.err);
100 				}
101 			}
102 			finally {
103 				release();
104 			}
105 
106 			if(DEBUG) System.err.println("##### " + getClass().getName() + ".run - exit:" + _threadId);
107 
108 //  			try {
109 //  				Object object = new Object();
110 //  				synchronized(object) {
111 //  					object.wait();
112 //  				}
113 //  			}
114 //  			catch(InterruptedException interruptedException) {
115 //  			}
116 		}
117 	}
118 
119 
120 	/**
121 	 * Constructs a async job queue with the given thread id
122 	 * which belongs to the given sync job queue.
123 	 * <p>
124 	 * @param threadId         the thread id
125 	 * @param sync_jobQueue    the sync queue this async queue belongs to
126 	 * @see                    com.sun.star.lib.uno.environments.remote.ThreadID
127 	 */
128 	JobQueue(JavaThreadPoolFactory javaThreadPoolFactory, ThreadId threadId) {
129 		_javaThreadPoolFactory = javaThreadPoolFactory;
130         _threadId = ThreadId.createFresh();
131 
132 		_sync_jobQueue    = javaThreadPoolFactory.getJobQueue(threadId);
133 		if(_sync_jobQueue == null) {
134 			_sync_jobQueue = new JobQueue(javaThreadPoolFactory, threadId, true);
135 			_sync_jobQueue.acquire();
136 		}
137 
138 		_sync_jobQueue._async_jobQueue = this;
139 
140 		_createThread     = true;
141 		_createThread_now = true;
142 
143 		acquire();
144 
145 		if(DEBUG) System.err.println("##### " + getClass().getName() + " - init:" +  _threadId);
146 	}
147 
148 	/**
149 	 * Constructs a sync job queue with the given thread id and the given thread.
150 	 * <p>
151 	 * @param threadId        the thread id
152 	 * @param createThread    if true, the queue creates a worker thread if needed
153 	 * @see             com.sun.star.lib.uno.environments.remote.ThreadID
154 	 */
155 	JobQueue(JavaThreadPoolFactory javaThreadPoolFactory, ThreadId threadId, boolean createThread){
156 		_javaThreadPoolFactory   = javaThreadPoolFactory;
157 		_threadId         = threadId;
158 		_createThread     = createThread;
159 		_createThread_now = createThread;
160 
161 		if(DEBUG) System.err.println("##### " + getClass().getName() + " - init:" +  _threadId + " " + createThread);
162 	}
163 
164 	/**
165 	 * Gives the thread id of this queue
166 	 * <p>
167 	 * @return  the thread id
168 	 * @see     com.sun.star.lib.uno.environments.remote.ThreadID
169 	 */
170 	ThreadId getThreadId() {
171 		return _threadId;
172 	}
173 
174 	synchronized void acquire() {
175         // add only synchronous queues .
176 		if(_ref_count <= 0 && _sync_jobQueue == null )
177 			_javaThreadPoolFactory.addJobQueue(this);
178 
179 		++ _ref_count;
180 	}
181 
182 	synchronized void release() {
183 		-- _ref_count;
184 
185 		if(_ref_count <= 0) {
186             // only synchronous queues needs to be removed .
187             if( _sync_jobQueue == null )
188                 _javaThreadPoolFactory.removeJobQueue(this);
189 
190 
191 			if(_sync_jobQueue != null) {
192 				_sync_jobQueue._async_jobQueue = null;
193 				_sync_jobQueue.release();
194 			}
195 		}
196 	}
197 
198 	/**
199 	 * Removes a job from the queue.
200 	 * <p>
201 	 * @return a job or null if timed out
202 	 * @param  waitTime        the maximum amount of time to wait for a job
203 	 */
204 	private Job removeJob(int waitTime) {
205 		if(DEBUG) System.err.println("##### " + getClass().getName() + ".removeJob:" + _head + " " + _threadId);
206 
207 		Job job = null;
208 		synchronized (this) {
209 			// wait max. waitTime time for a job to enter the queue
210 			boolean waited = false;
211 			while(_head == null && (waitTime == 0 || !waited)) {
212 				if(_doDispose == _disposeId) {
213 					_doDispose = null;
214 					throw (DisposedException)
215                         new DisposedException().initCause(_throwable);
216 				}
217 
218 				// notify sync queues
219 				notifyAll();
220 
221 				try {
222 					// wait for new job
223 					wait(waitTime);
224 				}
225 				catch(InterruptedException interruptedException) {
226   					throw new com.sun.star.uno.RuntimeException(getClass().getName() + ".removeJob - unexpected:" + interruptedException);
227 				}
228 
229 				// signal that we have already waited once
230 				waited = true;
231 			}
232 
233 
234 			if(_head != null) {
235 				Job current = _head;
236 				_head    = _head._next;
237 
238 				if(_head == null)
239 					_tail = null;
240 
241 				job = current;
242 				_active = true;
243 			}
244 		}
245 
246 		// always wait for asynchron jobqueue to be finished !
247 		if(job != null && _async_jobQueue != null) {
248 			synchronized(_async_jobQueue) {
249 				// wait for async queue to be empty and last job to be done
250 				while(_async_jobQueue._active || _async_jobQueue._head != null) {
251 					if(DEBUG) System.err.println("waiting for async:" + _async_jobQueue._head + " " +  _async_jobQueue._worker_thread);
252 
253 					if(_doDispose == _disposeId) {
254 						_doDispose = null;
255 						throw (DisposedException)
256                             new DisposedException().initCause(_throwable);
257 					}
258 
259 					try {
260 						_async_jobQueue.wait();
261 					}
262 					catch(InterruptedException interruptedException) {
263 						throw new com.sun.star.uno.RuntimeException(getClass().getName() + ".removeJob - unexpected:" + interruptedException);
264 					}
265 				}
266 			}
267 		}
268 
269 		return job;
270 	}
271 
272 	/**
273 	 * Puts a job into the queue.
274 	 * <p>
275 	 * @param  job        the job
276 	 * @param  disposeId  a dispose id
277 	 */
278 	synchronized void putJob(Job job, Object disposeId) {
279 		if(DEBUG) System.err.println("##### " + getClass().getName() + ".putJob todoes: " + " job:" + job);
280 
281 		if(_tail != null)
282 			_tail._next = job;
283 		else
284 			_head = job;
285 
286 		_tail = job;
287 
288 		if(_worker_thread == null && _createThread && _createThread_now) { // if there is no thread, which dispatches and if shall create one, create one
289 
290 			acquire();
291 
292 			_createThread_now = false;
293 			new JobDispatcher(disposeId).start();
294 		}
295 
296 		// always notify possible waiters
297 		notifyAll();
298 	}
299 
300 	/**
301 	 * Enters the job queue.
302 	 * <p>
303 	 * @return the result of the final job (reply)
304 	 * @param  disposeId  a dispose id
305 	 */
306 	Object enter(Object disposeId) throws Throwable {
307 		return enter(0, disposeId); // wait infinitly
308 	}
309 
310 	/**
311 	 * Enters the job queue.
312 	 * <p>
313 	 * @return the result of the final job (reply)
314 	 * @param  waitTime   the maximum amount of time to wait for a job (0 means wait infinitly)
315 	 * @param  disposeId  a dispose id
316 	 */
317 	Object enter(int waitTime, Object disposeId) throws Throwable {
318 		if(DEBUG) System.err.println("#####" + getClass().getName() + ".enter: " + _threadId);
319 
320 		boolean quit = false;
321 
322 		Object hold_disposeId = _disposeId;
323 		_disposeId = disposeId;
324 
325 		Object result = null;
326 
327 		Thread hold_worker_thread = _worker_thread;
328 		_worker_thread = Thread.currentThread();
329 
330 		while(!quit) {
331 			Job job = null;
332 
333 			try {
334 				job = removeJob(waitTime);
335 
336 				if(job != null) {
337 					try {
338 						result = job.execute();
339 					}
340 					finally {
341 						_active = false;
342 					}
343 
344 					if (!job.isRequest()) {
345 						job.dispose();
346 
347 						quit = true;
348 					}
349 
350 					job = null;
351 				}
352 				else
353 					quit = true;
354 
355 
356 			}
357 			finally { // ensure that this queue becomes disposed, if necessary
358 				if(DEBUG) System.err.println("##### " + getClass().getName() + ".enter leaving: " + _threadId + " " + _worker_thread + " " + hold_worker_thread + " " + result);
359 
360 				synchronized(this) {
361 					if(job != null || (quit && _head == null)) {
362 						_worker_thread = hold_worker_thread;
363 
364 						_createThread_now = true;
365 
366 						_disposeId = hold_disposeId;
367 
368 						if(_sync_jobQueue != null)
369 							notifyAll(); // notify waiters (e.g. this is an asyncQueue and there is a sync waiting)
370 					}
371 					else
372 						quit = false;
373 
374 				}
375 			}
376 		}
377 
378 		return result;
379 	}
380 
381 	/**
382 	 * If the given disposeId is registered,
383 	 * interrups the worker thread.
384 	 * <p>
385 	 * @param disposeId    the dispose id
386 	 */
387 	synchronized void dispose(Object disposeId, Throwable throwable) {
388 		if(_sync_jobQueue == null) { // dispose only sync queues
389 			_doDispose = disposeId;
390 			_throwable = throwable;
391 
392 			// get thread out of wait and let it throw the throwable
393 			if(DEBUG) System.err.println(getClass().getName() + ".dispose - notifying thread");
394 
395 			notifyAll();
396 		}
397 	}
398 }
399 
400