Imported Upstream version 3.2.2
[debian/gnuradio] / gcell / lib / runtime / gc_job_manager_impl.h
1 /* -*- c++ -*- */
2 /*
3  * Copyright 2007,2008,2009 Free Software Foundation, Inc.
4  * 
5  * This file is part of GNU Radio
6  * 
7  * GNU Radio is free software; you can redistribute it and/or modify
8  * it under the terms of the GNU General Public License as published by
9  * the Free Software Foundation; either version 3, or (at your option)
10  * any later version.
11  * 
12  * GNU Radio is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  * GNU General Public License for more details.
16  * 
17  * You should have received a copy of the GNU General Public License along
18  * with this program; if not, write to the Free Software Foundation, Inc.,
19  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
20  */
21
22 #ifndef INCLUDED_GC_JOB_MANAGER_IMPL_H
23 #define INCLUDED_GC_JOB_MANAGER_IMPL_H
24
25 #include <gcell/gc_job_manager.h>
26 #include <gcell/gc_jd_stack.h>
27 #include <gcell/gc_jd_queue.h>
28 #include <gcell/gc_spu_args.h>
29 #include "gc_client_thread_info.h"
30 #include <libspe2.h>
31 #include <vector>
32 #include <boost/scoped_array.hpp>
33
34 typedef boost::shared_ptr<spe_gang_context> spe_gang_context_sptr;
35 typedef boost::shared_ptr<spe_program_handle_t> spe_program_handle_sptr;
36 typedef boost::scoped_array<gc_client_thread_info> gc_client_thread_info_sa;
37
38
39 enum worker_state {
40   WS_FREE,      // not in use
41   WS_INIT,      // allocated and being initialized
42   WS_RUNNING,   // the thread is running
43   WS_DEAD,      // the thread is dead
44 };
45
46 struct worker_ctx {
47   volatile worker_state   state;
48   unsigned int            spe_idx;      // [0, nspes-1]
49   spe_context_ptr_t       spe_ctx;
50   spe_spu_control_area_t *spe_ctrl;
51   pthread_t               thread;
52   gc_spu_args_t          *spu_args;     // pointer to 16-byte aligned struct
53
54   worker_ctx()
55     : state(WS_FREE), spe_idx(0), spe_ctx(0), spe_ctrl(0),
56       thread(0), spu_args(0) {}
57   ~worker_ctx();
58 };
59
60 enum evt_handler_state {
61   EHS_INIT,             // being initialized
62   EHS_RUNNING,          // thread is running
63   EHS_SHUTTING_DOWN,    // in process of shutting down everything
64   EHS_WAITING_FOR_WORKERS_TO_DIE,
65   EHS_DEAD,             // thread is dead
66 };
67
68 enum job_completer_state {
69   JCS_INIT,             // being initialized
70   JCS_RUNNING,          // thread is running
71   JCS_DEAD,             // thread is dead
72 };
73
74 struct spe_event_handler {
75   spe_event_handler_ptr_t       ptr;
76
77   spe_event_handler() : ptr(0) {}
78   ~spe_event_handler(){
79     if (ptr){
80       if (spe_event_handler_destroy(ptr) != 0){
81         perror("spe_event_handler_destroy");
82       }
83     }
84   }
85 };
86
87
88 /*!
89  * \brief Concrete class that manages SPE jobs.
90  *
91  * This class contains all the implementation details.
92  */
93 class gc_job_manager_impl : public gc_job_manager
94 {
95   enum { MAX_SPES =  16 };
96
97   int                     d_debug;
98   gc_jm_options           d_options;
99   spe_program_handle_sptr d_spe_image;
100   spe_gang_context_sptr   d_gang;               // boost::shared_ptr
101
102   worker_ctx             d_worker[MAX_SPES];    // SPE ctx, thread, etc
103   gc_spu_args_t         *d_spu_args;            // 16-byte aligned structs
104   boost::shared_ptr<void> _d_spu_args_boost;    // hack for automatic storage mgmt
105
106   gc_comp_info_t        *d_comp_info;           // 128-byte aligned structs
107   boost::shared_ptr<void> _d_comp_info_boost;   // hack for automatic storage mgmt
108
109   // used to coordinate communication w/ the event handling thread
110   omni_mutex             d_eh_mutex;
111   omni_condition         d_eh_cond;
112   pthread_t              d_eh_thread;           // the event handler thread
113   volatile evt_handler_state    d_eh_state;
114   volatile bool                 d_shutdown_requested;
115   spe_event_handler      d_spe_event_handler;
116   
117   // used to coordinate communication w/ the job completer thread
118   omni_mutex             d_jc_mutex;
119   omni_condition         d_jc_cond;
120   pthread_t              d_jc_thread;           // the job completion thread
121   volatile job_completer_state  d_jc_state;
122   int                    d_jc_njobs_active;     // # of jobs submitted but not yet reaped
123
124   // round robin notification of spes
125   int                    d_ntell;               // # of spes to tell
126   unsigned int           d_tell_start;          // which one to start with
127
128   // All of the job descriptors are hung off of here.
129   // We allocate them all in a single cache aligned chunk.
130   gc_job_desc_t         *d_jd;                  // [options.max_jobs]
131   boost::shared_ptr<void> _d_jd_boost;          // hack for automatic storage mgmt
132
133   gc_client_thread_info_sa d_client_thread;     // [options.max_client_threads]
134
135   // We use bitvectors to represent the completing state of a job.  Each
136   // bitvector is d_bvlen longs in length.
137   int                    d_bvlen;               // bit vector length in longs
138
139   // This contains the storage for all the bitvectors used by the job
140   // manager.  There's 1 for each client thread, in the d_jobs_done
141   // field.  We allocate them all in a single cache aligned chunk.
142   boost::shared_ptr<void> _d_all_bitvectors;    // hack for automatic storage mgmt
143
144   // Lock free stack where we keep track of the free job descriptors.
145   gc_jd_stack_t         *d_free_list;           // stack of free job descriptors
146   boost::shared_ptr<void> _d_free_list_boost;   // hack for automatic storage mgmt
147
148   // The PPE inserts jobs here; SPEs pull jobs from here.
149   gc_jd_queue_t         *d_queue;               // job queue
150   boost::shared_ptr<void> _d_queue_boost;       // hack for automatic storage mgmt
151
152   int                    d_ea_args_maxsize;
153
154   struct gc_proc_def    *d_proc_def;            // the SPE procedure table
155   uint32_t               d_proc_def_ls_addr;    // the LS address of the table
156   int                    d_nproc_defs;          // number of proc_defs in table
157
158   gc_client_thread_info *alloc_cti();
159   void free_cti(gc_client_thread_info *cti);
160
161   void create_event_handler();
162   void set_eh_state(evt_handler_state s);
163   void set_ea_args_maxsize(int maxsize);
164
165   void notify_clients_jobs_are_done(unsigned int spe_num,
166                                     unsigned int completion_info_idx);
167
168 public:
169   void event_handler_loop();    // really private
170   void job_completer_loop();    // really private
171
172 private:
173   bool send_all_spes(uint32_t msg);
174   bool send_spe(unsigned int spe, uint32_t msg);
175   void print_event(spe_event_unit_t *evt);
176   void handle_event(spe_event_unit_t *evt);
177   bool incr_njobs_active();
178   void decr_njobs_active(int n);
179   void tell_spes_to_check_queue();
180   void poll_for_job_completion();
181
182   // bitvector ops
183   void bv_zero(unsigned long *bv);
184   void bv_clr(unsigned long *bv, unsigned int bitno);
185   void bv_set(unsigned long *bv, unsigned int bitno);
186   bool bv_isset(unsigned long *bv, unsigned int bitno);
187   bool bv_isclr(unsigned long *bv, unsigned int bitno);
188
189   void setup_logfiles();
190   void sync_logfiles();
191   void unmap_logfiles();
192
193   friend gc_job_manager_sptr gc_make_job_manager(const gc_jm_options *options);
194   
195   gc_job_manager_impl(const gc_jm_options *options = 0);
196
197 public:
198   virtual ~gc_job_manager_impl();
199
200   /*!
201    * Stop accepting new jobs.  Wait for existing jobs to complete.
202    * Return all managed SPE's to the system.
203    */
204   virtual bool shutdown();
205
206   /*!
207    * \brief Return number of SPE's currently allocated to job manager.
208    */
209   virtual int nspes() const;
210
211   /*!
212    * \brief Return a pointer to a properly aligned job descriptor,
213    * or zero if none are available.
214    */
215   virtual gc_job_desc *alloc_job_desc();
216
217   /*
218    *! Return a job descriptor previously allocated with alloc_job_desc()
219    *
220    * \param[in] jd pointer to job descriptor to free.
221    */
222   virtual void free_job_desc(gc_job_desc *jd);
223
224   /*!
225    * \brief Submit a job for asynchronous processing on an SPE.
226    *
227    * \param[in] jd pointer to job description
228    *
229    * The caller must not read or write the job description
230    * or any of the memory associated with any indirect arguments
231    * until after calling wait_job.
232    *
233    * \returns true iff the job was successfully enqueued.
234    * If submit_job returns false, check jd->status for additional info.
235    */
236   virtual bool submit_job(gc_job_desc *jd);
237
238   /*!
239    * \brief Wait for job to complete.
240    *
241    * A thread may only wait for jobs which it submitted.
242    *
243    * \returns true if sucessful, else false.
244    */
245   virtual bool 
246   wait_job(gc_job_desc *jd);
247
248   /*!
249    * \brief wait for 1 or more jobs to complete.
250    *
251    * \param[in] njobs is the length of arrays \p jd and \p done.
252    * \param[in] jd are the jobs that are to be waited for.
253    * \param[out] done indicates whether the corresponding job is complete.
254    * \param[in] mode indicates whether to wait for ALL or ANY of the jobs
255    *   in \p jd to complete.
256    *
257    * A thread may only wait for jobs which it submitted.
258    *
259    * \returns number of jobs completed, or -1 if error.
260    */
261   virtual int
262   wait_jobs(unsigned int njobs,
263             gc_job_desc *jd[], bool done[], gc_wait_mode mode);
264
265   virtual int ea_args_maxsize();
266
267   virtual gc_proc_id_t lookup_proc(const std::string &name);
268   virtual std::vector<std::string> proc_names();
269
270   virtual void set_debug(int debug);
271   virtual int debug();
272 };
273
274 #endif /* INCLUDED_GC_JOB_MANAGER_IMPL_H */