Updated Sunset/Sunrise functions to latest PyEphem API
[debian/gnuradio] / gcell / src / lib / runtime / gc_job_manager_impl.cc
1 /* -*- c++ -*- */
2 /*
3  * Copyright 2007 Free Software Foundation, Inc.
4  * 
5  * This file is part of GNU Radio
6  * 
7  * GNU Radio is free software; you can redistribute it and/or modify
8  * it under the terms of the GNU General Public License as published by
9  * the Free Software Foundation; either version 3, or (at your option)
10  * any later version.
11  * 
12  * GNU Radio is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  * GNU General Public License for more details.
16  * 
17  * You should have received a copy of the GNU General Public License along
18  * with this program; if not, write to the Free Software Foundation, Inc.,
19  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
20  */
21
22 #ifdef HAVE_CONFIG_H
23 #include <config.h>
24 #endif
25 #include <gc_job_manager_impl.h>
26 #include <gc_mbox.h>
27 #include <gc_proc_def_utils.h>
28 #include <gc_aligned_alloc.h>
29 #include <stdio.h>
30 #include <stdexcept>
31 #include <stdlib.h>
32 #include <atomic_dec_if_positive.h>
33 #include <memory_barrier.h>
34 #include <unistd.h>
35 #include <sys/mman.h>
36 #include <sys/types.h>
37 #include <sys/stat.h>
38 #include <fcntl.h>
39 #include <string.h>
40
41
42 static const size_t CACHE_LINE_SIZE = 128;
43
44 static const unsigned int DEFAULT_MAX_JOBS = 128;
45 static const unsigned int DEFAULT_MAX_CLIENT_THREADS = 64;
46
47 // FIXME this really depends on the SPU code...
48 static const unsigned int MAX_TOTAL_INDIRECT_LENGTH = 16 * 1024;
49
50
51 static bool          s_key_initialized = false;
52 static pthread_key_t s_client_key;
53
54 static int s_worker_debug = 0;
55
56 // custom deleter of gang_contexts for use with boost::shared_ptr
57 class gang_deleter {
58 public:
59   void operator()(spe_gang_context_ptr_t ctx) {
60     if (ctx){
61       int r = spe_gang_context_destroy(ctx);
62       if (r != 0){
63         perror("spe_gang_context_destroy");
64       }
65     }
66   }
67 };
68
69
70 // custom deleter of anything that can be freed with "free"
71 class free_deleter {
72 public:
73   void operator()(void *p) {
74     free(p);
75   }
76 };
77
78
79 /*
80  * Called when client thread is destroyed.
81  * We mark our client info free.
82  */
83 static void
84 client_key_destructor(void *p)
85 {
86   ((gc_client_thread_info *) p)->d_free = 1;
87 }
88
89 static bool
90 is_power_of_2(uint32_t x)
91 {
92   return (x != 0) && !(x & (x - 1));
93 }
94
95 ////////////////////////////////////////////////////////////////////////
96
97
98 gc_job_manager_impl::gc_job_manager_impl(const gc_jm_options *options)
99   : d_debug(0), d_spu_args(0),
100     d_eh_cond(&d_eh_mutex), d_eh_thread(0), d_eh_state(EHS_INIT),
101     d_shutdown_requested(false),
102     d_client_thread(0), d_ea_args_maxsize(0),
103     d_proc_def(0), d_proc_def_ls_addr(0), d_nproc_defs(0)
104 {
105   if (!s_key_initialized){
106     int r = pthread_key_create(&s_client_key, client_key_destructor);
107     if (r != 0)
108       throw std::runtime_error("pthread_key_create");
109     s_key_initialized = true;
110   }
111
112   // ensure it's zero
113   pthread_setspecific(s_client_key, 0);
114
115   if (options != 0)
116     d_options = *options;
117
118   // provide the real default for those indicated with a zero
119   if (d_options.max_jobs == 0)
120     d_options.max_jobs = DEFAULT_MAX_JOBS;
121   if (d_options.max_client_threads == 0)
122     d_options.max_client_threads = DEFAULT_MAX_CLIENT_THREADS;
123
124   if (!d_options.program_handle){
125     fprintf(stderr, "gc_job_manager: options->program_handle must be non-zero\n");
126     throw std::runtime_error("gc_job_manager: options->program_handle must be non-zero");
127   }
128
129   int ncpu_nodes = spe_cpu_info_get(SPE_COUNT_PHYSICAL_CPU_NODES, -1);
130   int nusable_spes = spe_cpu_info_get(SPE_COUNT_USABLE_SPES, -1);
131
132   if (debug()){
133     printf("cpu_nodes = %d\n", ncpu_nodes);
134     for (int i = 0; i < ncpu_nodes; i++){
135       printf("node[%d].physical_spes = %2d\n", i,
136              spe_cpu_info_get(SPE_COUNT_PHYSICAL_SPES, i));
137       printf("node[%d].usable_spes   = %2d\n", i,
138              spe_cpu_info_get(SPE_COUNT_USABLE_SPES, i));
139     }
140   }
141
142   // clamp nspes
143   d_options.nspes = std::min(d_options.nspes, (unsigned int) MAX_SPES);
144   nusable_spes = std::min(nusable_spes, (int) MAX_SPES);
145
146   //
147   // sanity check requested number of spes.
148   //
149   if (d_options.nspes == 0)     // use all of them
150     d_options.nspes = nusable_spes;
151   else {
152     if (d_options.nspes > (unsigned int) nusable_spes){
153       fprintf(stderr,
154               "gc_job_manager: warning: caller requested %d spes.  There are only %d available.\n",
155               d_options.nspes, nusable_spes);
156       if (d_options.gang_schedule){
157         // If we're gang scheduling we'll never get scheduled if we
158         // ask for more than are available.
159         throw std::out_of_range("gang_scheduling: not enough spes available");
160       }
161       else {    // FIXME clamp to usable.  problem on PS3 when overcommited
162         fprintf(stderr, "gc_job_manager: clamping nspes to %d\n", nusable_spes);
163         d_options.nspes = nusable_spes;
164       }
165     }
166   }
167
168   if (d_options.use_affinity){
169     printf("gc_job_manager: warning: affinity request was ignored\n");
170   }
171
172   if (d_options.gang_schedule){
173     d_gang = spe_gang_context_sptr(spe_gang_context_create(0), gang_deleter());
174     if (!d_gang){
175       perror("gc_job_manager_impl[spe_gang_context_create]");
176       throw std::runtime_error("spe_gang_context_create");
177     }
178   }
179
180   // ----------------------------------------------------------------
181   // initalize the job queue
182   
183   d_queue = (gc_jd_queue_t *) gc_aligned_alloc(sizeof(gc_jd_queue_t), CACHE_LINE_SIZE);
184   _d_queue_boost =
185     boost::shared_ptr<void>((void *) d_queue, free_deleter());
186   gc_jd_queue_init(d_queue);
187
188
189   // ----------------------------------------------------------------
190   // create the spe contexts
191
192   // 1 spu_arg struct for each SPE
193   assert(sizeof(gc_spu_args_t) % 16 == 0);
194   d_spu_args =
195     (gc_spu_args_t *) gc_aligned_alloc(MAX_SPES * sizeof(gc_spu_args_t), 16);
196   _d_spu_args_boost =
197     boost::shared_ptr<void>((void *) d_spu_args, free_deleter());
198
199   // 2 completion info structs for each SPE (we double buffer them)
200   assert(sizeof(gc_comp_info_t) % CACHE_LINE_SIZE == 0);
201   d_comp_info =
202     (gc_comp_info_t *) gc_aligned_alloc(2 * MAX_SPES * sizeof(gc_comp_info_t),
203                                         CACHE_LINE_SIZE);
204   _d_comp_info_boost =
205     boost::shared_ptr<void>((void *) d_comp_info, free_deleter());
206
207
208   // get a handle to the spe program
209
210   spe_program_handle_t *spe_image = d_options.program_handle.get();
211
212   // fish proc_def table out of SPE ELF file
213
214   if (!gcpd_find_table(spe_image, &d_proc_def, &d_nproc_defs, &d_proc_def_ls_addr)){
215     fprintf(stderr, "gc_job_manager_impl: couldn't find gc_proc_defs in SPE ELF file.\n");
216     throw std::runtime_error("no gc_proc_defs");
217   }
218   // fprintf(stderr, "d_proc_def_ls_addr = 0x%0x\n", d_proc_def_ls_addr);
219
220   int spe_flags = (SPE_EVENTS_ENABLE
221                    | SPE_CFG_SIGNOTIFY1_OR
222                    | SPE_CFG_SIGNOTIFY2_OR);
223   
224   for (unsigned int i = 0; i < d_options.nspes; i++){
225     // FIXME affinity stuff goes here
226     d_worker[i].spe_ctx = spe_context_create(spe_flags, d_gang.get());;
227     if (d_worker[i].spe_ctx == 0){
228       perror("spe_context_create");
229       throw std::runtime_error("spe_context_create");
230     }
231     d_worker[i].spe_idx = i;
232     d_worker[i].spu_args = &d_spu_args[i];
233     d_worker[i].spu_args->queue = ptr_to_ea(d_queue);
234     d_worker[i].spu_args->comp_info[0] = ptr_to_ea(&d_comp_info[2*i+0]);
235     d_worker[i].spu_args->comp_info[1] = ptr_to_ea(&d_comp_info[2*i+1]);
236     d_worker[i].spu_args->spu_idx = i;
237     d_worker[i].spu_args->nspus = d_options.nspes;
238     d_worker[i].spu_args->proc_def_ls_addr = d_proc_def_ls_addr;
239     d_worker[i].spu_args->nproc_defs = d_nproc_defs;
240     d_worker[i].spu_args->log.base = 0;
241     d_worker[i].spu_args->log.nentries = 0;
242     d_worker[i].state = WS_INIT;
243
244     int r = spe_program_load(d_worker[i].spe_ctx, spe_image);
245     if (r != 0){
246       perror("spe_program_load");
247       throw std::runtime_error("spe_program_load");
248     }
249   }
250
251   setup_logfiles();
252
253   // ----------------------------------------------------------------
254   // initalize the free list of job descriptors
255   
256   d_free_list = (gc_jd_stack_t *) gc_aligned_alloc(sizeof(gc_jd_stack_t), CACHE_LINE_SIZE);
257   // This ensures that the memory associated with d_free_list is
258   // automatically freed in the destructor or if an exception occurs
259   // here in the constructor.
260   _d_free_list_boost =
261     boost::shared_ptr<void>((void *) d_free_list, free_deleter());
262   gc_jd_stack_init(d_free_list);
263
264   if (debug()){
265     printf("sizeof(d_jd[0]) = %d (0x%x)\n", sizeof(d_jd[0]), sizeof(d_jd[0]));
266     printf("max_jobs = %u\n", d_options.max_jobs);
267   }
268
269   // Initialize the array of job descriptors.
270   d_jd = (gc_job_desc_t *) gc_aligned_alloc(sizeof(d_jd[0]) * d_options.max_jobs, CACHE_LINE_SIZE);
271   _d_jd_boost = boost::shared_ptr<void>((void *) d_jd, free_deleter());
272
273
274   // set unique job_id
275   for (int i = 0; i < (int) d_options.max_jobs; i++)
276     d_jd[i].sys.job_id = i;
277
278   // push them onto the free list
279   for (int i = d_options.max_jobs - 1; i >= 0; i--)
280     free_job_desc(&d_jd[i]);
281
282   // ----------------------------------------------------------------
283   // initialize d_client_thread
284
285   {
286     gc_client_thread_info_sa cti(
287          new gc_client_thread_info[d_options.max_client_threads]);
288
289     d_client_thread.swap(cti);
290
291     for (unsigned int i = 0; i < d_options.max_client_threads; i++)
292       d_client_thread[i].d_client_id = i;
293   }
294
295   // ----------------------------------------------------------------
296   // initialize bitvectors
297
298   // initialize d_bvlen, the number of longs in job related bitvectors.
299   int bits_per_long = sizeof(unsigned long) * 8;
300   d_bvlen = (d_options.max_jobs + bits_per_long - 1) / bits_per_long;
301
302   // allocate all bitvectors in a single cache-aligned chunk
303   size_t nlongs = d_bvlen * d_options.max_client_threads;
304   void *p = gc_aligned_alloc(nlongs * sizeof(unsigned long), CACHE_LINE_SIZE);
305   _d_all_bitvectors = boost::shared_ptr<void>(p, free_deleter());
306
307   // Now point the gc_client_thread_info bitvectors into this storage
308   unsigned long *v = (unsigned long *) p;
309
310   for (unsigned int i = 0; i < d_options.max_client_threads; i++, v += d_bvlen)
311     d_client_thread[i].d_jobs_done = v;
312
313
314   // ----------------------------------------------------------------
315   // create the spe event handler & worker (SPE) threads
316
317   create_event_handler();
318
319 }
320
321 ////////////////////////////////////////////////////////////////////////
322
323 gc_job_manager_impl::~gc_job_manager_impl()
324 {
325   shutdown();
326
327   d_jd = 0;             // handled via _d_jd_boost
328   d_free_list = 0;      // handled via _d_free_list_boost
329   d_queue = 0;          // handled via _d_queue_boost
330
331   // clear cti, since we've deleted the underlying data
332   pthread_setspecific(s_client_key, 0);
333
334   unmap_logfiles();
335 }
336
337 bool
338 gc_job_manager_impl::shutdown()
339 {
340   omni_mutex_lock       l(d_eh_mutex);
341
342   d_shutdown_requested = true;          // set flag for event handler thread
343
344   // should only happens during early QA code
345   if (d_eh_thread == 0 && d_eh_state == EHS_INIT)
346     return false;
347
348   while (d_eh_state != EHS_DEAD)        // wait for it to finish
349     d_eh_cond.wait();
350
351   return true;
352 }
353
354 int
355 gc_job_manager_impl::nspes() const
356 {
357   return d_options.nspes;
358 }
359
360 ////////////////////////////////////////////////////////////////////////
361
362 void
363 gc_job_manager_impl::bv_zero(unsigned long *bv)
364 {
365   memset(bv, 0, sizeof(unsigned long) * d_bvlen);
366 }
367
368 inline void
369 gc_job_manager_impl::bv_clr(unsigned long *bv, unsigned int bitno)
370 {
371   unsigned int wi = bitno / (sizeof (unsigned long) * 8);
372   unsigned int bi = bitno & ((sizeof (unsigned long) * 8) - 1);
373   bv[wi] &= ~(1UL << bi);
374 }
375
376 inline void
377 gc_job_manager_impl::bv_set(unsigned long *bv, unsigned int bitno)
378 {
379   unsigned int wi = bitno / (sizeof (unsigned long) * 8);
380   unsigned int bi = bitno & ((sizeof (unsigned long) * 8) - 1);
381   bv[wi] |= (1UL << bi);
382 }
383
384 inline bool
385 gc_job_manager_impl::bv_isset(unsigned long *bv, unsigned int bitno)
386 {
387   unsigned int wi = bitno / (sizeof (unsigned long) * 8);
388   unsigned int bi = bitno & ((sizeof (unsigned long) * 8) - 1);
389   return (bv[wi] & (1UL << bi)) != 0;
390 }
391
392 inline bool
393 gc_job_manager_impl::bv_isclr(unsigned long *bv, unsigned int bitno)
394 {
395   unsigned int wi = bitno / (sizeof (unsigned long) * 8);
396   unsigned int bi = bitno & ((sizeof (unsigned long) * 8) - 1);
397   return (bv[wi] & (1UL << bi)) == 0;
398 }
399
400 ////////////////////////////////////////////////////////////////////////
401
402 gc_job_desc *
403 gc_job_manager_impl::alloc_job_desc()
404 {
405   // stack is lock free, and safe to call from any thread
406   gc_job_desc *jd = gc_jd_stack_pop(d_free_list);
407   if (jd == 0)
408     throw gc_bad_alloc("alloc_job_desc: none available");
409
410   return jd;
411 }
412
413 void
414 gc_job_manager_impl::free_job_desc(gc_job_desc *jd)
415 {
416   // stack is lock free, thus safe to call from any thread
417   if (jd != 0)
418     gc_jd_stack_push(d_free_list, jd);
419 }
420
421 ////////////////////////////////////////////////////////////////////////
422
423 /*
424  * We check as much as we can here on the PPE side, so that the SPE
425  * doesn't have to.
426  */
427 static bool
428 check_direct_args(gc_job_desc *jd, gc_job_direct_args *args)
429 {
430   if (args->nargs > MAX_ARGS_DIRECT){
431     jd->status = JS_BAD_N_DIRECT;
432     return false;
433   }
434
435   return true;
436 }
437
438 static bool
439 check_ea_args(gc_job_desc *jd, gc_job_ea_args *p)
440 {
441   if (p->nargs > MAX_ARGS_EA){
442     jd->status = JS_BAD_N_EA;
443     return false;
444   }
445
446   uint32_t dir_union = 0;
447
448   for (unsigned int i = 0; i < p->nargs; i++){
449     dir_union |= p->arg[i].direction;
450     switch(p->arg[i].direction){
451     case GCJD_DMA_GET:
452     case GCJD_DMA_PUT:
453       break;
454
455     default:
456       jd->status = JS_BAD_DIRECTION;
457       return false;
458     }
459   }
460
461   if (p->nargs > 1){
462     unsigned int common_eah = (p->arg[0].ea_addr) >> 32;
463     for (unsigned int i = 1; i < p->nargs; i++){
464       if ((p->arg[i].ea_addr >> 32) != common_eah){
465         jd->status = JS_BAD_EAH;
466         return false;
467       }
468     }
469   }
470
471   jd->sys.direction_union = dir_union;
472   return true;
473 }
474
475 bool
476 gc_job_manager_impl::submit_job(gc_job_desc *jd)
477 {
478   if (unlikely(d_shutdown_requested)){
479     jd->status = JS_SHUTTING_DOWN;
480     return false;
481   }
482
483   // Ensure it's one of our job descriptors
484
485   if (jd < d_jd || jd >= &d_jd[d_options.max_jobs]){
486     jd->status = JS_BAD_JOB_DESC;
487     return false;
488   }
489
490   // Ensure we've got a client_thread_info assigned to this thread.
491   
492   gc_client_thread_info *cti =
493     (gc_client_thread_info *) pthread_getspecific(s_client_key);
494   if (unlikely(cti == 0)){
495     if ((cti = alloc_cti()) == 0){
496       fprintf(stderr, "gc_job_manager_impl::submit_job: Too many client threads.\n");
497       jd->status = JS_TOO_MANY_CLIENTS;
498       return false;
499     }
500     int r = pthread_setspecific(s_client_key, cti);
501     if (r != 0){
502       jd->status = JS_BAD_JUJU;
503       fprintf(stderr, "pthread_setspecific failed (return = %d)\n", r);
504       return false;
505     }
506   }
507
508   if (jd->proc_id == GCP_UNKNOWN_PROC){
509     jd->status = JS_UNKNOWN_PROC;
510     return false;
511   }
512
513   if (!check_direct_args(jd, &jd->input))
514     return false;
515
516   if (!check_direct_args(jd, &jd->output))
517     return false;
518
519   if (!check_ea_args(jd, &jd->eaa))
520     return false;
521
522   jd->status = JS_OK;
523   jd->sys.client_id = cti->d_client_id;
524
525   // FIXME keep count of jobs in progress?
526   
527   gc_jd_queue_enqueue(d_queue, jd);
528   return true;
529 }
530
531 bool
532 gc_job_manager_impl::wait_job(gc_job_desc *jd)
533 {
534   bool done;
535   return wait_jobs(1, &jd, &done, GC_WAIT_ANY) == 1 && jd->status == JS_OK;
536 }
537
538 int
539 gc_job_manager_impl::wait_jobs(unsigned int njobs,
540                                gc_job_desc *jd[],
541                                bool done[],
542                                gc_wait_mode mode)
543 {
544   unsigned int i;
545
546   gc_client_thread_info *cti =
547     (gc_client_thread_info *) pthread_getspecific(s_client_key);
548   if (unlikely(cti == 0))
549     return -1;
550
551   for (i = 0; i < njobs; i++){
552     done[i] = false;
553     if (unlikely(jd[i]->sys.client_id != cti->d_client_id)){
554       fprintf(stderr, "gc_job_manager_impl::wait_jobs: can't wait for a job you didn't submit\n");
555       return -1;
556     }
557   }
558
559   {
560     omni_mutex_lock     l(cti->d_mutex);
561
562     // setup info for event handler
563     cti->d_state = (mode == GC_WAIT_ANY) ? CT_WAIT_ANY : CT_WAIT_ALL;
564     cti->d_njobs_waiting_for = njobs;
565     cti->d_jobs_waiting_for = jd;
566     assert(cti->d_jobs_done != 0);
567
568     unsigned int ndone = 0;
569
570     // wait for jobs to complete
571     
572     while (1){
573       ndone = 0;
574       for (i= 0; i < njobs; i++){
575         if (done[i])
576           ndone++;
577         else if (bv_isset(cti->d_jobs_done, jd[i]->sys.job_id)){
578           bv_clr(cti->d_jobs_done, jd[i]->sys.job_id);
579           done[i] = true;
580           ndone++;
581         }
582       }
583
584       if (mode == GC_WAIT_ANY && ndone > 0)
585         break;
586
587       if (mode == GC_WAIT_ALL && ndone == njobs)
588         break;
589
590       // FIXME what happens when somebody calls shutdown?
591
592       cti->d_cond.wait();       // wait for event handler to wake us up
593     }
594
595     cti->d_state = CT_NOT_WAITING;  
596     cti->d_njobs_waiting_for = 0;       // tidy up (not reqd)
597     cti->d_jobs_waiting_for = 0;        // tidy up (not reqd)
598     return ndone;
599   }
600 }
601
602 ////////////////////////////////////////////////////////////////////////
603
604 bool
605 gc_job_manager_impl::send_all_spes(uint32_t msg)
606 {
607   bool ok = true;
608
609   for (unsigned int i = 0; i < d_options.nspes; i++)
610     ok &= send_spe(i, msg);
611
612   return ok;
613 }
614
615 bool
616 gc_job_manager_impl::send_spe(unsigned int spe, uint32_t msg)
617 {
618   if (spe >= d_options.nspes)
619     return false;
620
621   int r = spe_in_mbox_write(d_worker[spe].spe_ctx, &msg, 1,
622                             SPE_MBOX_ALL_BLOCKING);
623   if (r < 0){
624     perror("spe_in_mbox_write");
625     return false;
626   }
627
628   return r == 1;
629 }
630
631 ////////////////////////////////////////////////////////////////////////
632
633 static void
634 pthread_create_failure_msg(int r, const char *which)
635 {
636   char buf[256];
637   char *s = 0;
638
639   switch (r){
640   case EAGAIN: s = "EAGAIN"; break;
641   case EINVAL: s = "EINVAL"; break;
642   case EPERM:  s = "EPERM";  break;
643   default:
644     snprintf(buf, sizeof(buf), "Unknown error %d", r);
645     s = buf;
646     break;
647   }
648   fprintf(stderr, "pthread_create[%s] failed: %s\n", which, s);
649 }
650
651
652 static bool
653 start_thread(pthread_t *thread,
654              void *(*start_routine)(void *),  void *arg,
655              const char *msg)
656 {
657   pthread_attr_t attr;
658   pthread_attr_init(&attr);
659   pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
660
661   // FIXME save sigprocmask
662   // FIXME set sigprocmask
663
664   int r = pthread_create(thread, &attr, start_routine, arg);
665     
666   // FIXME restore sigprocmask
667
668   if (r != 0){
669     pthread_create_failure_msg(r, msg);
670     return false;
671   }
672   return true;
673 }
674
675
676 ////////////////////////////////////////////////////////////////////////
677
678 static void *start_worker(void *arg);
679
680 static void *
681 start_event_handler(void *arg)
682 {
683   gc_job_manager_impl *p = (gc_job_manager_impl *) arg;
684   p->event_handler_loop();
685   return 0;
686 }
687
688 void
689 gc_job_manager_impl::create_event_handler()
690 {
691   // create the SPE event handler and register our interest in events
692
693   d_spe_event_handler.ptr = spe_event_handler_create();
694   if (d_spe_event_handler.ptr == 0){
695     perror("spe_event_handler_create");
696     throw std::runtime_error("spe_event_handler_create");
697   }
698
699   for (unsigned int i = 0; i < d_options.nspes; i++){
700     spe_event_unit_t    eu;
701     memset(&eu, 0, sizeof(eu));
702     eu.events = SPE_EVENT_OUT_INTR_MBOX | SPE_EVENT_SPE_STOPPED;
703     eu.spe = d_worker[i].spe_ctx;
704     eu.data.u32 = i;    // set in events returned by spe_event_wait
705
706     if (spe_event_handler_register(d_spe_event_handler.ptr, &eu) != 0){
707       perror("spe_event_handler_register");
708       throw std::runtime_error("spe_event_handler_register");
709     }
710   }
711
712   // create our event handling thread
713
714   if (!start_thread(&d_eh_thread, start_event_handler, this, "event_handler")){
715     throw std::runtime_error("pthread_create");
716   }
717
718   // create the SPE worker threads
719
720   bool ok = true;
721   for (unsigned int i = 0; ok && i < d_options.nspes; i++){
722     char name[256];
723     snprintf(name, sizeof(name), "worker[%d]", i);
724     ok &= start_thread(&d_worker[i].thread, start_worker,
725                        &d_worker[i], name);
726   }
727
728   if (!ok){
729     //
730     // FIXME Clean up the mess.  Need to terminate event handler and all workers.
731     //
732     // this should cause the workers to exit, unless they're seriously broken
733     send_all_spes(MK_MBOX_MSG(OP_EXIT, 0));
734
735     shutdown();
736
737     throw std::runtime_error("pthread_create");
738   }
739 }
740
741 ////////////////////////////////////////////////////////////////////////
742
743 void
744 gc_job_manager_impl::set_eh_state(evt_handler_state s)
745 {
746   omni_mutex_lock       l(d_eh_mutex);
747   d_eh_state = s;
748   d_eh_cond.broadcast();
749 }
750
751 void
752 gc_job_manager_impl::set_ea_args_maxsize(int maxsize)
753 {
754   omni_mutex_lock       l(d_eh_mutex);
755   d_ea_args_maxsize = maxsize;
756   d_eh_cond.broadcast();
757 }
758
759 void
760 gc_job_manager_impl::print_event(spe_event_unit_t *evt)
761 {
762   printf("evt: spe = %d events = (0x%x)", evt->data.u32, evt->events);
763
764   if (evt->events & SPE_EVENT_OUT_INTR_MBOX)
765     printf(" OUT_INTR_MBOX");
766   
767   if (evt->events & SPE_EVENT_IN_MBOX)
768     printf(" IN_MBOX");
769   
770   if (evt->events & SPE_EVENT_TAG_GROUP)
771     printf(" TAG_GROUP");
772   
773   if (evt->events & SPE_EVENT_SPE_STOPPED)
774     printf(" SPE_STOPPED");
775
776   printf("\n");
777 }
778
779 struct job_client_info {
780   uint16_t      job_id;
781   uint16_t      client_id;
782 };
783
784 static int
785 compare_jci_clients(const void *va, const void *vb)
786 {
787   const job_client_info *a = (job_client_info *) va;
788   const job_client_info *b = (job_client_info *) vb;
789
790   return a->client_id - b->client_id;
791 }
792
793 void
794 gc_job_manager_impl::notify_clients_jobs_are_done(unsigned int spe_num,
795                                                   unsigned int completion_info_idx)
796 {
797   const char *msg = "gc_job_manager_impl::notify_client_job_is_done (INTERNAL ERROR)";
798
799   smp_rmb();  // order reads so we know that data sent from SPE is here
800
801   gc_comp_info_t *ci = &d_comp_info[2 * spe_num + (completion_info_idx & 0x1)];
802
803   if (ci->ncomplete == 0){      // never happens, but ensures code below is correct
804     ci->in_use = 0;
805     return;
806   }
807
808   if (0){
809     static int total_jobs;
810     static int total_msgs;
811     total_msgs++;
812     total_jobs += ci->ncomplete;
813     printf("ppe:     tj = %6d  tm = %6d\n", total_jobs, total_msgs);
814   }
815
816   job_client_info gci[GC_CI_NJOBS];
817
818   /*
819    * Make one pass through and sanity check everything while filling in gci
820    */
821   for (unsigned int i = 0; i < ci->ncomplete; i++){
822     unsigned int job_id = ci->job_id[i];
823
824     if (job_id >= d_options.max_jobs){
825       // internal error, shouldn't happen
826       fprintf(stderr,"%s: invalid job_id = %d\n", msg, job_id);
827       ci->in_use = 0;           // clear flag so SPE knows we're done with it
828       return;
829     }
830     gc_job_desc *jd = &d_jd[job_id];
831
832     if (jd->sys.client_id >= d_options.max_client_threads){
833       // internal error, shouldn't happen
834       fprintf(stderr, "%s: invalid client_id = %d\n", msg, jd->sys.client_id);
835       ci->in_use = 0;           // clear flag so SPE knows we're done with it
836       return;
837     }
838
839     gci[i].job_id = job_id;
840     gci[i].client_id = jd->sys.client_id;
841   }
842
843   // sort by client_id so we only have to lock & signal once / client
844
845   if (ci->ncomplete > 1)
846     qsort(gci, ci->ncomplete, sizeof(gci[0]), compare_jci_clients);
847
848   // "wind-in" 
849
850   gc_client_thread_info *last_cti = &d_client_thread[gci[0].client_id];
851   last_cti->d_mutex.lock();
852   bv_set(last_cti->d_jobs_done, gci[0].job_id);  // mark job done
853
854   for (unsigned int i = 1; i < ci->ncomplete; i++){
855
856     gc_client_thread_info *cti = &d_client_thread[gci[i].client_id];
857
858     if (cti != last_cti){       // new client?
859
860       // yes.  signal old client, unlock old, lock new
861
862       // FIXME we could distinguish between CT_WAIT_ALL & CT_WAIT_ANY
863
864       if (last_cti->d_state == CT_WAIT_ANY || last_cti->d_state == CT_WAIT_ALL)
865         last_cti->d_cond.signal();      // wake client thread up
866
867       last_cti->d_mutex.unlock();
868       cti->d_mutex.lock();
869       last_cti = cti;
870     }
871
872     // mark job done
873     bv_set(cti->d_jobs_done, gci[i].job_id);
874   }
875
876   // "wind-out"
877
878   if (last_cti->d_state == CT_WAIT_ANY || last_cti->d_state == CT_WAIT_ALL)
879     last_cti->d_cond.signal();  // wake client thread up
880   last_cti->d_mutex.unlock();
881
882   ci->in_use = 0;               // clear flag so SPE knows we're done with it
883 }
884
885 void
886 gc_job_manager_impl::handle_event(spe_event_unit_t *evt)
887 {
888   // print_event(evt);
889
890   int spe_num = evt->data.u32;
891
892   // only a single event type can be signaled at a time
893   
894   if (evt->events == SPE_EVENT_OUT_INTR_MBOX) { // SPE sent us 1 or more msgs
895     static const int NMSGS = 32;
896     unsigned int msg[NMSGS];
897     int n = spe_out_intr_mbox_read(evt->spe, msg, NMSGS, SPE_MBOX_ANY_BLOCKING);
898     // printf("spe_out_intr_mbox_read = %d\n", n);
899     if (n < 0){
900       perror("spe_out_intr_mbox_read");
901     }
902     else {
903       for (int i = 0; i < n; i++){
904         switch(MBOX_MSG_OP(msg[i])){
905         case OP_JOBS_DONE:
906           if (debug())
907             printf("eh: job_done (0x%08x) from spu[%d]\n", msg[i], spe_num);
908           notify_clients_jobs_are_done(spe_num, MBOX_MSG_ARG(msg[i]));
909           break;
910
911         case OP_SPU_BUFSIZE:
912           set_ea_args_maxsize(MBOX_MSG_ARG(msg[i]));
913           break;
914
915         case OP_EXIT:
916         default:
917           printf("eh: Unexpected msg (0x%08x) from spu[%d]\n", msg[i], spe_num);
918           break;
919         }
920       }
921     }
922   }
923   else if (evt->events == SPE_EVENT_SPE_STOPPED){ // the SPE stopped
924     spe_stop_info_t si;
925     int r = spe_stop_info_read(evt->spe, &si);
926     if (r < 0){
927       perror("spe_stop_info_read");
928     }
929     else {
930       switch (si.stop_reason){
931       case SPE_EXIT:
932         if (debug()){
933           printf("eh: spu[%d] SPE_EXIT w/ exit_code = %d\n",
934                  spe_num, si.result.spe_exit_code);
935         }
936         break;
937       case SPE_STOP_AND_SIGNAL:
938         printf("eh: spu[%d] SPE_STOP_AND_SIGNAL w/ spe_signal_code = 0x%x\n",
939                spe_num, si.result.spe_signal_code);
940         break;
941       case SPE_RUNTIME_ERROR:
942         printf("eh: spu[%d] SPE_RUNTIME_ERROR w/ spe_runtime_error = 0x%x\n",
943                spe_num, si.result.spe_runtime_error);
944         break;
945       case SPE_RUNTIME_EXCEPTION:
946         printf("eh: spu[%d] SPE_RUNTIME_EXCEPTION w/ spe_runtime_exception = 0x%x\n",
947                spe_num, si.result.spe_runtime_exception);
948         break;
949       case SPE_RUNTIME_FATAL:
950         printf("eh: spu[%d] SPE_RUNTIME_FATAL w/ spe_runtime_fatal = 0x%x\n",
951                spe_num, si.result.spe_runtime_fatal);
952         break;
953       case SPE_CALLBACK_ERROR:
954         printf("eh: spu[%d] SPE_CALLBACK_ERROR w/ spe_callback_error = 0x%x\n",
955                spe_num, si.result.spe_callback_error);
956         break;
957       case SPE_ISOLATION_ERROR:
958         printf("eh: spu[%d] SPE_ISOLATION_ERROR w/ spe_isolation_error = 0x%x\n",
959                spe_num, si.result.spe_isolation_error);
960         break;
961       default:
962         printf("eh: spu[%d] UNKNOWN STOP REASON (%d) w/ spu_status = 0x%x\n",
963                spe_num, si.stop_reason, si.spu_status);
964         break;
965       }
966     }
967   }
968 #if 0 // not enabled
969   else if (evt->events == SPE_EVENT_IN_MBOX){    // there's room to write to SPE
970     // spe_in_mbox_write (ignore)
971   }
972   else if (evt->events == SPE_EVENT_TAG_GROUP){  // our DMA completed
973     // spe_mfcio_tag_status_read
974   }
975 #endif
976   else {
977     fprintf(stderr, "handle_event: unexpected evt->events = 0x%x\n", evt->events);
978     return;
979   }
980 }
981
982 //
983 // This is the "main program" of the event handling thread
984 //
985 void
986 gc_job_manager_impl::event_handler_loop()
987 {
988   static const int MAX_EVENTS = 16;
989   static const int TIMEOUT = 20;        // how long to block in milliseconds
990
991   spe_event_unit_t events[MAX_EVENTS];
992
993   if (d_debug)
994     printf("event_handler_loop: starting\n");
995
996   set_eh_state(EHS_RUNNING);
997
998   // ask the first spe for its max bufsize
999   send_spe(0, MK_MBOX_MSG(OP_GET_SPU_BUFSIZE, 0));
1000
1001   while (1){
1002     switch(d_eh_state){
1003
1004     case EHS_RUNNING:      // normal stuff
1005       if (d_shutdown_requested) {
1006         set_eh_state(EHS_SHUTTING_DOWN);
1007       }
1008       break;
1009
1010     case EHS_SHUTTING_DOWN:
1011
1012       // FIXME wait until job queue is empty, then tell them to exit
1013
1014       send_all_spes(MK_MBOX_MSG(OP_EXIT, 0));
1015       set_eh_state(EHS_WAITING_FOR_WORKERS_TO_DIE);
1016       break;
1017
1018     case EHS_WAITING_FOR_WORKERS_TO_DIE:
1019       {
1020         bool all_dead = true;
1021         for (unsigned int i = 0; i < d_options.nspes; i++)
1022           all_dead &= d_worker[i].state == WS_DEAD;
1023
1024         if (all_dead){
1025           set_eh_state(EHS_DEAD);
1026           if (d_debug)
1027             printf("event_handler_loop: exiting\n");
1028           return;
1029         }
1030       }
1031       break;
1032
1033     default:
1034       set_eh_state(EHS_DEAD);
1035       printf("event_handler_loop(default): exiting\n");
1036       return;
1037     }
1038
1039     // block waiting for events...
1040     int nevents = spe_event_wait(d_spe_event_handler.ptr,
1041                                  events, MAX_EVENTS, TIMEOUT);
1042     if (nevents < 0){
1043       perror("spe_wait_event");
1044       // FIXME bail?
1045     }
1046     for (int i = 0; i < nevents; i++){
1047       handle_event(&events[i]);
1048     }
1049   }
1050 }
1051
1052 ////////////////////////////////////////////////////////////////////////
1053 // This is the top of the SPE worker threads
1054
1055 static void *
1056 start_worker(void *arg)
1057 {
1058   worker_ctx *w = (worker_ctx *) arg;
1059   spe_stop_info_t       si;
1060
1061   w->state = WS_RUNNING;
1062   if (s_worker_debug)
1063     printf("worker[%d]: WS_RUNNING\n", w->spe_idx);
1064
1065   unsigned int entry = SPE_DEFAULT_ENTRY;
1066   int r = spe_context_run(w->spe_ctx,  &entry, 0, w->spu_args, 0, &si);
1067
1068   if (r < 0){                   // error
1069     char buf[64];
1070     snprintf(buf, sizeof(buf), "worker[%d]: spe_context_run", w->spe_idx);
1071     perror(buf);
1072   }
1073   else if (r == 0){
1074     // spe program called exit.
1075     if (s_worker_debug)
1076       printf("worker[%d]: SPE_EXIT w/ exit_code = %d\n",
1077              w->spe_idx, si.result.spe_exit_code);
1078   }
1079   else {
1080     // called stop_and_signal
1081     //
1082     // I'm not sure we'll ever get here.  I think the event
1083     // handler will catch this...
1084     printf("worker[%d]: SPE_STOP_AND_SIGNAL w/ spe_signal_code = 0x%x\n",
1085            w->spe_idx, si.result.spe_signal_code);
1086   }
1087
1088   // in any event, we're committing suicide now ;)
1089   if (s_worker_debug)
1090     printf("worker[%d]: WS_DEAD\n", w->spe_idx);
1091
1092   w->state = WS_DEAD;
1093   return 0;
1094 }
1095
1096 ////////////////////////////////////////////////////////////////////////
1097
1098 gc_client_thread_info *
1099 gc_job_manager_impl::alloc_cti()
1100 {
1101   for (unsigned int i = 0; i < d_options.max_client_threads; i++){
1102     if (d_client_thread[i].d_free){
1103       // try to atomically grab it
1104       if (_atomic_dec_if_positive(ptr_to_ea(&d_client_thread[i].d_free)) == 0){
1105         // got it...
1106         gc_client_thread_info *cti = &d_client_thread[i];
1107         cti->d_state = CT_NOT_WAITING;
1108         bv_zero(cti->d_jobs_done);
1109         cti->d_njobs_waiting_for = 0;
1110         cti->d_jobs_waiting_for = 0;
1111         
1112         return cti;
1113       }
1114     }
1115   }
1116   return 0;
1117 }
1118
1119 void
1120 gc_job_manager_impl::free_cti(gc_client_thread_info *cti)
1121 {
1122   assert((size_t) (cti - d_client_thread.get()) < d_options.max_client_threads);
1123   cti->d_free = 1;
1124 }
1125
1126 int
1127 gc_job_manager_impl::ea_args_maxsize()
1128 {
1129   omni_mutex_lock       l(d_eh_mutex);
1130
1131   while (d_ea_args_maxsize == 0)        // wait for it to be initialized
1132     d_eh_cond.wait();
1133
1134   return d_ea_args_maxsize;
1135 }
1136
1137 void
1138 gc_job_manager_impl::set_debug(int debug)
1139 {
1140   d_debug = debug;
1141   s_worker_debug = debug;
1142 }
1143
1144 int
1145 gc_job_manager_impl::debug()
1146 {
1147   return d_debug;
1148 }
1149
1150 ////////////////////////////////////////////////////////////////////////
1151
1152 void
1153 gc_job_manager_impl::setup_logfiles()
1154 {
1155   if (!d_options.enable_logging)
1156     return;
1157
1158   if (d_options.log2_nlog_entries == 0)
1159     d_options.log2_nlog_entries = 12;
1160
1161   // must end up a multiple of the page size
1162
1163   size_t pagesize = getpagesize();
1164   size_t s = (1 << d_options.log2_nlog_entries) * sizeof(gc_log_entry_t);
1165   s = ((s + pagesize - 1) / pagesize) * pagesize;
1166   size_t nentries = s / sizeof(gc_log_entry_t);
1167   assert(is_power_of_2(nentries));
1168
1169   for (unsigned int i = 0; i < d_options.nspes; i++){
1170     char filename[100];
1171     snprintf(filename, sizeof(filename), "spu_log.%02d", i);
1172     int fd = open(filename, O_CREAT|O_TRUNC|O_RDWR, 0664);
1173     if (fd == -1){
1174       perror(filename);
1175       return;
1176     }
1177     lseek(fd, s - 1, SEEK_SET);
1178     write(fd, "\0", 1);
1179     void *p = mmap(0, s, PROT_READ|PROT_WRITE, MAP_SHARED, fd, 0);
1180     if (p == MAP_FAILED){
1181       perror("gc_job_manager_impl::setup_logfiles: mmap");
1182       close(fd);
1183       return;
1184     }
1185     close(fd);
1186     memset(p, 0, s);
1187     d_spu_args[i].log.base = ptr_to_ea(p);
1188     d_spu_args[i].log.nentries = nentries;
1189   }
1190 }
1191
1192 void
1193 gc_job_manager_impl::sync_logfiles()
1194 {
1195   for (unsigned int i = 0; i < d_options.nspes; i++){
1196     if (d_spu_args[i].log.base)
1197       msync(ea_to_ptr(d_spu_args[i].log.base),
1198             d_spu_args[i].log.nentries * sizeof(gc_log_entry_t),
1199             MS_ASYNC);
1200   }
1201 }
1202
1203 void
1204 gc_job_manager_impl::unmap_logfiles()
1205 {
1206   for (unsigned int i = 0; i < d_options.nspes; i++){
1207     if (d_spu_args[i].log.base)
1208       munmap(ea_to_ptr(d_spu_args[i].log.base),
1209              d_spu_args[i].log.nentries * sizeof(gc_log_entry_t));
1210   }
1211 }
1212
1213 ////////////////////////////////////////////////////////////////////////
1214 //
1215 // lookup proc names in d_proc_def table
1216
1217 gc_proc_id_t 
1218 gc_job_manager_impl::lookup_proc(const std::string &proc_name)
1219 {
1220   for (int i = 0; i < d_nproc_defs; i++)
1221     if (proc_name == d_proc_def[i].name)
1222       return i;
1223
1224   throw gc_unknown_proc(proc_name);
1225 }
1226
1227 std::vector<std::string>
1228 gc_job_manager_impl::proc_names()
1229 {
1230   std::vector<std::string> r;
1231   for (int i = 0; i < d_nproc_defs; i++)
1232     r.push_back(d_proc_def[i].name);
1233
1234   return r;
1235 }
1236
1237 ////////////////////////////////////////////////////////////////////////
1238
1239 worker_ctx::~worker_ctx()
1240 {
1241   if (spe_ctx){
1242     int r = spe_context_destroy(spe_ctx);
1243     if (r != 0){
1244       perror("spe_context_destroy");
1245     }
1246     spe_ctx = 0;
1247   }
1248   state = WS_FREE;
1249 }