Merged eb/gcell -r8215:8243 into trunk. This adds gr-gcell, the GNU
[debian/gnuradio] / gcell / src / lib / runtime / gc_job_manager_impl.cc
1 /* -*- c++ -*- */
2 /*
3  * Copyright 2007 Free Software Foundation, Inc.
4  * 
5  * This file is part of GNU Radio
6  * 
7  * GNU Radio is free software; you can redistribute it and/or modify
8  * it under the terms of the GNU General Public License as published by
9  * the Free Software Foundation; either version 3, or (at your option)
10  * any later version.
11  * 
12  * GNU Radio is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  * GNU General Public License for more details.
16  * 
17  * You should have received a copy of the GNU General Public License along
18  * with this program; if not, write to the Free Software Foundation, Inc.,
19  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
20  */
21
22 #ifdef HAVE_CONFIG_H
23 #include <config.h>
24 #endif
25 #include <gc_job_manager_impl.h>
26 #include <gc_mbox.h>
27 #include <gc_proc_def_utils.h>
28 #include <gc_aligned_alloc.h>
29 #include <stdio.h>
30 #include <stdexcept>
31 #include <stdlib.h>
32 #include <atomic_dec_if_positive.h>
33 #include <memory_barrier.h>
34 #include <unistd.h>
35 #include <sys/mman.h>
36 #include <sys/types.h>
37 #include <sys/stat.h>
38 #include <fcntl.h>
39
40
41 static const size_t CACHE_LINE_SIZE = 128;
42
43 static const unsigned int DEFAULT_MAX_JOBS = 128;
44 static const unsigned int DEFAULT_MAX_CLIENT_THREADS = 64;
45
46 // FIXME this really depends on the SPU code...
47 static const unsigned int MAX_TOTAL_INDIRECT_LENGTH = 16 * 1024;
48
49
50 static bool          s_key_initialized = false;
51 static pthread_key_t s_client_key;
52
53 static int s_worker_debug = 0;
54
55 // custom deleter of gang_contexts for use with boost::shared_ptr
56 class gang_deleter {
57 public:
58   void operator()(spe_gang_context_ptr_t ctx) {
59     if (ctx){
60       int r = spe_gang_context_destroy(ctx);
61       if (r != 0){
62         perror("spe_gang_context_destroy");
63       }
64     }
65   }
66 };
67
68
69 // custom deleter of anything that can be freed with "free"
70 class free_deleter {
71 public:
72   void operator()(void *p) {
73     free(p);
74   }
75 };
76
77
78 /*
79  * Called when client thread is destroyed.
80  * We mark our client info free.
81  */
82 static void
83 client_key_destructor(void *p)
84 {
85   ((gc_client_thread_info *) p)->d_free = 1;
86 }
87
88 static bool
89 is_power_of_2(uint32_t x)
90 {
91   return (x != 0) && !(x & (x - 1));
92 }
93
94 ////////////////////////////////////////////////////////////////////////
95
96
97 gc_job_manager_impl::gc_job_manager_impl(const gc_jm_options *options)
98   : d_debug(0), d_spu_args(0),
99     d_eh_cond(&d_eh_mutex), d_eh_thread(0), d_eh_state(EHS_INIT),
100     d_shutdown_requested(false),
101     d_client_thread(0), d_ea_args_maxsize(0),
102     d_proc_def(0), d_proc_def_ls_addr(0), d_nproc_defs(0)
103 {
104   if (!s_key_initialized){
105     int r = pthread_key_create(&s_client_key, client_key_destructor);
106     if (r != 0)
107       throw std::runtime_error("pthread_key_create");
108     s_key_initialized = true;
109   }
110
111   // ensure it's zero
112   pthread_setspecific(s_client_key, 0);
113
114   if (options != 0)
115     d_options = *options;
116
117   // provide the real default for those indicated with a zero
118   if (d_options.max_jobs == 0)
119     d_options.max_jobs = DEFAULT_MAX_JOBS;
120   if (d_options.max_client_threads == 0)
121     d_options.max_client_threads = DEFAULT_MAX_CLIENT_THREADS;
122
123   if (!d_options.program_handle){
124     fprintf(stderr, "gc_job_manager: options->program_handle must be non-zero\n");
125     throw std::runtime_error("gc_job_manager: options->program_handle must be non-zero");
126   }
127
128   int ncpu_nodes = spe_cpu_info_get(SPE_COUNT_PHYSICAL_CPU_NODES, -1);
129   int nusable_spes = spe_cpu_info_get(SPE_COUNT_USABLE_SPES, -1);
130
131   if (debug()){
132     printf("cpu_nodes = %d\n", ncpu_nodes);
133     for (int i = 0; i < ncpu_nodes; i++){
134       printf("node[%d].physical_spes = %2d\n", i,
135              spe_cpu_info_get(SPE_COUNT_PHYSICAL_SPES, i));
136       printf("node[%d].usable_spes   = %2d\n", i,
137              spe_cpu_info_get(SPE_COUNT_USABLE_SPES, i));
138     }
139   }
140
141   // clamp nspes
142   d_options.nspes = std::min(d_options.nspes, (unsigned int) MAX_SPES);
143   nusable_spes = std::min(nusable_spes, (int) MAX_SPES);
144
145   //
146   // sanity check requested number of spes.
147   //
148   if (d_options.nspes == 0)     // use all of them
149     d_options.nspes = nusable_spes;
150   else {
151     if (d_options.nspes > (unsigned int) nusable_spes){
152       fprintf(stderr,
153               "gc_job_manager: warning: caller requested %d spes.  There are only %d available.\n",
154               d_options.nspes, nusable_spes);
155       if (d_options.gang_schedule){
156         // If we're gang scheduling we'll never get scheduled if we
157         // ask for more than are available.
158         throw std::out_of_range("gang_scheduling: not enough spes available");
159       }
160       else {    // FIXME clamp to usable.  problem on PS3 when overcommited
161         fprintf(stderr, "gc_job_manager: clamping nspes to %d\n", nusable_spes);
162         d_options.nspes = nusable_spes;
163       }
164     }
165   }
166
167   if (d_options.use_affinity){
168     printf("gc_job_manager: warning: affinity request was ignored\n");
169   }
170
171   if (d_options.gang_schedule){
172     d_gang = spe_gang_context_sptr(spe_gang_context_create(0), gang_deleter());
173     if (!d_gang){
174       perror("gc_job_manager_impl[spe_gang_context_create]");
175       throw std::runtime_error("spe_gang_context_create");
176     }
177   }
178
179   // ----------------------------------------------------------------
180   // initalize the job queue
181   
182   d_queue = (gc_jd_queue_t *) gc_aligned_alloc(sizeof(gc_jd_queue_t), CACHE_LINE_SIZE);
183   _d_queue_boost =
184     boost::shared_ptr<void>((void *) d_queue, free_deleter());
185   gc_jd_queue_init(d_queue);
186
187
188   // ----------------------------------------------------------------
189   // create the spe contexts
190
191   // 1 spu_arg struct for each SPE
192   assert(sizeof(gc_spu_args_t) % 16 == 0);
193   d_spu_args =
194     (gc_spu_args_t *) gc_aligned_alloc(MAX_SPES * sizeof(gc_spu_args_t), 16);
195   _d_spu_args_boost =
196     boost::shared_ptr<void>((void *) d_spu_args, free_deleter());
197
198   // 2 completion info structs for each SPE (we double buffer them)
199   assert(sizeof(gc_comp_info_t) % CACHE_LINE_SIZE == 0);
200   d_comp_info =
201     (gc_comp_info_t *) gc_aligned_alloc(2 * MAX_SPES * sizeof(gc_comp_info_t),
202                                         CACHE_LINE_SIZE);
203   _d_comp_info_boost =
204     boost::shared_ptr<void>((void *) d_comp_info, free_deleter());
205
206
207   // get a handle to the spe program
208
209   spe_program_handle_t *spe_image = d_options.program_handle.get();
210
211   // fish proc_def table out of SPE ELF file
212
213   if (!gcpd_find_table(spe_image, &d_proc_def, &d_nproc_defs, &d_proc_def_ls_addr)){
214     fprintf(stderr, "gc_job_manager_impl: couldn't find gc_proc_defs in SPE ELF file.\n");
215     throw std::runtime_error("no gc_proc_defs");
216   }
217   // fprintf(stderr, "d_proc_def_ls_addr = 0x%0x\n", d_proc_def_ls_addr);
218
219   int spe_flags = (SPE_EVENTS_ENABLE
220                    | SPE_CFG_SIGNOTIFY1_OR
221                    | SPE_CFG_SIGNOTIFY2_OR);
222   
223   for (unsigned int i = 0; i < d_options.nspes; i++){
224     // FIXME affinity stuff goes here
225     d_worker[i].spe_ctx = spe_context_create(spe_flags, d_gang.get());;
226     if (d_worker[i].spe_ctx == 0){
227       perror("spe_context_create");
228       throw std::runtime_error("spe_context_create");
229     }
230     d_worker[i].spe_idx = i;
231     d_worker[i].spu_args = &d_spu_args[i];
232     d_worker[i].spu_args->queue = ptr_to_ea(d_queue);
233     d_worker[i].spu_args->comp_info[0] = ptr_to_ea(&d_comp_info[2*i+0]);
234     d_worker[i].spu_args->comp_info[1] = ptr_to_ea(&d_comp_info[2*i+1]);
235     d_worker[i].spu_args->spu_idx = i;
236     d_worker[i].spu_args->nspus = d_options.nspes;
237     d_worker[i].spu_args->proc_def_ls_addr = d_proc_def_ls_addr;
238     d_worker[i].spu_args->nproc_defs = d_nproc_defs;
239     d_worker[i].spu_args->log.base = 0;
240     d_worker[i].spu_args->log.nentries = 0;
241     d_worker[i].state = WS_INIT;
242
243     int r = spe_program_load(d_worker[i].spe_ctx, spe_image);
244     if (r != 0){
245       perror("spe_program_load");
246       throw std::runtime_error("spe_program_load");
247     }
248   }
249
250   setup_logfiles();
251
252   // ----------------------------------------------------------------
253   // initalize the free list of job descriptors
254   
255   d_free_list = (gc_jd_stack_t *) gc_aligned_alloc(sizeof(gc_jd_stack_t), CACHE_LINE_SIZE);
256   // This ensures that the memory associated with d_free_list is
257   // automatically freed in the destructor or if an exception occurs
258   // here in the constructor.
259   _d_free_list_boost =
260     boost::shared_ptr<void>((void *) d_free_list, free_deleter());
261   gc_jd_stack_init(d_free_list);
262
263   if (debug()){
264     printf("sizeof(d_jd[0]) = %d (0x%x)\n", sizeof(d_jd[0]), sizeof(d_jd[0]));
265     printf("max_jobs = %u\n", d_options.max_jobs);
266   }
267
268   // Initialize the array of job descriptors.
269   d_jd = (gc_job_desc_t *) gc_aligned_alloc(sizeof(d_jd[0]) * d_options.max_jobs, CACHE_LINE_SIZE);
270   _d_jd_boost = boost::shared_ptr<void>((void *) d_jd, free_deleter());
271
272
273   // set unique job_id
274   for (int i = 0; i < (int) d_options.max_jobs; i++)
275     d_jd[i].sys.job_id = i;
276
277   // push them onto the free list
278   for (int i = d_options.max_jobs - 1; i >= 0; i--)
279     free_job_desc(&d_jd[i]);
280
281   // ----------------------------------------------------------------
282   // initialize d_client_thread
283
284   {
285     gc_client_thread_info_sa cti(
286          new gc_client_thread_info[d_options.max_client_threads]);
287
288     d_client_thread.swap(cti);
289
290     for (unsigned int i = 0; i < d_options.max_client_threads; i++)
291       d_client_thread[i].d_client_id = i;
292   }
293
294   // ----------------------------------------------------------------
295   // initialize bitvectors
296
297   // initialize d_bvlen, the number of longs in job related bitvectors.
298   int bits_per_long = sizeof(unsigned long) * 8;
299   d_bvlen = (d_options.max_jobs + bits_per_long - 1) / bits_per_long;
300
301   // allocate all bitvectors in a single cache-aligned chunk
302   size_t nlongs = d_bvlen * d_options.max_client_threads;
303   void *p = gc_aligned_alloc(nlongs * sizeof(unsigned long), CACHE_LINE_SIZE);
304   _d_all_bitvectors = boost::shared_ptr<void>(p, free_deleter());
305
306   // Now point the gc_client_thread_info bitvectors into this storage
307   unsigned long *v = (unsigned long *) p;
308
309   for (unsigned int i = 0; i < d_options.max_client_threads; i++, v += d_bvlen)
310     d_client_thread[i].d_jobs_done = v;
311
312
313   // ----------------------------------------------------------------
314   // create the spe event handler & worker (SPE) threads
315
316   create_event_handler();
317
318 }
319
320 ////////////////////////////////////////////////////////////////////////
321
322 gc_job_manager_impl::~gc_job_manager_impl()
323 {
324   shutdown();
325
326   d_jd = 0;             // handled via _d_jd_boost
327   d_free_list = 0;      // handled via _d_free_list_boost
328   d_queue = 0;          // handled via _d_queue_boost
329
330   // clear cti, since we've deleted the underlying data
331   pthread_setspecific(s_client_key, 0);
332
333   unmap_logfiles();
334 }
335
336 bool
337 gc_job_manager_impl::shutdown()
338 {
339   omni_mutex_lock       l(d_eh_mutex);
340
341   d_shutdown_requested = true;          // set flag for event handler thread
342
343   // should only happens during early QA code
344   if (d_eh_thread == 0 && d_eh_state == EHS_INIT)
345     return false;
346
347   while (d_eh_state != EHS_DEAD)        // wait for it to finish
348     d_eh_cond.wait();
349
350   return true;
351 }
352
353 int
354 gc_job_manager_impl::nspes() const
355 {
356   return d_options.nspes;
357 }
358
359 ////////////////////////////////////////////////////////////////////////
360
361 void
362 gc_job_manager_impl::bv_zero(unsigned long *bv)
363 {
364   memset(bv, 0, sizeof(unsigned long) * d_bvlen);
365 }
366
367 inline void
368 gc_job_manager_impl::bv_clr(unsigned long *bv, unsigned int bitno)
369 {
370   unsigned int wi = bitno / (sizeof (unsigned long) * 8);
371   unsigned int bi = bitno & ((sizeof (unsigned long) * 8) - 1);
372   bv[wi] &= ~(1UL << bi);
373 }
374
375 inline void
376 gc_job_manager_impl::bv_set(unsigned long *bv, unsigned int bitno)
377 {
378   unsigned int wi = bitno / (sizeof (unsigned long) * 8);
379   unsigned int bi = bitno & ((sizeof (unsigned long) * 8) - 1);
380   bv[wi] |= (1UL << bi);
381 }
382
383 inline bool
384 gc_job_manager_impl::bv_isset(unsigned long *bv, unsigned int bitno)
385 {
386   unsigned int wi = bitno / (sizeof (unsigned long) * 8);
387   unsigned int bi = bitno & ((sizeof (unsigned long) * 8) - 1);
388   return (bv[wi] & (1UL << bi)) != 0;
389 }
390
391 inline bool
392 gc_job_manager_impl::bv_isclr(unsigned long *bv, unsigned int bitno)
393 {
394   unsigned int wi = bitno / (sizeof (unsigned long) * 8);
395   unsigned int bi = bitno & ((sizeof (unsigned long) * 8) - 1);
396   return (bv[wi] & (1UL << bi)) == 0;
397 }
398
399 ////////////////////////////////////////////////////////////////////////
400
401 gc_job_desc *
402 gc_job_manager_impl::alloc_job_desc()
403 {
404   // stack is lock free, and safe to call from any thread
405   gc_job_desc *jd = gc_jd_stack_pop(d_free_list);
406   if (jd == 0)
407     throw gc_bad_alloc("alloc_job_desc: none available");
408
409   return jd;
410 }
411
412 void
413 gc_job_manager_impl::free_job_desc(gc_job_desc *jd)
414 {
415   // stack is lock free, thus safe to call from any thread
416   if (jd != 0)
417     gc_jd_stack_push(d_free_list, jd);
418 }
419
420 ////////////////////////////////////////////////////////////////////////
421
422 /*
423  * We check as much as we can here on the PPE side, so that the SPE
424  * doesn't have to.
425  */
426 static bool
427 check_direct_args(gc_job_desc *jd, gc_job_direct_args *args)
428 {
429   if (args->nargs > MAX_ARGS_DIRECT){
430     jd->status = JS_BAD_N_DIRECT;
431     return false;
432   }
433
434   return true;
435 }
436
437 static bool
438 check_ea_args(gc_job_desc *jd, gc_job_ea_args *p)
439 {
440   if (p->nargs > MAX_ARGS_EA){
441     jd->status = JS_BAD_N_EA;
442     return false;
443   }
444
445   uint32_t dir_union = 0;
446
447   for (unsigned int i = 0; i < p->nargs; i++){
448     dir_union |= p->arg[i].direction;
449     switch(p->arg[i].direction){
450     case GCJD_DMA_GET:
451     case GCJD_DMA_PUT:
452       break;
453
454     default:
455       jd->status = JS_BAD_DIRECTION;
456       return false;
457     }
458   }
459
460   if (p->nargs > 1){
461     unsigned int common_eah = (p->arg[0].ea_addr) >> 32;
462     for (unsigned int i = 1; i < p->nargs; i++){
463       if ((p->arg[i].ea_addr >> 32) != common_eah){
464         jd->status = JS_BAD_EAH;
465         return false;
466       }
467     }
468   }
469
470   jd->sys.direction_union = dir_union;
471   return true;
472 }
473
474 bool
475 gc_job_manager_impl::submit_job(gc_job_desc *jd)
476 {
477   if (unlikely(d_shutdown_requested)){
478     jd->status = JS_SHUTTING_DOWN;
479     return false;
480   }
481
482   // Ensure it's one of our job descriptors
483
484   if (jd < d_jd || jd >= &d_jd[d_options.max_jobs]){
485     jd->status = JS_BAD_JOB_DESC;
486     return false;
487   }
488
489   // Ensure we've got a client_thread_info assigned to this thread.
490   
491   gc_client_thread_info *cti =
492     (gc_client_thread_info *) pthread_getspecific(s_client_key);
493   if (unlikely(cti == 0)){
494     if ((cti = alloc_cti()) == 0){
495       fprintf(stderr, "gc_job_manager_impl::submit_job: Too many client threads.\n");
496       jd->status = JS_TOO_MANY_CLIENTS;
497       return false;
498     }
499     int r = pthread_setspecific(s_client_key, cti);
500     if (r != 0){
501       jd->status = JS_BAD_JUJU;
502       fprintf(stderr, "pthread_setspecific failed (return = %d)\n", r);
503       return false;
504     }
505   }
506
507   if (jd->proc_id == GCP_UNKNOWN_PROC){
508     jd->status = JS_UNKNOWN_PROC;
509     return false;
510   }
511
512   if (!check_direct_args(jd, &jd->input))
513     return false;
514
515   if (!check_direct_args(jd, &jd->output))
516     return false;
517
518   if (!check_ea_args(jd, &jd->eaa))
519     return false;
520
521   jd->status = JS_OK;
522   jd->sys.client_id = cti->d_client_id;
523
524   // FIXME keep count of jobs in progress?
525   
526   gc_jd_queue_enqueue(d_queue, jd);
527   return true;
528 }
529
530 bool
531 gc_job_manager_impl::wait_job(gc_job_desc *jd)
532 {
533   bool done;
534   return wait_jobs(1, &jd, &done, GC_WAIT_ANY) == 1 && jd->status == JS_OK;
535 }
536
537 int
538 gc_job_manager_impl::wait_jobs(unsigned int njobs,
539                                gc_job_desc *jd[],
540                                bool done[],
541                                gc_wait_mode mode)
542 {
543   unsigned int i;
544
545   gc_client_thread_info *cti =
546     (gc_client_thread_info *) pthread_getspecific(s_client_key);
547   if (unlikely(cti == 0))
548     return -1;
549
550   for (i = 0; i < njobs; i++){
551     done[i] = false;
552     if (unlikely(jd[i]->sys.client_id != cti->d_client_id)){
553       fprintf(stderr, "gc_job_manager_impl::wait_jobs: can't wait for a job you didn't submit\n");
554       return -1;
555     }
556   }
557
558   {
559     omni_mutex_lock     l(cti->d_mutex);
560
561     // setup info for event handler
562     cti->d_state = (mode == GC_WAIT_ANY) ? CT_WAIT_ANY : CT_WAIT_ALL;
563     cti->d_njobs_waiting_for = njobs;
564     cti->d_jobs_waiting_for = jd;
565     assert(cti->d_jobs_done != 0);
566
567     unsigned int ndone = 0;
568
569     // wait for jobs to complete
570     
571     while (1){
572       ndone = 0;
573       for (i= 0; i < njobs; i++){
574         if (done[i])
575           ndone++;
576         else if (bv_isset(cti->d_jobs_done, jd[i]->sys.job_id)){
577           bv_clr(cti->d_jobs_done, jd[i]->sys.job_id);
578           done[i] = true;
579           ndone++;
580         }
581       }
582
583       if (mode == GC_WAIT_ANY && ndone > 0)
584         break;
585
586       if (mode == GC_WAIT_ALL && ndone == njobs)
587         break;
588
589       // FIXME what happens when somebody calls shutdown?
590
591       cti->d_cond.wait();       // wait for event handler to wake us up
592     }
593
594     cti->d_state = CT_NOT_WAITING;  
595     cti->d_njobs_waiting_for = 0;       // tidy up (not reqd)
596     cti->d_jobs_waiting_for = 0;        // tidy up (not reqd)
597     return ndone;
598   }
599 }
600
601 ////////////////////////////////////////////////////////////////////////
602
603 bool
604 gc_job_manager_impl::send_all_spes(uint32_t msg)
605 {
606   bool ok = true;
607
608   for (unsigned int i = 0; i < d_options.nspes; i++)
609     ok &= send_spe(i, msg);
610
611   return ok;
612 }
613
614 bool
615 gc_job_manager_impl::send_spe(unsigned int spe, uint32_t msg)
616 {
617   if (spe >= d_options.nspes)
618     return false;
619
620   int r = spe_in_mbox_write(d_worker[spe].spe_ctx, &msg, 1,
621                             SPE_MBOX_ALL_BLOCKING);
622   if (r < 0){
623     perror("spe_in_mbox_write");
624     return false;
625   }
626
627   return r == 1;
628 }
629
630 ////////////////////////////////////////////////////////////////////////
631
632 static void
633 pthread_create_failure_msg(int r, const char *which)
634 {
635   char buf[256];
636   char *s = 0;
637
638   switch (r){
639   case EAGAIN: s = "EAGAIN"; break;
640   case EINVAL: s = "EINVAL"; break;
641   case EPERM:  s = "EPERM";  break;
642   default:
643     snprintf(buf, sizeof(buf), "Unknown error %d", r);
644     s = buf;
645     break;
646   }
647   fprintf(stderr, "pthread_create[%s] failed: %s\n", which, s);
648 }
649
650
651 static bool
652 start_thread(pthread_t *thread,
653              void *(*start_routine)(void *),  void *arg,
654              const char *msg)
655 {
656   pthread_attr_t attr;
657   pthread_attr_init(&attr);
658   pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
659
660   // FIXME save sigprocmask
661   // FIXME set sigprocmask
662
663   int r = pthread_create(thread, &attr, start_routine, arg);
664     
665   // FIXME restore sigprocmask
666
667   if (r != 0){
668     pthread_create_failure_msg(r, msg);
669     return false;
670   }
671   return true;
672 }
673
674
675 ////////////////////////////////////////////////////////////////////////
676
677 static void *start_worker(void *arg);
678
679 static void *
680 start_event_handler(void *arg)
681 {
682   gc_job_manager_impl *p = (gc_job_manager_impl *) arg;
683   p->event_handler_loop();
684   return 0;
685 }
686
687 void
688 gc_job_manager_impl::create_event_handler()
689 {
690   // create the SPE event handler and register our interest in events
691
692   d_spe_event_handler.ptr = spe_event_handler_create();
693   if (d_spe_event_handler.ptr == 0){
694     perror("spe_event_handler_create");
695     throw std::runtime_error("spe_event_handler_create");
696   }
697
698   for (unsigned int i = 0; i < d_options.nspes; i++){
699     spe_event_unit_t    eu;
700     memset(&eu, 0, sizeof(eu));
701     eu.events = SPE_EVENT_OUT_INTR_MBOX | SPE_EVENT_SPE_STOPPED;
702     eu.spe = d_worker[i].spe_ctx;
703     eu.data.u32 = i;    // set in events returned by spe_event_wait
704
705     if (spe_event_handler_register(d_spe_event_handler.ptr, &eu) != 0){
706       perror("spe_event_handler_register");
707       throw std::runtime_error("spe_event_handler_register");
708     }
709   }
710
711   // create our event handling thread
712
713   if (!start_thread(&d_eh_thread, start_event_handler, this, "event_handler")){
714     throw std::runtime_error("pthread_create");
715   }
716
717   // create the SPE worker threads
718
719   bool ok = true;
720   for (unsigned int i = 0; ok && i < d_options.nspes; i++){
721     char name[256];
722     snprintf(name, sizeof(name), "worker[%d]", i);
723     ok &= start_thread(&d_worker[i].thread, start_worker,
724                        &d_worker[i], name);
725   }
726
727   if (!ok){
728     //
729     // FIXME Clean up the mess.  Need to terminate event handler and all workers.
730     //
731     // this should cause the workers to exit, unless they're seriously broken
732     send_all_spes(MK_MBOX_MSG(OP_EXIT, 0));
733
734     shutdown();
735
736     throw std::runtime_error("pthread_create");
737   }
738 }
739
740 ////////////////////////////////////////////////////////////////////////
741
742 void
743 gc_job_manager_impl::set_eh_state(evt_handler_state s)
744 {
745   omni_mutex_lock       l(d_eh_mutex);
746   d_eh_state = s;
747   d_eh_cond.broadcast();
748 }
749
750 void
751 gc_job_manager_impl::set_ea_args_maxsize(int maxsize)
752 {
753   omni_mutex_lock       l(d_eh_mutex);
754   d_ea_args_maxsize = maxsize;
755   d_eh_cond.broadcast();
756 }
757
758 void
759 gc_job_manager_impl::print_event(spe_event_unit_t *evt)
760 {
761   printf("evt: spe = %d events = (0x%x)", evt->data.u32, evt->events);
762
763   if (evt->events & SPE_EVENT_OUT_INTR_MBOX)
764     printf(" OUT_INTR_MBOX");
765   
766   if (evt->events & SPE_EVENT_IN_MBOX)
767     printf(" IN_MBOX");
768   
769   if (evt->events & SPE_EVENT_TAG_GROUP)
770     printf(" TAG_GROUP");
771   
772   if (evt->events & SPE_EVENT_SPE_STOPPED)
773     printf(" SPE_STOPPED");
774
775   printf("\n");
776 }
777
778 struct job_client_info {
779   uint16_t      job_id;
780   uint16_t      client_id;
781 };
782
783 static int
784 compare_jci_clients(const void *va, const void *vb)
785 {
786   const job_client_info *a = (job_client_info *) va;
787   const job_client_info *b = (job_client_info *) vb;
788
789   return a->client_id - b->client_id;
790 }
791
792 void
793 gc_job_manager_impl::notify_clients_jobs_are_done(unsigned int spe_num,
794                                                   unsigned int completion_info_idx)
795 {
796   const char *msg = "gc_job_manager_impl::notify_client_job_is_done (INTERNAL ERROR)";
797
798   smp_rmb();  // order reads so we know that data sent from SPE is here
799
800   gc_comp_info_t *ci = &d_comp_info[2 * spe_num + (completion_info_idx & 0x1)];
801
802   if (ci->ncomplete == 0){      // never happens, but ensures code below is correct
803     ci->in_use = 0;
804     return;
805   }
806
807   if (0){
808     static int total_jobs;
809     static int total_msgs;
810     total_msgs++;
811     total_jobs += ci->ncomplete;
812     printf("ppe:     tj = %6d  tm = %6d\n", total_jobs, total_msgs);
813   }
814
815   job_client_info gci[GC_CI_NJOBS];
816
817   /*
818    * Make one pass through and sanity check everything while filling in gci
819    */
820   for (unsigned int i = 0; i < ci->ncomplete; i++){
821     unsigned int job_id = ci->job_id[i];
822
823     if (job_id >= d_options.max_jobs){
824       // internal error, shouldn't happen
825       fprintf(stderr,"%s: invalid job_id = %d\n", msg, job_id);
826       ci->in_use = 0;           // clear flag so SPE knows we're done with it
827       return;
828     }
829     gc_job_desc *jd = &d_jd[job_id];
830
831     if (jd->sys.client_id >= d_options.max_client_threads){
832       // internal error, shouldn't happen
833       fprintf(stderr, "%s: invalid client_id = %d\n", msg, jd->sys.client_id);
834       ci->in_use = 0;           // clear flag so SPE knows we're done with it
835       return;
836     }
837
838     gci[i].job_id = job_id;
839     gci[i].client_id = jd->sys.client_id;
840   }
841
842   // sort by client_id so we only have to lock & signal once / client
843
844   if (ci->ncomplete > 1)
845     qsort(gci, ci->ncomplete, sizeof(gci[0]), compare_jci_clients);
846
847   // "wind-in" 
848
849   gc_client_thread_info *last_cti = &d_client_thread[gci[0].client_id];
850   last_cti->d_mutex.lock();
851   bv_set(last_cti->d_jobs_done, gci[0].job_id);  // mark job done
852
853   for (unsigned int i = 1; i < ci->ncomplete; i++){
854
855     gc_client_thread_info *cti = &d_client_thread[gci[i].client_id];
856
857     if (cti != last_cti){       // new client?
858
859       // yes.  signal old client, unlock old, lock new
860
861       // FIXME we could distinguish between CT_WAIT_ALL & CT_WAIT_ANY
862
863       if (last_cti->d_state == CT_WAIT_ANY || last_cti->d_state == CT_WAIT_ALL)
864         last_cti->d_cond.signal();      // wake client thread up
865
866       last_cti->d_mutex.unlock();
867       cti->d_mutex.lock();
868       last_cti = cti;
869     }
870
871     // mark job done
872     bv_set(cti->d_jobs_done, gci[i].job_id);
873   }
874
875   // "wind-out"
876
877   if (last_cti->d_state == CT_WAIT_ANY || last_cti->d_state == CT_WAIT_ALL)
878     last_cti->d_cond.signal();  // wake client thread up
879   last_cti->d_mutex.unlock();
880
881   ci->in_use = 0;               // clear flag so SPE knows we're done with it
882 }
883
884 void
885 gc_job_manager_impl::handle_event(spe_event_unit_t *evt)
886 {
887   // print_event(evt);
888
889   int spe_num = evt->data.u32;
890
891   // only a single event type can be signaled at a time
892   
893   if (evt->events == SPE_EVENT_OUT_INTR_MBOX) { // SPE sent us 1 or more msgs
894     static const int NMSGS = 32;
895     unsigned int msg[NMSGS];
896     int n = spe_out_intr_mbox_read(evt->spe, msg, NMSGS, SPE_MBOX_ANY_BLOCKING);
897     // printf("spe_out_intr_mbox_read = %d\n", n);
898     if (n < 0){
899       perror("spe_out_intr_mbox_read");
900     }
901     else {
902       for (int i = 0; i < n; i++){
903         switch(MBOX_MSG_OP(msg[i])){
904         case OP_JOBS_DONE:
905           if (debug())
906             printf("eh: job_done (0x%08x) from spu[%d]\n", msg[i], spe_num);
907           notify_clients_jobs_are_done(spe_num, MBOX_MSG_ARG(msg[i]));
908           break;
909
910         case OP_SPU_BUFSIZE:
911           set_ea_args_maxsize(MBOX_MSG_ARG(msg[i]));
912           break;
913
914         case OP_EXIT:
915         default:
916           printf("eh: Unexpected msg (0x%08x) from spu[%d]\n", msg[i], spe_num);
917           break;
918         }
919       }
920     }
921   }
922   else if (evt->events == SPE_EVENT_SPE_STOPPED){ // the SPE stopped
923     spe_stop_info_t si;
924     int r = spe_stop_info_read(evt->spe, &si);
925     if (r < 0){
926       perror("spe_stop_info_read");
927     }
928     else {
929       switch (si.stop_reason){
930       case SPE_EXIT:
931         if (debug()){
932           printf("eh: spu[%d] SPE_EXIT w/ exit_code = %d\n",
933                  spe_num, si.result.spe_exit_code);
934         }
935         break;
936       case SPE_STOP_AND_SIGNAL:
937         printf("eh: spu[%d] SPE_STOP_AND_SIGNAL w/ spe_signal_code = 0x%x\n",
938                spe_num, si.result.spe_signal_code);
939         break;
940       case SPE_RUNTIME_ERROR:
941         printf("eh: spu[%d] SPE_RUNTIME_ERROR w/ spe_runtime_error = 0x%x\n",
942                spe_num, si.result.spe_runtime_error);
943         break;
944       case SPE_RUNTIME_EXCEPTION:
945         printf("eh: spu[%d] SPE_RUNTIME_EXCEPTION w/ spe_runtime_exception = 0x%x\n",
946                spe_num, si.result.spe_runtime_exception);
947         break;
948       case SPE_RUNTIME_FATAL:
949         printf("eh: spu[%d] SPE_RUNTIME_FATAL w/ spe_runtime_fatal = 0x%x\n",
950                spe_num, si.result.spe_runtime_fatal);
951         break;
952       case SPE_CALLBACK_ERROR:
953         printf("eh: spu[%d] SPE_CALLBACK_ERROR w/ spe_callback_error = 0x%x\n",
954                spe_num, si.result.spe_callback_error);
955         break;
956       case SPE_ISOLATION_ERROR:
957         printf("eh: spu[%d] SPE_ISOLATION_ERROR w/ spe_isolation_error = 0x%x\n",
958                spe_num, si.result.spe_isolation_error);
959         break;
960       default:
961         printf("eh: spu[%d] UNKNOWN STOP REASON (%d) w/ spu_status = 0x%x\n",
962                spe_num, si.stop_reason, si.spu_status);
963         break;
964       }
965     }
966   }
967 #if 0 // not enabled
968   else if (evt->events == SPE_EVENT_IN_MBOX){    // there's room to write to SPE
969     // spe_in_mbox_write (ignore)
970   }
971   else if (evt->events == SPE_EVENT_TAG_GROUP){  // our DMA completed
972     // spe_mfcio_tag_status_read
973   }
974 #endif
975   else {
976     fprintf(stderr, "handle_event: unexpected evt->events = 0x%x\n", evt->events);
977     return;
978   }
979 }
980
981 //
982 // This is the "main program" of the event handling thread
983 //
984 void
985 gc_job_manager_impl::event_handler_loop()
986 {
987   static const int MAX_EVENTS = 16;
988   static const int TIMEOUT = 20;        // how long to block in milliseconds
989
990   spe_event_unit_t events[MAX_EVENTS];
991
992   if (d_debug)
993     printf("event_handler_loop: starting\n");
994
995   set_eh_state(EHS_RUNNING);
996
997   // ask the first spe for its max bufsize
998   send_spe(0, MK_MBOX_MSG(OP_GET_SPU_BUFSIZE, 0));
999
1000   while (1){
1001     switch(d_eh_state){
1002
1003     case EHS_RUNNING:      // normal stuff
1004       if (d_shutdown_requested) {
1005         set_eh_state(EHS_SHUTTING_DOWN);
1006       }
1007       break;
1008
1009     case EHS_SHUTTING_DOWN:
1010
1011       // FIXME wait until job queue is empty, then tell them to exit
1012
1013       send_all_spes(MK_MBOX_MSG(OP_EXIT, 0));
1014       set_eh_state(EHS_WAITING_FOR_WORKERS_TO_DIE);
1015       break;
1016
1017     case EHS_WAITING_FOR_WORKERS_TO_DIE:
1018       {
1019         bool all_dead = true;
1020         for (unsigned int i = 0; i < d_options.nspes; i++)
1021           all_dead &= d_worker[i].state == WS_DEAD;
1022
1023         if (all_dead){
1024           set_eh_state(EHS_DEAD);
1025           if (d_debug)
1026             printf("event_handler_loop: exiting\n");
1027           return;
1028         }
1029       }
1030       break;
1031
1032     default:
1033       set_eh_state(EHS_DEAD);
1034       printf("event_handler_loop(default): exiting\n");
1035       return;
1036     }
1037
1038     // block waiting for events...
1039     int nevents = spe_event_wait(d_spe_event_handler.ptr,
1040                                  events, MAX_EVENTS, TIMEOUT);
1041     if (nevents < 0){
1042       perror("spe_wait_event");
1043       // FIXME bail?
1044     }
1045     for (int i = 0; i < nevents; i++){
1046       handle_event(&events[i]);
1047     }
1048   }
1049 }
1050
1051 ////////////////////////////////////////////////////////////////////////
1052 // This is the top of the SPE worker threads
1053
1054 static void *
1055 start_worker(void *arg)
1056 {
1057   worker_ctx *w = (worker_ctx *) arg;
1058   spe_stop_info_t       si;
1059
1060   w->state = WS_RUNNING;
1061   if (s_worker_debug)
1062     printf("worker[%d]: WS_RUNNING\n", w->spe_idx);
1063
1064   unsigned int entry = SPE_DEFAULT_ENTRY;
1065   int r = spe_context_run(w->spe_ctx,  &entry, 0, w->spu_args, 0, &si);
1066
1067   if (r < 0){                   // error
1068     char buf[64];
1069     snprintf(buf, sizeof(buf), "worker[%d]: spe_context_run", w->spe_idx);
1070     perror(buf);
1071   }
1072   else if (r == 0){
1073     // spe program called exit.
1074     if (s_worker_debug)
1075       printf("worker[%d]: SPE_EXIT w/ exit_code = %d\n",
1076              w->spe_idx, si.result.spe_exit_code);
1077   }
1078   else {
1079     // called stop_and_signal
1080     //
1081     // I'm not sure we'll ever get here.  I think the event
1082     // handler will catch this...
1083     printf("worker[%d]: SPE_STOP_AND_SIGNAL w/ spe_signal_code = 0x%x\n",
1084            w->spe_idx, si.result.spe_signal_code);
1085   }
1086
1087   // in any event, we're committing suicide now ;)
1088   if (s_worker_debug)
1089     printf("worker[%d]: WS_DEAD\n", w->spe_idx);
1090
1091   w->state = WS_DEAD;
1092   return 0;
1093 }
1094
1095 ////////////////////////////////////////////////////////////////////////
1096
1097 gc_client_thread_info *
1098 gc_job_manager_impl::alloc_cti()
1099 {
1100   for (unsigned int i = 0; i < d_options.max_client_threads; i++){
1101     if (d_client_thread[i].d_free){
1102       // try to atomically grab it
1103       if (_atomic_dec_if_positive(ptr_to_ea(&d_client_thread[i].d_free)) == 0){
1104         // got it...
1105         gc_client_thread_info *cti = &d_client_thread[i];
1106         cti->d_state = CT_NOT_WAITING;
1107         bv_zero(cti->d_jobs_done);
1108         cti->d_njobs_waiting_for = 0;
1109         cti->d_jobs_waiting_for = 0;
1110         
1111         return cti;
1112       }
1113     }
1114   }
1115   return 0;
1116 }
1117
1118 void
1119 gc_job_manager_impl::free_cti(gc_client_thread_info *cti)
1120 {
1121   assert((size_t) (cti - d_client_thread.get()) < d_options.max_client_threads);
1122   cti->d_free = 1;
1123 }
1124
1125 int
1126 gc_job_manager_impl::ea_args_maxsize()
1127 {
1128   omni_mutex_lock       l(d_eh_mutex);
1129
1130   while (d_ea_args_maxsize == 0)        // wait for it to be initialized
1131     d_eh_cond.wait();
1132
1133   return d_ea_args_maxsize;
1134 }
1135
1136 void
1137 gc_job_manager_impl::set_debug(int debug)
1138 {
1139   d_debug = debug;
1140   s_worker_debug = debug;
1141 }
1142
1143 int
1144 gc_job_manager_impl::debug()
1145 {
1146   return d_debug;
1147 }
1148
1149 ////////////////////////////////////////////////////////////////////////
1150
1151 void
1152 gc_job_manager_impl::setup_logfiles()
1153 {
1154   if (!d_options.enable_logging)
1155     return;
1156
1157   if (d_options.log2_nlog_entries == 0)
1158     d_options.log2_nlog_entries = 12;
1159
1160   // must end up a multiple of the page size
1161
1162   size_t pagesize = getpagesize();
1163   size_t s = (1 << d_options.log2_nlog_entries) * sizeof(gc_log_entry_t);
1164   s = ((s + pagesize - 1) / pagesize) * pagesize;
1165   size_t nentries = s / sizeof(gc_log_entry_t);
1166   assert(is_power_of_2(nentries));
1167
1168   for (unsigned int i = 0; i < d_options.nspes; i++){
1169     char filename[100];
1170     snprintf(filename, sizeof(filename), "spu_log.%02d", i);
1171     int fd = open(filename, O_CREAT|O_TRUNC|O_RDWR, 0664);
1172     if (fd == -1){
1173       perror(filename);
1174       return;
1175     }
1176     lseek(fd, s - 1, SEEK_SET);
1177     write(fd, "\0", 1);
1178     void *p = mmap(0, s, PROT_READ|PROT_WRITE, MAP_SHARED, fd, 0);
1179     if (p == MAP_FAILED){
1180       perror("gc_job_manager_impl::setup_logfiles: mmap");
1181       close(fd);
1182       return;
1183     }
1184     close(fd);
1185     memset(p, 0, s);
1186     d_spu_args[i].log.base = ptr_to_ea(p);
1187     d_spu_args[i].log.nentries = nentries;
1188   }
1189 }
1190
1191 void
1192 gc_job_manager_impl::sync_logfiles()
1193 {
1194   for (unsigned int i = 0; i < d_options.nspes; i++){
1195     if (d_spu_args[i].log.base)
1196       msync(ea_to_ptr(d_spu_args[i].log.base),
1197             d_spu_args[i].log.nentries * sizeof(gc_log_entry_t),
1198             MS_ASYNC);
1199   }
1200 }
1201
1202 void
1203 gc_job_manager_impl::unmap_logfiles()
1204 {
1205   for (unsigned int i = 0; i < d_options.nspes; i++){
1206     if (d_spu_args[i].log.base)
1207       munmap(ea_to_ptr(d_spu_args[i].log.base),
1208              d_spu_args[i].log.nentries * sizeof(gc_log_entry_t));
1209   }
1210 }
1211
1212 ////////////////////////////////////////////////////////////////////////
1213 //
1214 // lookup proc names in d_proc_def table
1215
1216 gc_proc_id_t 
1217 gc_job_manager_impl::lookup_proc(const std::string &proc_name)
1218 {
1219   for (int i = 0; i < d_nproc_defs; i++)
1220     if (proc_name == d_proc_def[i].name)
1221       return i;
1222
1223   throw gc_unknown_proc(proc_name);
1224 }
1225
1226 std::vector<std::string>
1227 gc_job_manager_impl::proc_names()
1228 {
1229   std::vector<std::string> r;
1230   for (int i = 0; i < d_nproc_defs; i++)
1231     r.push_back(d_proc_def[i].name);
1232
1233   return r;
1234 }
1235
1236 ////////////////////////////////////////////////////////////////////////
1237
1238 worker_ctx::~worker_ctx()
1239 {
1240   if (spe_ctx){
1241     int r = spe_context_destroy(spe_ctx);
1242     if (r != 0){
1243       perror("spe_context_destroy");
1244     }
1245     spe_ctx = 0;
1246   }
1247   state = WS_FREE;
1248 }