dd08154d0db272e259d03ac32b28192e62244a3d
[debian/gnuradio] / gcell / src / lib / runtime / gc_job_manager_impl.cc
1 /* -*- c++ -*- */
2 /*
3  * Copyright 2007 Free Software Foundation, Inc.
4  * 
5  * This file is part of GNU Radio
6  * 
7  * GNU Radio is free software; you can redistribute it and/or modify
8  * it under the terms of the GNU General Public License as published by
9  * the Free Software Foundation; either version 3, or (at your option)
10  * any later version.
11  * 
12  * GNU Radio is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  * GNU General Public License for more details.
16  * 
17  * You should have received a copy of the GNU General Public License along
18  * with this program; if not, write to the Free Software Foundation, Inc.,
19  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
20  */
21
22 #ifdef HAVE_CONFIG_H
23 #include <config.h>
24 #endif
25 #include <gc_job_manager_impl.h>
26 #include <gc_mbox.h>
27 #include <gc_proc_def_utils.h>
28
29 #include <stdio.h>
30 #include <stdexcept>
31 #include <stdlib.h>
32 #include <atomic_dec_if_positive.h>
33 #include <memory_barrier.h>
34 #include <unistd.h>
35 #include <sys/mman.h>
36 #include <sys/types.h>
37 #include <sys/stat.h>
38 #include <fcntl.h>
39
40
41 static const size_t CACHE_LINE_SIZE = 128;
42
43 static const unsigned int DEFAULT_MAX_JOBS = 128;
44 static const unsigned int DEFAULT_MAX_CLIENT_THREADS = 64;
45
46 // FIXME this really depends on the SPU code...
47 static const unsigned int MAX_TOTAL_INDIRECT_LENGTH = 16 * 1024;
48
49
50 static bool          s_key_initialized = false;
51 static pthread_key_t s_client_key;
52
53 static int s_worker_debug = 0;
54
55 // custom deleter of gang_contexts for use with boost::shared_ptr
56 class gang_deleter {
57 public:
58   void operator()(spe_gang_context_ptr_t ctx) {
59     if (ctx){
60       int r = spe_gang_context_destroy(ctx);
61       if (r != 0){
62         perror("spe_gang_context_destroy");
63       }
64     }
65   }
66 };
67
68 // custom deleter
69 class spe_program_handle_deleter {
70 public:
71   void operator()(spe_program_handle_t *program) {
72     if (program){
73       int r = spe_image_close(program);
74       if (r != 0){
75         perror("spe_image_close");
76       }
77     }
78   }
79 };
80
81
82 // custom deleter of anything that can be freed with "free"
83 class free_deleter {
84 public:
85   void operator()(void *p) {
86     free(p);
87   }
88 };
89
90
91 /*
92  * Called when client thread is destroyed.
93  * We mark our client info free.
94  */
95 static void
96 client_key_destructor(void *p)
97 {
98   ((gc_client_thread_info *) p)->d_free = 1;
99 }
100
101 /*
102  * Return pointer to cache-aligned chunk of storage of size size bytes.
103  * Throw if can't allocate memory.  The storage should be freed
104  * with "free" when done.  The memory is initialized to zero.
105  */
106 static void *
107 aligned_alloc(size_t size, size_t alignment = CACHE_LINE_SIZE)
108 {
109   void *p = 0;
110   if (posix_memalign(&p, alignment, size) != 0){
111     perror("posix_memalign");
112     throw std::runtime_error("memory");
113   }
114   memset(p, 0, size);           // zero the memory
115   return p;
116 }
117
118 static bool
119 is_power_of_2(uint32_t x)
120 {
121   return (x != 0) && !(x & (x - 1));
122 }
123
124 ////////////////////////////////////////////////////////////////////////
125
126
127 gc_job_manager_impl::gc_job_manager_impl(const gc_jm_options *options)
128   : d_debug(0), d_spu_args(0),
129     d_eh_cond(&d_eh_mutex), d_eh_thread(0), d_eh_state(EHS_INIT),
130     d_shutdown_requested(false),
131     d_client_thread(0), d_ea_args_maxsize(0),
132     d_proc_def(0), d_proc_def_ls_addr(0), d_nproc_defs(0)
133 {
134   if (!s_key_initialized){
135     int r = pthread_key_create(&s_client_key, client_key_destructor);
136     if (r != 0)
137       throw std::runtime_error("pthread_key_create");
138     s_key_initialized = true;
139   }
140
141   // ensure it's zero
142   pthread_setspecific(s_client_key, 0);
143
144   if (options != 0)
145     d_options = *options;
146
147   // provide the real default for those indicated with a zero
148   if (d_options.max_jobs == 0)
149     d_options.max_jobs = DEFAULT_MAX_JOBS;
150   if (d_options.max_client_threads == 0)
151     d_options.max_client_threads = DEFAULT_MAX_CLIENT_THREADS;
152
153   if (d_options.program_handle == 0){
154     fprintf(stderr, "gc_job_manager: options->program_handle must be non-zero\n");
155     throw std::runtime_error("gc_job_manager: options->program_handle must be non-zero");
156   }
157
158   int ncpu_nodes = spe_cpu_info_get(SPE_COUNT_PHYSICAL_CPU_NODES, -1);
159   int nusable_spes = spe_cpu_info_get(SPE_COUNT_USABLE_SPES, -1);
160
161   if (debug()){
162     printf("cpu_nodes = %d\n", ncpu_nodes);
163     for (int i = 0; i < ncpu_nodes; i++){
164       printf("node[%d].physical_spes = %2d\n", i,
165              spe_cpu_info_get(SPE_COUNT_PHYSICAL_SPES, i));
166       printf("node[%d].usable_spes   = %2d\n", i,
167              spe_cpu_info_get(SPE_COUNT_USABLE_SPES, i));
168     }
169   }
170
171   // clamp nspes
172   d_options.nspes = std::min(d_options.nspes, (unsigned int) MAX_SPES);
173   nusable_spes = std::min(nusable_spes, (int) MAX_SPES);
174
175   //
176   // sanity check requested number of spes.
177   //
178   if (d_options.nspes == 0)     // use all of them
179     d_options.nspes = nusable_spes;
180   else {
181     if (d_options.nspes > (unsigned int) nusable_spes){
182       fprintf(stderr,
183               "gc_job_manager: warning: caller requested %d spes.  There are only %d available.\n",
184               d_options.nspes, nusable_spes);
185       if (d_options.gang_schedule){
186         // If we're gang scheduling we'll never get scheduled if we
187         // ask for more than are available.
188         throw std::out_of_range("gang_scheduling: not enough spes available");
189       }
190       else {    // FIXME clamp to usable.  problem on PS3 when overcommited
191         fprintf(stderr, "gc_job_manager: clamping nspes to %d\n", nusable_spes);
192         d_options.nspes = nusable_spes;
193       }
194     }
195   }
196
197   if (d_options.use_affinity){
198     printf("gc_job_manager: warning: affinity request was ignored\n");
199   }
200
201   if (d_options.gang_schedule){
202     d_gang = spe_gang_context_sptr(spe_gang_context_create(0), gang_deleter());
203     if (!d_gang){
204       perror("gc_job_manager_impl[spe_gang_context_create]");
205       throw std::runtime_error("spe_gang_context_create");
206     }
207   }
208
209   // ----------------------------------------------------------------
210   // initalize the job queue
211   
212   d_queue = (gc_jd_queue_t *) aligned_alloc(sizeof(gc_jd_queue_t));
213   _d_queue_boost =
214     boost::shared_ptr<void>((void *) d_queue, free_deleter());
215   gc_jd_queue_init(d_queue);
216
217
218   // ----------------------------------------------------------------
219   // create the spe contexts
220
221   // 1 spu_arg struct for each SPE
222   assert(sizeof(gc_spu_args_t) % 16 == 0);
223   d_spu_args =
224     (gc_spu_args_t *) aligned_alloc(MAX_SPES * sizeof(gc_spu_args_t), 16);
225   _d_spu_args_boost =
226     boost::shared_ptr<void>((void *) d_spu_args, free_deleter());
227
228   // 2 completion info structs for each SPE (we double buffer them)
229   assert(sizeof(gc_comp_info_t) % CACHE_LINE_SIZE == 0);
230   d_comp_info =
231     (gc_comp_info_t *) aligned_alloc(2 * MAX_SPES * sizeof(gc_comp_info_t),
232                                      CACHE_LINE_SIZE);
233   _d_comp_info_boost =
234     boost::shared_ptr<void>((void *) d_comp_info, free_deleter());
235
236
237   // get a handle to the spe program
238
239   spe_program_handle_t *spe_image = d_options.program_handle;
240
241   // fish proc_def table out of SPE ELF file
242
243   if (!gcpd_find_table(spe_image, &d_proc_def, &d_nproc_defs, &d_proc_def_ls_addr)){
244     fprintf(stderr, "gc_job_manager_impl: couldn't find gc_proc_defs in SPE ELF file.\n");
245     throw std::runtime_error("no gc_proc_defs");
246   }
247   // fprintf(stderr, "d_proc_def_ls_addr = 0x%0x\n", d_proc_def_ls_addr);
248
249   int spe_flags = (SPE_EVENTS_ENABLE
250                    | SPE_CFG_SIGNOTIFY1_OR
251                    | SPE_CFG_SIGNOTIFY2_OR);
252   
253   for (unsigned int i = 0; i < d_options.nspes; i++){
254     // FIXME affinity stuff goes here
255     d_worker[i].spe_ctx = spe_context_create(spe_flags, d_gang.get());;
256     if (d_worker[i].spe_ctx == 0){
257       perror("spe_context_create");
258       throw std::runtime_error("spe_context_create");
259     }
260     d_worker[i].spe_idx = i;
261     d_worker[i].spu_args = &d_spu_args[i];
262     d_worker[i].spu_args->queue = ptr_to_ea(d_queue);
263     d_worker[i].spu_args->comp_info[0] = ptr_to_ea(&d_comp_info[2*i+0]);
264     d_worker[i].spu_args->comp_info[1] = ptr_to_ea(&d_comp_info[2*i+1]);
265     d_worker[i].spu_args->spu_idx = i;
266     d_worker[i].spu_args->nspus = d_options.nspes;
267     d_worker[i].spu_args->proc_def_ls_addr = d_proc_def_ls_addr;
268     d_worker[i].spu_args->nproc_defs = d_nproc_defs;
269     d_worker[i].spu_args->log.base = 0;
270     d_worker[i].spu_args->log.nentries = 0;
271     d_worker[i].state = WS_INIT;
272
273     int r = spe_program_load(d_worker[i].spe_ctx, spe_image);
274     if (r != 0){
275       perror("spe_program_load");
276       throw std::runtime_error("spe_program_load");
277     }
278   }
279
280   setup_logfiles();
281
282   // ----------------------------------------------------------------
283   // initalize the free list of job descriptors
284   
285   d_free_list = (gc_jd_stack_t *) aligned_alloc(sizeof(gc_jd_stack_t));
286   // This ensures that the memory associated with d_free_list is
287   // automatically freed in the destructor or if an exception occurs
288   // here in the constructor.
289   _d_free_list_boost =
290     boost::shared_ptr<void>((void *) d_free_list, free_deleter());
291   gc_jd_stack_init(d_free_list);
292
293   if (debug()){
294     printf("sizeof(d_jd[0]) = %d (0x%x)\n", sizeof(d_jd[0]), sizeof(d_jd[0]));
295     printf("max_jobs = %u\n", d_options.max_jobs);
296   }
297
298   // Initialize the array of job descriptors.
299   d_jd = (gc_job_desc_t *) aligned_alloc(sizeof(d_jd[0]) * d_options.max_jobs);
300   _d_jd_boost = boost::shared_ptr<void>((void *) d_jd, free_deleter());
301
302
303   // set unique job_id
304   for (int i = 0; i < (int) d_options.max_jobs; i++)
305     d_jd[i].sys.job_id = i;
306
307   // push them onto the free list
308   for (int i = d_options.max_jobs - 1; i >= 0; i--)
309     free_job_desc(&d_jd[i]);
310
311   // ----------------------------------------------------------------
312   // initialize d_client_thread
313
314   {
315     gc_client_thread_info_sa cti(
316          new gc_client_thread_info[d_options.max_client_threads]);
317
318     d_client_thread.swap(cti);
319
320     for (unsigned int i = 0; i < d_options.max_client_threads; i++)
321       d_client_thread[i].d_client_id = i;
322   }
323
324   // ----------------------------------------------------------------
325   // initialize bitvectors
326
327   // initialize d_bvlen, the number of longs in job related bitvectors.
328   int bits_per_long = sizeof(unsigned long) * 8;
329   d_bvlen = (d_options.max_jobs + bits_per_long - 1) / bits_per_long;
330
331   // allocate all bitvectors in a single cache-aligned chunk
332   size_t nlongs = d_bvlen * d_options.max_client_threads;
333   void *p = aligned_alloc(nlongs * sizeof(unsigned long));
334   _d_all_bitvectors = boost::shared_ptr<void>(p, free_deleter());
335
336   // Now point the gc_client_thread_info bitvectors into this storage
337   unsigned long *v = (unsigned long *) p;
338
339   for (unsigned int i = 0; i < d_options.max_client_threads; i++, v += d_bvlen)
340     d_client_thread[i].d_jobs_done = v;
341
342
343   // ----------------------------------------------------------------
344   // create the spe event handler & worker (SPE) threads
345
346   create_event_handler();
347
348 }
349
350 ////////////////////////////////////////////////////////////////////////
351
352 gc_job_manager_impl::~gc_job_manager_impl()
353 {
354   shutdown();
355
356   d_jd = 0;             // handled via _d_jd_boost
357   d_free_list = 0;      // handled via _d_free_list_boost
358   d_queue = 0;          // handled via _d_queue_boost
359
360   // clear cti, since we've deleted the underlying data
361   pthread_setspecific(s_client_key, 0);
362
363   unmap_logfiles();
364 }
365
366 bool
367 gc_job_manager_impl::shutdown()
368 {
369   omni_mutex_lock       l(d_eh_mutex);
370
371   d_shutdown_requested = true;          // set flag for event handler thread
372
373   // should only happens during early QA code
374   if (d_eh_thread == 0 && d_eh_state == EHS_INIT)
375     return false;
376
377   while (d_eh_state != EHS_DEAD)        // wait for it to finish
378     d_eh_cond.wait();
379
380   return true;
381 }
382
383 int
384 gc_job_manager_impl::nspes() const
385 {
386   return d_options.nspes;
387 }
388
389 ////////////////////////////////////////////////////////////////////////
390
391 void
392 gc_job_manager_impl::bv_zero(unsigned long *bv)
393 {
394   memset(bv, 0, sizeof(unsigned long) * d_bvlen);
395 }
396
397 inline void
398 gc_job_manager_impl::bv_clr(unsigned long *bv, unsigned int bitno)
399 {
400   unsigned int wi = bitno / (sizeof (unsigned long) * 8);
401   unsigned int bi = bitno & ((sizeof (unsigned long) * 8) - 1);
402   bv[wi] &= ~(1UL << bi);
403 }
404
405 inline void
406 gc_job_manager_impl::bv_set(unsigned long *bv, unsigned int bitno)
407 {
408   unsigned int wi = bitno / (sizeof (unsigned long) * 8);
409   unsigned int bi = bitno & ((sizeof (unsigned long) * 8) - 1);
410   bv[wi] |= (1UL << bi);
411 }
412
413 inline bool
414 gc_job_manager_impl::bv_isset(unsigned long *bv, unsigned int bitno)
415 {
416   unsigned int wi = bitno / (sizeof (unsigned long) * 8);
417   unsigned int bi = bitno & ((sizeof (unsigned long) * 8) - 1);
418   return (bv[wi] & (1UL << bi)) != 0;
419 }
420
421 inline bool
422 gc_job_manager_impl::bv_isclr(unsigned long *bv, unsigned int bitno)
423 {
424   unsigned int wi = bitno / (sizeof (unsigned long) * 8);
425   unsigned int bi = bitno & ((sizeof (unsigned long) * 8) - 1);
426   return (bv[wi] & (1UL << bi)) == 0;
427 }
428
429 ////////////////////////////////////////////////////////////////////////
430
431 gc_job_desc *
432 gc_job_manager_impl::alloc_job_desc()
433 {
434   // stack is lock free, thus safe to call from any thread
435   return gc_jd_stack_pop(d_free_list);
436 }
437
438 void
439 gc_job_manager_impl::free_job_desc(gc_job_desc *jd)
440 {
441   // stack is lock free, thus safe to call from any thread
442   if (jd != 0)
443     gc_jd_stack_push(d_free_list, jd);
444 }
445
446 ////////////////////////////////////////////////////////////////////////
447
448 /*
449  * We check as much as we can here on the PPE side, so that the SPE
450  * doesn't have to.
451  */
452 static bool
453 check_direct_args(gc_job_desc *jd, gc_job_direct_args *args)
454 {
455   if (args->nargs > MAX_ARGS_DIRECT){
456     jd->status = JS_BAD_N_DIRECT;
457     return false;
458   }
459
460   return true;
461 }
462
463 static bool
464 check_ea_args(gc_job_desc *jd, gc_job_ea_args *p)
465 {
466   if (p->nargs > MAX_ARGS_EA){
467     jd->status = JS_BAD_N_EA;
468     return false;
469   }
470
471   uint32_t dir_union = 0;
472
473   for (unsigned int i = 0; i < p->nargs; i++){
474     dir_union |= p->arg[i].direction;
475     switch(p->arg[i].direction){
476     case GCJD_DMA_GET:
477     case GCJD_DMA_PUT:
478       break;
479
480     default:
481       jd->status = JS_BAD_DIRECTION;
482       return false;
483     }
484   }
485
486   if (p->nargs > 1){
487     unsigned int common_eah = (p->arg[0].ea_addr) >> 32;
488     for (unsigned int i = 1; i < p->nargs; i++){
489       if ((p->arg[i].ea_addr >> 32) != common_eah){
490         jd->status = JS_BAD_EAH;
491         return false;
492       }
493     }
494   }
495
496   jd->sys.direction_union = dir_union;
497   return true;
498 }
499
500 bool
501 gc_job_manager_impl::submit_job(gc_job_desc *jd)
502 {
503   if (unlikely(d_shutdown_requested)){
504     jd->status = JS_SHUTTING_DOWN;
505     return false;
506   }
507
508   // Ensure it's one of our job descriptors
509
510   if (jd < d_jd || jd >= &d_jd[d_options.max_jobs]){
511     jd->status = JS_BAD_JOB_DESC;
512     return false;
513   }
514
515   // Ensure we've got a client_thread_info assigned to this thread.
516   
517   gc_client_thread_info *cti =
518     (gc_client_thread_info *) pthread_getspecific(s_client_key);
519   if (unlikely(cti == 0)){
520     if ((cti = alloc_cti()) == 0){
521       fprintf(stderr, "gc_job_manager_impl::submit_job: Too many client threads.\n");
522       jd->status = JS_TOO_MANY_CLIENTS;
523       return false;
524     }
525     int r = pthread_setspecific(s_client_key, cti);
526     if (r != 0){
527       jd->status = JS_BAD_JUJU;
528       fprintf(stderr, "pthread_setspecific failed (return = %d)\n", r);
529       return false;
530     }
531   }
532
533   if (jd->proc_id == GCP_UNKNOWN_PROC){
534     jd->status = JS_UNKNOWN_PROC;
535     return false;
536   }
537
538   if (!check_direct_args(jd, &jd->input))
539     return false;
540
541   if (!check_direct_args(jd, &jd->output))
542     return false;
543
544   if (!check_ea_args(jd, &jd->eaa))
545     return false;
546
547   jd->status = JS_OK;
548   jd->sys.client_id = cti->d_client_id;
549
550   // FIXME keep count of jobs in progress?
551   
552   gc_jd_queue_enqueue(d_queue, jd);
553   return true;
554 }
555
556 bool
557 gc_job_manager_impl::wait_job(gc_job_desc *jd)
558 {
559   bool done;
560   return wait_jobs(1, &jd, &done, GC_WAIT_ANY) == 1;
561 }
562
563 int
564 gc_job_manager_impl::wait_jobs(unsigned int njobs,
565                                gc_job_desc *jd[],
566                                bool done[],
567                                gc_wait_mode mode)
568 {
569   unsigned int i;
570
571   gc_client_thread_info *cti =
572     (gc_client_thread_info *) pthread_getspecific(s_client_key);
573   if (unlikely(cti == 0))
574     return -1;
575
576   for (i = 0; i < njobs; i++){
577     done[i] = false;
578     if (unlikely(jd[i]->sys.client_id != cti->d_client_id)){
579       fprintf(stderr, "gc_job_manager_impl::wait_jobs: can't wait for a job you didn't submit\n");
580       return -1;
581     }
582   }
583
584   {
585     omni_mutex_lock     l(cti->d_mutex);
586
587     // setup info for event handler
588     cti->d_state = (mode == GC_WAIT_ANY) ? CT_WAIT_ANY : CT_WAIT_ALL;
589     cti->d_njobs_waiting_for = njobs;
590     cti->d_jobs_waiting_for = jd;
591     assert(cti->d_jobs_done != 0);
592
593     unsigned int ndone = 0;
594
595     // wait for jobs to complete
596     
597     while (1){
598       ndone = 0;
599       for (i= 0; i < njobs; i++){
600         if (done[i])
601           ndone++;
602         else if (bv_isset(cti->d_jobs_done, jd[i]->sys.job_id)){
603           bv_clr(cti->d_jobs_done, jd[i]->sys.job_id);
604           done[i] = true;
605           ndone++;
606         }
607       }
608
609       if (mode == GC_WAIT_ANY && ndone > 0)
610         break;
611
612       if (mode == GC_WAIT_ALL && ndone == njobs)
613         break;
614
615       // FIXME what happens when somebody calls shutdown?
616
617       cti->d_cond.wait();       // wait for event handler to wake us up
618     }
619
620     cti->d_state = CT_NOT_WAITING;  
621     cti->d_njobs_waiting_for = 0;       // tidy up (not reqd)
622     cti->d_jobs_waiting_for = 0;        // tidy up (not reqd)
623     return ndone;
624   }
625 }
626
627 ////////////////////////////////////////////////////////////////////////
628
629 bool
630 gc_job_manager_impl::send_all_spes(uint32_t msg)
631 {
632   bool ok = true;
633
634   for (unsigned int i = 0; i < d_options.nspes; i++)
635     ok &= send_spe(i, msg);
636
637   return ok;
638 }
639
640 bool
641 gc_job_manager_impl::send_spe(unsigned int spe, uint32_t msg)
642 {
643   if (spe >= d_options.nspes)
644     return false;
645
646   int r = spe_in_mbox_write(d_worker[spe].spe_ctx, &msg, 1,
647                             SPE_MBOX_ALL_BLOCKING);
648   if (r < 0){
649     perror("spe_in_mbox_write");
650     return false;
651   }
652
653   return r == 1;
654 }
655
656 ////////////////////////////////////////////////////////////////////////
657
658 static void
659 pthread_create_failure_msg(int r, const char *which)
660 {
661   char buf[256];
662   char *s = 0;
663
664   switch (r){
665   case EAGAIN: s = "EAGAIN"; break;
666   case EINVAL: s = "EINVAL"; break;
667   case EPERM:  s = "EPERM";  break;
668   default:
669     snprintf(buf, sizeof(buf), "Unknown error %d", r);
670     s = buf;
671     break;
672   }
673   fprintf(stderr, "pthread_create[%s] failed: %s\n", which, s);
674 }
675
676
677 static bool
678 start_thread(pthread_t *thread,
679              void *(*start_routine)(void *),  void *arg,
680              const char *msg)
681 {
682   pthread_attr_t attr;
683   pthread_attr_init(&attr);
684   pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
685
686   // FIXME save sigprocmask
687   // FIXME set sigprocmask
688
689   int r = pthread_create(thread, &attr, start_routine, arg);
690     
691   // FIXME restore sigprocmask
692
693   if (r != 0){
694     pthread_create_failure_msg(r, msg);
695     return false;
696   }
697   return true;
698 }
699
700
701 ////////////////////////////////////////////////////////////////////////
702
703 static void *start_worker(void *arg);
704
705 static void *
706 start_event_handler(void *arg)
707 {
708   gc_job_manager_impl *p = (gc_job_manager_impl *) arg;
709   p->event_handler_loop();
710   return 0;
711 }
712
713 void
714 gc_job_manager_impl::create_event_handler()
715 {
716   // create the SPE event handler and register our interest in events
717
718   d_spe_event_handler.ptr = spe_event_handler_create();
719   if (d_spe_event_handler.ptr == 0){
720     perror("spe_event_handler_create");
721     throw std::runtime_error("spe_event_handler_create");
722   }
723
724   for (unsigned int i = 0; i < d_options.nspes; i++){
725     spe_event_unit_t    eu;
726     memset(&eu, 0, sizeof(eu));
727     eu.events = SPE_EVENT_OUT_INTR_MBOX | SPE_EVENT_SPE_STOPPED;
728     eu.spe = d_worker[i].spe_ctx;
729     eu.data.u32 = i;    // set in events returned by spe_event_wait
730
731     if (spe_event_handler_register(d_spe_event_handler.ptr, &eu) != 0){
732       perror("spe_event_handler_register");
733       throw std::runtime_error("spe_event_handler_register");
734     }
735   }
736
737   // create our event handling thread
738
739   if (!start_thread(&d_eh_thread, start_event_handler, this, "event_handler")){
740     throw std::runtime_error("pthread_create");
741   }
742
743   // create the SPE worker threads
744
745   bool ok = true;
746   for (unsigned int i = 0; ok && i < d_options.nspes; i++){
747     char name[256];
748     snprintf(name, sizeof(name), "worker[%d]", i);
749     ok &= start_thread(&d_worker[i].thread, start_worker,
750                        &d_worker[i], name);
751   }
752
753   if (!ok){
754     //
755     // FIXME Clean up the mess.  Need to terminate event handler and all workers.
756     //
757     // this should cause the workers to exit, unless they're seriously broken
758     send_all_spes(MK_MBOX_MSG(OP_EXIT, 0));
759
760     shutdown();
761
762     throw std::runtime_error("pthread_create");
763   }
764 }
765
766 ////////////////////////////////////////////////////////////////////////
767
768 void
769 gc_job_manager_impl::set_eh_state(evt_handler_state s)
770 {
771   omni_mutex_lock       l(d_eh_mutex);
772   d_eh_state = s;
773   d_eh_cond.broadcast();
774 }
775
776 void
777 gc_job_manager_impl::set_ea_args_maxsize(int maxsize)
778 {
779   omni_mutex_lock       l(d_eh_mutex);
780   d_ea_args_maxsize = maxsize;
781   d_eh_cond.broadcast();
782 }
783
784 void
785 gc_job_manager_impl::print_event(spe_event_unit_t *evt)
786 {
787   printf("evt: spe = %d events = (0x%x)", evt->data.u32, evt->events);
788
789   if (evt->events & SPE_EVENT_OUT_INTR_MBOX)
790     printf(" OUT_INTR_MBOX");
791   
792   if (evt->events & SPE_EVENT_IN_MBOX)
793     printf(" IN_MBOX");
794   
795   if (evt->events & SPE_EVENT_TAG_GROUP)
796     printf(" TAG_GROUP");
797   
798   if (evt->events & SPE_EVENT_SPE_STOPPED)
799     printf(" SPE_STOPPED");
800
801   printf("\n");
802 }
803
804 struct job_client_info {
805   uint16_t      job_id;
806   uint16_t      client_id;
807 };
808
809 static int
810 compare_jci_clients(const void *va, const void *vb)
811 {
812   const job_client_info *a = (job_client_info *) va;
813   const job_client_info *b = (job_client_info *) vb;
814
815   return a->client_id - b->client_id;
816 }
817
818 void
819 gc_job_manager_impl::notify_clients_jobs_are_done(unsigned int spe_num,
820                                                   unsigned int completion_info_idx)
821 {
822   const char *msg = "gc_job_manager_impl::notify_client_job_is_done (INTERNAL ERROR)";
823
824   smp_rmb();  // order reads so we know that data sent from SPE is here
825
826   gc_comp_info_t *ci = &d_comp_info[2 * spe_num + (completion_info_idx & 0x1)];
827
828   if (ci->ncomplete == 0){      // never happens, but ensures code below is correct
829     ci->in_use = 0;
830     return;
831   }
832
833   if (0){
834     static int total_jobs;
835     static int total_msgs;
836     total_msgs++;
837     total_jobs += ci->ncomplete;
838     printf("ppe:     tj = %6d  tm = %6d\n", total_jobs, total_msgs);
839   }
840
841   job_client_info gci[GC_CI_NJOBS];
842
843   /*
844    * Make one pass through and sanity check everything while filling in gci
845    */
846   for (unsigned int i = 0; i < ci->ncomplete; i++){
847     unsigned int job_id = ci->job_id[i];
848
849     if (job_id >= d_options.max_jobs){
850       // internal error, shouldn't happen
851       fprintf(stderr,"%s: invalid job_id = %d\n", msg, job_id);
852       ci->in_use = 0;           // clear flag so SPE knows we're done with it
853       return;
854     }
855     gc_job_desc *jd = &d_jd[job_id];
856
857     if (jd->sys.client_id >= d_options.max_client_threads){
858       // internal error, shouldn't happen
859       fprintf(stderr, "%s: invalid client_id = %d\n", msg, jd->sys.client_id);
860       ci->in_use = 0;           // clear flag so SPE knows we're done with it
861       return;
862     }
863
864     gci[i].job_id = job_id;
865     gci[i].client_id = jd->sys.client_id;
866   }
867
868   // sort by client_id so we only have to lock & signal once / client
869
870   if (ci->ncomplete > 1)
871     qsort(gci, ci->ncomplete, sizeof(gci[0]), compare_jci_clients);
872
873   // "wind-in" 
874
875   gc_client_thread_info *last_cti = &d_client_thread[gci[0].client_id];
876   last_cti->d_mutex.lock();
877   bv_set(last_cti->d_jobs_done, gci[0].job_id);  // mark job done
878
879   for (unsigned int i = 1; i < ci->ncomplete; i++){
880
881     gc_client_thread_info *cti = &d_client_thread[gci[i].client_id];
882
883     if (cti != last_cti){       // new client?
884
885       // yes.  signal old client, unlock old, lock new
886
887       // FIXME we could distinguish between CT_WAIT_ALL & CT_WAIT_ANY
888
889       if (last_cti->d_state == CT_WAIT_ANY || last_cti->d_state == CT_WAIT_ALL)
890         last_cti->d_cond.signal();      // wake client thread up
891
892       last_cti->d_mutex.unlock();
893       cti->d_mutex.lock();
894       last_cti = cti;
895     }
896
897     // mark job done
898     bv_set(cti->d_jobs_done, gci[i].job_id);
899   }
900
901   // "wind-out"
902
903   if (last_cti->d_state == CT_WAIT_ANY || last_cti->d_state == CT_WAIT_ALL)
904     last_cti->d_cond.signal();  // wake client thread up
905   last_cti->d_mutex.unlock();
906
907   ci->in_use = 0;               // clear flag so SPE knows we're done with it
908 }
909
910 void
911 gc_job_manager_impl::handle_event(spe_event_unit_t *evt)
912 {
913   // print_event(evt);
914
915   int spe_num = evt->data.u32;
916
917   // only a single event type can be signaled at a time
918   
919   if (evt->events == SPE_EVENT_OUT_INTR_MBOX) { // SPE sent us 1 or more msgs
920     static const int NMSGS = 32;
921     unsigned int msg[NMSGS];
922     int n = spe_out_intr_mbox_read(evt->spe, msg, NMSGS, SPE_MBOX_ANY_BLOCKING);
923     // printf("spe_out_intr_mbox_read = %d\n", n);
924     if (n < 0){
925       perror("spe_out_intr_mbox_read");
926     }
927     else {
928       for (int i = 0; i < n; i++){
929         switch(MBOX_MSG_OP(msg[i])){
930         case OP_JOBS_DONE:
931           if (debug())
932             printf("eh: job_done (0x%08x) from spu[%d]\n", msg[i], spe_num);
933           notify_clients_jobs_are_done(spe_num, MBOX_MSG_ARG(msg[i]));
934           break;
935
936         case OP_SPU_BUFSIZE:
937           set_ea_args_maxsize(MBOX_MSG_ARG(msg[i]));
938           break;
939
940         case OP_EXIT:
941         default:
942           printf("eh: Unexpected msg (0x%08x) from spu[%d]\n", msg[i], spe_num);
943           break;
944         }
945       }
946     }
947   }
948   else if (evt->events == SPE_EVENT_SPE_STOPPED){ // the SPE stopped
949     spe_stop_info_t si;
950     int r = spe_stop_info_read(evt->spe, &si);
951     if (r < 0){
952       perror("spe_stop_info_read");
953     }
954     else {
955       switch (si.stop_reason){
956       case SPE_EXIT:
957         if (debug()){
958           printf("eh: spu[%d] SPE_EXIT w/ exit_code = %d\n",
959                  spe_num, si.result.spe_exit_code);
960         }
961         break;
962       case SPE_STOP_AND_SIGNAL:
963         printf("eh: spu[%d] SPE_STOP_AND_SIGNAL w/ spe_signal_code = 0x%x\n",
964                spe_num, si.result.spe_signal_code);
965         break;
966       case SPE_RUNTIME_ERROR:
967         printf("eh: spu[%d] SPE_RUNTIME_ERROR w/ spe_runtime_error = 0x%x\n",
968                spe_num, si.result.spe_runtime_error);
969         break;
970       case SPE_RUNTIME_EXCEPTION:
971         printf("eh: spu[%d] SPE_RUNTIME_EXCEPTION w/ spe_runtime_exception = 0x%x\n",
972                spe_num, si.result.spe_runtime_exception);
973         break;
974       case SPE_RUNTIME_FATAL:
975         printf("eh: spu[%d] SPE_RUNTIME_FATAL w/ spe_runtime_fatal = 0x%x\n",
976                spe_num, si.result.spe_runtime_fatal);
977         break;
978       case SPE_CALLBACK_ERROR:
979         printf("eh: spu[%d] SPE_CALLBACK_ERROR w/ spe_callback_error = 0x%x\n",
980                spe_num, si.result.spe_callback_error);
981         break;
982       case SPE_ISOLATION_ERROR:
983         printf("eh: spu[%d] SPE_ISOLATION_ERROR w/ spe_isolation_error = 0x%x\n",
984                spe_num, si.result.spe_isolation_error);
985         break;
986       default:
987         printf("eh: spu[%d] UNKNOWN STOP REASON (%d) w/ spu_status = 0x%x\n",
988                spe_num, si.stop_reason, si.spu_status);
989         break;
990       }
991     }
992   }
993 #if 0 // not enabled
994   else if (evt->events == SPE_EVENT_IN_MBOX){    // there's room to write to SPE
995     // spe_in_mbox_write (ignore)
996   }
997   else if (evt->events == SPE_EVENT_TAG_GROUP){  // our DMA completed
998     // spe_mfcio_tag_status_read
999   }
1000 #endif
1001   else {
1002     fprintf(stderr, "handle_event: unexpected evt->events = 0x%x\n", evt->events);
1003     return;
1004   }
1005 }
1006
1007 //
1008 // This is the "main program" of the event handling thread
1009 //
1010 void
1011 gc_job_manager_impl::event_handler_loop()
1012 {
1013   static const int MAX_EVENTS = 16;
1014   static const int TIMEOUT = 20;        // how long to block in milliseconds
1015
1016   spe_event_unit_t events[MAX_EVENTS];
1017
1018   if (d_debug)
1019     printf("event_handler_loop: starting\n");
1020
1021   set_eh_state(EHS_RUNNING);
1022
1023   // ask the first spe for its max bufsize
1024   send_spe(0, MK_MBOX_MSG(OP_GET_SPU_BUFSIZE, 0));
1025
1026   while (1){
1027     switch(d_eh_state){
1028
1029     case EHS_RUNNING:      // normal stuff
1030       if (d_shutdown_requested) {
1031         set_eh_state(EHS_SHUTTING_DOWN);
1032       }
1033       break;
1034
1035     case EHS_SHUTTING_DOWN:
1036
1037       // FIXME wait until job queue is empty, then tell them to exit
1038
1039       send_all_spes(MK_MBOX_MSG(OP_EXIT, 0));
1040       set_eh_state(EHS_WAITING_FOR_WORKERS_TO_DIE);
1041       break;
1042
1043     case EHS_WAITING_FOR_WORKERS_TO_DIE:
1044       {
1045         bool all_dead = true;
1046         for (unsigned int i = 0; i < d_options.nspes; i++)
1047           all_dead &= d_worker[i].state == WS_DEAD;
1048
1049         if (all_dead){
1050           set_eh_state(EHS_DEAD);
1051           if (d_debug)
1052             printf("event_handler_loop: exiting\n");
1053           return;
1054         }
1055       }
1056       break;
1057
1058     default:
1059       set_eh_state(EHS_DEAD);
1060       printf("event_handler_loop(default): exiting\n");
1061       return;
1062     }
1063
1064     // block waiting for events...
1065     int nevents = spe_event_wait(d_spe_event_handler.ptr,
1066                                  events, MAX_EVENTS, TIMEOUT);
1067     if (nevents < 0){
1068       perror("spe_wait_event");
1069       // FIXME bail?
1070     }
1071     for (int i = 0; i < nevents; i++){
1072       handle_event(&events[i]);
1073     }
1074   }
1075 }
1076
1077 ////////////////////////////////////////////////////////////////////////
1078 // This is the top of the SPE worker threads
1079
1080 static void *
1081 start_worker(void *arg)
1082 {
1083   worker_ctx *w = (worker_ctx *) arg;
1084   spe_stop_info_t       si;
1085
1086   w->state = WS_RUNNING;
1087   if (s_worker_debug)
1088     printf("worker[%d]: WS_RUNNING\n", w->spe_idx);
1089
1090   unsigned int entry = SPE_DEFAULT_ENTRY;
1091   int r = spe_context_run(w->spe_ctx,  &entry, 0, w->spu_args, 0, &si);
1092
1093   if (r < 0){                   // error
1094     char buf[64];
1095     snprintf(buf, sizeof(buf), "worker[%d]: spe_context_run", w->spe_idx);
1096     perror(buf);
1097   }
1098   else if (r == 0){
1099     // spe program called exit.
1100     if (s_worker_debug)
1101       printf("worker[%d]: SPE_EXIT w/ exit_code = %d\n",
1102              w->spe_idx, si.result.spe_exit_code);
1103   }
1104   else {
1105     // called stop_and_signal
1106     //
1107     // I'm not sure we'll ever get here.  I think the event
1108     // handler will catch this...
1109     printf("worker[%d]: SPE_STOP_AND_SIGNAL w/ spe_signal_code = 0x%x\n",
1110            w->spe_idx, si.result.spe_signal_code);
1111   }
1112
1113   // in any event, we're committing suicide now ;)
1114   if (s_worker_debug)
1115     printf("worker[%d]: WS_DEAD\n", w->spe_idx);
1116
1117   w->state = WS_DEAD;
1118   return 0;
1119 }
1120
1121 ////////////////////////////////////////////////////////////////////////
1122
1123 gc_client_thread_info *
1124 gc_job_manager_impl::alloc_cti()
1125 {
1126   for (unsigned int i = 0; i < d_options.max_client_threads; i++){
1127     if (d_client_thread[i].d_free){
1128       // try to atomically grab it
1129       if (_atomic_dec_if_positive(ptr_to_ea(&d_client_thread[i].d_free)) == 0){
1130         // got it...
1131         gc_client_thread_info *cti = &d_client_thread[i];
1132         cti->d_state = CT_NOT_WAITING;
1133         bv_zero(cti->d_jobs_done);
1134         cti->d_njobs_waiting_for = 0;
1135         cti->d_jobs_waiting_for = 0;
1136         
1137         return cti;
1138       }
1139     }
1140   }
1141   return 0;
1142 }
1143
1144 void
1145 gc_job_manager_impl::free_cti(gc_client_thread_info *cti)
1146 {
1147   assert((size_t) (cti - d_client_thread.get()) < d_options.max_client_threads);
1148   cti->d_free = 1;
1149 }
1150
1151 int
1152 gc_job_manager_impl::ea_args_maxsize()
1153 {
1154   omni_mutex_lock       l(d_eh_mutex);
1155
1156   while (d_ea_args_maxsize == 0)        // wait for it to be initialized
1157     d_eh_cond.wait();
1158
1159   return d_ea_args_maxsize;
1160 }
1161
1162 void
1163 gc_job_manager_impl::set_debug(int debug)
1164 {
1165   d_debug = debug;
1166   s_worker_debug = debug;
1167 }
1168
1169 int
1170 gc_job_manager_impl::debug()
1171 {
1172   return d_debug;
1173 }
1174
1175 ////////////////////////////////////////////////////////////////////////
1176
1177 void
1178 gc_job_manager_impl::setup_logfiles()
1179 {
1180   if (!d_options.enable_logging)
1181     return;
1182
1183   if (d_options.log2_nlog_entries == 0)
1184     d_options.log2_nlog_entries = 12;
1185
1186   // must end up a multiple of the page size
1187
1188   size_t pagesize = getpagesize();
1189   size_t s = (1 << d_options.log2_nlog_entries) * sizeof(gc_log_entry_t);
1190   s = ((s + pagesize - 1) / pagesize) * pagesize;
1191   size_t nentries = s / sizeof(gc_log_entry_t);
1192   assert(is_power_of_2(nentries));
1193
1194   for (unsigned int i = 0; i < d_options.nspes; i++){
1195     char filename[100];
1196     snprintf(filename, sizeof(filename), "spu_log.%02d", i);
1197     int fd = open(filename, O_CREAT|O_TRUNC|O_RDWR, 0664);
1198     if (fd == -1){
1199       perror(filename);
1200       return;
1201     }
1202     lseek(fd, s - 1, SEEK_SET);
1203     write(fd, "\0", 1);
1204     void *p = mmap(0, s, PROT_READ|PROT_WRITE, MAP_SHARED, fd, 0);
1205     if (p == MAP_FAILED){
1206       perror("gc_job_manager_impl::setup_logfiles: mmap");
1207       close(fd);
1208       return;
1209     }
1210     close(fd);
1211     memset(p, 0, s);
1212     d_spu_args[i].log.base = ptr_to_ea(p);
1213     d_spu_args[i].log.nentries = nentries;
1214   }
1215 }
1216
1217 void
1218 gc_job_manager_impl::sync_logfiles()
1219 {
1220   for (unsigned int i = 0; i < d_options.nspes; i++){
1221     if (d_spu_args[i].log.base)
1222       msync(ea_to_ptr(d_spu_args[i].log.base),
1223             d_spu_args[i].log.nentries * sizeof(gc_log_entry_t),
1224             MS_ASYNC);
1225   }
1226 }
1227
1228 void
1229 gc_job_manager_impl::unmap_logfiles()
1230 {
1231   for (unsigned int i = 0; i < d_options.nspes; i++){
1232     if (d_spu_args[i].log.base)
1233       munmap(ea_to_ptr(d_spu_args[i].log.base),
1234              d_spu_args[i].log.nentries * sizeof(gc_log_entry_t));
1235   }
1236 }
1237
1238 ////////////////////////////////////////////////////////////////////////
1239 //
1240 // lookup proc names in d_proc_def table
1241
1242 gc_proc_id_t 
1243 gc_job_manager_impl::lookup_proc(const std::string &proc_name)
1244 {
1245   for (int i = 0; i < d_nproc_defs; i++)
1246     if (proc_name == d_proc_def[i].name)
1247       return i;
1248
1249   return GCP_UNKNOWN_PROC;
1250 }
1251
1252 std::vector<std::string>
1253 gc_job_manager_impl::proc_names()
1254 {
1255   std::vector<std::string> r;
1256   for (int i = 0; i < d_nproc_defs; i++)
1257     r.push_back(d_proc_def[i].name);
1258
1259   return r;
1260 }
1261
1262 ////////////////////////////////////////////////////////////////////////
1263
1264 worker_ctx::~worker_ctx()
1265 {
1266   if (spe_ctx){
1267     int r = spe_context_destroy(spe_ctx);
1268     if (r != 0){
1269       perror("spe_context_destroy");
1270     }
1271     spe_ctx = 0;
1272   }
1273   state = WS_FREE;
1274 }