780786007319224b8af7338fd2fb552771d5c37a
[debian/openrocket] / src / net / sf / openrocket / optimization / general / ParallelExecutorCache.java
1 package net.sf.openrocket.optimization.general;
2
3 import java.util.ArrayList;
4 import java.util.Collection;
5 import java.util.HashMap;
6 import java.util.List;
7 import java.util.Map;
8 import java.util.concurrent.Callable;
9 import java.util.concurrent.ExecutionException;
10 import java.util.concurrent.ExecutorService;
11 import java.util.concurrent.Future;
12 import java.util.concurrent.LinkedBlockingQueue;
13 import java.util.concurrent.ThreadFactory;
14 import java.util.concurrent.ThreadPoolExecutor;
15 import java.util.concurrent.TimeUnit;
16
17 import net.sf.openrocket.util.BugException;
18
19 /**
20  * An implementation of a ParallelFunctionCache that evaluates function values
21  * in parallel and caches them.  This allows pre-calculating possibly required
22  * function values beforehand.  If values are not required after all, the
23  * computation can be aborted assuming the function evaluation supports it.
24  * <p>
25  * Note that while this class handles threads and abstracts background execution,
26  * the public methods themselves are NOT thread-safe and should be called from
27  * only one thread at a time.
28  * 
29  * @author Sampo Niskanen <sampo.niskanen@iki.fi>
30  */
31 public class ParallelExecutorCache implements ParallelFunctionCache {
32         
33         private final Map<Point, Double> functionCache = new HashMap<Point, Double>();
34         private final Map<Point, Future<Double>> futureMap = new HashMap<Point, Future<Double>>();
35         
36         private ExecutorService executor;
37         
38         private Function function;
39         
40         
41         /**
42          * Construct a cache that uses the same number of computational threads as there are
43          * processors available.
44          */
45         public ParallelExecutorCache() {
46                 this(Runtime.getRuntime().availableProcessors());
47         }
48         
49         /**
50          * Construct a cache that uses the specified number of computational threads for background
51          * computation.  The threads that are created are marked as daemon threads.
52          * 
53          * @param threadCount   the number of threads to use in the executor.
54          */
55         public ParallelExecutorCache(int threadCount) {
56                 this(new ThreadPoolExecutor(threadCount, threadCount, 60, TimeUnit.SECONDS,
57                                 new LinkedBlockingQueue<Runnable>(),
58                                 new ThreadFactory() {
59                                         @Override
60                                         public Thread newThread(Runnable r) {
61                                                 Thread t = new Thread(r);
62                                                 t.setDaemon(true);
63                                                 return t;
64                                         }
65                                 }));
66         }
67         
68         /**
69          * Construct a cache that uses the specified ExecutorService for managing
70          * computational threads.
71          * 
72          * @param executor      the executor to use for function evaluations.
73          */
74         public ParallelExecutorCache(ExecutorService executor) {
75                 this.executor = executor;
76         }
77         
78         
79
80         @Override
81         public void compute(Collection<Point> points) {
82                 for (Point p : points) {
83                         compute(p);
84                 }
85         }
86         
87         
88         @Override
89         public void compute(Point point) {
90                 if (functionCache.containsKey(point)) {
91                         // Function has already been evaluated at the point
92                         return;
93                 }
94                 
95                 if (futureMap.containsKey(point)) {
96                         // Function is being evaluated at the point
97                         return;
98                 }
99                 
100                 // Submit point for evaluation
101                 FunctionCallable callable = new FunctionCallable(function, point);
102                 Future<Double> future = executor.submit(callable);
103                 futureMap.put(point, future);
104         }
105         
106         
107         @Override
108         public void waitFor(Collection<Point> points) throws InterruptedException, OptimizationException {
109                 for (Point p : points) {
110                         waitFor(p);
111                 }
112         }
113         
114         
115         @Override
116         public void waitFor(Point point) throws InterruptedException, OptimizationException {
117                 if (functionCache.containsKey(point)) {
118                         return;
119                 }
120                 
121                 Future<Double> future = futureMap.get(point);
122                 if (future == null) {
123                         throw new IllegalStateException("waitFor called for " + point + " but it is not being computed");
124                 }
125                 
126                 try {
127                         double value = future.get();
128                         functionCache.put(point, value);
129                 } catch (ExecutionException e) {
130                         Throwable cause = e.getCause();
131                         if (cause instanceof InterruptedException) {
132                                 throw (InterruptedException) cause;
133                         }
134                         if (cause instanceof OptimizationException) {
135                                 throw (OptimizationException) cause;
136                         }
137                         if (cause instanceof RuntimeException) {
138                                 throw (RuntimeException) cause;
139                         }
140                         
141                         throw new BugException("Function threw unknown exception while processing", e);
142                 }
143         }
144         
145         
146
147         @Override
148         public List<Point> abort(Collection<Point> points) {
149                 List<Point> computed = new ArrayList<Point>(Math.min(points.size(), 10));
150                 
151                 for (Point p : points) {
152                         if (abort(p)) {
153                                 computed.add(p);
154                         }
155                 }
156                 
157                 return computed;
158         }
159         
160         
161
162         @Override
163         public boolean abort(Point point) {
164                 if (functionCache.containsKey(point)) {
165                         return true;
166                 }
167                 
168                 Future<Double> future = futureMap.remove(point);
169                 if (future == null) {
170                         throw new IllegalStateException("abort called for " + point + " but it is not being computed");
171                 }
172                 
173                 if (future.isDone()) {
174                         // Evaluation has been completed, store value in cache
175                         try {
176                                 double value = future.get();
177                                 functionCache.put(point, value);
178                                 return true;
179                         } catch (Exception e) {
180                                 return false;
181                         }
182                 } else {
183                         // Cancel the evaluation
184                         future.cancel(true);
185                         return false;
186                 }
187         }
188         
189         
190         @Override
191         public double getValue(Point point) {
192                 Double d = functionCache.get(point);
193                 if (d == null) {
194                         throw new IllegalStateException(point + " is not in function cache.  " +
195                                         "functionCache=" + functionCache + "  futureMap=" + futureMap);
196                 }
197                 return d;
198         }
199         
200         
201         @Override
202         public Function getFunction() {
203                 return function;
204         }
205         
206         @Override
207         public void setFunction(Function function) {
208                 this.function = function;
209                 clearCache();
210         }
211         
212         @Override
213         public void clearCache() {
214                 List<Point> list = new ArrayList<Point>(futureMap.keySet());
215                 abort(list);
216                 functionCache.clear();
217         }
218         
219         
220         public ExecutorService getExecutor() {
221                 return executor;
222         }
223         
224         
225
226         /**
227          * A Callable that evaluates a function at a specific point and returns the result.
228          */
229         private class FunctionCallable implements Callable<Double> {
230                 private final Function calledFunction;
231                 private final Point point;
232                 
233                 public FunctionCallable(Function function, Point point) {
234                         this.calledFunction = function;
235                         this.point = point;
236                 }
237                 
238                 @Override
239                 public Double call() throws InterruptedException, OptimizationException {
240                         return calledFunction.evaluate(point);
241                 }
242         }
243         
244 }