Updated Sunset/Sunrise functions to latest PyEphem API
[debian/gnuradio] / gcell / src / lib / runtime / gc_job_manager_impl.h
1 /* -*- c++ -*- */
2 /*
3  * Copyright 2007,2008 Free Software Foundation, Inc.
4  * 
5  * This file is part of GNU Radio
6  * 
7  * GNU Radio is free software; you can redistribute it and/or modify
8  * it under the terms of the GNU General Public License as published by
9  * the Free Software Foundation; either version 3, or (at your option)
10  * any later version.
11  * 
12  * GNU Radio is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  * GNU General Public License for more details.
16  * 
17  * You should have received a copy of the GNU General Public License along
18  * with this program; if not, write to the Free Software Foundation, Inc.,
19  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
20  */
21
22 #ifndef INCLUDED_GC_JOB_MANAGER_IMPL_H
23 #define INCLUDED_GC_JOB_MANAGER_IMPL_H
24
25 #include "gc_job_manager.h"
26 #include "gc_client_thread_info.h"
27 #include "gc_jd_stack.h"
28 #include "gc_jd_queue.h"
29 #include "gc_spu_args.h"
30 #include <libspe2.h>
31 #include <vector>
32 #include <boost/scoped_array.hpp>
33
34 typedef boost::shared_ptr<spe_gang_context> spe_gang_context_sptr;
35 typedef boost::shared_ptr<spe_program_handle_t> spe_program_handle_sptr;
36 typedef boost::scoped_array<gc_client_thread_info> gc_client_thread_info_sa;
37
38
39 enum worker_state {
40   WS_FREE,      // not in use
41   WS_INIT,      // allocated and being initialized
42   WS_RUNNING,   // the thread is running
43   WS_DEAD,      // the thread is dead
44 };
45
46 struct worker_ctx {
47   volatile worker_state state;
48   unsigned int          spe_idx;        // [0, nspes-1]
49   spe_context_ptr_t     spe_ctx;
50   pthread_t             thread;
51   gc_spu_args_t         *spu_args;      // pointer to 16-byte aligned struct
52
53   worker_ctx()
54     : state(WS_FREE), spe_idx(0), spe_ctx(0),
55       thread(0), spu_args(0) {}
56   ~worker_ctx();
57 };
58
59 enum evt_handler_state {
60   EHS_INIT,             // being initialized
61   EHS_RUNNING,          // thread is running
62   EHS_SHUTTING_DOWN,    // in process of shutting down everything
63   EHS_WAITING_FOR_WORKERS_TO_DIE,
64   EHS_DEAD,             // thread is dead
65 };
66
67 struct spe_event_handler {
68   spe_event_handler_ptr_t       ptr;
69
70   spe_event_handler() : ptr(0) {}
71   ~spe_event_handler(){
72     if (ptr){
73       if (spe_event_handler_destroy(ptr) != 0){
74         perror("spe_event_handler_destroy");
75       }
76     }
77   }
78 };
79
80
81 /*!
82  * \brief Concrete class that manages SPE jobs.
83  *
84  * This class contains all the implementation details.
85  */
86 class gc_job_manager_impl : public gc_job_manager
87 {
88   enum { MAX_SPES =  16 };
89
90   int                     d_debug;
91   gc_jm_options           d_options;
92   spe_program_handle_sptr d_spe_image;
93   spe_gang_context_sptr   d_gang;               // boost::shared_ptr
94
95   worker_ctx             d_worker[MAX_SPES];    // SPE ctx, thread, etc
96   gc_spu_args_t         *d_spu_args;            // 16-byte aligned structs
97   boost::shared_ptr<void> _d_spu_args_boost;    // hack for automatic storage mgmt
98
99   gc_comp_info_t        *d_comp_info;           // 128-byte aligned structs
100   boost::shared_ptr<void> _d_comp_info_boost;   // hack for automatic storage mgmt
101
102   // used to coordinate communication w/ the event handling thread
103   omni_mutex             d_eh_mutex;
104   omni_condition         d_eh_cond;
105   pthread_t              d_eh_thread;           // the event handler thread
106   volatile evt_handler_state    d_eh_state;
107   volatile bool                 d_shutdown_requested;
108   spe_event_handler      d_spe_event_handler;
109   
110
111   // All of the job descriptors are hung off of here.
112   // We allocate them all in a single cache aligned chunk.
113   gc_job_desc_t         *d_jd;                  // [options.max_jobs]
114   boost::shared_ptr<void> _d_jd_boost;          // hack for automatic storage mgmt
115
116   gc_client_thread_info_sa d_client_thread;     // [options.max_client_threads]
117
118   // We use bitvectors to represent the completing state of a job.  Each
119   // bitvector is d_bvlen longs in length.
120   int                    d_bvlen;               // bit vector length in longs
121
122   // This contains the storage for all the bitvectors used by the job
123   // manager.  There's 1 for each client thread, in the d_jobs_done
124   // field.  We allocate them all in a single cache aligned chunk.
125   boost::shared_ptr<void> _d_all_bitvectors;    // hack for automatic storage mgmt
126
127   // Lock free stack where we keep track of the free job descriptors.
128   gc_jd_stack_t         *d_free_list;           // stack of free job descriptors
129   boost::shared_ptr<void> _d_free_list_boost;   // hack for automatic storage mgmt
130
131   // The PPE inserts jobs here; SPEs pull jobs from here.
132   gc_jd_queue_t         *d_queue;               // job queue
133   boost::shared_ptr<void> _d_queue_boost;       // hack for automatic storage mgmt
134
135   int                    d_ea_args_maxsize;
136
137   struct gc_proc_def    *d_proc_def;            // the SPE procedure table
138   uint32_t               d_proc_def_ls_addr;    // the LS address of the table
139   int                    d_nproc_defs;          // number of proc_defs in table
140
141   gc_client_thread_info *alloc_cti();
142   void free_cti(gc_client_thread_info *cti);
143
144   void create_event_handler();
145   void set_eh_state(evt_handler_state s);
146   void set_ea_args_maxsize(int maxsize);
147
148   void notify_clients_jobs_are_done(unsigned int spe_num,
149                                     unsigned int completion_info_idx);
150
151 public:
152   void event_handler_loop();    // really private
153
154 private:
155   bool send_all_spes(uint32_t msg);
156   bool send_spe(unsigned int spe, uint32_t msg);
157   void print_event(spe_event_unit_t *evt);
158   void handle_event(spe_event_unit_t *evt);
159
160   // bitvector ops
161   void bv_zero(unsigned long *bv);
162   void bv_clr(unsigned long *bv, unsigned int bitno);
163   void bv_set(unsigned long *bv, unsigned int bitno);
164   bool bv_isset(unsigned long *bv, unsigned int bitno);
165   bool bv_isclr(unsigned long *bv, unsigned int bitno);
166
167   void setup_logfiles();
168   void sync_logfiles();
169   void unmap_logfiles();
170
171   friend gc_job_manager_sptr gc_make_job_manager(const gc_jm_options *options);
172   
173   gc_job_manager_impl(const gc_jm_options *options = 0);
174
175 public:
176   virtual ~gc_job_manager_impl();
177
178   /*!
179    * Stop accepting new jobs.  Wait for existing jobs to complete.
180    * Return all managed SPE's to the system.
181    */
182   virtual bool shutdown();
183
184   /*!
185    * \brief Return number of SPE's currently allocated to job manager.
186    */
187   virtual int nspes() const;
188
189   /*!
190    * \brief Return a pointer to a properly aligned job descriptor,
191    * or zero if none are available.
192    */
193   virtual gc_job_desc *alloc_job_desc();
194
195   /*
196    *! Return a job descriptor previously allocated with alloc_job_desc()
197    *
198    * \param[in] jd pointer to job descriptor to free.
199    */
200   virtual void free_job_desc(gc_job_desc *jd);
201
202   /*!
203    * \brief Submit a job for asynchronous processing on an SPE.
204    *
205    * \param[in] jd pointer to job description
206    *
207    * The caller must not read or write the job description
208    * or any of the memory associated with any indirect arguments
209    * until after calling wait_job.
210    *
211    * \returns true iff the job was successfully enqueued.
212    * If submit_job returns false, check jd->status for additional info.
213    */
214   virtual bool submit_job(gc_job_desc *jd);
215
216   /*!
217    * \brief Wait for job to complete.
218    *
219    * A thread may only wait for jobs which it submitted.
220    *
221    * \returns true if sucessful, else false.
222    */
223   virtual bool 
224   wait_job(gc_job_desc *jd);
225
226   /*!
227    * \brief wait for 1 or more jobs to complete.
228    *
229    * \param[input] njobs is the length of arrays \p jd and \p done.
230    * \param[input] jd are the jobs that are to be waited for.
231    * \param[output] done indicates whether the corresponding job is complete.
232    * \param[input] mode indicates whether to wait for ALL or ANY of the jobs
233    *   in \p jd to complete.
234    *
235    * A thread may only wait for jobs which it submitted.
236    *
237    * \returns number of jobs completed, or -1 if error.
238    */
239   virtual int
240   wait_jobs(unsigned int njobs,
241             gc_job_desc *jd[], bool done[], gc_wait_mode mode);
242
243   virtual int ea_args_maxsize();
244
245   virtual gc_proc_id_t lookup_proc(const std::string &name);
246   virtual std::vector<std::string> proc_names();
247
248   virtual void set_debug(int debug);
249   virtual int debug();
250 };
251
252 #endif /* INCLUDED_GC_JOB_MANAGER_IMPL_H */