bug fixes and rocket optimization
[debian/openrocket] / src / net / sf / openrocket / optimization / general / ParallelExecutorCache.java
1 package net.sf.openrocket.optimization.general;
2
3 import java.util.ArrayList;
4 import java.util.Collection;
5 import java.util.HashMap;
6 import java.util.List;
7 import java.util.Map;
8 import java.util.concurrent.Callable;
9 import java.util.concurrent.ExecutionException;
10 import java.util.concurrent.ExecutorService;
11 import java.util.concurrent.Future;
12 import java.util.concurrent.LinkedBlockingQueue;
13 import java.util.concurrent.ThreadFactory;
14 import java.util.concurrent.ThreadPoolExecutor;
15 import java.util.concurrent.TimeUnit;
16
17 import net.sf.openrocket.util.BugException;
18
19 /**
20  * An implementation of a ParallelFunctionCache that evaluates function values
21  * in parallel and caches them.  This allows pre-calculating possibly required
22  * function values beforehand.  If values are not required after all, the
23  * computation can be aborted assuming the function evaluation supports it.
24  * <p>
25  * Note that while this class handles threads and abstracts background execution,
26  * the public methods themselves are NOT thread-safe and should be called from
27  * only one thread at a time.
28  * 
29  * @author Sampo Niskanen <sampo.niskanen@iki.fi>
30  */
31 public class ParallelExecutorCache implements ParallelFunctionCache {
32         
33         private final Map<Point, Double> functionCache = new HashMap<Point, Double>();
34         private final Map<Point, Future<Double>> futureMap = new HashMap<Point, Future<Double>>();
35         
36         private ExecutorService executor;
37         
38         private Function function;
39         
40         
41         /**
42          * Construct a cache that uses the same number of computational threads as there are
43          * processors available.
44          */
45         public ParallelExecutorCache() {
46                 this(Runtime.getRuntime().availableProcessors());
47         }
48         
49         /**
50          * Construct a cache that uses the specified number of computational threads for background
51          * computation.  The threads that are created are marked as daemon threads.
52          * 
53          * @param threadCount   the number of threads to use in the executor.
54          */
55         public ParallelExecutorCache(int threadCount) {
56                 this(new ThreadPoolExecutor(threadCount, threadCount, 60, TimeUnit.SECONDS,
57                                 new LinkedBlockingQueue<Runnable>(),
58                                 new ThreadFactory() {
59                                         @Override
60                                         public Thread newThread(Runnable r) {
61                                                 Thread t = new Thread(r);
62                                                 t.setDaemon(true);
63                                                 return t;
64                                         }
65                                 }));
66         }
67         
68         /**
69          * Construct a cache that uses the specified ExecutorService for managing
70          * computational threads.
71          * 
72          * @param executor      the executor to use for function evaluations.
73          */
74         public ParallelExecutorCache(ExecutorService executor) {
75                 this.executor = executor;
76         }
77         
78         
79
80         @Override
81         public void compute(Collection<Point> points) {
82                 for (Point p : points) {
83                         compute(p);
84                 }
85         }
86         
87         
88         @Override
89         public void compute(Point point) {
90                 
91                 if (isOutsideRange(point)) {
92                         // Point is outside of range
93                         return;
94                 }
95                 
96                 if (functionCache.containsKey(point)) {
97                         // Function has already been evaluated at the point
98                         return;
99                 }
100                 
101                 if (futureMap.containsKey(point)) {
102                         // Function is being evaluated at the point
103                         return;
104                 }
105                 
106                 // Submit point for evaluation
107                 FunctionCallable callable = new FunctionCallable(function, point);
108                 Future<Double> future = executor.submit(callable);
109                 futureMap.put(point, future);
110         }
111         
112         
113         @Override
114         public void waitFor(Collection<Point> points) throws InterruptedException, OptimizationException {
115                 for (Point p : points) {
116                         waitFor(p);
117                 }
118         }
119         
120         
121         @Override
122         public void waitFor(Point point) throws InterruptedException, OptimizationException {
123                 if (isOutsideRange(point)) {
124                         return;
125                 }
126                 
127                 if (functionCache.containsKey(point)) {
128                         return;
129                 }
130                 
131                 Future<Double> future = futureMap.get(point);
132                 if (future == null) {
133                         throw new IllegalStateException("waitFor called for " + point + " but it is not being computed");
134                 }
135                 
136                 try {
137                         double value = future.get();
138                         functionCache.put(point, value);
139                 } catch (ExecutionException e) {
140                         Throwable cause = e.getCause();
141                         if (cause instanceof InterruptedException) {
142                                 throw (InterruptedException) cause;
143                         }
144                         if (cause instanceof OptimizationException) {
145                                 throw (OptimizationException) cause;
146                         }
147                         if (cause instanceof RuntimeException) {
148                                 throw (RuntimeException) cause;
149                         }
150                         
151                         throw new BugException("Function threw unknown exception while processing", e);
152                 }
153         }
154         
155         
156
157         @Override
158         public List<Point> abort(Collection<Point> points) {
159                 List<Point> computed = new ArrayList<Point>(Math.min(points.size(), 10));
160                 
161                 for (Point p : points) {
162                         if (abort(p)) {
163                                 computed.add(p);
164                         }
165                 }
166                 
167                 return computed;
168         }
169         
170         
171
172         @Override
173         public boolean abort(Point point) {
174                 if (isOutsideRange(point)) {
175                         return false;
176                 }
177                 
178                 if (functionCache.containsKey(point)) {
179                         return true;
180                 }
181                 
182                 Future<Double> future = futureMap.remove(point);
183                 if (future == null) {
184                         throw new IllegalStateException("abort called for " + point + " but it is not being computed");
185                 }
186                 
187                 if (future.isDone()) {
188                         // Evaluation has been completed, store value in cache
189                         try {
190                                 double value = future.get();
191                                 functionCache.put(point, value);
192                                 return true;
193                         } catch (Exception e) {
194                                 return false;
195                         }
196                 } else {
197                         // Cancel the evaluation
198                         future.cancel(true);
199                         return false;
200                 }
201         }
202         
203         
204         @Override
205         public double getValue(Point point) {
206                 if (isOutsideRange(point)) {
207                         return Double.MAX_VALUE;
208                 }
209                 
210                 Double d = functionCache.get(point);
211                 if (d == null) {
212                         throw new IllegalStateException(point + " is not in function cache.  " +
213                                         "functionCache=" + functionCache + "  futureMap=" + futureMap);
214                 }
215                 return d;
216         }
217         
218         
219         @Override
220         public Function getFunction() {
221                 return function;
222         }
223         
224         @Override
225         public void setFunction(Function function) {
226                 this.function = function;
227                 clearCache();
228         }
229         
230         @Override
231         public void clearCache() {
232                 List<Point> list = new ArrayList<Point>(futureMap.keySet());
233                 abort(list);
234                 functionCache.clear();
235         }
236         
237         
238         public ExecutorService getExecutor() {
239                 return executor;
240         }
241         
242         
243         /**
244          * Check whether a point is outside of the valid optimization range.
245          */
246         private boolean isOutsideRange(Point p) {
247                 int n = p.dim();
248                 for (int i = 0; i < n; i++) {
249                         double d = p.get(i);
250                         // Include NaN in disallowed range
251                         if (!(d >= 0.0 && d <= 1.0)) {
252                                 return true;
253                         }
254                 }
255                 return false;
256         }
257         
258         
259         /**
260          * A Callable that evaluates a function at a specific point and returns the result.
261          */
262         private class FunctionCallable implements Callable<Double> {
263                 private final Function calledFunction;
264                 private final Point point;
265                 
266                 public FunctionCallable(Function function, Point point) {
267                         this.calledFunction = function;
268                         this.point = point;
269                 }
270                 
271                 @Override
272                 public Double call() throws InterruptedException, OptimizationException {
273                         return calledFunction.evaluate(point);
274                 }
275         }
276         
277 }