Imported Upstream version 3.2.2
[debian/gnuradio] / gcell / lib / runtime / gc_job_manager_impl.cc
1 /* -*- c++ -*- */
2 /*
3  * Copyright 2007,2008,2009 Free Software Foundation, Inc.
4  * 
5  * This file is part of GNU Radio
6  * 
7  * GNU Radio is free software; you can redistribute it and/or modify
8  * it under the terms of the GNU General Public License as published by
9  * the Free Software Foundation; either version 3, or (at your option)
10  * any later version.
11  * 
12  * GNU Radio is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  * GNU General Public License for more details.
16  * 
17  * You should have received a copy of the GNU General Public License along
18  * with this program; if not, write to the Free Software Foundation, Inc.,
19  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
20  */
21
22 #ifdef HAVE_CONFIG_H
23 #include <config.h>
24 #endif
25 #include "gc_job_manager_impl.h"
26 #include <gcell/gc_mbox.h>
27 #include <gcell/gc_aligned_alloc.h>
28 #include <gcell/memory_barrier.h>
29 #include <gc_proc_def_utils.h>
30 #include <atomic_dec_if_positive.h>
31 #include <stdio.h>
32 #include <stdexcept>
33 #include <stdlib.h>
34 #include <unistd.h>
35 #include <sys/mman.h>
36 #include <sys/types.h>
37 #include <sys/stat.h>
38 #include <fcntl.h>
39 #include <string.h>
40 #include <sched.h>
41
42
43 #define __nop() __asm__ volatile ("ori 0,0,0" : : : "memory")
44 #define __cctpl() __asm__ volatile ("or 1,1,1" : : : "memory")
45 #define __cctpm() __asm__ volatile ("or 2,2,2" : : : "memory")
46 #define __cctph() __asm__ volatile ("or 3,3,3" : : : "memory")
47 #define __db8cyc() __asm__ volatile ("or 28,28,28" : : : "memory")
48 #define __db10cyc() __asm__ volatile ("or 29,29,29" : : : "memory")
49 #define __db12cyc() __asm__ volatile ("or 30,30,30" : : : "memory")
50 #define __db16cyc() __asm__ volatile ("or 31,31,31" : : : "memory")
51
52
53 #if 1
54 #define CCTPL() __cctpl()
55 #define CCTPM() __cctpm()
56 #else
57 #define CCTPL() (void) 0
58 #define CCTPM() (void) 0
59 #endif
60
61 static const size_t CACHE_LINE_SIZE = 128;
62
63 static const unsigned int DEFAULT_MAX_JOBS = 128;
64 static const unsigned int DEFAULT_MAX_CLIENT_THREADS = 64;
65
66 // FIXME this really depends on the SPU code...
67 static const unsigned int MAX_TOTAL_INDIRECT_LENGTH = 16 * 1024;
68
69
70 static bool          s_key_initialized = false;
71 static pthread_key_t s_client_key;
72
73 static int s_worker_debug = 0;
74
75 // custom deleter of gang_contexts for use with boost::shared_ptr
76 class gang_deleter {
77 public:
78   void operator()(spe_gang_context_ptr_t ctx) {
79     if (ctx){
80       int r = spe_gang_context_destroy(ctx);
81       if (r != 0){
82         perror("spe_gang_context_destroy");
83       }
84     }
85   }
86 };
87
88
89 // custom deleter of anything that can be freed with "free"
90 class free_deleter {
91 public:
92   void operator()(void *p) {
93     free(p);
94   }
95 };
96
97
98 /*
99  * Called when client thread is destroyed.
100  * We mark our client info free.
101  */
102 static void
103 client_key_destructor(void *p)
104 {
105   ((gc_client_thread_info *) p)->d_free = 1;
106 }
107
108 static bool
109 is_power_of_2(uint32_t x)
110 {
111   return (x != 0) && !(x & (x - 1));
112 }
113
114 ////////////////////////////////////////////////////////////////////////
115
116
117 gc_job_manager_impl::gc_job_manager_impl(const gc_jm_options *options)
118   : d_debug(0), d_spu_args(0),
119     d_eh_cond(&d_eh_mutex), d_eh_thread(0), d_eh_state(EHS_INIT),
120     d_shutdown_requested(false),
121     d_jc_cond(&d_jc_mutex), d_jc_thread(0), d_jc_state(JCS_INIT), d_jc_njobs_active(0),
122     d_ntell(0), d_tell_start(0),
123     d_client_thread(0), d_ea_args_maxsize(0),
124     d_proc_def(0), d_proc_def_ls_addr(0), d_nproc_defs(0)
125 {
126   if (!s_key_initialized){
127     int r = pthread_key_create(&s_client_key, client_key_destructor);
128     if (r != 0)
129       throw std::runtime_error("pthread_key_create");
130     s_key_initialized = true;
131   }
132
133   // ensure it's zero
134   pthread_setspecific(s_client_key, 0);
135
136   if (options != 0)
137     d_options = *options;
138
139   // provide the real default for those indicated with a zero
140   if (d_options.max_jobs == 0)
141     d_options.max_jobs = DEFAULT_MAX_JOBS;
142   if (d_options.max_client_threads == 0)
143     d_options.max_client_threads = DEFAULT_MAX_CLIENT_THREADS;
144
145   if (!d_options.program_handle){
146     fprintf(stderr, "gc_job_manager: options->program_handle must be non-zero\n");
147     throw std::runtime_error("gc_job_manager: options->program_handle must be non-zero");
148   }
149
150   int ncpu_nodes = spe_cpu_info_get(SPE_COUNT_PHYSICAL_CPU_NODES, -1);
151   int nusable_spes = spe_cpu_info_get(SPE_COUNT_USABLE_SPES, -1);
152
153   if (debug()){
154     printf("cpu_nodes = %d\n", ncpu_nodes);
155     for (int i = 0; i < ncpu_nodes; i++){
156       printf("node[%d].physical_spes = %2d\n", i,
157              spe_cpu_info_get(SPE_COUNT_PHYSICAL_SPES, i));
158       printf("node[%d].usable_spes   = %2d\n", i,
159              spe_cpu_info_get(SPE_COUNT_USABLE_SPES, i));
160     }
161   }
162
163   // clamp nspes
164   d_options.nspes = std::min(d_options.nspes, (unsigned int) MAX_SPES);
165   nusable_spes = std::min(nusable_spes, (int) MAX_SPES);
166
167   //
168   // sanity check requested number of spes.
169   //
170   if (d_options.nspes == 0)     // use all of them
171     d_options.nspes = nusable_spes;
172   else {
173     if (d_options.nspes > (unsigned int) nusable_spes){
174       fprintf(stderr,
175               "gc_job_manager: warning: caller requested %d spes.  There are only %d available.\n",
176               d_options.nspes, nusable_spes);
177       if (d_options.gang_schedule){
178         // If we're gang scheduling we'll never get scheduled if we
179         // ask for more than are available.
180         throw std::out_of_range("gang_scheduling: not enough spes available");
181       }
182       else {    // FIXME clamp to usable.  problem on PS3 when overcommited
183         fprintf(stderr, "gc_job_manager: clamping nspes to %d\n", nusable_spes);
184         d_options.nspes = nusable_spes;
185       }
186     }
187   }
188
189   if (d_options.use_affinity){
190     printf("gc_job_manager: warning: affinity request was ignored\n");
191   }
192
193   if (d_options.gang_schedule){
194     d_gang = spe_gang_context_sptr(spe_gang_context_create(0), gang_deleter());
195     if (!d_gang){
196       perror("gc_job_manager_impl[spe_gang_context_create]");
197       throw std::runtime_error("spe_gang_context_create");
198     }
199   }
200
201   d_ntell = std::min(d_options.nspes, 2U);
202
203   // ----------------------------------------------------------------
204   // initalize the job queue
205   
206   d_queue = (gc_jd_queue_t *) gc_aligned_alloc(sizeof(gc_jd_queue_t), CACHE_LINE_SIZE);
207   _d_queue_boost =
208     boost::shared_ptr<void>((void *) d_queue, free_deleter());
209   gc_jd_queue_init(d_queue);
210
211
212   // ----------------------------------------------------------------
213   // create the spe contexts
214
215   // 1 spu_arg struct for each SPE
216   assert(sizeof(gc_spu_args_t) % 16 == 0);
217   d_spu_args =
218     (gc_spu_args_t *) gc_aligned_alloc(MAX_SPES * sizeof(gc_spu_args_t), 16);
219   _d_spu_args_boost =
220     boost::shared_ptr<void>((void *) d_spu_args, free_deleter());
221
222   // 2 completion info structs for each SPE (we double buffer them)
223   assert(sizeof(gc_comp_info_t) % CACHE_LINE_SIZE == 0);
224   d_comp_info =
225     (gc_comp_info_t *) gc_aligned_alloc(2 * MAX_SPES * sizeof(gc_comp_info_t),
226                                         CACHE_LINE_SIZE);
227   _d_comp_info_boost =
228     boost::shared_ptr<void>((void *) d_comp_info, free_deleter());
229
230
231   // get a handle to the spe program
232
233   spe_program_handle_t *spe_image = d_options.program_handle.get();
234
235   // fish proc_def table out of SPE ELF file
236
237   if (!gcpd_find_table(spe_image, &d_proc_def, &d_nproc_defs, &d_proc_def_ls_addr)){
238     fprintf(stderr, "gc_job_manager_impl: couldn't find gc_proc_defs in SPE ELF file.\n");
239     throw std::runtime_error("no gc_proc_defs");
240   }
241   // fprintf(stderr, "d_proc_def_ls_addr = 0x%0x\n", d_proc_def_ls_addr);
242
243   int spe_flags = (SPE_EVENTS_ENABLE
244                    | SPE_MAP_PS
245                    | SPE_CFG_SIGNOTIFY1_OR
246                    | SPE_CFG_SIGNOTIFY2_OR);
247   
248   for (unsigned int i = 0; i < d_options.nspes; i++){
249     // FIXME affinity stuff goes here
250     d_worker[i].spe_ctx = spe_context_create(spe_flags, d_gang.get());;
251     if (d_worker[i].spe_ctx == 0){
252       perror("spe_context_create");
253       throw std::runtime_error("spe_context_create");
254     }
255
256     d_worker[i].spe_ctrl = 
257       (spe_spu_control_area_t *)spe_ps_area_get(d_worker[i].spe_ctx, SPE_CONTROL_AREA);
258     if (d_worker[i].spe_ctrl == 0){
259       perror("spe_ps_area_get(SPE_CONTROL_AREA)");
260       throw std::runtime_error("spe_ps_area_get(SPE_CONTROL_AREA)");
261     }
262
263     d_worker[i].spe_idx = i;
264     d_worker[i].spu_args = &d_spu_args[i];
265     d_worker[i].spu_args->queue = ptr_to_ea(d_queue);
266     d_worker[i].spu_args->comp_info[0] = ptr_to_ea(&d_comp_info[2*i+0]);
267     d_worker[i].spu_args->comp_info[1] = ptr_to_ea(&d_comp_info[2*i+1]);
268     d_worker[i].spu_args->spu_idx = i;
269     d_worker[i].spu_args->nspus = d_options.nspes;
270     d_worker[i].spu_args->proc_def_ls_addr = d_proc_def_ls_addr;
271     d_worker[i].spu_args->nproc_defs = d_nproc_defs;
272     d_worker[i].spu_args->log.base = 0;
273     d_worker[i].spu_args->log.nentries = 0;
274     d_worker[i].state = WS_INIT;
275
276     int r = spe_program_load(d_worker[i].spe_ctx, spe_image);
277     if (r != 0){
278       perror("spe_program_load");
279       throw std::runtime_error("spe_program_load");
280     }
281   }
282
283   setup_logfiles();
284
285   // ----------------------------------------------------------------
286   // initalize the free list of job descriptors
287   
288   d_free_list = (gc_jd_stack_t *) gc_aligned_alloc(sizeof(gc_jd_stack_t), CACHE_LINE_SIZE);
289   // This ensures that the memory associated with d_free_list is
290   // automatically freed in the destructor or if an exception occurs
291   // here in the constructor.
292   _d_free_list_boost =
293     boost::shared_ptr<void>((void *) d_free_list, free_deleter());
294   gc_jd_stack_init(d_free_list);
295
296   if (debug()){
297     printf("sizeof(d_jd[0]) = %d (0x%x)\n", sizeof(d_jd[0]), sizeof(d_jd[0]));
298     printf("max_jobs = %u\n", d_options.max_jobs);
299   }
300
301   // Initialize the array of job descriptors.
302   d_jd = (gc_job_desc_t *) gc_aligned_alloc(sizeof(d_jd[0]) * d_options.max_jobs, CACHE_LINE_SIZE);
303   _d_jd_boost = boost::shared_ptr<void>((void *) d_jd, free_deleter());
304
305
306   // set unique job_id
307   for (int i = 0; i < (int) d_options.max_jobs; i++)
308     d_jd[i].sys.job_id = i;
309
310   // push them onto the free list
311   for (int i = d_options.max_jobs - 1; i >= 0; i--)
312     free_job_desc(&d_jd[i]);
313
314   // ----------------------------------------------------------------
315   // initialize d_client_thread
316
317   {
318     gc_client_thread_info_sa cti(
319          new gc_client_thread_info[d_options.max_client_threads]);
320
321     d_client_thread.swap(cti);
322
323     for (unsigned int i = 0; i < d_options.max_client_threads; i++)
324       d_client_thread[i].d_client_id = i;
325   }
326
327   // ----------------------------------------------------------------
328   // initialize bitvectors
329
330   // initialize d_bvlen, the number of longs in job related bitvectors.
331   int bits_per_long = sizeof(unsigned long) * 8;
332   d_bvlen = (d_options.max_jobs + bits_per_long - 1) / bits_per_long;
333
334   // allocate all bitvectors in a single cache-aligned chunk
335   size_t nlongs = d_bvlen * d_options.max_client_threads;
336   void *p = gc_aligned_alloc(nlongs * sizeof(unsigned long), CACHE_LINE_SIZE);
337   _d_all_bitvectors = boost::shared_ptr<void>(p, free_deleter());
338
339   // Now point the gc_client_thread_info bitvectors into this storage
340   unsigned long *v = (unsigned long *) p;
341
342   for (unsigned int i = 0; i < d_options.max_client_threads; i++, v += d_bvlen)
343     d_client_thread[i].d_jobs_done = v;
344
345
346   // ----------------------------------------------------------------
347   // create the spe event handler & worker (SPE) threads
348
349   create_event_handler();
350 }
351
352 ////////////////////////////////////////////////////////////////////////
353
354 gc_job_manager_impl::~gc_job_manager_impl()
355 {
356   shutdown();
357
358   d_jd = 0;             // handled via _d_jd_boost
359   d_free_list = 0;      // handled via _d_free_list_boost
360   d_queue = 0;          // handled via _d_queue_boost
361
362   // clear cti, since we've deleted the underlying data
363   pthread_setspecific(s_client_key, 0);
364
365   unmap_logfiles();
366 }
367
368 bool
369 gc_job_manager_impl::shutdown()
370 {
371   omni_mutex_lock       l(d_eh_mutex);
372
373   {
374     omni_mutex_lock     l2(d_jc_mutex);
375     d_shutdown_requested = true;        // set flag for event handler thread
376     d_jc_cond.signal();                 // wake up job completer
377   }
378
379   // should only happens during early QA code
380   if (d_eh_thread == 0 && d_eh_state == EHS_INIT)
381     return false;
382
383   while (d_eh_state != EHS_DEAD)        // wait for it to finish
384     d_eh_cond.wait();
385
386   return true;
387 }
388
389 int
390 gc_job_manager_impl::nspes() const
391 {
392   return d_options.nspes;
393 }
394
395 ////////////////////////////////////////////////////////////////////////
396
397 void
398 gc_job_manager_impl::bv_zero(unsigned long *bv)
399 {
400   memset(bv, 0, sizeof(unsigned long) * d_bvlen);
401 }
402
403 inline void
404 gc_job_manager_impl::bv_clr(unsigned long *bv, unsigned int bitno)
405 {
406   unsigned int wi = bitno / (sizeof (unsigned long) * 8);
407   unsigned int bi = bitno & ((sizeof (unsigned long) * 8) - 1);
408   bv[wi] &= ~(1UL << bi);
409 }
410
411 inline void
412 gc_job_manager_impl::bv_set(unsigned long *bv, unsigned int bitno)
413 {
414   unsigned int wi = bitno / (sizeof (unsigned long) * 8);
415   unsigned int bi = bitno & ((sizeof (unsigned long) * 8) - 1);
416   bv[wi] |= (1UL << bi);
417 }
418
419 inline bool
420 gc_job_manager_impl::bv_isset(unsigned long *bv, unsigned int bitno)
421 {
422   unsigned int wi = bitno / (sizeof (unsigned long) * 8);
423   unsigned int bi = bitno & ((sizeof (unsigned long) * 8) - 1);
424   return (bv[wi] & (1UL << bi)) != 0;
425 }
426
427 inline bool
428 gc_job_manager_impl::bv_isclr(unsigned long *bv, unsigned int bitno)
429 {
430   unsigned int wi = bitno / (sizeof (unsigned long) * 8);
431   unsigned int bi = bitno & ((sizeof (unsigned long) * 8) - 1);
432   return (bv[wi] & (1UL << bi)) == 0;
433 }
434
435 ////////////////////////////////////////////////////////////////////////
436
437 gc_job_desc *
438 gc_job_manager_impl::alloc_job_desc()
439 {
440   // stack is lock free, and safe to call from any thread
441   gc_job_desc *jd = gc_jd_stack_pop(d_free_list);
442   if (jd == 0)
443     throw gc_bad_alloc("alloc_job_desc: none available");
444
445   return jd;
446 }
447
448 void
449 gc_job_manager_impl::free_job_desc(gc_job_desc *jd)
450 {
451   // stack is lock free, thus safe to call from any thread
452   if (jd != 0)
453     gc_jd_stack_push(d_free_list, jd);
454 }
455
456 ////////////////////////////////////////////////////////////////////////
457
458
459 inline bool
460 gc_job_manager_impl::incr_njobs_active()
461 {
462   omni_mutex_lock       l(d_jc_mutex);
463
464   if (d_shutdown_requested)
465     return false;
466
467   if (d_jc_njobs_active++ == 0) // signal on 0 to 1 transition
468     d_jc_cond.signal();
469
470   return true;
471 }
472
473 inline void
474 gc_job_manager_impl::decr_njobs_active(int n)
475 {
476   omni_mutex_lock       l(d_jc_mutex);
477   d_jc_njobs_active -= n;
478 }
479
480
481 /*
482  * We check as much as we can here on the PPE side, so that the SPE
483  * doesn't have to.
484  */
485 static bool
486 check_direct_args(gc_job_desc *jd, gc_job_direct_args *args)
487 {
488   if (args->nargs > MAX_ARGS_DIRECT){
489     jd->status = JS_BAD_N_DIRECT;
490     return false;
491   }
492
493   return true;
494 }
495
496 static bool
497 check_ea_args(gc_job_desc *jd, gc_job_ea_args *p)
498 {
499   if (p->nargs > MAX_ARGS_EA){
500     jd->status = JS_BAD_N_EA;
501     return false;
502   }
503
504   uint32_t dir_union = 0;
505
506   for (unsigned int i = 0; i < p->nargs; i++){
507     dir_union |= p->arg[i].direction;
508     switch(p->arg[i].direction){
509     case GCJD_DMA_GET:
510     case GCJD_DMA_PUT:
511       break;
512
513     default:
514       jd->status = JS_BAD_DIRECTION;
515       return false;
516     }
517   }
518
519   if (p->nargs > 1){
520     unsigned int common_eah = (p->arg[0].ea_addr) >> 32;
521     for (unsigned int i = 1; i < p->nargs; i++){
522       if ((p->arg[i].ea_addr >> 32) != common_eah){
523         jd->status = JS_BAD_EAH;
524         return false;
525       }
526     }
527   }
528
529   jd->sys.direction_union = dir_union;
530   return true;
531 }
532
533 bool
534 gc_job_manager_impl::submit_job(gc_job_desc *jd)
535 {
536   // Ensure it's one of our job descriptors
537
538   if (jd < d_jd || jd >= &d_jd[d_options.max_jobs]){
539     jd->status = JS_BAD_JOB_DESC;
540     return false;
541   }
542
543   // Ensure we've got a client_thread_info assigned to this thread.
544   
545   gc_client_thread_info *cti =
546     (gc_client_thread_info *) pthread_getspecific(s_client_key);
547   if (unlikely(cti == 0)){
548     if ((cti = alloc_cti()) == 0){
549       fprintf(stderr, "gc_job_manager_impl::submit_job: Too many client threads.\n");
550       jd->status = JS_TOO_MANY_CLIENTS;
551       return false;
552     }
553     int r = pthread_setspecific(s_client_key, cti);
554     if (r != 0){
555       jd->status = JS_BAD_JUJU;
556       fprintf(stderr, "pthread_setspecific failed (return = %d)\n", r);
557       return false;
558     }
559   }
560
561   if (jd->proc_id == GCP_UNKNOWN_PROC){
562     jd->status = JS_UNKNOWN_PROC;
563     return false;
564   }
565
566   if (!check_direct_args(jd, &jd->input))
567     return false;
568
569   if (!check_direct_args(jd, &jd->output))
570     return false;
571
572   if (!check_ea_args(jd, &jd->eaa))
573     return false;
574
575   jd->status = JS_OK;
576   jd->sys.client_id = cti->d_client_id;
577
578   if (!incr_njobs_active()){
579     jd->status = JS_SHUTTING_DOWN;
580     return false;
581   }
582   
583   gc_jd_queue_enqueue(d_queue, jd);
584   // tell_spes_to_check_queue();
585   return true;
586 }
587
588 bool
589 gc_job_manager_impl::wait_job(gc_job_desc *jd)
590 {
591   bool done;
592   return wait_jobs(1, &jd, &done, GC_WAIT_ANY) == 1 && jd->status == JS_OK;
593 }
594
595 int
596 gc_job_manager_impl::wait_jobs(unsigned int njobs,
597                                gc_job_desc *jd[],
598                                bool done[],
599                                gc_wait_mode mode)
600 {
601   unsigned int i;
602
603   gc_client_thread_info *cti =
604     (gc_client_thread_info *) pthread_getspecific(s_client_key);
605   if (unlikely(cti == 0))
606     return -1;
607
608   for (i = 0; i < njobs; i++){
609     done[i] = false;
610     if (unlikely(jd[i]->sys.client_id != cti->d_client_id)){
611       fprintf(stderr, "gc_job_manager_impl::wait_jobs: can't wait for a job you didn't submit\n");
612       return -1;
613     }
614   }
615
616   {
617     omni_mutex_lock     l(cti->d_mutex);
618
619     // setup info for event handler
620     cti->d_state = (mode == GC_WAIT_ANY) ? CT_WAIT_ANY : CT_WAIT_ALL;
621     cti->d_njobs_waiting_for = njobs;
622     cti->d_jobs_waiting_for = jd;
623     assert(cti->d_jobs_done != 0);
624
625     unsigned int ndone = 0;
626
627     // wait for jobs to complete
628     
629     while (1){
630       ndone = 0;
631       for (i= 0; i < njobs; i++){
632         if (done[i])
633           ndone++;
634         else if (bv_isset(cti->d_jobs_done, jd[i]->sys.job_id)){
635           bv_clr(cti->d_jobs_done, jd[i]->sys.job_id);
636           done[i] = true;
637           ndone++;
638         }
639       }
640
641       if (mode == GC_WAIT_ANY && ndone > 0)
642         break;
643
644       if (mode == GC_WAIT_ALL && ndone == njobs)
645         break;
646
647       // FIXME what happens when somebody calls shutdown?
648
649       cti->d_cond.wait();       // wait for event handler to wake us up
650     }
651
652     cti->d_state = CT_NOT_WAITING;  
653     cti->d_njobs_waiting_for = 0;       // tidy up (not reqd)
654     cti->d_jobs_waiting_for = 0;        // tidy up (not reqd)
655     return ndone;
656   }
657 }
658
659 ////////////////////////////////////////////////////////////////////////
660
661 bool
662 gc_job_manager_impl::send_all_spes(uint32_t msg)
663 {
664   bool ok = true;
665
666   for (unsigned int i = 0; i < d_options.nspes; i++)
667     ok &= send_spe(i, msg);
668
669   return ok;
670 }
671
672 bool
673 gc_job_manager_impl::send_spe(unsigned int spe, uint32_t msg)
674 {
675   if (spe >= d_options.nspes)
676     return false;
677
678   int r = spe_in_mbox_write(d_worker[spe].spe_ctx, &msg, 1,
679                             SPE_MBOX_ALL_BLOCKING);
680   if (r < 0){
681     perror("spe_in_mbox_write");
682     return false;
683   }
684
685   return r == 1;
686 }
687
688 void 
689 gc_job_manager_impl::tell_spes_to_check_queue()
690 {
691   int nspes = d_options.nspes;
692
693   for (int i = 0, ntold = 0; ntold < d_ntell && i < nspes ; ++i){
694     volatile spe_spu_control_area_t *spe_ctrl = d_worker[d_tell_start].spe_ctrl;
695     int nfree = (spe_ctrl->SPU_Mbox_Stat >> 8) & 0xFF;
696     if (nfree == 4){
697       spe_ctrl->SPU_In_Mbox = MK_MBOX_MSG(OP_CHECK_QUEUE, 0);
698       ntold++;
699     }
700
701     unsigned int t = d_tell_start + 1;
702     if (t >= d_options.nspes)
703       t = 0;
704     d_tell_start = t;
705   }
706 }
707
708
709 ////////////////////////////////////////////////////////////////////////
710
711 static void
712 pthread_create_failure_msg(int r, const char *which)
713 {
714   char buf[256];
715   const char *s = 0;
716
717   switch (r){
718   case EAGAIN: s = "EAGAIN"; break;
719   case EINVAL: s = "EINVAL"; break;
720   case EPERM:  s = "EPERM";  break;
721   default:
722     snprintf(buf, sizeof(buf), "Unknown error %d", r);
723     s = buf;
724     break;
725   }
726   fprintf(stderr, "pthread_create[%s] failed: %s\n", which, s);
727 }
728
729
730 static bool
731 start_thread(pthread_t *thread,
732              void *(*start_routine)(void *),  void *arg,
733              const char *msg)
734 {
735   pthread_attr_t attr;
736   pthread_attr_init(&attr);
737   pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
738
739   // FIXME save sigprocmask
740   // FIXME set sigprocmask
741
742   int r = pthread_create(thread, &attr, start_routine, arg);
743     
744   // FIXME restore sigprocmask
745
746   if (r != 0){
747     pthread_create_failure_msg(r, msg);
748     return false;
749   }
750   return true;
751 }
752
753
754 ////////////////////////////////////////////////////////////////////////
755
756 static void *start_worker(void *arg);
757
758 static void *
759 start_event_handler(void *arg)
760 {
761   gc_job_manager_impl *p = (gc_job_manager_impl *) arg;
762   p->event_handler_loop();
763   return 0;
764 }
765
766 static void *
767 start_job_completer(void *arg)
768 {
769   gc_job_manager_impl *p = (gc_job_manager_impl *) arg;
770   p->job_completer_loop();
771   return 0;
772 }
773
774 void
775 gc_job_manager_impl::create_event_handler()
776 {
777   // create the SPE event handler and register our interest in events
778
779   d_spe_event_handler.ptr = spe_event_handler_create();
780   if (d_spe_event_handler.ptr == 0){
781     perror("spe_event_handler_create");
782     throw std::runtime_error("spe_event_handler_create");
783   }
784
785   for (unsigned int i = 0; i < d_options.nspes; i++){
786     spe_event_unit_t    eu;
787     memset(&eu, 0, sizeof(eu));
788     eu.events = SPE_EVENT_OUT_INTR_MBOX | SPE_EVENT_SPE_STOPPED;
789     eu.spe = d_worker[i].spe_ctx;
790     eu.data.u32 = i;    // set in events returned by spe_event_wait
791
792     if (spe_event_handler_register(d_spe_event_handler.ptr, &eu) != 0){
793       perror("spe_event_handler_register");
794       throw std::runtime_error("spe_event_handler_register");
795     }
796   }
797
798   // create the event handling thread
799
800   if (!start_thread(&d_eh_thread, start_event_handler, this, "event_handler")){
801     throw std::runtime_error("pthread_create");
802   }
803
804   // create the job completion thread
805
806   if (!start_thread(&d_jc_thread, start_job_completer, this, "job_completer")){
807     throw std::runtime_error("pthread_create");
808   }
809
810   // create the SPE worker threads
811
812   bool ok = true;
813   for (unsigned int i = 0; ok && i < d_options.nspes; i++){
814     char name[256];
815     snprintf(name, sizeof(name), "worker[%d]", i);
816     ok &= start_thread(&d_worker[i].thread, start_worker,
817                        &d_worker[i], name);
818   }
819
820   if (!ok){
821     //
822     // FIXME Clean up the mess.  Need to terminate event handler and all workers.
823     //
824     // this should cause the workers to exit, unless they're seriously broken
825     send_all_spes(MK_MBOX_MSG(OP_EXIT, 0));
826
827     shutdown();
828
829     throw std::runtime_error("pthread_create");
830   }
831 }
832
833 ////////////////////////////////////////////////////////////////////////
834
835 void
836 gc_job_manager_impl::set_eh_state(evt_handler_state s)
837 {
838   omni_mutex_lock       l(d_eh_mutex);
839   d_eh_state = s;
840   d_eh_cond.broadcast();
841 }
842
843 void
844 gc_job_manager_impl::set_ea_args_maxsize(int maxsize)
845 {
846   omni_mutex_lock       l(d_eh_mutex);
847   d_ea_args_maxsize = maxsize;
848   d_eh_cond.broadcast();
849 }
850
851 void
852 gc_job_manager_impl::print_event(spe_event_unit_t *evt)
853 {
854   printf("evt: spe = %d events = (0x%x)", evt->data.u32, evt->events);
855
856   if (evt->events & SPE_EVENT_OUT_INTR_MBOX)
857     printf(" OUT_INTR_MBOX");
858   
859   if (evt->events & SPE_EVENT_IN_MBOX)
860     printf(" IN_MBOX");
861   
862   if (evt->events & SPE_EVENT_TAG_GROUP)
863     printf(" TAG_GROUP");
864   
865   if (evt->events & SPE_EVENT_SPE_STOPPED)
866     printf(" SPE_STOPPED");
867
868   printf("\n");
869 }
870
871 struct job_client_info {
872   uint16_t      job_id;
873   uint16_t      client_id;
874 };
875
876 static int
877 compare_jci_clients(const void *va, const void *vb)
878 {
879   const job_client_info *a = (job_client_info *) va;
880   const job_client_info *b = (job_client_info *) vb;
881
882   return a->client_id - b->client_id;
883 }
884
885 void
886 gc_job_manager_impl::notify_clients_jobs_are_done(unsigned int spe_num,
887                                                   unsigned int completion_info_idx)
888 {
889   const char *msg = "gc_job_manager_impl::notify_client_job_is_done (INTERNAL ERROR)";
890
891   smp_rmb();  // order reads so we know that data sent from SPE is here
892
893   gc_comp_info_t *ci = &d_comp_info[2 * spe_num + (completion_info_idx & 0x1)];
894
895   if (ci->ncomplete == 0){      // never happens, but ensures code below is correct
896     ci->in_use = 0;
897     return;
898   }
899
900   decr_njobs_active(ci->ncomplete);
901
902   if (0){
903     static int total_jobs;
904     static int total_msgs;
905     total_msgs++;
906     total_jobs += ci->ncomplete;
907     printf("ppe:     tj = %6d  tm = %6d\n", total_jobs, total_msgs);
908   }
909
910   job_client_info gci[GC_CI_NJOBS];
911
912   /*
913    * Make one pass through and sanity check everything while filling in gci
914    */
915   for (unsigned int i = 0; i < ci->ncomplete; i++){
916     unsigned int job_id = ci->job_id[i];
917
918     if (job_id >= d_options.max_jobs){
919       // internal error, shouldn't happen
920       fprintf(stderr,"%s: invalid job_id = %d\n", msg, job_id);
921       ci->in_use = 0;           // clear flag so SPE knows we're done with it
922       return;
923     }
924     gc_job_desc *jd = &d_jd[job_id];
925
926     if (jd->sys.client_id >= d_options.max_client_threads){
927       // internal error, shouldn't happen
928       fprintf(stderr, "%s: invalid client_id = %d\n", msg, jd->sys.client_id);
929       ci->in_use = 0;           // clear flag so SPE knows we're done with it
930       return;
931     }
932
933     gci[i].job_id = job_id;
934     gci[i].client_id = jd->sys.client_id;
935   }
936
937   // sort by client_id so we only have to lock & signal once / client
938
939   if (ci->ncomplete > 1)
940     qsort(gci, ci->ncomplete, sizeof(gci[0]), compare_jci_clients);
941
942   // "wind-in" 
943
944   gc_client_thread_info *last_cti = &d_client_thread[gci[0].client_id];
945   last_cti->d_mutex.lock();
946   bv_set(last_cti->d_jobs_done, gci[0].job_id);  // mark job done
947
948   for (unsigned int i = 1; i < ci->ncomplete; i++){
949
950     gc_client_thread_info *cti = &d_client_thread[gci[i].client_id];
951
952     if (cti != last_cti){       // new client?
953
954       // yes.  signal old client, unlock old, lock new
955
956       // FIXME we could distinguish between CT_WAIT_ALL & CT_WAIT_ANY
957
958       if (last_cti->d_state == CT_WAIT_ANY || last_cti->d_state == CT_WAIT_ALL)
959         last_cti->d_cond.signal();      // wake client thread up
960
961       last_cti->d_mutex.unlock();
962       cti->d_mutex.lock();
963       last_cti = cti;
964     }
965
966     // mark job done
967     bv_set(cti->d_jobs_done, gci[i].job_id);
968   }
969
970   // "wind-out"
971
972   if (last_cti->d_state == CT_WAIT_ANY || last_cti->d_state == CT_WAIT_ALL)
973     last_cti->d_cond.signal();  // wake client thread up
974   last_cti->d_mutex.unlock();
975
976   ci->in_use = 0;               // clear flag so SPE knows we're done with it
977 }
978
979 void
980 gc_job_manager_impl::handle_event(spe_event_unit_t *evt)
981 {
982   // print_event(evt);
983
984   int spe_num = evt->data.u32;
985
986   // only a single event type can be signaled at a time
987   
988   if (evt->events == SPE_EVENT_OUT_INTR_MBOX) { // SPE sent us 1 or more msgs
989     static const int NMSGS = 32;
990     unsigned int msg[NMSGS];
991     int n = spe_out_intr_mbox_read(evt->spe, msg, NMSGS, SPE_MBOX_ANY_BLOCKING);
992     // printf("spe_out_intr_mbox_read = %d\n", n);
993     if (n < 0){
994       perror("spe_out_intr_mbox_read");
995     }
996     else {
997       for (int i = 0; i < n; i++){
998         switch(MBOX_MSG_OP(msg[i])){
999 #if 0
1000         case OP_JOBS_DONE:
1001           if (debug())
1002             printf("eh: job_done (0x%08x) from spu[%d]\n", msg[i], spe_num);
1003           notify_clients_jobs_are_done(spe_num, MBOX_MSG_ARG(msg[i]));
1004           break;
1005 #endif
1006         case OP_SPU_BUFSIZE:
1007           set_ea_args_maxsize(MBOX_MSG_ARG(msg[i]));
1008           break;
1009
1010         case OP_EXIT:
1011         default:
1012           printf("eh: Unexpected msg (0x%08x) from spu[%d]\n", msg[i], spe_num);
1013           break;
1014         }
1015       }
1016     }
1017   }
1018   else if (evt->events == SPE_EVENT_SPE_STOPPED){ // the SPE stopped
1019     spe_stop_info_t si;
1020     int r = spe_stop_info_read(evt->spe, &si);
1021     if (r < 0){
1022       perror("spe_stop_info_read");
1023     }
1024     else {
1025       switch (si.stop_reason){
1026       case SPE_EXIT:
1027         if (debug()){
1028           printf("eh: spu[%d] SPE_EXIT w/ exit_code = %d\n",
1029                  spe_num, si.result.spe_exit_code);
1030         }
1031         break;
1032       case SPE_STOP_AND_SIGNAL:
1033         printf("eh: spu[%d] SPE_STOP_AND_SIGNAL w/ spe_signal_code = 0x%x\n",
1034                spe_num, si.result.spe_signal_code);
1035         break;
1036       case SPE_RUNTIME_ERROR:
1037         printf("eh: spu[%d] SPE_RUNTIME_ERROR w/ spe_runtime_error = 0x%x\n",
1038                spe_num, si.result.spe_runtime_error);
1039         break;
1040       case SPE_RUNTIME_EXCEPTION:
1041         printf("eh: spu[%d] SPE_RUNTIME_EXCEPTION w/ spe_runtime_exception = 0x%x\n",
1042                spe_num, si.result.spe_runtime_exception);
1043         break;
1044       case SPE_RUNTIME_FATAL:
1045         printf("eh: spu[%d] SPE_RUNTIME_FATAL w/ spe_runtime_fatal = 0x%x\n",
1046                spe_num, si.result.spe_runtime_fatal);
1047         break;
1048       case SPE_CALLBACK_ERROR:
1049         printf("eh: spu[%d] SPE_CALLBACK_ERROR w/ spe_callback_error = 0x%x\n",
1050                spe_num, si.result.spe_callback_error);
1051         break;
1052       case SPE_ISOLATION_ERROR:
1053         printf("eh: spu[%d] SPE_ISOLATION_ERROR w/ spe_isolation_error = 0x%x\n",
1054                spe_num, si.result.spe_isolation_error);
1055         break;
1056       default:
1057         printf("eh: spu[%d] UNKNOWN STOP REASON (%d) w/ spu_status = 0x%x\n",
1058                spe_num, si.stop_reason, si.spu_status);
1059         break;
1060       }
1061     }
1062   }
1063 #if 0 // not enabled
1064   else if (evt->events == SPE_EVENT_IN_MBOX){    // there's room to write to SPE
1065     // spe_in_mbox_write (ignore)
1066   }
1067   else if (evt->events == SPE_EVENT_TAG_GROUP){  // our DMA completed
1068     // spe_mfcio_tag_status_read
1069   }
1070 #endif
1071   else {
1072     fprintf(stderr, "handle_event: unexpected evt->events = 0x%x\n", evt->events);
1073     return;
1074   }
1075 }
1076
1077 //
1078 // This is the "main program" of the event handling thread
1079 //
1080 void
1081 gc_job_manager_impl::event_handler_loop()
1082 {
1083   static const int MAX_EVENTS = 16;
1084   static const int TIMEOUT = 20;        // how long to block in milliseconds
1085
1086   spe_event_unit_t events[MAX_EVENTS];
1087
1088   if (d_debug)
1089     printf("event_handler_loop: starting\n");
1090
1091   set_eh_state(EHS_RUNNING);
1092
1093   // ask the first spe for its max bufsize
1094   send_spe(0, MK_MBOX_MSG(OP_GET_SPU_BUFSIZE, 0));
1095
1096   while (1){
1097     switch(d_eh_state){
1098
1099     case EHS_RUNNING:                   // normal stuff
1100       if (d_shutdown_requested) {
1101         set_eh_state(EHS_SHUTTING_DOWN);
1102       }
1103       break;
1104
1105     case EHS_SHUTTING_DOWN:
1106       if (d_jc_state == JCS_DEAD){
1107         send_all_spes(MK_MBOX_MSG(OP_EXIT, 0));
1108         set_eh_state(EHS_WAITING_FOR_WORKERS_TO_DIE);
1109       }
1110       break;
1111
1112     case EHS_WAITING_FOR_WORKERS_TO_DIE:
1113       {
1114         bool all_dead = true;
1115         for (unsigned int i = 0; i < d_options.nspes; i++)
1116           all_dead &= d_worker[i].state == WS_DEAD;
1117
1118         if (all_dead){
1119           set_eh_state(EHS_DEAD);
1120           if (d_debug)
1121             printf("event_handler_loop: exiting\n");
1122           return;
1123         }
1124       }
1125       break;
1126
1127     default:
1128       set_eh_state(EHS_DEAD);
1129       printf("event_handler_loop(default): exiting\n");
1130       return;
1131     }
1132
1133     // block waiting for events...
1134     int nevents = spe_event_wait(d_spe_event_handler.ptr,
1135                                  events, MAX_EVENTS, TIMEOUT);
1136     if (nevents < 0){
1137       perror("spe_wait_event");
1138       // FIXME bail?
1139     }
1140     for (int i = 0; i < nevents; i++){
1141       handle_event(&events[i]);
1142     }
1143   }
1144 }
1145
1146 ////////////////////////////////////////////////////////////////////////
1147
1148 void
1149 gc_job_manager_impl::poll_for_job_completion()
1150 {
1151   static const int niter = 10000;
1152
1153   CCTPL();              // change current (h/w) thread priority to low
1154
1155   for (int n = 0; n < niter; n++){
1156
1157     for (unsigned int spe_num = 0; spe_num < d_options.nspes; spe_num++){
1158       volatile spe_spu_control_area_t *spe_ctrl = d_worker[spe_num].spe_ctrl;
1159       int nentries = spe_ctrl->SPU_Mbox_Stat & 0xFF;
1160       while (nentries-- > 0){
1161         unsigned int msg = spe_ctrl->SPU_Out_Mbox;
1162         switch(MBOX_MSG_OP(msg)){
1163         case OP_JOBS_DONE:
1164           if (debug())
1165             printf("jc: job_done (0x%08x) from spu[%d]\n", msg, spe_num);
1166
1167           CCTPM();              // change current thread priority to medium
1168           notify_clients_jobs_are_done(spe_num, MBOX_MSG_ARG(msg));
1169           CCTPL();
1170           break;
1171
1172         default:
1173           printf("jc: Unexpected msg (0x%08x) from spu[%d]\n", msg, spe_num);
1174           break;
1175         }
1176       }
1177     }
1178   }
1179   CCTPM();
1180 }
1181
1182 //
1183 // This is the "main program" of the job completer thread
1184 //
1185 void
1186 gc_job_manager_impl::job_completer_loop()
1187 {
1188   d_jc_state = JCS_RUNNING;
1189
1190   while (1){
1191     {
1192       omni_mutex_lock   l(d_jc_mutex);
1193       if (d_jc_njobs_active == 0){
1194         if (d_shutdown_requested){
1195           d_jc_state = JCS_DEAD;
1196           return;
1197         }
1198         d_jc_cond.wait();
1199       }
1200     }
1201
1202     poll_for_job_completion();
1203   }
1204 }
1205
1206 ////////////////////////////////////////////////////////////////////////
1207 // this is the top of the SPE worker threads
1208
1209 static void *
1210 start_worker(void *arg)
1211 {
1212   worker_ctx *w = (worker_ctx *) arg;
1213   spe_stop_info_t       si;
1214
1215   w->state = WS_RUNNING;
1216   if (s_worker_debug)
1217     printf("worker[%d]: WS_RUNNING\n", w->spe_idx);
1218
1219   unsigned int entry = SPE_DEFAULT_ENTRY;
1220   int r = spe_context_run(w->spe_ctx,  &entry, 0, w->spu_args, 0, &si);
1221
1222   if (r < 0){                   // error
1223     char buf[64];
1224     snprintf(buf, sizeof(buf), "worker[%d]: spe_context_run", w->spe_idx);
1225     perror(buf);
1226   }
1227   else if (r == 0){
1228     // spe program called exit.
1229     if (s_worker_debug)
1230       printf("worker[%d]: SPE_EXIT w/ exit_code = %d\n",
1231              w->spe_idx, si.result.spe_exit_code);
1232   }
1233   else {
1234     // called stop_and_signal
1235     //
1236     // I'm not sure we'll ever get here.  I think the event
1237     // handler will catch this...
1238     printf("worker[%d]: SPE_STOP_AND_SIGNAL w/ spe_signal_code = 0x%x\n",
1239            w->spe_idx, si.result.spe_signal_code);
1240   }
1241
1242   // in any event, we're committing suicide now ;)
1243   if (s_worker_debug)
1244     printf("worker[%d]: WS_DEAD\n", w->spe_idx);
1245
1246   w->state = WS_DEAD;
1247   return 0;
1248 }
1249
1250 ////////////////////////////////////////////////////////////////////////
1251
1252 gc_client_thread_info *
1253 gc_job_manager_impl::alloc_cti()
1254 {
1255   for (unsigned int i = 0; i < d_options.max_client_threads; i++){
1256     if (d_client_thread[i].d_free){
1257       // try to atomically grab it
1258       if (_atomic_dec_if_positive(ptr_to_ea(&d_client_thread[i].d_free)) == 0){
1259         // got it...
1260         gc_client_thread_info *cti = &d_client_thread[i];
1261         cti->d_state = CT_NOT_WAITING;
1262         bv_zero(cti->d_jobs_done);
1263         cti->d_njobs_waiting_for = 0;
1264         cti->d_jobs_waiting_for = 0;
1265         
1266         return cti;
1267       }
1268     }
1269   }
1270   return 0;
1271 }
1272
1273 void
1274 gc_job_manager_impl::free_cti(gc_client_thread_info *cti)
1275 {
1276   assert((size_t) (cti - d_client_thread.get()) < d_options.max_client_threads);
1277   cti->d_free = 1;
1278 }
1279
1280 int
1281 gc_job_manager_impl::ea_args_maxsize()
1282 {
1283   omni_mutex_lock       l(d_eh_mutex);
1284
1285   while (d_ea_args_maxsize == 0)        // wait for it to be initialized
1286     d_eh_cond.wait();
1287
1288   return d_ea_args_maxsize;
1289 }
1290
1291 void
1292 gc_job_manager_impl::set_debug(int debug)
1293 {
1294   d_debug = debug;
1295   s_worker_debug = debug;
1296 }
1297
1298 int
1299 gc_job_manager_impl::debug()
1300 {
1301   return d_debug;
1302 }
1303
1304 ////////////////////////////////////////////////////////////////////////
1305
1306 void
1307 gc_job_manager_impl::setup_logfiles()
1308 {
1309   if (!d_options.enable_logging)
1310     return;
1311
1312   if (d_options.log2_nlog_entries == 0)
1313     d_options.log2_nlog_entries = 12;
1314
1315   // must end up a multiple of the page size
1316
1317   size_t pagesize = getpagesize();
1318   size_t s = (1 << d_options.log2_nlog_entries) * sizeof(gc_log_entry_t);
1319   s = ((s + pagesize - 1) / pagesize) * pagesize;
1320   size_t nentries = s / sizeof(gc_log_entry_t);
1321   assert(is_power_of_2(nentries));
1322
1323   for (unsigned int i = 0; i < d_options.nspes; i++){
1324     char filename[100];
1325     snprintf(filename, sizeof(filename), "spu_log.%02d", i);
1326     int fd = open(filename, O_CREAT|O_TRUNC|O_RDWR, 0664);
1327     if (fd == -1){
1328       perror(filename);
1329       return;
1330     }
1331     lseek(fd, s - 1, SEEK_SET);
1332     write(fd, "\0", 1);
1333     void *p = mmap(0, s, PROT_READ|PROT_WRITE, MAP_SHARED, fd, 0);
1334     if (p == MAP_FAILED){
1335       perror("gc_job_manager_impl::setup_logfiles: mmap");
1336       close(fd);
1337       return;
1338     }
1339     close(fd);
1340     memset(p, 0, s);
1341     d_spu_args[i].log.base = ptr_to_ea(p);
1342     d_spu_args[i].log.nentries = nentries;
1343   }
1344 }
1345
1346 void
1347 gc_job_manager_impl::sync_logfiles()
1348 {
1349   for (unsigned int i = 0; i < d_options.nspes; i++){
1350     if (d_spu_args[i].log.base)
1351       msync(ea_to_ptr(d_spu_args[i].log.base),
1352             d_spu_args[i].log.nentries * sizeof(gc_log_entry_t),
1353             MS_ASYNC);
1354   }
1355 }
1356
1357 void
1358 gc_job_manager_impl::unmap_logfiles()
1359 {
1360   for (unsigned int i = 0; i < d_options.nspes; i++){
1361     if (d_spu_args[i].log.base)
1362       munmap(ea_to_ptr(d_spu_args[i].log.base),
1363              d_spu_args[i].log.nentries * sizeof(gc_log_entry_t));
1364   }
1365 }
1366
1367 ////////////////////////////////////////////////////////////////////////
1368 //
1369 // lookup proc names in d_proc_def table
1370
1371 gc_proc_id_t 
1372 gc_job_manager_impl::lookup_proc(const std::string &proc_name)
1373 {
1374   for (int i = 0; i < d_nproc_defs; i++)
1375     if (proc_name == d_proc_def[i].name)
1376       return i;
1377
1378   throw gc_unknown_proc(proc_name);
1379 }
1380
1381 std::vector<std::string>
1382 gc_job_manager_impl::proc_names()
1383 {
1384   std::vector<std::string> r;
1385   for (int i = 0; i < d_nproc_defs; i++)
1386     r.push_back(d_proc_def[i].name);
1387
1388   return r;
1389 }
1390
1391 ////////////////////////////////////////////////////////////////////////
1392
1393 worker_ctx::~worker_ctx()
1394 {
1395   if (spe_ctx){
1396     int r = spe_context_destroy(spe_ctx);
1397     if (r != 0){
1398       perror("spe_context_destroy");
1399     }
1400     spe_ctx = 0;
1401   }
1402   state = WS_FREE;
1403 }