Merged gcell-wip -r8159:8202 into trunk. This includes the following
[debian/gnuradio] / gcell / src / lib / runtime / gc_job_manager_impl.cc
1 /* -*- c++ -*- */
2 /*
3  * Copyright 2007 Free Software Foundation, Inc.
4  * 
5  * This file is part of GNU Radio
6  * 
7  * GNU Radio is free software; you can redistribute it and/or modify
8  * it under the terms of the GNU General Public License as published by
9  * the Free Software Foundation; either version 3, or (at your option)
10  * any later version.
11  * 
12  * GNU Radio is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  * GNU General Public License for more details.
16  * 
17  * You should have received a copy of the GNU General Public License along
18  * with this program; if not, write to the Free Software Foundation, Inc.,
19  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
20  */
21
22 #ifdef HAVE_CONFIG_H
23 #include <config.h>
24 #endif
25 #include <gc_job_manager_impl.h>
26 #include <gc_mbox.h>
27 #include <gc_proc_def_utils.h>
28
29 #include <stdio.h>
30 #include <stdexcept>
31 #include <stdlib.h>
32 #include <atomic_dec_if_positive.h>
33 #include <memory_barrier.h>
34 #include <unistd.h>
35 #include <sys/mman.h>
36 #include <sys/types.h>
37 #include <sys/stat.h>
38 #include <fcntl.h>
39
40
41 static const size_t CACHE_LINE_SIZE = 128;
42
43 static const unsigned int DEFAULT_MAX_JOBS = 128;
44 static const unsigned int DEFAULT_MAX_CLIENT_THREADS = 64;
45
46 // FIXME this really depends on the SPU code...
47 static const unsigned int MAX_TOTAL_INDIRECT_LENGTH = 16 * 1024;
48
49
50 static bool          s_key_initialized = false;
51 static pthread_key_t s_client_key;
52
53 static int s_worker_debug = 0;
54
55 // custom deleter of gang_contexts for use with boost::shared_ptr
56 class gang_deleter {
57 public:
58   void operator()(spe_gang_context_ptr_t ctx) {
59     if (ctx){
60       int r = spe_gang_context_destroy(ctx);
61       if (r != 0){
62         perror("spe_gang_context_destroy");
63       }
64     }
65   }
66 };
67
68
69 // custom deleter of anything that can be freed with "free"
70 class free_deleter {
71 public:
72   void operator()(void *p) {
73     free(p);
74   }
75 };
76
77
78 /*
79  * Called when client thread is destroyed.
80  * We mark our client info free.
81  */
82 static void
83 client_key_destructor(void *p)
84 {
85   ((gc_client_thread_info *) p)->d_free = 1;
86 }
87
88 /*
89  * Return pointer to cache-aligned chunk of storage of size size bytes.
90  * Throw if can't allocate memory.  The storage should be freed
91  * with "free" when done.  The memory is initialized to zero.
92  */
93 static void *
94 aligned_alloc(size_t size, size_t alignment = CACHE_LINE_SIZE)
95 {
96   void *p = 0;
97   if (posix_memalign(&p, alignment, size) != 0){
98     perror("posix_memalign");
99     throw std::runtime_error("memory");
100   }
101   memset(p, 0, size);           // zero the memory
102   return p;
103 }
104
105 static bool
106 is_power_of_2(uint32_t x)
107 {
108   return (x != 0) && !(x & (x - 1));
109 }
110
111 ////////////////////////////////////////////////////////////////////////
112
113
114 gc_job_manager_impl::gc_job_manager_impl(const gc_jm_options *options)
115   : d_debug(0), d_spu_args(0),
116     d_eh_cond(&d_eh_mutex), d_eh_thread(0), d_eh_state(EHS_INIT),
117     d_shutdown_requested(false),
118     d_client_thread(0), d_ea_args_maxsize(0),
119     d_proc_def(0), d_proc_def_ls_addr(0), d_nproc_defs(0)
120 {
121   if (!s_key_initialized){
122     int r = pthread_key_create(&s_client_key, client_key_destructor);
123     if (r != 0)
124       throw std::runtime_error("pthread_key_create");
125     s_key_initialized = true;
126   }
127
128   // ensure it's zero
129   pthread_setspecific(s_client_key, 0);
130
131   if (options != 0)
132     d_options = *options;
133
134   // provide the real default for those indicated with a zero
135   if (d_options.max_jobs == 0)
136     d_options.max_jobs = DEFAULT_MAX_JOBS;
137   if (d_options.max_client_threads == 0)
138     d_options.max_client_threads = DEFAULT_MAX_CLIENT_THREADS;
139
140   if (!d_options.program_handle){
141     fprintf(stderr, "gc_job_manager: options->program_handle must be non-zero\n");
142     throw std::runtime_error("gc_job_manager: options->program_handle must be non-zero");
143   }
144
145   int ncpu_nodes = spe_cpu_info_get(SPE_COUNT_PHYSICAL_CPU_NODES, -1);
146   int nusable_spes = spe_cpu_info_get(SPE_COUNT_USABLE_SPES, -1);
147
148   if (debug()){
149     printf("cpu_nodes = %d\n", ncpu_nodes);
150     for (int i = 0; i < ncpu_nodes; i++){
151       printf("node[%d].physical_spes = %2d\n", i,
152              spe_cpu_info_get(SPE_COUNT_PHYSICAL_SPES, i));
153       printf("node[%d].usable_spes   = %2d\n", i,
154              spe_cpu_info_get(SPE_COUNT_USABLE_SPES, i));
155     }
156   }
157
158   // clamp nspes
159   d_options.nspes = std::min(d_options.nspes, (unsigned int) MAX_SPES);
160   nusable_spes = std::min(nusable_spes, (int) MAX_SPES);
161
162   //
163   // sanity check requested number of spes.
164   //
165   if (d_options.nspes == 0)     // use all of them
166     d_options.nspes = nusable_spes;
167   else {
168     if (d_options.nspes > (unsigned int) nusable_spes){
169       fprintf(stderr,
170               "gc_job_manager: warning: caller requested %d spes.  There are only %d available.\n",
171               d_options.nspes, nusable_spes);
172       if (d_options.gang_schedule){
173         // If we're gang scheduling we'll never get scheduled if we
174         // ask for more than are available.
175         throw std::out_of_range("gang_scheduling: not enough spes available");
176       }
177       else {    // FIXME clamp to usable.  problem on PS3 when overcommited
178         fprintf(stderr, "gc_job_manager: clamping nspes to %d\n", nusable_spes);
179         d_options.nspes = nusable_spes;
180       }
181     }
182   }
183
184   if (d_options.use_affinity){
185     printf("gc_job_manager: warning: affinity request was ignored\n");
186   }
187
188   if (d_options.gang_schedule){
189     d_gang = spe_gang_context_sptr(spe_gang_context_create(0), gang_deleter());
190     if (!d_gang){
191       perror("gc_job_manager_impl[spe_gang_context_create]");
192       throw std::runtime_error("spe_gang_context_create");
193     }
194   }
195
196   // ----------------------------------------------------------------
197   // initalize the job queue
198   
199   d_queue = (gc_jd_queue_t *) aligned_alloc(sizeof(gc_jd_queue_t));
200   _d_queue_boost =
201     boost::shared_ptr<void>((void *) d_queue, free_deleter());
202   gc_jd_queue_init(d_queue);
203
204
205   // ----------------------------------------------------------------
206   // create the spe contexts
207
208   // 1 spu_arg struct for each SPE
209   assert(sizeof(gc_spu_args_t) % 16 == 0);
210   d_spu_args =
211     (gc_spu_args_t *) aligned_alloc(MAX_SPES * sizeof(gc_spu_args_t), 16);
212   _d_spu_args_boost =
213     boost::shared_ptr<void>((void *) d_spu_args, free_deleter());
214
215   // 2 completion info structs for each SPE (we double buffer them)
216   assert(sizeof(gc_comp_info_t) % CACHE_LINE_SIZE == 0);
217   d_comp_info =
218     (gc_comp_info_t *) aligned_alloc(2 * MAX_SPES * sizeof(gc_comp_info_t),
219                                      CACHE_LINE_SIZE);
220   _d_comp_info_boost =
221     boost::shared_ptr<void>((void *) d_comp_info, free_deleter());
222
223
224   // get a handle to the spe program
225
226   spe_program_handle_t *spe_image = d_options.program_handle.get();
227
228   // fish proc_def table out of SPE ELF file
229
230   if (!gcpd_find_table(spe_image, &d_proc_def, &d_nproc_defs, &d_proc_def_ls_addr)){
231     fprintf(stderr, "gc_job_manager_impl: couldn't find gc_proc_defs in SPE ELF file.\n");
232     throw std::runtime_error("no gc_proc_defs");
233   }
234   // fprintf(stderr, "d_proc_def_ls_addr = 0x%0x\n", d_proc_def_ls_addr);
235
236   int spe_flags = (SPE_EVENTS_ENABLE
237                    | SPE_CFG_SIGNOTIFY1_OR
238                    | SPE_CFG_SIGNOTIFY2_OR);
239   
240   for (unsigned int i = 0; i < d_options.nspes; i++){
241     // FIXME affinity stuff goes here
242     d_worker[i].spe_ctx = spe_context_create(spe_flags, d_gang.get());;
243     if (d_worker[i].spe_ctx == 0){
244       perror("spe_context_create");
245       throw std::runtime_error("spe_context_create");
246     }
247     d_worker[i].spe_idx = i;
248     d_worker[i].spu_args = &d_spu_args[i];
249     d_worker[i].spu_args->queue = ptr_to_ea(d_queue);
250     d_worker[i].spu_args->comp_info[0] = ptr_to_ea(&d_comp_info[2*i+0]);
251     d_worker[i].spu_args->comp_info[1] = ptr_to_ea(&d_comp_info[2*i+1]);
252     d_worker[i].spu_args->spu_idx = i;
253     d_worker[i].spu_args->nspus = d_options.nspes;
254     d_worker[i].spu_args->proc_def_ls_addr = d_proc_def_ls_addr;
255     d_worker[i].spu_args->nproc_defs = d_nproc_defs;
256     d_worker[i].spu_args->log.base = 0;
257     d_worker[i].spu_args->log.nentries = 0;
258     d_worker[i].state = WS_INIT;
259
260     int r = spe_program_load(d_worker[i].spe_ctx, spe_image);
261     if (r != 0){
262       perror("spe_program_load");
263       throw std::runtime_error("spe_program_load");
264     }
265   }
266
267   setup_logfiles();
268
269   // ----------------------------------------------------------------
270   // initalize the free list of job descriptors
271   
272   d_free_list = (gc_jd_stack_t *) aligned_alloc(sizeof(gc_jd_stack_t));
273   // This ensures that the memory associated with d_free_list is
274   // automatically freed in the destructor or if an exception occurs
275   // here in the constructor.
276   _d_free_list_boost =
277     boost::shared_ptr<void>((void *) d_free_list, free_deleter());
278   gc_jd_stack_init(d_free_list);
279
280   if (debug()){
281     printf("sizeof(d_jd[0]) = %d (0x%x)\n", sizeof(d_jd[0]), sizeof(d_jd[0]));
282     printf("max_jobs = %u\n", d_options.max_jobs);
283   }
284
285   // Initialize the array of job descriptors.
286   d_jd = (gc_job_desc_t *) aligned_alloc(sizeof(d_jd[0]) * d_options.max_jobs);
287   _d_jd_boost = boost::shared_ptr<void>((void *) d_jd, free_deleter());
288
289
290   // set unique job_id
291   for (int i = 0; i < (int) d_options.max_jobs; i++)
292     d_jd[i].sys.job_id = i;
293
294   // push them onto the free list
295   for (int i = d_options.max_jobs - 1; i >= 0; i--)
296     free_job_desc(&d_jd[i]);
297
298   // ----------------------------------------------------------------
299   // initialize d_client_thread
300
301   {
302     gc_client_thread_info_sa cti(
303          new gc_client_thread_info[d_options.max_client_threads]);
304
305     d_client_thread.swap(cti);
306
307     for (unsigned int i = 0; i < d_options.max_client_threads; i++)
308       d_client_thread[i].d_client_id = i;
309   }
310
311   // ----------------------------------------------------------------
312   // initialize bitvectors
313
314   // initialize d_bvlen, the number of longs in job related bitvectors.
315   int bits_per_long = sizeof(unsigned long) * 8;
316   d_bvlen = (d_options.max_jobs + bits_per_long - 1) / bits_per_long;
317
318   // allocate all bitvectors in a single cache-aligned chunk
319   size_t nlongs = d_bvlen * d_options.max_client_threads;
320   void *p = aligned_alloc(nlongs * sizeof(unsigned long));
321   _d_all_bitvectors = boost::shared_ptr<void>(p, free_deleter());
322
323   // Now point the gc_client_thread_info bitvectors into this storage
324   unsigned long *v = (unsigned long *) p;
325
326   for (unsigned int i = 0; i < d_options.max_client_threads; i++, v += d_bvlen)
327     d_client_thread[i].d_jobs_done = v;
328
329
330   // ----------------------------------------------------------------
331   // create the spe event handler & worker (SPE) threads
332
333   create_event_handler();
334
335 }
336
337 ////////////////////////////////////////////////////////////////////////
338
339 gc_job_manager_impl::~gc_job_manager_impl()
340 {
341   shutdown();
342
343   d_jd = 0;             // handled via _d_jd_boost
344   d_free_list = 0;      // handled via _d_free_list_boost
345   d_queue = 0;          // handled via _d_queue_boost
346
347   // clear cti, since we've deleted the underlying data
348   pthread_setspecific(s_client_key, 0);
349
350   unmap_logfiles();
351 }
352
353 bool
354 gc_job_manager_impl::shutdown()
355 {
356   omni_mutex_lock       l(d_eh_mutex);
357
358   d_shutdown_requested = true;          // set flag for event handler thread
359
360   // should only happens during early QA code
361   if (d_eh_thread == 0 && d_eh_state == EHS_INIT)
362     return false;
363
364   while (d_eh_state != EHS_DEAD)        // wait for it to finish
365     d_eh_cond.wait();
366
367   return true;
368 }
369
370 int
371 gc_job_manager_impl::nspes() const
372 {
373   return d_options.nspes;
374 }
375
376 ////////////////////////////////////////////////////////////////////////
377
378 void
379 gc_job_manager_impl::bv_zero(unsigned long *bv)
380 {
381   memset(bv, 0, sizeof(unsigned long) * d_bvlen);
382 }
383
384 inline void
385 gc_job_manager_impl::bv_clr(unsigned long *bv, unsigned int bitno)
386 {
387   unsigned int wi = bitno / (sizeof (unsigned long) * 8);
388   unsigned int bi = bitno & ((sizeof (unsigned long) * 8) - 1);
389   bv[wi] &= ~(1UL << bi);
390 }
391
392 inline void
393 gc_job_manager_impl::bv_set(unsigned long *bv, unsigned int bitno)
394 {
395   unsigned int wi = bitno / (sizeof (unsigned long) * 8);
396   unsigned int bi = bitno & ((sizeof (unsigned long) * 8) - 1);
397   bv[wi] |= (1UL << bi);
398 }
399
400 inline bool
401 gc_job_manager_impl::bv_isset(unsigned long *bv, unsigned int bitno)
402 {
403   unsigned int wi = bitno / (sizeof (unsigned long) * 8);
404   unsigned int bi = bitno & ((sizeof (unsigned long) * 8) - 1);
405   return (bv[wi] & (1UL << bi)) != 0;
406 }
407
408 inline bool
409 gc_job_manager_impl::bv_isclr(unsigned long *bv, unsigned int bitno)
410 {
411   unsigned int wi = bitno / (sizeof (unsigned long) * 8);
412   unsigned int bi = bitno & ((sizeof (unsigned long) * 8) - 1);
413   return (bv[wi] & (1UL << bi)) == 0;
414 }
415
416 ////////////////////////////////////////////////////////////////////////
417
418 gc_job_desc *
419 gc_job_manager_impl::alloc_job_desc()
420 {
421   // stack is lock free, and safe to call from any thread
422   gc_job_desc *jd = gc_jd_stack_pop(d_free_list);
423   if (jd == 0)
424     throw gc_bad_alloc("alloc_job_desc: none available");
425
426   return jd;
427 }
428
429 void
430 gc_job_manager_impl::free_job_desc(gc_job_desc *jd)
431 {
432   // stack is lock free, thus safe to call from any thread
433   if (jd != 0)
434     gc_jd_stack_push(d_free_list, jd);
435 }
436
437 ////////////////////////////////////////////////////////////////////////
438
439 /*
440  * We check as much as we can here on the PPE side, so that the SPE
441  * doesn't have to.
442  */
443 static bool
444 check_direct_args(gc_job_desc *jd, gc_job_direct_args *args)
445 {
446   if (args->nargs > MAX_ARGS_DIRECT){
447     jd->status = JS_BAD_N_DIRECT;
448     return false;
449   }
450
451   return true;
452 }
453
454 static bool
455 check_ea_args(gc_job_desc *jd, gc_job_ea_args *p)
456 {
457   if (p->nargs > MAX_ARGS_EA){
458     jd->status = JS_BAD_N_EA;
459     return false;
460   }
461
462   uint32_t dir_union = 0;
463
464   for (unsigned int i = 0; i < p->nargs; i++){
465     dir_union |= p->arg[i].direction;
466     switch(p->arg[i].direction){
467     case GCJD_DMA_GET:
468     case GCJD_DMA_PUT:
469       break;
470
471     default:
472       jd->status = JS_BAD_DIRECTION;
473       return false;
474     }
475   }
476
477   if (p->nargs > 1){
478     unsigned int common_eah = (p->arg[0].ea_addr) >> 32;
479     for (unsigned int i = 1; i < p->nargs; i++){
480       if ((p->arg[i].ea_addr >> 32) != common_eah){
481         jd->status = JS_BAD_EAH;
482         return false;
483       }
484     }
485   }
486
487   jd->sys.direction_union = dir_union;
488   return true;
489 }
490
491 bool
492 gc_job_manager_impl::submit_job(gc_job_desc *jd)
493 {
494   if (unlikely(d_shutdown_requested)){
495     jd->status = JS_SHUTTING_DOWN;
496     return false;
497   }
498
499   // Ensure it's one of our job descriptors
500
501   if (jd < d_jd || jd >= &d_jd[d_options.max_jobs]){
502     jd->status = JS_BAD_JOB_DESC;
503     return false;
504   }
505
506   // Ensure we've got a client_thread_info assigned to this thread.
507   
508   gc_client_thread_info *cti =
509     (gc_client_thread_info *) pthread_getspecific(s_client_key);
510   if (unlikely(cti == 0)){
511     if ((cti = alloc_cti()) == 0){
512       fprintf(stderr, "gc_job_manager_impl::submit_job: Too many client threads.\n");
513       jd->status = JS_TOO_MANY_CLIENTS;
514       return false;
515     }
516     int r = pthread_setspecific(s_client_key, cti);
517     if (r != 0){
518       jd->status = JS_BAD_JUJU;
519       fprintf(stderr, "pthread_setspecific failed (return = %d)\n", r);
520       return false;
521     }
522   }
523
524   if (jd->proc_id == GCP_UNKNOWN_PROC){
525     jd->status = JS_UNKNOWN_PROC;
526     return false;
527   }
528
529   if (!check_direct_args(jd, &jd->input))
530     return false;
531
532   if (!check_direct_args(jd, &jd->output))
533     return false;
534
535   if (!check_ea_args(jd, &jd->eaa))
536     return false;
537
538   jd->status = JS_OK;
539   jd->sys.client_id = cti->d_client_id;
540
541   // FIXME keep count of jobs in progress?
542   
543   gc_jd_queue_enqueue(d_queue, jd);
544   return true;
545 }
546
547 bool
548 gc_job_manager_impl::wait_job(gc_job_desc *jd)
549 {
550   bool done;
551   return wait_jobs(1, &jd, &done, GC_WAIT_ANY) == 1 && jd->status == JS_OK;
552 }
553
554 int
555 gc_job_manager_impl::wait_jobs(unsigned int njobs,
556                                gc_job_desc *jd[],
557                                bool done[],
558                                gc_wait_mode mode)
559 {
560   unsigned int i;
561
562   gc_client_thread_info *cti =
563     (gc_client_thread_info *) pthread_getspecific(s_client_key);
564   if (unlikely(cti == 0))
565     return -1;
566
567   for (i = 0; i < njobs; i++){
568     done[i] = false;
569     if (unlikely(jd[i]->sys.client_id != cti->d_client_id)){
570       fprintf(stderr, "gc_job_manager_impl::wait_jobs: can't wait for a job you didn't submit\n");
571       return -1;
572     }
573   }
574
575   {
576     omni_mutex_lock     l(cti->d_mutex);
577
578     // setup info for event handler
579     cti->d_state = (mode == GC_WAIT_ANY) ? CT_WAIT_ANY : CT_WAIT_ALL;
580     cti->d_njobs_waiting_for = njobs;
581     cti->d_jobs_waiting_for = jd;
582     assert(cti->d_jobs_done != 0);
583
584     unsigned int ndone = 0;
585
586     // wait for jobs to complete
587     
588     while (1){
589       ndone = 0;
590       for (i= 0; i < njobs; i++){
591         if (done[i])
592           ndone++;
593         else if (bv_isset(cti->d_jobs_done, jd[i]->sys.job_id)){
594           bv_clr(cti->d_jobs_done, jd[i]->sys.job_id);
595           done[i] = true;
596           ndone++;
597         }
598       }
599
600       if (mode == GC_WAIT_ANY && ndone > 0)
601         break;
602
603       if (mode == GC_WAIT_ALL && ndone == njobs)
604         break;
605
606       // FIXME what happens when somebody calls shutdown?
607
608       cti->d_cond.wait();       // wait for event handler to wake us up
609     }
610
611     cti->d_state = CT_NOT_WAITING;  
612     cti->d_njobs_waiting_for = 0;       // tidy up (not reqd)
613     cti->d_jobs_waiting_for = 0;        // tidy up (not reqd)
614     return ndone;
615   }
616 }
617
618 ////////////////////////////////////////////////////////////////////////
619
620 bool
621 gc_job_manager_impl::send_all_spes(uint32_t msg)
622 {
623   bool ok = true;
624
625   for (unsigned int i = 0; i < d_options.nspes; i++)
626     ok &= send_spe(i, msg);
627
628   return ok;
629 }
630
631 bool
632 gc_job_manager_impl::send_spe(unsigned int spe, uint32_t msg)
633 {
634   if (spe >= d_options.nspes)
635     return false;
636
637   int r = spe_in_mbox_write(d_worker[spe].spe_ctx, &msg, 1,
638                             SPE_MBOX_ALL_BLOCKING);
639   if (r < 0){
640     perror("spe_in_mbox_write");
641     return false;
642   }
643
644   return r == 1;
645 }
646
647 ////////////////////////////////////////////////////////////////////////
648
649 static void
650 pthread_create_failure_msg(int r, const char *which)
651 {
652   char buf[256];
653   char *s = 0;
654
655   switch (r){
656   case EAGAIN: s = "EAGAIN"; break;
657   case EINVAL: s = "EINVAL"; break;
658   case EPERM:  s = "EPERM";  break;
659   default:
660     snprintf(buf, sizeof(buf), "Unknown error %d", r);
661     s = buf;
662     break;
663   }
664   fprintf(stderr, "pthread_create[%s] failed: %s\n", which, s);
665 }
666
667
668 static bool
669 start_thread(pthread_t *thread,
670              void *(*start_routine)(void *),  void *arg,
671              const char *msg)
672 {
673   pthread_attr_t attr;
674   pthread_attr_init(&attr);
675   pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
676
677   // FIXME save sigprocmask
678   // FIXME set sigprocmask
679
680   int r = pthread_create(thread, &attr, start_routine, arg);
681     
682   // FIXME restore sigprocmask
683
684   if (r != 0){
685     pthread_create_failure_msg(r, msg);
686     return false;
687   }
688   return true;
689 }
690
691
692 ////////////////////////////////////////////////////////////////////////
693
694 static void *start_worker(void *arg);
695
696 static void *
697 start_event_handler(void *arg)
698 {
699   gc_job_manager_impl *p = (gc_job_manager_impl *) arg;
700   p->event_handler_loop();
701   return 0;
702 }
703
704 void
705 gc_job_manager_impl::create_event_handler()
706 {
707   // create the SPE event handler and register our interest in events
708
709   d_spe_event_handler.ptr = spe_event_handler_create();
710   if (d_spe_event_handler.ptr == 0){
711     perror("spe_event_handler_create");
712     throw std::runtime_error("spe_event_handler_create");
713   }
714
715   for (unsigned int i = 0; i < d_options.nspes; i++){
716     spe_event_unit_t    eu;
717     memset(&eu, 0, sizeof(eu));
718     eu.events = SPE_EVENT_OUT_INTR_MBOX | SPE_EVENT_SPE_STOPPED;
719     eu.spe = d_worker[i].spe_ctx;
720     eu.data.u32 = i;    // set in events returned by spe_event_wait
721
722     if (spe_event_handler_register(d_spe_event_handler.ptr, &eu) != 0){
723       perror("spe_event_handler_register");
724       throw std::runtime_error("spe_event_handler_register");
725     }
726   }
727
728   // create our event handling thread
729
730   if (!start_thread(&d_eh_thread, start_event_handler, this, "event_handler")){
731     throw std::runtime_error("pthread_create");
732   }
733
734   // create the SPE worker threads
735
736   bool ok = true;
737   for (unsigned int i = 0; ok && i < d_options.nspes; i++){
738     char name[256];
739     snprintf(name, sizeof(name), "worker[%d]", i);
740     ok &= start_thread(&d_worker[i].thread, start_worker,
741                        &d_worker[i], name);
742   }
743
744   if (!ok){
745     //
746     // FIXME Clean up the mess.  Need to terminate event handler and all workers.
747     //
748     // this should cause the workers to exit, unless they're seriously broken
749     send_all_spes(MK_MBOX_MSG(OP_EXIT, 0));
750
751     shutdown();
752
753     throw std::runtime_error("pthread_create");
754   }
755 }
756
757 ////////////////////////////////////////////////////////////////////////
758
759 void
760 gc_job_manager_impl::set_eh_state(evt_handler_state s)
761 {
762   omni_mutex_lock       l(d_eh_mutex);
763   d_eh_state = s;
764   d_eh_cond.broadcast();
765 }
766
767 void
768 gc_job_manager_impl::set_ea_args_maxsize(int maxsize)
769 {
770   omni_mutex_lock       l(d_eh_mutex);
771   d_ea_args_maxsize = maxsize;
772   d_eh_cond.broadcast();
773 }
774
775 void
776 gc_job_manager_impl::print_event(spe_event_unit_t *evt)
777 {
778   printf("evt: spe = %d events = (0x%x)", evt->data.u32, evt->events);
779
780   if (evt->events & SPE_EVENT_OUT_INTR_MBOX)
781     printf(" OUT_INTR_MBOX");
782   
783   if (evt->events & SPE_EVENT_IN_MBOX)
784     printf(" IN_MBOX");
785   
786   if (evt->events & SPE_EVENT_TAG_GROUP)
787     printf(" TAG_GROUP");
788   
789   if (evt->events & SPE_EVENT_SPE_STOPPED)
790     printf(" SPE_STOPPED");
791
792   printf("\n");
793 }
794
795 struct job_client_info {
796   uint16_t      job_id;
797   uint16_t      client_id;
798 };
799
800 static int
801 compare_jci_clients(const void *va, const void *vb)
802 {
803   const job_client_info *a = (job_client_info *) va;
804   const job_client_info *b = (job_client_info *) vb;
805
806   return a->client_id - b->client_id;
807 }
808
809 void
810 gc_job_manager_impl::notify_clients_jobs_are_done(unsigned int spe_num,
811                                                   unsigned int completion_info_idx)
812 {
813   const char *msg = "gc_job_manager_impl::notify_client_job_is_done (INTERNAL ERROR)";
814
815   smp_rmb();  // order reads so we know that data sent from SPE is here
816
817   gc_comp_info_t *ci = &d_comp_info[2 * spe_num + (completion_info_idx & 0x1)];
818
819   if (ci->ncomplete == 0){      // never happens, but ensures code below is correct
820     ci->in_use = 0;
821     return;
822   }
823
824   if (0){
825     static int total_jobs;
826     static int total_msgs;
827     total_msgs++;
828     total_jobs += ci->ncomplete;
829     printf("ppe:     tj = %6d  tm = %6d\n", total_jobs, total_msgs);
830   }
831
832   job_client_info gci[GC_CI_NJOBS];
833
834   /*
835    * Make one pass through and sanity check everything while filling in gci
836    */
837   for (unsigned int i = 0; i < ci->ncomplete; i++){
838     unsigned int job_id = ci->job_id[i];
839
840     if (job_id >= d_options.max_jobs){
841       // internal error, shouldn't happen
842       fprintf(stderr,"%s: invalid job_id = %d\n", msg, job_id);
843       ci->in_use = 0;           // clear flag so SPE knows we're done with it
844       return;
845     }
846     gc_job_desc *jd = &d_jd[job_id];
847
848     if (jd->sys.client_id >= d_options.max_client_threads){
849       // internal error, shouldn't happen
850       fprintf(stderr, "%s: invalid client_id = %d\n", msg, jd->sys.client_id);
851       ci->in_use = 0;           // clear flag so SPE knows we're done with it
852       return;
853     }
854
855     gci[i].job_id = job_id;
856     gci[i].client_id = jd->sys.client_id;
857   }
858
859   // sort by client_id so we only have to lock & signal once / client
860
861   if (ci->ncomplete > 1)
862     qsort(gci, ci->ncomplete, sizeof(gci[0]), compare_jci_clients);
863
864   // "wind-in" 
865
866   gc_client_thread_info *last_cti = &d_client_thread[gci[0].client_id];
867   last_cti->d_mutex.lock();
868   bv_set(last_cti->d_jobs_done, gci[0].job_id);  // mark job done
869
870   for (unsigned int i = 1; i < ci->ncomplete; i++){
871
872     gc_client_thread_info *cti = &d_client_thread[gci[i].client_id];
873
874     if (cti != last_cti){       // new client?
875
876       // yes.  signal old client, unlock old, lock new
877
878       // FIXME we could distinguish between CT_WAIT_ALL & CT_WAIT_ANY
879
880       if (last_cti->d_state == CT_WAIT_ANY || last_cti->d_state == CT_WAIT_ALL)
881         last_cti->d_cond.signal();      // wake client thread up
882
883       last_cti->d_mutex.unlock();
884       cti->d_mutex.lock();
885       last_cti = cti;
886     }
887
888     // mark job done
889     bv_set(cti->d_jobs_done, gci[i].job_id);
890   }
891
892   // "wind-out"
893
894   if (last_cti->d_state == CT_WAIT_ANY || last_cti->d_state == CT_WAIT_ALL)
895     last_cti->d_cond.signal();  // wake client thread up
896   last_cti->d_mutex.unlock();
897
898   ci->in_use = 0;               // clear flag so SPE knows we're done with it
899 }
900
901 void
902 gc_job_manager_impl::handle_event(spe_event_unit_t *evt)
903 {
904   // print_event(evt);
905
906   int spe_num = evt->data.u32;
907
908   // only a single event type can be signaled at a time
909   
910   if (evt->events == SPE_EVENT_OUT_INTR_MBOX) { // SPE sent us 1 or more msgs
911     static const int NMSGS = 32;
912     unsigned int msg[NMSGS];
913     int n = spe_out_intr_mbox_read(evt->spe, msg, NMSGS, SPE_MBOX_ANY_BLOCKING);
914     // printf("spe_out_intr_mbox_read = %d\n", n);
915     if (n < 0){
916       perror("spe_out_intr_mbox_read");
917     }
918     else {
919       for (int i = 0; i < n; i++){
920         switch(MBOX_MSG_OP(msg[i])){
921         case OP_JOBS_DONE:
922           if (debug())
923             printf("eh: job_done (0x%08x) from spu[%d]\n", msg[i], spe_num);
924           notify_clients_jobs_are_done(spe_num, MBOX_MSG_ARG(msg[i]));
925           break;
926
927         case OP_SPU_BUFSIZE:
928           set_ea_args_maxsize(MBOX_MSG_ARG(msg[i]));
929           break;
930
931         case OP_EXIT:
932         default:
933           printf("eh: Unexpected msg (0x%08x) from spu[%d]\n", msg[i], spe_num);
934           break;
935         }
936       }
937     }
938   }
939   else if (evt->events == SPE_EVENT_SPE_STOPPED){ // the SPE stopped
940     spe_stop_info_t si;
941     int r = spe_stop_info_read(evt->spe, &si);
942     if (r < 0){
943       perror("spe_stop_info_read");
944     }
945     else {
946       switch (si.stop_reason){
947       case SPE_EXIT:
948         if (debug()){
949           printf("eh: spu[%d] SPE_EXIT w/ exit_code = %d\n",
950                  spe_num, si.result.spe_exit_code);
951         }
952         break;
953       case SPE_STOP_AND_SIGNAL:
954         printf("eh: spu[%d] SPE_STOP_AND_SIGNAL w/ spe_signal_code = 0x%x\n",
955                spe_num, si.result.spe_signal_code);
956         break;
957       case SPE_RUNTIME_ERROR:
958         printf("eh: spu[%d] SPE_RUNTIME_ERROR w/ spe_runtime_error = 0x%x\n",
959                spe_num, si.result.spe_runtime_error);
960         break;
961       case SPE_RUNTIME_EXCEPTION:
962         printf("eh: spu[%d] SPE_RUNTIME_EXCEPTION w/ spe_runtime_exception = 0x%x\n",
963                spe_num, si.result.spe_runtime_exception);
964         break;
965       case SPE_RUNTIME_FATAL:
966         printf("eh: spu[%d] SPE_RUNTIME_FATAL w/ spe_runtime_fatal = 0x%x\n",
967                spe_num, si.result.spe_runtime_fatal);
968         break;
969       case SPE_CALLBACK_ERROR:
970         printf("eh: spu[%d] SPE_CALLBACK_ERROR w/ spe_callback_error = 0x%x\n",
971                spe_num, si.result.spe_callback_error);
972         break;
973       case SPE_ISOLATION_ERROR:
974         printf("eh: spu[%d] SPE_ISOLATION_ERROR w/ spe_isolation_error = 0x%x\n",
975                spe_num, si.result.spe_isolation_error);
976         break;
977       default:
978         printf("eh: spu[%d] UNKNOWN STOP REASON (%d) w/ spu_status = 0x%x\n",
979                spe_num, si.stop_reason, si.spu_status);
980         break;
981       }
982     }
983   }
984 #if 0 // not enabled
985   else if (evt->events == SPE_EVENT_IN_MBOX){    // there's room to write to SPE
986     // spe_in_mbox_write (ignore)
987   }
988   else if (evt->events == SPE_EVENT_TAG_GROUP){  // our DMA completed
989     // spe_mfcio_tag_status_read
990   }
991 #endif
992   else {
993     fprintf(stderr, "handle_event: unexpected evt->events = 0x%x\n", evt->events);
994     return;
995   }
996 }
997
998 //
999 // This is the "main program" of the event handling thread
1000 //
1001 void
1002 gc_job_manager_impl::event_handler_loop()
1003 {
1004   static const int MAX_EVENTS = 16;
1005   static const int TIMEOUT = 20;        // how long to block in milliseconds
1006
1007   spe_event_unit_t events[MAX_EVENTS];
1008
1009   if (d_debug)
1010     printf("event_handler_loop: starting\n");
1011
1012   set_eh_state(EHS_RUNNING);
1013
1014   // ask the first spe for its max bufsize
1015   send_spe(0, MK_MBOX_MSG(OP_GET_SPU_BUFSIZE, 0));
1016
1017   while (1){
1018     switch(d_eh_state){
1019
1020     case EHS_RUNNING:      // normal stuff
1021       if (d_shutdown_requested) {
1022         set_eh_state(EHS_SHUTTING_DOWN);
1023       }
1024       break;
1025
1026     case EHS_SHUTTING_DOWN:
1027
1028       // FIXME wait until job queue is empty, then tell them to exit
1029
1030       send_all_spes(MK_MBOX_MSG(OP_EXIT, 0));
1031       set_eh_state(EHS_WAITING_FOR_WORKERS_TO_DIE);
1032       break;
1033
1034     case EHS_WAITING_FOR_WORKERS_TO_DIE:
1035       {
1036         bool all_dead = true;
1037         for (unsigned int i = 0; i < d_options.nspes; i++)
1038           all_dead &= d_worker[i].state == WS_DEAD;
1039
1040         if (all_dead){
1041           set_eh_state(EHS_DEAD);
1042           if (d_debug)
1043             printf("event_handler_loop: exiting\n");
1044           return;
1045         }
1046       }
1047       break;
1048
1049     default:
1050       set_eh_state(EHS_DEAD);
1051       printf("event_handler_loop(default): exiting\n");
1052       return;
1053     }
1054
1055     // block waiting for events...
1056     int nevents = spe_event_wait(d_spe_event_handler.ptr,
1057                                  events, MAX_EVENTS, TIMEOUT);
1058     if (nevents < 0){
1059       perror("spe_wait_event");
1060       // FIXME bail?
1061     }
1062     for (int i = 0; i < nevents; i++){
1063       handle_event(&events[i]);
1064     }
1065   }
1066 }
1067
1068 ////////////////////////////////////////////////////////////////////////
1069 // This is the top of the SPE worker threads
1070
1071 static void *
1072 start_worker(void *arg)
1073 {
1074   worker_ctx *w = (worker_ctx *) arg;
1075   spe_stop_info_t       si;
1076
1077   w->state = WS_RUNNING;
1078   if (s_worker_debug)
1079     printf("worker[%d]: WS_RUNNING\n", w->spe_idx);
1080
1081   unsigned int entry = SPE_DEFAULT_ENTRY;
1082   int r = spe_context_run(w->spe_ctx,  &entry, 0, w->spu_args, 0, &si);
1083
1084   if (r < 0){                   // error
1085     char buf[64];
1086     snprintf(buf, sizeof(buf), "worker[%d]: spe_context_run", w->spe_idx);
1087     perror(buf);
1088   }
1089   else if (r == 0){
1090     // spe program called exit.
1091     if (s_worker_debug)
1092       printf("worker[%d]: SPE_EXIT w/ exit_code = %d\n",
1093              w->spe_idx, si.result.spe_exit_code);
1094   }
1095   else {
1096     // called stop_and_signal
1097     //
1098     // I'm not sure we'll ever get here.  I think the event
1099     // handler will catch this...
1100     printf("worker[%d]: SPE_STOP_AND_SIGNAL w/ spe_signal_code = 0x%x\n",
1101            w->spe_idx, si.result.spe_signal_code);
1102   }
1103
1104   // in any event, we're committing suicide now ;)
1105   if (s_worker_debug)
1106     printf("worker[%d]: WS_DEAD\n", w->spe_idx);
1107
1108   w->state = WS_DEAD;
1109   return 0;
1110 }
1111
1112 ////////////////////////////////////////////////////////////////////////
1113
1114 gc_client_thread_info *
1115 gc_job_manager_impl::alloc_cti()
1116 {
1117   for (unsigned int i = 0; i < d_options.max_client_threads; i++){
1118     if (d_client_thread[i].d_free){
1119       // try to atomically grab it
1120       if (_atomic_dec_if_positive(ptr_to_ea(&d_client_thread[i].d_free)) == 0){
1121         // got it...
1122         gc_client_thread_info *cti = &d_client_thread[i];
1123         cti->d_state = CT_NOT_WAITING;
1124         bv_zero(cti->d_jobs_done);
1125         cti->d_njobs_waiting_for = 0;
1126         cti->d_jobs_waiting_for = 0;
1127         
1128         return cti;
1129       }
1130     }
1131   }
1132   return 0;
1133 }
1134
1135 void
1136 gc_job_manager_impl::free_cti(gc_client_thread_info *cti)
1137 {
1138   assert((size_t) (cti - d_client_thread.get()) < d_options.max_client_threads);
1139   cti->d_free = 1;
1140 }
1141
1142 int
1143 gc_job_manager_impl::ea_args_maxsize()
1144 {
1145   omni_mutex_lock       l(d_eh_mutex);
1146
1147   while (d_ea_args_maxsize == 0)        // wait for it to be initialized
1148     d_eh_cond.wait();
1149
1150   return d_ea_args_maxsize;
1151 }
1152
1153 void
1154 gc_job_manager_impl::set_debug(int debug)
1155 {
1156   d_debug = debug;
1157   s_worker_debug = debug;
1158 }
1159
1160 int
1161 gc_job_manager_impl::debug()
1162 {
1163   return d_debug;
1164 }
1165
1166 ////////////////////////////////////////////////////////////////////////
1167
1168 void
1169 gc_job_manager_impl::setup_logfiles()
1170 {
1171   if (!d_options.enable_logging)
1172     return;
1173
1174   if (d_options.log2_nlog_entries == 0)
1175     d_options.log2_nlog_entries = 12;
1176
1177   // must end up a multiple of the page size
1178
1179   size_t pagesize = getpagesize();
1180   size_t s = (1 << d_options.log2_nlog_entries) * sizeof(gc_log_entry_t);
1181   s = ((s + pagesize - 1) / pagesize) * pagesize;
1182   size_t nentries = s / sizeof(gc_log_entry_t);
1183   assert(is_power_of_2(nentries));
1184
1185   for (unsigned int i = 0; i < d_options.nspes; i++){
1186     char filename[100];
1187     snprintf(filename, sizeof(filename), "spu_log.%02d", i);
1188     int fd = open(filename, O_CREAT|O_TRUNC|O_RDWR, 0664);
1189     if (fd == -1){
1190       perror(filename);
1191       return;
1192     }
1193     lseek(fd, s - 1, SEEK_SET);
1194     write(fd, "\0", 1);
1195     void *p = mmap(0, s, PROT_READ|PROT_WRITE, MAP_SHARED, fd, 0);
1196     if (p == MAP_FAILED){
1197       perror("gc_job_manager_impl::setup_logfiles: mmap");
1198       close(fd);
1199       return;
1200     }
1201     close(fd);
1202     memset(p, 0, s);
1203     d_spu_args[i].log.base = ptr_to_ea(p);
1204     d_spu_args[i].log.nentries = nentries;
1205   }
1206 }
1207
1208 void
1209 gc_job_manager_impl::sync_logfiles()
1210 {
1211   for (unsigned int i = 0; i < d_options.nspes; i++){
1212     if (d_spu_args[i].log.base)
1213       msync(ea_to_ptr(d_spu_args[i].log.base),
1214             d_spu_args[i].log.nentries * sizeof(gc_log_entry_t),
1215             MS_ASYNC);
1216   }
1217 }
1218
1219 void
1220 gc_job_manager_impl::unmap_logfiles()
1221 {
1222   for (unsigned int i = 0; i < d_options.nspes; i++){
1223     if (d_spu_args[i].log.base)
1224       munmap(ea_to_ptr(d_spu_args[i].log.base),
1225              d_spu_args[i].log.nentries * sizeof(gc_log_entry_t));
1226   }
1227 }
1228
1229 ////////////////////////////////////////////////////////////////////////
1230 //
1231 // lookup proc names in d_proc_def table
1232
1233 gc_proc_id_t 
1234 gc_job_manager_impl::lookup_proc(const std::string &proc_name)
1235 {
1236   for (int i = 0; i < d_nproc_defs; i++)
1237     if (proc_name == d_proc_def[i].name)
1238       return i;
1239
1240   throw gc_unknown_proc(proc_name);
1241 }
1242
1243 std::vector<std::string>
1244 gc_job_manager_impl::proc_names()
1245 {
1246   std::vector<std::string> r;
1247   for (int i = 0; i < d_nproc_defs; i++)
1248     r.push_back(d_proc_def[i].name);
1249
1250   return r;
1251 }
1252
1253 ////////////////////////////////////////////////////////////////////////
1254
1255 worker_ctx::~worker_ctx()
1256 {
1257   if (spe_ctx){
1258     int r = spe_context_destroy(spe_ctx);
1259     if (r != 0){
1260       perror("spe_context_destroy");
1261     }
1262     spe_ctx = 0;
1263   }
1264   state = WS_FREE;
1265 }