1 package net.sf.openrocket.optimization.general;
3 import java.util.ArrayList;
4 import java.util.Collection;
5 import java.util.HashMap;
8 import java.util.concurrent.Callable;
9 import java.util.concurrent.ExecutionException;
10 import java.util.concurrent.ExecutorService;
11 import java.util.concurrent.Future;
12 import java.util.concurrent.LinkedBlockingQueue;
13 import java.util.concurrent.ThreadFactory;
14 import java.util.concurrent.ThreadPoolExecutor;
15 import java.util.concurrent.TimeUnit;
17 import net.sf.openrocket.util.BugException;
20 * An implementation of a ParallelFunctionCache that evaluates function values
21 * in parallel and caches them. This allows pre-calculating possibly required
22 * function values beforehand. If values are not required after all, the
23 * computation can be aborted assuming the function evaluation supports it.
25 * Note that while this class handles threads and abstracts background execution,
26 * the public methods themselves are NOT thread-safe and should be called from
27 * only one thread at a time.
29 * @author Sampo Niskanen <sampo.niskanen@iki.fi>
31 public class ParallelExecutorCache implements ParallelFunctionCache {
33 private final Map<Point, Double> functionCache = new HashMap<Point, Double>();
34 private final Map<Point, Future<Double>> futureMap = new HashMap<Point, Future<Double>>();
36 private ExecutorService executor;
38 private Function function;
42 * Construct a cache that uses the same number of computational threads as there are
43 * processors available.
45 public ParallelExecutorCache() {
46 this(Runtime.getRuntime().availableProcessors());
50 * Construct a cache that uses the specified number of computational threads for background
51 * computation. The threads that are created are marked as daemon threads.
53 * @param threadCount the number of threads to use in the executor.
55 public ParallelExecutorCache(int threadCount) {
56 this(new ThreadPoolExecutor(threadCount, threadCount, 60, TimeUnit.SECONDS,
57 new LinkedBlockingQueue<Runnable>(),
60 public Thread newThread(Runnable r) {
61 Thread t = new Thread(r);
69 * Construct a cache that uses the specified ExecutorService for managing
70 * computational threads.
72 * @param executor the executor to use for function evaluations.
74 public ParallelExecutorCache(ExecutorService executor) {
75 this.executor = executor;
81 public void compute(Collection<Point> points) {
82 for (Point p : points) {
89 public void compute(Point point) {
90 if (functionCache.containsKey(point)) {
91 // Function has already been evaluated at the point
95 if (futureMap.containsKey(point)) {
96 // Function is being evaluated at the point
100 // Submit point for evaluation
101 FunctionCallable callable = new FunctionCallable(function, point);
102 Future<Double> future = executor.submit(callable);
103 futureMap.put(point, future);
108 public void waitFor(Collection<Point> points) throws InterruptedException, OptimizationException {
109 for (Point p : points) {
116 public void waitFor(Point point) throws InterruptedException, OptimizationException {
117 if (functionCache.containsKey(point)) {
121 Future<Double> future = futureMap.get(point);
122 if (future == null) {
123 throw new IllegalStateException("waitFor called for " + point + " but it is not being computed");
127 double value = future.get();
128 functionCache.put(point, value);
129 } catch (ExecutionException e) {
130 Throwable cause = e.getCause();
131 if (cause instanceof InterruptedException) {
132 throw (InterruptedException) cause;
134 if (cause instanceof OptimizationException) {
135 throw (OptimizationException) cause;
137 if (cause instanceof RuntimeException) {
138 throw (RuntimeException) cause;
141 throw new BugException("Function threw unknown exception while processing", e);
148 public List<Point> abort(Collection<Point> points) {
149 List<Point> computed = new ArrayList<Point>(Math.min(points.size(), 10));
151 for (Point p : points) {
163 public boolean abort(Point point) {
164 if (functionCache.containsKey(point)) {
168 Future<Double> future = futureMap.remove(point);
169 if (future == null) {
170 throw new IllegalStateException("abort called for " + point + " but it is not being computed");
173 if (future.isDone()) {
174 // Evaluation has been completed, store value in cache
176 double value = future.get();
177 functionCache.put(point, value);
179 } catch (Exception e) {
183 // Cancel the evaluation
191 public double getValue(Point point) {
192 Double d = functionCache.get(point);
194 throw new IllegalStateException(point + " is not in function cache. " +
195 "functionCache=" + functionCache + " futureMap=" + futureMap);
202 public Function getFunction() {
207 public void setFunction(Function function) {
208 this.function = function;
213 public void clearCache() {
214 List<Point> list = new ArrayList<Point>(futureMap.keySet());
216 functionCache.clear();
220 public ExecutorService getExecutor() {
227 * A Callable that evaluates a function at a specific point and returns the result.
229 private class FunctionCallable implements Callable<Double> {
230 private final Function calledFunction;
231 private final Point point;
233 public FunctionCallable(Function function, Point point) {
234 this.calledFunction = function;
239 public Double call() throws InterruptedException, OptimizationException {
240 return calledFunction.evaluate(point);