xref: /AOO41X/main/jurt/com/sun/star/lib/uno/environments/remote/JobQueue.java (revision 47148b3bc50811ceb41802e4cc50a5db21535900)
1 /**************************************************************
2  *
3  * Licensed to the Apache Software Foundation (ASF) under one
4  * or more contributor license agreements.  See the NOTICE file
5  * distributed with this work for additional information
6  * regarding copyright ownership.  The ASF licenses this file
7  * to you under the Apache License, Version 2.0 (the
8  * "License"); you may not use this file except in compliance
9  * with the License.  You may obtain a copy of the License at
10  *
11  *   http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing,
14  * software distributed under the License is distributed on an
15  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16  * KIND, either express or implied.  See the License for the
17  * specific language governing permissions and limitations
18  * under the License.
19  *
20  *************************************************************/
21 
22 
23 
24 package com.sun.star.lib.uno.environments.remote;
25 
26 import com.sun.star.lang.DisposedException;
27 
28 /**
29  * The <code>JobQueue</code> implements a queue for jobs.
30  * For every jobs thread id exists a job queue which is registered
31  * at the <code>ThreadPool</code>.
32  * A JobQueue is splitted in a sync job queue and an async job queue.
33  * The sync job queue is the registerd queue, it delegates async jobs
34  * (put by <code>putjob</code>) into the async queue, which is only
35  * known by the sync queue.
36  * <p>
37  * @version     $Revision: 1.19 $ $ $Date: 2008-04-11 11:21:18 $
38  * @author      Kay Ramme
39  * @see         com.sun.star.lib.uno.environments.remote.ThreadPool
40  * @see         com.sun.star.lib.uno.environments.remote.Job
41  * @see         com.sun.star.lib.uno.environments.remote.ThreadId
42  * @since       UDK1.0
43  */
44 public class JobQueue {
45     /**
46      * When set to true, enables various debugging output.
47      */
48     private static final boolean DEBUG = false;
49 
50     protected Job _head;                 // the head of the job list
51     protected Job _tail;                 // the tail of the job list
52 
53     protected ThreadId  _threadId;       // the thread id of the queue
54     protected int       _ref_count = 0;  // the stack deepness
55     protected boolean   _createThread;   // create a worker thread, if needed
56     protected boolean   _createThread_now;   // create a worker thread, if needed
57     protected Thread    _worker_thread;  // the thread that does the jobs
58 
59     protected Object    _disposeId; // the active dispose id
60     protected Object    _doDispose = null;
61     protected Throwable _throwable;
62 
63     protected JobQueue  _async_jobQueue; // chaining job qeueus for asyncs
64     protected JobQueue  _sync_jobQueue;  // chaining job qeueus for syncs
65 
66     protected boolean _active = false;
67 
68     protected JavaThreadPoolFactory _javaThreadPoolFactory;
69 
70     /**
71      * A thread for dispatching jobs
72      */
73     class JobDispatcher extends Thread {
74         Object _disposeId;
75 
JobDispatcher(Object disposeId)76         JobDispatcher(Object disposeId) {
77             if(DEBUG) System.err.println("JobQueue$JobDispatcher.<init>:" + _threadId);
78 
79             _disposeId = disposeId;
80         }
81 
getThreadId()82         ThreadId getThreadId() {
83             return _threadId;
84         }
85 
run()86         public void run() {
87             if(DEBUG) System.err.println("ThreadPool$JobDispatcher.run: " + Thread.currentThread());
88 
89             try {
90                 enter(2000, _disposeId);
91             }
92             catch(Throwable throwable) {
93                 if(_head != null || _active) { // there was a job in progress, so give a stack
94                     System.err.println(getClass().getName() + " - exception occurred:" + throwable);
95                     throwable.printStackTrace(System.err);
96                 }
97             }
98             finally {
99                 release();
100             }
101 
102             if(DEBUG) System.err.println("##### " + getClass().getName() + ".run - exit:" + _threadId);
103 
104 //              try {
105 //                  Object object = new Object();
106 //                  synchronized(object) {
107 //                      object.wait();
108 //                  }
109 //              }
110 //              catch(InterruptedException interruptedException) {
111 //              }
112         }
113     }
114 
115 
116     /**
117      * Constructs a async job queue with the given thread id
118      * which belongs to the given sync job queue.
119      * <p>
120      * @param threadId         the thread id
121      * @param sync_jobQueue    the sync queue this async queue belongs to
122      * @see                    com.sun.star.lib.uno.environments.remote.ThreadID
123      */
JobQueue(JavaThreadPoolFactory javaThreadPoolFactory, ThreadId threadId)124     JobQueue(JavaThreadPoolFactory javaThreadPoolFactory, ThreadId threadId) {
125         _javaThreadPoolFactory = javaThreadPoolFactory;
126         _threadId = ThreadId.createFresh();
127 
128         _sync_jobQueue    = javaThreadPoolFactory.getJobQueue(threadId);
129         if(_sync_jobQueue == null) {
130             _sync_jobQueue = new JobQueue(javaThreadPoolFactory, threadId, true);
131             _sync_jobQueue.acquire();
132         }
133 
134         _sync_jobQueue._async_jobQueue = this;
135 
136         _createThread     = true;
137         _createThread_now = true;
138 
139         acquire();
140 
141         if(DEBUG) System.err.println("##### " + getClass().getName() + " - init:" +  _threadId);
142     }
143 
144     /**
145      * Constructs a sync job queue with the given thread id and the given thread.
146      * <p>
147      * @param threadId        the thread id
148      * @param createThread    if true, the queue creates a worker thread if needed
149      * @see             com.sun.star.lib.uno.environments.remote.ThreadID
150      */
JobQueue(JavaThreadPoolFactory javaThreadPoolFactory, ThreadId threadId, boolean createThread)151     JobQueue(JavaThreadPoolFactory javaThreadPoolFactory, ThreadId threadId, boolean createThread){
152         _javaThreadPoolFactory   = javaThreadPoolFactory;
153         _threadId         = threadId;
154         _createThread     = createThread;
155         _createThread_now = createThread;
156 
157         if(DEBUG) System.err.println("##### " + getClass().getName() + " - init:" +  _threadId + " " + createThread);
158     }
159 
160     /**
161      * Gives the thread id of this queue
162      * <p>
163      * @return  the thread id
164      * @see     com.sun.star.lib.uno.environments.remote.ThreadID
165      */
getThreadId()166     ThreadId getThreadId() {
167         return _threadId;
168     }
169 
acquire()170     synchronized void acquire() {
171         // add only synchronous queues .
172         if(_ref_count <= 0 && _sync_jobQueue == null )
173             _javaThreadPoolFactory.addJobQueue(this);
174 
175         ++ _ref_count;
176     }
177 
release()178     synchronized void release() {
179         -- _ref_count;
180 
181         if(_ref_count <= 0) {
182             // only synchronous queues needs to be removed .
183             if( _sync_jobQueue == null )
184                 _javaThreadPoolFactory.removeJobQueue(this);
185 
186 
187             if(_sync_jobQueue != null) {
188                 _sync_jobQueue._async_jobQueue = null;
189                 _sync_jobQueue.release();
190             }
191         }
192     }
193 
194     /**
195      * Removes a job from the queue.
196      * <p>
197      * @return a job or null if timed out
198      * @param  waitTime        the maximum amount of time to wait for a job
199      */
removeJob(int waitTime)200     private Job removeJob(int waitTime) {
201         if(DEBUG) System.err.println("##### " + getClass().getName() + ".removeJob:" + _head + " " + _threadId);
202 
203         Job job = null;
204         synchronized (this) {
205             // wait max. waitTime time for a job to enter the queue
206             boolean waited = false;
207             while(_head == null && (waitTime == 0 || !waited)) {
208                 if(_doDispose == _disposeId) {
209                     _doDispose = null;
210                     throw (DisposedException)
211                         new DisposedException().initCause(_throwable);
212                 }
213 
214                 // notify sync queues
215                 notifyAll();
216 
217                 try {
218                     // wait for new job
219                     wait(waitTime);
220                 }
221                 catch(InterruptedException interruptedException) {
222                     throw new com.sun.star.uno.RuntimeException(getClass().getName() + ".removeJob - unexpected:" + interruptedException);
223                 }
224 
225                 // signal that we have already waited once
226                 waited = true;
227             }
228 
229 
230             if(_head != null) {
231                 Job current = _head;
232                 _head    = _head._next;
233 
234                 if(_head == null)
235                     _tail = null;
236 
237                 job = current;
238                 _active = true;
239             }
240         }
241 
242         // always wait for asynchron jobqueue to be finished !
243         if(job != null && _async_jobQueue != null) {
244             synchronized(_async_jobQueue) {
245                 // wait for async queue to be empty and last job to be done
246                 while(_async_jobQueue._active || _async_jobQueue._head != null) {
247                     if(DEBUG) System.err.println("waiting for async:" + _async_jobQueue._head + " " +  _async_jobQueue._worker_thread);
248 
249                     if(_doDispose == _disposeId) {
250                         _doDispose = null;
251                         throw (DisposedException)
252                             new DisposedException().initCause(_throwable);
253                     }
254 
255                     try {
256                         _async_jobQueue.wait();
257                     }
258                     catch(InterruptedException interruptedException) {
259                         throw new com.sun.star.uno.RuntimeException(getClass().getName() + ".removeJob - unexpected:" + interruptedException);
260                     }
261                 }
262             }
263         }
264 
265         return job;
266     }
267 
268     /**
269      * Puts a job into the queue.
270      * <p>
271      * @param  job        the job
272      * @param  disposeId  a dispose id
273      */
putJob(Job job, Object disposeId)274     synchronized void putJob(Job job, Object disposeId) {
275         if(DEBUG) System.err.println("##### " + getClass().getName() + ".putJob todoes: " + " job:" + job);
276 
277         if(_tail != null)
278             _tail._next = job;
279         else
280             _head = job;
281 
282         _tail = job;
283 
284         if(_worker_thread == null && _createThread && _createThread_now) { // if there is no thread, which dispatches and if shall create one, create one
285 
286             acquire();
287 
288             _createThread_now = false;
289             new JobDispatcher(disposeId).start();
290         }
291 
292         // always notify possible waiters
293         notifyAll();
294     }
295 
296     /**
297      * Enters the job queue.
298      * <p>
299      * @return the result of the final job (reply)
300      * @param  disposeId  a dispose id
301      */
enter(Object disposeId)302     Object enter(Object disposeId) throws Throwable {
303         return enter(0, disposeId); // wait infinitly
304     }
305 
306     /**
307      * Enters the job queue.
308      * <p>
309      * @return the result of the final job (reply)
310      * @param  waitTime   the maximum amount of time to wait for a job (0 means wait infinitly)
311      * @param  disposeId  a dispose id
312      */
enter(int waitTime, Object disposeId)313     Object enter(int waitTime, Object disposeId) throws Throwable {
314         if(DEBUG) System.err.println("#####" + getClass().getName() + ".enter: " + _threadId);
315 
316         boolean quit = false;
317 
318         Object hold_disposeId = _disposeId;
319         _disposeId = disposeId;
320 
321         Object result = null;
322 
323         Thread hold_worker_thread = _worker_thread;
324         _worker_thread = Thread.currentThread();
325 
326         while(!quit) {
327             Job job = null;
328 
329             try {
330                 job = removeJob(waitTime);
331 
332                 if(job != null) {
333                     try {
334                         result = job.execute();
335                     }
336                     finally {
337                         _active = false;
338                     }
339 
340                     if (!job.isRequest()) {
341                         job.dispose();
342 
343                         quit = true;
344                     }
345 
346                     job = null;
347                 }
348                 else
349                     quit = true;
350 
351 
352             }
353             finally { // ensure that this queue becomes disposed, if necessary
354                 if(DEBUG) System.err.println("##### " + getClass().getName() + ".enter leaving: " + _threadId + " " + _worker_thread + " " + hold_worker_thread + " " + result);
355 
356                 synchronized(this) {
357                     if(job != null || (quit && _head == null)) {
358                         _worker_thread = hold_worker_thread;
359 
360                         _createThread_now = true;
361 
362                         _disposeId = hold_disposeId;
363 
364                         if(_sync_jobQueue != null)
365                             notifyAll(); // notify waiters (e.g. this is an asyncQueue and there is a sync waiting)
366                     }
367                     else
368                         quit = false;
369 
370                 }
371             }
372         }
373 
374         return result;
375     }
376 
377     /**
378      * If the given disposeId is registered,
379      * interrups the worker thread.
380      * <p>
381      * @param disposeId    the dispose id
382      */
dispose(Object disposeId, Throwable throwable)383     synchronized void dispose(Object disposeId, Throwable throwable) {
384         if(_sync_jobQueue == null) { // dispose only sync queues
385             _doDispose = disposeId;
386             _throwable = throwable;
387 
388             // get thread out of wait and let it throw the throwable
389             if(DEBUG) System.err.println(getClass().getName() + ".dispose - notifying thread");
390 
391             notifyAll();
392         }
393     }
394 }
395 
396