Explicitly create the writerThread to be an executor with an unbounded linked blockin...
[debian/openrocket] / core / src / net / sf / openrocket / startup / ConcurrentLoadingThrustCurveMotorSetDatabase.java
1 package net.sf.openrocket.startup;
2
3 import java.io.File;
4 import java.io.IOException;
5 import java.io.InputStream;
6 import java.util.List;
7 import java.util.concurrent.ExecutorService;
8 import java.util.concurrent.LinkedBlockingQueue;
9 import java.util.concurrent.ThreadFactory;
10 import java.util.concurrent.ThreadPoolExecutor;
11 import java.util.concurrent.TimeUnit;
12
13 import net.sf.openrocket.database.ThrustCurveMotorSet;
14 import net.sf.openrocket.database.ThrustCurveMotorSetDatabase;
15 import net.sf.openrocket.file.iterator.DirectoryIterator;
16 import net.sf.openrocket.file.iterator.FileIterator;
17 import net.sf.openrocket.file.motor.MotorLoaderHelper;
18 import net.sf.openrocket.gui.util.SimpleFileFilter;
19 import net.sf.openrocket.gui.util.SwingPreferences;
20 import net.sf.openrocket.logging.LogHelper;
21 import net.sf.openrocket.motor.Motor;
22 import net.sf.openrocket.motor.ThrustCurveMotor;
23 import net.sf.openrocket.util.BugException;
24 import net.sf.openrocket.util.Pair;
25
26 /**
27  * Load motors in parallel using a three stage pipeline.
28  * 
29  * Stage 1: single thread managed by the ThrustCurveMotorSetDatabase.  This thread generates
30  *          one object for each thrust curve motor file and puts it in the second stage.
31  *          
32  * Stage 2: multiple threads which process individual files.  Each process takes
33  *          a single motor file and parses out the list of motors it contains.
34  *          The list of motors is queued up for the third stage to process.
35  *          
36  * Stage 3: single thread which processes the list of motors generated in stage 2.
37  *          This thread puts all the motors from the list in the motor set database.
38  *          
39  * It is important that stage 3 be done with a single thread because ThrustCurveMotorSetDatabase
40  * is not thread safe.  Even if synchronization were to be done, it is unlikely that parallelizing
41  * this process would improve anything.
42  * 
43  *
44  */
45 public class ConcurrentLoadingThrustCurveMotorSetDatabase extends ThrustCurveMotorSetDatabase {
46
47         private static final LogHelper log = Application.getLogger();
48         private final String thrustCurveDirectory;
49
50
51         public ConcurrentLoadingThrustCurveMotorSetDatabase(String thrustCurveDirectory) {
52                 // configure ThrustCurveMotorSetDatabase as true so we get our own thread in
53                 // loadMotors.
54                 super(true);
55                 this.thrustCurveDirectory = thrustCurveDirectory;
56         }
57
58         @Override
59         protected void loadMotors() {
60
61                 BookKeeping keeper = new BookKeeping();
62                 keeper.start();
63
64                 try {
65                         keeper.waitForFinish();
66                 }
67                 catch ( InterruptedException iex ) {
68                         throw new BugException(iex);
69                 }
70
71                 keeper = null;
72
73         }
74
75         private void addAll( List<Motor> motors ) {
76                 for (Motor m : motors) {
77                         addMotor( (ThrustCurveMotor) m);
78                 }
79         }
80
81         /**
82          * A class which holds all the threading data.
83          * Implemented as an inner class so we can easily jettison the references when
84          * the processing is terminated.
85          *
86          */
87         private class BookKeeping {
88
89                 /*
90                  * Executor for Stage 3.
91                  */
92                 private final ExecutorService writerThread;
93
94                 /*
95                  * Executor for Stage 2.
96                  */
97                 private final ExecutorService loaderPool;
98
99                 /*
100                  * Runnable used for Stage 1.
101                  */
102                 private final WorkGenerator workGenerator;
103
104                 private long startTime;
105
106                 /*
107                  * Number of thrust curves loaded
108                  */
109                 private int thrustCurveCount = 0;
110
111                 /*
112                  * Number of files processed.
113                  */
114                 private int fileCount = 0;
115
116                 /*
117                  * We have to hold on to the zip file iterator which is used to load
118                  * the system motor files until all processing is done.  This is because
119                  * closing the iterator prematurely causes all the InputStreams opened
120                  * with it to close. 
121                  */
122                 private FileIterator iterator;
123
124                 private BookKeeping() {
125
126                         writerThread = new ThreadPoolExecutor(1,1,200, TimeUnit.SECONDS,
127                                         new LinkedBlockingQueue<Runnable>(),
128                                         new ThreadFactory() {
129                                 @Override
130                                 public Thread newThread(Runnable r) {
131                                         Thread t = new Thread(r,"MotorWriterThread");
132                                         return t;
133                                 }
134                         });
135
136                         loaderPool = new ThreadPoolExecutor(25,25, 2, TimeUnit.SECONDS,
137                                         new LinkedBlockingQueue<Runnable>(),
138                                         new ThreadFactory() {
139                                 int threadCount = 0;
140                                 @Override
141                                 public Thread newThread(Runnable r) {
142                                         Thread t = new Thread(r,"MotorLoaderPool-" + threadCount++);
143                                         return t;
144                                 }
145                         });
146
147                         workGenerator = new WorkGenerator();
148
149                 }
150
151                 private void start() {
152
153                         startTime = System.currentTimeMillis();
154
155                         log.info("Starting motor loading from " + thrustCurveDirectory + " in background thread.");
156
157                         // Run the work generator - in this thread.
158                         workGenerator.run();
159
160                 }
161
162                 private void waitForFinish() throws InterruptedException {
163                         try {
164                                 loaderPool.shutdown();
165                                 loaderPool.awaitTermination(10, TimeUnit.SECONDS);
166                                 writerThread.shutdown();
167                                 writerThread.awaitTermination(10, TimeUnit.SECONDS);
168                         }
169                         finally {
170                                 iterator.close();
171                         }
172
173                         long endTime = System.currentTimeMillis();
174
175                         int distinctMotorCount = 0;
176                         int distinctThrustCurveCount = 0;
177                         distinctMotorCount = motorSets.size();
178                         for (ThrustCurveMotorSet set : motorSets) {
179                                 distinctThrustCurveCount += set.getMotorCount();
180                         }
181
182                         log.info("Motor loading done, took " + (endTime - startTime) + " ms to load " 
183                                         + fileCount + " files/directories containing " 
184                                         + thrustCurveCount + " thrust curves which contained "
185                                         + distinctMotorCount + " distinct motors with "
186                                         + distinctThrustCurveCount + " distinct thrust curves.");
187
188                 }
189
190
191                 private class WorkGenerator implements Runnable {
192
193                         @Override
194                         public void run() {
195                                 // Start loading
196                                 log.info("Loading motors from " + thrustCurveDirectory);
197
198                                 iterator = DirectoryIterator.findDirectory(thrustCurveDirectory,
199                                                 new SimpleFileFilter("", false, "eng", "rse"));
200
201                                 // Load the packaged thrust curves
202                                 if (iterator == null) {
203                                         throw new IllegalStateException("Thrust curve directory " + thrustCurveDirectory +
204                                                         "not found, distribution built wrong");
205                                 }
206
207                                 while( iterator.hasNext() ) {
208                                         Pair<String,InputStream> f = iterator.next();
209                                         MotorLoader loader = new MotorLoader( f.getV(), f.getU() );
210                                         loaderPool.execute(loader);
211                                         fileCount ++;
212                                 }
213
214                                 // Load the user-defined thrust curves
215                                 for (File file : ((SwingPreferences) Application.getPreferences()).getUserThrustCurveFiles()) {
216                                         log.info("Loading motors from " + file);
217                                         MotorLoader loader = new MotorLoader( file );
218                                         loaderPool.execute(loader);
219                                         fileCount++;
220                                 }
221                         }
222                 }
223
224                 private class MotorLoader implements Runnable {
225
226                         private final InputStream is;
227                         private final String fileName;
228
229                         private final File file;
230
231                         public MotorLoader( File file ) {
232                                 super();
233                                 this.file = file;
234                                 this.is = null;
235                                 this.fileName = null;
236                         }
237
238                         public MotorLoader(InputStream is, String fileName) {
239                                 super();
240                                 this.file = null;
241                                 this.is = is;
242                                 this.fileName = fileName;
243                         }
244
245                         @Override
246                         public void run() {
247                                 log.debug("Loading motor from " + fileName);
248
249                                 try {
250                                         List<Motor> motors;
251                                         if ( file == null ) {
252                                                 motors = MotorLoaderHelper.load(is, fileName);
253                                         } else {
254                                                 motors = MotorLoaderHelper.load(file);
255                                         }
256                                         writerThread.submit( new MotorInserter(motors));
257                                 }
258                                 finally {
259                                         if ( is != null ) {
260                                                 try {
261                                                         is.close();
262                                                 } catch ( IOException iex ) {
263                                                 }
264                                         }
265                                 }
266                         }
267                 }
268
269                 private class MotorInserter implements Runnable {
270
271                         private final List<Motor> motors;
272
273                         MotorInserter( List<Motor> motors ) {
274                                 this.motors = motors;
275                         }
276
277                         @Override
278                         public void run() {
279                                 thrustCurveCount += motors.size();
280                                 ConcurrentLoadingThrustCurveMotorSetDatabase.this.addAll(motors);
281                         }
282
283                 }
284         }
285
286 }