Convert gcell to use boost::threads instead of omnithread.
[debian/gnuradio] / gcell / lib / runtime / gc_job_manager_impl.cc
1 /* -*- c++ -*- */
2 /*
3  * Copyright 2007,2008,2009,2010 Free Software Foundation, Inc.
4  * 
5  * This file is part of GNU Radio
6  * 
7  * GNU Radio is free software; you can redistribute it and/or modify
8  * it under the terms of the GNU General Public License as published by
9  * the Free Software Foundation; either version 3, or (at your option)
10  * any later version.
11  * 
12  * GNU Radio is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  * GNU General Public License for more details.
16  * 
17  * You should have received a copy of the GNU General Public License along
18  * with this program; if not, write to the Free Software Foundation, Inc.,
19  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
20  */
21
22 #ifdef HAVE_CONFIG_H
23 #include <config.h>
24 #endif
25 #include "gc_job_manager_impl.h"
26 #include <gcell/gc_mbox.h>
27 #include <gcell/gc_aligned_alloc.h>
28 #include <gcell/memory_barrier.h>
29 #include <gc_proc_def_utils.h>
30 #include <atomic_dec_if_positive.h>
31 #include <stdio.h>
32 #include <stdexcept>
33 #include <stdlib.h>
34 #include <unistd.h>
35 #include <sys/mman.h>
36 #include <sys/types.h>
37 #include <sys/stat.h>
38 #include <fcntl.h>
39 #include <string.h>
40 #include <sched.h>
41
42 typedef boost::unique_lock<boost::mutex> scoped_lock;
43
44 #define __nop() __asm__ volatile ("ori 0,0,0" : : : "memory")
45 #define __cctpl() __asm__ volatile ("or 1,1,1" : : : "memory")
46 #define __cctpm() __asm__ volatile ("or 2,2,2" : : : "memory")
47 #define __cctph() __asm__ volatile ("or 3,3,3" : : : "memory")
48 #define __db8cyc() __asm__ volatile ("or 28,28,28" : : : "memory")
49 #define __db10cyc() __asm__ volatile ("or 29,29,29" : : : "memory")
50 #define __db12cyc() __asm__ volatile ("or 30,30,30" : : : "memory")
51 #define __db16cyc() __asm__ volatile ("or 31,31,31" : : : "memory")
52
53
54 #if 1
55 #define CCTPL() __cctpl()
56 #define CCTPM() __cctpm()
57 #else
58 #define CCTPL() (void) 0
59 #define CCTPM() (void) 0
60 #endif
61
62 static const size_t CACHE_LINE_SIZE = 128;
63
64 static const unsigned int DEFAULT_MAX_JOBS = 128;
65 static const unsigned int DEFAULT_MAX_CLIENT_THREADS = 64;
66
67 // FIXME this really depends on the SPU code...
68 static const unsigned int MAX_TOTAL_INDIRECT_LENGTH = 16 * 1024;
69
70
71 static bool          s_key_initialized = false;
72 static pthread_key_t s_client_key;
73
74 static int s_worker_debug = 0;
75
76 // custom deleter of gang_contexts for use with boost::shared_ptr
77 class gang_deleter {
78 public:
79   void operator()(spe_gang_context_ptr_t ctx) {
80     if (ctx){
81       int r = spe_gang_context_destroy(ctx);
82       if (r != 0){
83         perror("spe_gang_context_destroy");
84       }
85     }
86   }
87 };
88
89
90 // custom deleter of anything that can be freed with "free"
91 class free_deleter {
92 public:
93   void operator()(void *p) {
94     free(p);
95   }
96 };
97
98
99 /*
100  * Called when client thread is destroyed.
101  * We mark our client info free.
102  */
103 static void
104 client_key_destructor(void *p)
105 {
106   ((gc_client_thread_info *) p)->d_free = 1;
107 }
108
109 static bool
110 is_power_of_2(uint32_t x)
111 {
112   return (x != 0) && !(x & (x - 1));
113 }
114
115 ////////////////////////////////////////////////////////////////////////
116
117
118 gc_job_manager_impl::gc_job_manager_impl(const gc_jm_options *options)
119   : d_debug(0), d_spu_args(0),
120     d_eh_cond(), d_eh_thread(0), d_eh_state(EHS_INIT),
121     d_shutdown_requested(false),
122     d_jc_cond(), d_jc_thread(0), d_jc_state(JCS_INIT), d_jc_njobs_active(0),
123     d_ntell(0), d_tell_start(0),
124     d_client_thread(0), d_ea_args_maxsize(0),
125     d_proc_def(0), d_proc_def_ls_addr(0), d_nproc_defs(0)
126 {
127   if (!s_key_initialized){
128     int r = pthread_key_create(&s_client_key, client_key_destructor);
129     if (r != 0)
130       throw std::runtime_error("pthread_key_create");
131     s_key_initialized = true;
132   }
133
134   // ensure it's zero
135   pthread_setspecific(s_client_key, 0);
136
137   if (options != 0)
138     d_options = *options;
139
140   // provide the real default for those indicated with a zero
141   if (d_options.max_jobs == 0)
142     d_options.max_jobs = DEFAULT_MAX_JOBS;
143   if (d_options.max_client_threads == 0)
144     d_options.max_client_threads = DEFAULT_MAX_CLIENT_THREADS;
145
146   if (!d_options.program_handle){
147     fprintf(stderr, "gc_job_manager: options->program_handle must be non-zero\n");
148     throw std::runtime_error("gc_job_manager: options->program_handle must be non-zero");
149   }
150
151   int ncpu_nodes = spe_cpu_info_get(SPE_COUNT_PHYSICAL_CPU_NODES, -1);
152   int nusable_spes = spe_cpu_info_get(SPE_COUNT_USABLE_SPES, -1);
153
154   if (debug()){
155     printf("cpu_nodes = %d\n", ncpu_nodes);
156     for (int i = 0; i < ncpu_nodes; i++){
157       printf("node[%d].physical_spes = %2d\n", i,
158              spe_cpu_info_get(SPE_COUNT_PHYSICAL_SPES, i));
159       printf("node[%d].usable_spes   = %2d\n", i,
160              spe_cpu_info_get(SPE_COUNT_USABLE_SPES, i));
161     }
162   }
163
164   // clamp nspes
165   d_options.nspes = std::min(d_options.nspes, (unsigned int) MAX_SPES);
166   nusable_spes = std::min(nusable_spes, (int) MAX_SPES);
167
168   //
169   // sanity check requested number of spes.
170   //
171   if (d_options.nspes == 0)     // use all of them
172     d_options.nspes = nusable_spes;
173   else {
174     if (d_options.nspes > (unsigned int) nusable_spes){
175       fprintf(stderr,
176               "gc_job_manager: warning: caller requested %d spes.  There are only %d available.\n",
177               d_options.nspes, nusable_spes);
178       if (d_options.gang_schedule){
179         // If we're gang scheduling we'll never get scheduled if we
180         // ask for more than are available.
181         throw std::out_of_range("gang_scheduling: not enough spes available");
182       }
183       else {    // FIXME clamp to usable.  problem on PS3 when overcommited
184         fprintf(stderr, "gc_job_manager: clamping nspes to %d\n", nusable_spes);
185         d_options.nspes = nusable_spes;
186       }
187     }
188   }
189
190   if (d_options.use_affinity){
191     printf("gc_job_manager: warning: affinity request was ignored\n");
192   }
193
194   if (d_options.gang_schedule){
195     d_gang = spe_gang_context_sptr(spe_gang_context_create(0), gang_deleter());
196     if (!d_gang){
197       perror("gc_job_manager_impl[spe_gang_context_create]");
198       throw std::runtime_error("spe_gang_context_create");
199     }
200   }
201
202   d_ntell = std::min(d_options.nspes, 2U);
203
204   // ----------------------------------------------------------------
205   // initalize the job queue
206   
207   d_queue = (gc_jd_queue_t *) gc_aligned_alloc(sizeof(gc_jd_queue_t), CACHE_LINE_SIZE);
208   _d_queue_boost =
209     boost::shared_ptr<void>((void *) d_queue, free_deleter());
210   gc_jd_queue_init(d_queue);
211
212
213   // ----------------------------------------------------------------
214   // create the spe contexts
215
216   // 1 spu_arg struct for each SPE
217   assert(sizeof(gc_spu_args_t) % 16 == 0);
218   d_spu_args =
219     (gc_spu_args_t *) gc_aligned_alloc(MAX_SPES * sizeof(gc_spu_args_t), 16);
220   _d_spu_args_boost =
221     boost::shared_ptr<void>((void *) d_spu_args, free_deleter());
222
223   // 2 completion info structs for each SPE (we double buffer them)
224   assert(sizeof(gc_comp_info_t) % CACHE_LINE_SIZE == 0);
225   d_comp_info =
226     (gc_comp_info_t *) gc_aligned_alloc(2 * MAX_SPES * sizeof(gc_comp_info_t),
227                                         CACHE_LINE_SIZE);
228   _d_comp_info_boost =
229     boost::shared_ptr<void>((void *) d_comp_info, free_deleter());
230
231
232   // get a handle to the spe program
233
234   spe_program_handle_t *spe_image = d_options.program_handle.get();
235
236   // fish proc_def table out of SPE ELF file
237
238   if (!gcpd_find_table(spe_image, &d_proc_def, &d_nproc_defs, &d_proc_def_ls_addr)){
239     fprintf(stderr, "gc_job_manager_impl: couldn't find gc_proc_defs in SPE ELF file.\n");
240     throw std::runtime_error("no gc_proc_defs");
241   }
242   // fprintf(stderr, "d_proc_def_ls_addr = 0x%0x\n", d_proc_def_ls_addr);
243
244   int spe_flags = (SPE_EVENTS_ENABLE
245                    | SPE_MAP_PS
246                    | SPE_CFG_SIGNOTIFY1_OR
247                    | SPE_CFG_SIGNOTIFY2_OR);
248   
249   for (unsigned int i = 0; i < d_options.nspes; i++){
250     // FIXME affinity stuff goes here
251     d_worker[i].spe_ctx = spe_context_create(spe_flags, d_gang.get());;
252     if (d_worker[i].spe_ctx == 0){
253       perror("spe_context_create");
254       throw std::runtime_error("spe_context_create");
255     }
256
257     d_worker[i].spe_ctrl = 
258       (spe_spu_control_area_t *)spe_ps_area_get(d_worker[i].spe_ctx, SPE_CONTROL_AREA);
259     if (d_worker[i].spe_ctrl == 0){
260       perror("spe_ps_area_get(SPE_CONTROL_AREA)");
261       throw std::runtime_error("spe_ps_area_get(SPE_CONTROL_AREA)");
262     }
263
264     d_worker[i].spe_idx = i;
265     d_worker[i].spu_args = &d_spu_args[i];
266     d_worker[i].spu_args->queue = ptr_to_ea(d_queue);
267     d_worker[i].spu_args->comp_info[0] = ptr_to_ea(&d_comp_info[2*i+0]);
268     d_worker[i].spu_args->comp_info[1] = ptr_to_ea(&d_comp_info[2*i+1]);
269     d_worker[i].spu_args->spu_idx = i;
270     d_worker[i].spu_args->nspus = d_options.nspes;
271     d_worker[i].spu_args->proc_def_ls_addr = d_proc_def_ls_addr;
272     d_worker[i].spu_args->nproc_defs = d_nproc_defs;
273     d_worker[i].spu_args->log.base = 0;
274     d_worker[i].spu_args->log.nentries = 0;
275     d_worker[i].state = WS_INIT;
276
277     int r = spe_program_load(d_worker[i].spe_ctx, spe_image);
278     if (r != 0){
279       perror("spe_program_load");
280       throw std::runtime_error("spe_program_load");
281     }
282   }
283
284   setup_logfiles();
285
286   // ----------------------------------------------------------------
287   // initalize the free list of job descriptors
288   
289   d_free_list = (gc_jd_stack_t *) gc_aligned_alloc(sizeof(gc_jd_stack_t), CACHE_LINE_SIZE);
290   // This ensures that the memory associated with d_free_list is
291   // automatically freed in the destructor or if an exception occurs
292   // here in the constructor.
293   _d_free_list_boost =
294     boost::shared_ptr<void>((void *) d_free_list, free_deleter());
295   gc_jd_stack_init(d_free_list);
296
297   if (debug()){
298     printf("sizeof(d_jd[0]) = %d (0x%x)\n", sizeof(d_jd[0]), sizeof(d_jd[0]));
299     printf("max_jobs = %u\n", d_options.max_jobs);
300   }
301
302   // Initialize the array of job descriptors.
303   d_jd = (gc_job_desc_t *) gc_aligned_alloc(sizeof(d_jd[0]) * d_options.max_jobs, CACHE_LINE_SIZE);
304   _d_jd_boost = boost::shared_ptr<void>((void *) d_jd, free_deleter());
305
306
307   // set unique job_id
308   for (int i = 0; i < (int) d_options.max_jobs; i++)
309     d_jd[i].sys.job_id = i;
310
311   // push them onto the free list
312   for (int i = d_options.max_jobs - 1; i >= 0; i--)
313     free_job_desc(&d_jd[i]);
314
315   // ----------------------------------------------------------------
316   // initialize d_client_thread
317
318   {
319     gc_client_thread_info_sa cti(
320          new gc_client_thread_info[d_options.max_client_threads]);
321
322     d_client_thread.swap(cti);
323
324     for (unsigned int i = 0; i < d_options.max_client_threads; i++)
325       d_client_thread[i].d_client_id = i;
326   }
327
328   // ----------------------------------------------------------------
329   // initialize bitvectors
330
331   // initialize d_bvlen, the number of longs in job related bitvectors.
332   int bits_per_long = sizeof(unsigned long) * 8;
333   d_bvlen = (d_options.max_jobs + bits_per_long - 1) / bits_per_long;
334
335   // allocate all bitvectors in a single cache-aligned chunk
336   size_t nlongs = d_bvlen * d_options.max_client_threads;
337   void *p = gc_aligned_alloc(nlongs * sizeof(unsigned long), CACHE_LINE_SIZE);
338   _d_all_bitvectors = boost::shared_ptr<void>(p, free_deleter());
339
340   // Now point the gc_client_thread_info bitvectors into this storage
341   unsigned long *v = (unsigned long *) p;
342
343   for (unsigned int i = 0; i < d_options.max_client_threads; i++, v += d_bvlen)
344     d_client_thread[i].d_jobs_done = v;
345
346
347   // ----------------------------------------------------------------
348   // create the spe event handler & worker (SPE) threads
349
350   create_event_handler();
351 }
352
353 ////////////////////////////////////////////////////////////////////////
354
355 gc_job_manager_impl::~gc_job_manager_impl()
356 {
357   shutdown();
358
359   d_jd = 0;             // handled via _d_jd_boost
360   d_free_list = 0;      // handled via _d_free_list_boost
361   d_queue = 0;          // handled via _d_queue_boost
362
363   // clear cti, since we've deleted the underlying data
364   pthread_setspecific(s_client_key, 0);
365
366   unmap_logfiles();
367 }
368
369 bool
370 gc_job_manager_impl::shutdown()
371 {
372   scoped_lock   l(d_eh_mutex);
373
374   {
375     scoped_lock         l2(d_jc_mutex);
376     d_shutdown_requested = true;        // set flag for event handler thread
377     d_jc_cond.notify_one();                     // wake up job completer
378   }
379
380   // should only happens during early QA code
381   if (d_eh_thread == 0 && d_eh_state == EHS_INIT)
382     return false;
383
384   while (d_eh_state != EHS_DEAD)        // wait for it to finish
385     d_eh_cond.wait(l);
386
387   return true;
388 }
389
390 int
391 gc_job_manager_impl::nspes() const
392 {
393   return d_options.nspes;
394 }
395
396 ////////////////////////////////////////////////////////////////////////
397
398 void
399 gc_job_manager_impl::bv_zero(unsigned long *bv)
400 {
401   memset(bv, 0, sizeof(unsigned long) * d_bvlen);
402 }
403
404 inline void
405 gc_job_manager_impl::bv_clr(unsigned long *bv, unsigned int bitno)
406 {
407   unsigned int wi = bitno / (sizeof (unsigned long) * 8);
408   unsigned int bi = bitno & ((sizeof (unsigned long) * 8) - 1);
409   bv[wi] &= ~(1UL << bi);
410 }
411
412 inline void
413 gc_job_manager_impl::bv_set(unsigned long *bv, unsigned int bitno)
414 {
415   unsigned int wi = bitno / (sizeof (unsigned long) * 8);
416   unsigned int bi = bitno & ((sizeof (unsigned long) * 8) - 1);
417   bv[wi] |= (1UL << bi);
418 }
419
420 inline bool
421 gc_job_manager_impl::bv_isset(unsigned long *bv, unsigned int bitno)
422 {
423   unsigned int wi = bitno / (sizeof (unsigned long) * 8);
424   unsigned int bi = bitno & ((sizeof (unsigned long) * 8) - 1);
425   return (bv[wi] & (1UL << bi)) != 0;
426 }
427
428 inline bool
429 gc_job_manager_impl::bv_isclr(unsigned long *bv, unsigned int bitno)
430 {
431   unsigned int wi = bitno / (sizeof (unsigned long) * 8);
432   unsigned int bi = bitno & ((sizeof (unsigned long) * 8) - 1);
433   return (bv[wi] & (1UL << bi)) == 0;
434 }
435
436 ////////////////////////////////////////////////////////////////////////
437
438 gc_job_desc *
439 gc_job_manager_impl::alloc_job_desc()
440 {
441   // stack is lock free, and safe to call from any thread
442   gc_job_desc *jd = gc_jd_stack_pop(d_free_list);
443   if (jd == 0)
444     throw gc_bad_alloc("alloc_job_desc: none available");
445
446   return jd;
447 }
448
449 void
450 gc_job_manager_impl::free_job_desc(gc_job_desc *jd)
451 {
452   // stack is lock free, thus safe to call from any thread
453   if (jd != 0)
454     gc_jd_stack_push(d_free_list, jd);
455 }
456
457 ////////////////////////////////////////////////////////////////////////
458
459
460 inline bool
461 gc_job_manager_impl::incr_njobs_active()
462 {
463   scoped_lock   l(d_jc_mutex);
464
465   if (d_shutdown_requested)
466     return false;
467
468   if (d_jc_njobs_active++ == 0) // signal on 0 to 1 transition
469     d_jc_cond.notify_one();
470
471   return true;
472 }
473
474 inline void
475 gc_job_manager_impl::decr_njobs_active(int n)
476 {
477   scoped_lock   l(d_jc_mutex);
478   d_jc_njobs_active -= n;
479 }
480
481
482 /*
483  * We check as much as we can here on the PPE side, so that the SPE
484  * doesn't have to.
485  */
486 static bool
487 check_direct_args(gc_job_desc *jd, gc_job_direct_args *args)
488 {
489   if (args->nargs > MAX_ARGS_DIRECT){
490     jd->status = JS_BAD_N_DIRECT;
491     return false;
492   }
493
494   return true;
495 }
496
497 static bool
498 check_ea_args(gc_job_desc *jd, gc_job_ea_args *p)
499 {
500   if (p->nargs > MAX_ARGS_EA){
501     jd->status = JS_BAD_N_EA;
502     return false;
503   }
504
505   uint32_t dir_union = 0;
506
507   for (unsigned int i = 0; i < p->nargs; i++){
508     dir_union |= p->arg[i].direction;
509     switch(p->arg[i].direction){
510     case GCJD_DMA_GET:
511     case GCJD_DMA_PUT:
512       break;
513
514     default:
515       jd->status = JS_BAD_DIRECTION;
516       return false;
517     }
518   }
519
520   if (p->nargs > 1){
521     unsigned int common_eah = (p->arg[0].ea_addr) >> 32;
522     for (unsigned int i = 1; i < p->nargs; i++){
523       if ((p->arg[i].ea_addr >> 32) != common_eah){
524         jd->status = JS_BAD_EAH;
525         return false;
526       }
527     }
528   }
529
530   jd->sys.direction_union = dir_union;
531   return true;
532 }
533
534 bool
535 gc_job_manager_impl::submit_job(gc_job_desc *jd)
536 {
537   // Ensure it's one of our job descriptors
538
539   if (jd < d_jd || jd >= &d_jd[d_options.max_jobs]){
540     jd->status = JS_BAD_JOB_DESC;
541     return false;
542   }
543
544   // Ensure we've got a client_thread_info assigned to this thread.
545   
546   gc_client_thread_info *cti =
547     (gc_client_thread_info *) pthread_getspecific(s_client_key);
548   if (unlikely(cti == 0)){
549     if ((cti = alloc_cti()) == 0){
550       fprintf(stderr, "gc_job_manager_impl::submit_job: Too many client threads.\n");
551       jd->status = JS_TOO_MANY_CLIENTS;
552       return false;
553     }
554     int r = pthread_setspecific(s_client_key, cti);
555     if (r != 0){
556       jd->status = JS_BAD_JUJU;
557       fprintf(stderr, "pthread_setspecific failed (return = %d)\n", r);
558       return false;
559     }
560   }
561
562   if (jd->proc_id == GCP_UNKNOWN_PROC){
563     jd->status = JS_UNKNOWN_PROC;
564     return false;
565   }
566
567   if (!check_direct_args(jd, &jd->input))
568     return false;
569
570   if (!check_direct_args(jd, &jd->output))
571     return false;
572
573   if (!check_ea_args(jd, &jd->eaa))
574     return false;
575
576   jd->status = JS_OK;
577   jd->sys.client_id = cti->d_client_id;
578
579   if (!incr_njobs_active()){
580     jd->status = JS_SHUTTING_DOWN;
581     return false;
582   }
583   
584   gc_jd_queue_enqueue(d_queue, jd);
585   // tell_spes_to_check_queue();
586   return true;
587 }
588
589 bool
590 gc_job_manager_impl::wait_job(gc_job_desc *jd)
591 {
592   bool done;
593   return wait_jobs(1, &jd, &done, GC_WAIT_ANY) == 1 && jd->status == JS_OK;
594 }
595
596 int
597 gc_job_manager_impl::wait_jobs(unsigned int njobs,
598                                gc_job_desc *jd[],
599                                bool done[],
600                                gc_wait_mode mode)
601 {
602   unsigned int i;
603
604   gc_client_thread_info *cti =
605     (gc_client_thread_info *) pthread_getspecific(s_client_key);
606   if (unlikely(cti == 0))
607     return -1;
608
609   for (i = 0; i < njobs; i++){
610     done[i] = false;
611     if (unlikely(jd[i]->sys.client_id != cti->d_client_id)){
612       fprintf(stderr, "gc_job_manager_impl::wait_jobs: can't wait for a job you didn't submit\n");
613       return -1;
614     }
615   }
616
617   {
618     scoped_lock l(cti->d_mutex);
619
620     // setup info for event handler
621     cti->d_state = (mode == GC_WAIT_ANY) ? CT_WAIT_ANY : CT_WAIT_ALL;
622     cti->d_njobs_waiting_for = njobs;
623     cti->d_jobs_waiting_for = jd;
624     assert(cti->d_jobs_done != 0);
625
626     unsigned int ndone = 0;
627
628     // wait for jobs to complete
629     
630     while (1){
631       ndone = 0;
632       for (i= 0; i < njobs; i++){
633         if (done[i])
634           ndone++;
635         else if (bv_isset(cti->d_jobs_done, jd[i]->sys.job_id)){
636           bv_clr(cti->d_jobs_done, jd[i]->sys.job_id);
637           done[i] = true;
638           ndone++;
639         }
640       }
641
642       if (mode == GC_WAIT_ANY && ndone > 0)
643         break;
644
645       if (mode == GC_WAIT_ALL && ndone == njobs)
646         break;
647
648       // FIXME what happens when somebody calls shutdown?
649
650       cti->d_cond.wait(l);      // wait for event handler to wake us up
651     }
652
653     cti->d_state = CT_NOT_WAITING;  
654     cti->d_njobs_waiting_for = 0;       // tidy up (not reqd)
655     cti->d_jobs_waiting_for = 0;        // tidy up (not reqd)
656     return ndone;
657   }
658 }
659
660 ////////////////////////////////////////////////////////////////////////
661
662 bool
663 gc_job_manager_impl::send_all_spes(uint32_t msg)
664 {
665   bool ok = true;
666
667   for (unsigned int i = 0; i < d_options.nspes; i++)
668     ok &= send_spe(i, msg);
669
670   return ok;
671 }
672
673 bool
674 gc_job_manager_impl::send_spe(unsigned int spe, uint32_t msg)
675 {
676   if (spe >= d_options.nspes)
677     return false;
678
679   int r = spe_in_mbox_write(d_worker[spe].spe_ctx, &msg, 1,
680                             SPE_MBOX_ALL_BLOCKING);
681   if (r < 0){
682     perror("spe_in_mbox_write");
683     return false;
684   }
685
686   return r == 1;
687 }
688
689 void 
690 gc_job_manager_impl::tell_spes_to_check_queue()
691 {
692   int nspes = d_options.nspes;
693
694   for (int i = 0, ntold = 0; ntold < d_ntell && i < nspes ; ++i){
695     volatile spe_spu_control_area_t *spe_ctrl = d_worker[d_tell_start].spe_ctrl;
696     int nfree = (spe_ctrl->SPU_Mbox_Stat >> 8) & 0xFF;
697     if (nfree == 4){
698       spe_ctrl->SPU_In_Mbox = MK_MBOX_MSG(OP_CHECK_QUEUE, 0);
699       ntold++;
700     }
701
702     unsigned int t = d_tell_start + 1;
703     if (t >= d_options.nspes)
704       t = 0;
705     d_tell_start = t;
706   }
707 }
708
709
710 ////////////////////////////////////////////////////////////////////////
711
712 static void
713 pthread_create_failure_msg(int r, const char *which)
714 {
715   char buf[256];
716   const char *s = 0;
717
718   switch (r){
719   case EAGAIN: s = "EAGAIN"; break;
720   case EINVAL: s = "EINVAL"; break;
721   case EPERM:  s = "EPERM";  break;
722   default:
723     snprintf(buf, sizeof(buf), "Unknown error %d", r);
724     s = buf;
725     break;
726   }
727   fprintf(stderr, "pthread_create[%s] failed: %s\n", which, s);
728 }
729
730
731 static bool
732 start_thread(pthread_t *thread,
733              void *(*start_routine)(void *),  void *arg,
734              const char *msg)
735 {
736   pthread_attr_t attr;
737   pthread_attr_init(&attr);
738   pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
739
740   // FIXME save sigprocmask
741   // FIXME set sigprocmask
742
743   int r = pthread_create(thread, &attr, start_routine, arg);
744     
745   // FIXME restore sigprocmask
746
747   if (r != 0){
748     pthread_create_failure_msg(r, msg);
749     return false;
750   }
751   return true;
752 }
753
754
755 ////////////////////////////////////////////////////////////////////////
756
757 static void *start_worker(void *arg);
758
759 static void *
760 start_event_handler(void *arg)
761 {
762   gc_job_manager_impl *p = (gc_job_manager_impl *) arg;
763   p->event_handler_loop();
764   return 0;
765 }
766
767 static void *
768 start_job_completer(void *arg)
769 {
770   gc_job_manager_impl *p = (gc_job_manager_impl *) arg;
771   p->job_completer_loop();
772   return 0;
773 }
774
775 void
776 gc_job_manager_impl::create_event_handler()
777 {
778   // create the SPE event handler and register our interest in events
779
780   d_spe_event_handler.ptr = spe_event_handler_create();
781   if (d_spe_event_handler.ptr == 0){
782     perror("spe_event_handler_create");
783     throw std::runtime_error("spe_event_handler_create");
784   }
785
786   for (unsigned int i = 0; i < d_options.nspes; i++){
787     spe_event_unit_t    eu;
788     memset(&eu, 0, sizeof(eu));
789     eu.events = SPE_EVENT_OUT_INTR_MBOX | SPE_EVENT_SPE_STOPPED;
790     eu.spe = d_worker[i].spe_ctx;
791     eu.data.u32 = i;    // set in events returned by spe_event_wait
792
793     if (spe_event_handler_register(d_spe_event_handler.ptr, &eu) != 0){
794       perror("spe_event_handler_register");
795       throw std::runtime_error("spe_event_handler_register");
796     }
797   }
798
799   // create the event handling thread
800
801   if (!start_thread(&d_eh_thread, start_event_handler, this, "event_handler")){
802     throw std::runtime_error("pthread_create");
803   }
804
805   // create the job completion thread
806
807   if (!start_thread(&d_jc_thread, start_job_completer, this, "job_completer")){
808     throw std::runtime_error("pthread_create");
809   }
810
811   // create the SPE worker threads
812
813   bool ok = true;
814   for (unsigned int i = 0; ok && i < d_options.nspes; i++){
815     char name[256];
816     snprintf(name, sizeof(name), "worker[%d]", i);
817     ok &= start_thread(&d_worker[i].thread, start_worker,
818                        &d_worker[i], name);
819   }
820
821   if (!ok){
822     //
823     // FIXME Clean up the mess.  Need to terminate event handler and all workers.
824     //
825     // this should cause the workers to exit, unless they're seriously broken
826     send_all_spes(MK_MBOX_MSG(OP_EXIT, 0));
827
828     shutdown();
829
830     throw std::runtime_error("pthread_create");
831   }
832 }
833
834 ////////////////////////////////////////////////////////////////////////
835
836 void
837 gc_job_manager_impl::set_eh_state(evt_handler_state s)
838 {
839   scoped_lock   l(d_eh_mutex);
840   d_eh_state = s;
841   d_eh_cond.notify_all();
842 }
843
844 void
845 gc_job_manager_impl::set_ea_args_maxsize(int maxsize)
846 {
847   scoped_lock   l(d_eh_mutex);
848   d_ea_args_maxsize = maxsize;
849   d_eh_cond.notify_all();
850 }
851
852 void
853 gc_job_manager_impl::print_event(spe_event_unit_t *evt)
854 {
855   printf("evt: spe = %d events = (0x%x)", evt->data.u32, evt->events);
856
857   if (evt->events & SPE_EVENT_OUT_INTR_MBOX)
858     printf(" OUT_INTR_MBOX");
859   
860   if (evt->events & SPE_EVENT_IN_MBOX)
861     printf(" IN_MBOX");
862   
863   if (evt->events & SPE_EVENT_TAG_GROUP)
864     printf(" TAG_GROUP");
865   
866   if (evt->events & SPE_EVENT_SPE_STOPPED)
867     printf(" SPE_STOPPED");
868
869   printf("\n");
870 }
871
872 struct job_client_info {
873   uint16_t      job_id;
874   uint16_t      client_id;
875 };
876
877 static int
878 compare_jci_clients(const void *va, const void *vb)
879 {
880   const job_client_info *a = (job_client_info *) va;
881   const job_client_info *b = (job_client_info *) vb;
882
883   return a->client_id - b->client_id;
884 }
885
886 void
887 gc_job_manager_impl::notify_clients_jobs_are_done(unsigned int spe_num,
888                                                   unsigned int completion_info_idx)
889 {
890   const char *msg = "gc_job_manager_impl::notify_client_job_is_done (INTERNAL ERROR)";
891
892   smp_rmb();  // order reads so we know that data sent from SPE is here
893
894   gc_comp_info_t *ci = &d_comp_info[2 * spe_num + (completion_info_idx & 0x1)];
895
896   if (ci->ncomplete == 0){      // never happens, but ensures code below is correct
897     ci->in_use = 0;
898     return;
899   }
900
901   decr_njobs_active(ci->ncomplete);
902
903   if (0){
904     static int total_jobs;
905     static int total_msgs;
906     total_msgs++;
907     total_jobs += ci->ncomplete;
908     printf("ppe:     tj = %6d  tm = %6d\n", total_jobs, total_msgs);
909   }
910
911   job_client_info gci[GC_CI_NJOBS];
912
913   /*
914    * Make one pass through and sanity check everything while filling in gci
915    */
916   for (unsigned int i = 0; i < ci->ncomplete; i++){
917     unsigned int job_id = ci->job_id[i];
918
919     if (job_id >= d_options.max_jobs){
920       // internal error, shouldn't happen
921       fprintf(stderr,"%s: invalid job_id = %d\n", msg, job_id);
922       ci->in_use = 0;           // clear flag so SPE knows we're done with it
923       return;
924     }
925     gc_job_desc *jd = &d_jd[job_id];
926
927     if (jd->sys.client_id >= d_options.max_client_threads){
928       // internal error, shouldn't happen
929       fprintf(stderr, "%s: invalid client_id = %d\n", msg, jd->sys.client_id);
930       ci->in_use = 0;           // clear flag so SPE knows we're done with it
931       return;
932     }
933
934     gci[i].job_id = job_id;
935     gci[i].client_id = jd->sys.client_id;
936   }
937
938   // sort by client_id so we only have to lock & signal once / client
939
940   if (ci->ncomplete > 1)
941     qsort(gci, ci->ncomplete, sizeof(gci[0]), compare_jci_clients);
942
943   // "wind-in" 
944
945   gc_client_thread_info *last_cti = &d_client_thread[gci[0].client_id];
946   last_cti->d_mutex.lock();
947   bv_set(last_cti->d_jobs_done, gci[0].job_id);  // mark job done
948
949   for (unsigned int i = 1; i < ci->ncomplete; i++){
950
951     gc_client_thread_info *cti = &d_client_thread[gci[i].client_id];
952
953     if (cti != last_cti){       // new client?
954
955       // yes.  signal old client, unlock old, lock new
956
957       // FIXME we could distinguish between CT_WAIT_ALL & CT_WAIT_ANY
958
959       if (last_cti->d_state == CT_WAIT_ANY || last_cti->d_state == CT_WAIT_ALL)
960         last_cti->d_cond.notify_one();  // wake client thread up
961
962       last_cti->d_mutex.unlock();
963       cti->d_mutex.lock();
964       last_cti = cti;
965     }
966
967     // mark job done
968     bv_set(cti->d_jobs_done, gci[i].job_id);
969   }
970
971   // "wind-out"
972
973   if (last_cti->d_state == CT_WAIT_ANY || last_cti->d_state == CT_WAIT_ALL)
974     last_cti->d_cond.notify_one();      // wake client thread up
975   last_cti->d_mutex.unlock();
976
977   ci->in_use = 0;               // clear flag so SPE knows we're done with it
978 }
979
980 void
981 gc_job_manager_impl::handle_event(spe_event_unit_t *evt)
982 {
983   // print_event(evt);
984
985   int spe_num = evt->data.u32;
986
987   // only a single event type can be signaled at a time
988   
989   if (evt->events == SPE_EVENT_OUT_INTR_MBOX) { // SPE sent us 1 or more msgs
990     static const int NMSGS = 32;
991     unsigned int msg[NMSGS];
992     int n = spe_out_intr_mbox_read(evt->spe, msg, NMSGS, SPE_MBOX_ANY_BLOCKING);
993     // printf("spe_out_intr_mbox_read = %d\n", n);
994     if (n < 0){
995       perror("spe_out_intr_mbox_read");
996     }
997     else {
998       for (int i = 0; i < n; i++){
999         switch(MBOX_MSG_OP(msg[i])){
1000 #if 0
1001         case OP_JOBS_DONE:
1002           if (debug())
1003             printf("eh: job_done (0x%08x) from spu[%d]\n", msg[i], spe_num);
1004           notify_clients_jobs_are_done(spe_num, MBOX_MSG_ARG(msg[i]));
1005           break;
1006 #endif
1007         case OP_SPU_BUFSIZE:
1008           set_ea_args_maxsize(MBOX_MSG_ARG(msg[i]));
1009           break;
1010
1011         case OP_EXIT:
1012         default:
1013           printf("eh: Unexpected msg (0x%08x) from spu[%d]\n", msg[i], spe_num);
1014           break;
1015         }
1016       }
1017     }
1018   }
1019   else if (evt->events == SPE_EVENT_SPE_STOPPED){ // the SPE stopped
1020     spe_stop_info_t si;
1021     int r = spe_stop_info_read(evt->spe, &si);
1022     if (r < 0){
1023       perror("spe_stop_info_read");
1024     }
1025     else {
1026       switch (si.stop_reason){
1027       case SPE_EXIT:
1028         if (debug()){
1029           printf("eh: spu[%d] SPE_EXIT w/ exit_code = %d\n",
1030                  spe_num, si.result.spe_exit_code);
1031         }
1032         break;
1033       case SPE_STOP_AND_SIGNAL:
1034         printf("eh: spu[%d] SPE_STOP_AND_SIGNAL w/ spe_signal_code = 0x%x\n",
1035                spe_num, si.result.spe_signal_code);
1036         break;
1037       case SPE_RUNTIME_ERROR:
1038         printf("eh: spu[%d] SPE_RUNTIME_ERROR w/ spe_runtime_error = 0x%x\n",
1039                spe_num, si.result.spe_runtime_error);
1040         break;
1041       case SPE_RUNTIME_EXCEPTION:
1042         printf("eh: spu[%d] SPE_RUNTIME_EXCEPTION w/ spe_runtime_exception = 0x%x\n",
1043                spe_num, si.result.spe_runtime_exception);
1044         break;
1045       case SPE_RUNTIME_FATAL:
1046         printf("eh: spu[%d] SPE_RUNTIME_FATAL w/ spe_runtime_fatal = 0x%x\n",
1047                spe_num, si.result.spe_runtime_fatal);
1048         break;
1049       case SPE_CALLBACK_ERROR:
1050         printf("eh: spu[%d] SPE_CALLBACK_ERROR w/ spe_callback_error = 0x%x\n",
1051                spe_num, si.result.spe_callback_error);
1052         break;
1053       case SPE_ISOLATION_ERROR:
1054         printf("eh: spu[%d] SPE_ISOLATION_ERROR w/ spe_isolation_error = 0x%x\n",
1055                spe_num, si.result.spe_isolation_error);
1056         break;
1057       default:
1058         printf("eh: spu[%d] UNKNOWN STOP REASON (%d) w/ spu_status = 0x%x\n",
1059                spe_num, si.stop_reason, si.spu_status);
1060         break;
1061       }
1062     }
1063   }
1064 #if 0 // not enabled
1065   else if (evt->events == SPE_EVENT_IN_MBOX){    // there's room to write to SPE
1066     // spe_in_mbox_write (ignore)
1067   }
1068   else if (evt->events == SPE_EVENT_TAG_GROUP){  // our DMA completed
1069     // spe_mfcio_tag_status_read
1070   }
1071 #endif
1072   else {
1073     fprintf(stderr, "handle_event: unexpected evt->events = 0x%x\n", evt->events);
1074     return;
1075   }
1076 }
1077
1078 //
1079 // This is the "main program" of the event handling thread
1080 //
1081 void
1082 gc_job_manager_impl::event_handler_loop()
1083 {
1084   static const int MAX_EVENTS = 16;
1085   static const int TIMEOUT = 20;        // how long to block in milliseconds
1086
1087   spe_event_unit_t events[MAX_EVENTS];
1088
1089   if (d_debug)
1090     printf("event_handler_loop: starting\n");
1091
1092   set_eh_state(EHS_RUNNING);
1093
1094   // ask the first spe for its max bufsize
1095   send_spe(0, MK_MBOX_MSG(OP_GET_SPU_BUFSIZE, 0));
1096
1097   while (1){
1098     switch(d_eh_state){
1099
1100     case EHS_RUNNING:                   // normal stuff
1101       if (d_shutdown_requested) {
1102         set_eh_state(EHS_SHUTTING_DOWN);
1103       }
1104       break;
1105
1106     case EHS_SHUTTING_DOWN:
1107       if (d_jc_state == JCS_DEAD){
1108         send_all_spes(MK_MBOX_MSG(OP_EXIT, 0));
1109         set_eh_state(EHS_WAITING_FOR_WORKERS_TO_DIE);
1110       }
1111       break;
1112
1113     case EHS_WAITING_FOR_WORKERS_TO_DIE:
1114       {
1115         bool all_dead = true;
1116         for (unsigned int i = 0; i < d_options.nspes; i++)
1117           all_dead &= d_worker[i].state == WS_DEAD;
1118
1119         if (all_dead){
1120           set_eh_state(EHS_DEAD);
1121           if (d_debug)
1122             printf("event_handler_loop: exiting\n");
1123           return;
1124         }
1125       }
1126       break;
1127
1128     default:
1129       set_eh_state(EHS_DEAD);
1130       printf("event_handler_loop(default): exiting\n");
1131       return;
1132     }
1133
1134     // block waiting for events...
1135     int nevents = spe_event_wait(d_spe_event_handler.ptr,
1136                                  events, MAX_EVENTS, TIMEOUT);
1137     if (nevents < 0){
1138       perror("spe_wait_event");
1139       // FIXME bail?
1140     }
1141     for (int i = 0; i < nevents; i++){
1142       handle_event(&events[i]);
1143     }
1144   }
1145 }
1146
1147 ////////////////////////////////////////////////////////////////////////
1148
1149 void
1150 gc_job_manager_impl::poll_for_job_completion()
1151 {
1152   static const int niter = 10000;
1153
1154   CCTPL();              // change current (h/w) thread priority to low
1155
1156   for (int n = 0; n < niter; n++){
1157
1158     for (unsigned int spe_num = 0; spe_num < d_options.nspes; spe_num++){
1159       volatile spe_spu_control_area_t *spe_ctrl = d_worker[spe_num].spe_ctrl;
1160       int nentries = spe_ctrl->SPU_Mbox_Stat & 0xFF;
1161       while (nentries-- > 0){
1162         unsigned int msg = spe_ctrl->SPU_Out_Mbox;
1163         switch(MBOX_MSG_OP(msg)){
1164         case OP_JOBS_DONE:
1165           if (debug())
1166             printf("jc: job_done (0x%08x) from spu[%d]\n", msg, spe_num);
1167
1168           CCTPM();              // change current thread priority to medium
1169           notify_clients_jobs_are_done(spe_num, MBOX_MSG_ARG(msg));
1170           CCTPL();
1171           break;
1172
1173         default:
1174           printf("jc: Unexpected msg (0x%08x) from spu[%d]\n", msg, spe_num);
1175           break;
1176         }
1177       }
1178     }
1179   }
1180   CCTPM();
1181 }
1182
1183 //
1184 // This is the "main program" of the job completer thread
1185 //
1186 void
1187 gc_job_manager_impl::job_completer_loop()
1188 {
1189   d_jc_state = JCS_RUNNING;
1190
1191   while (1){
1192     {
1193       scoped_lock l(d_jc_mutex);
1194       if (d_jc_njobs_active == 0){
1195         if (d_shutdown_requested){
1196           d_jc_state = JCS_DEAD;
1197           return;
1198         }
1199         d_jc_cond.wait(l);
1200       }
1201     }
1202
1203     poll_for_job_completion();
1204   }
1205 }
1206
1207 ////////////////////////////////////////////////////////////////////////
1208 // this is the top of the SPE worker threads
1209
1210 static void *
1211 start_worker(void *arg)
1212 {
1213   worker_ctx *w = (worker_ctx *) arg;
1214   spe_stop_info_t       si;
1215
1216   w->state = WS_RUNNING;
1217   if (s_worker_debug)
1218     printf("worker[%d]: WS_RUNNING\n", w->spe_idx);
1219
1220   unsigned int entry = SPE_DEFAULT_ENTRY;
1221   int r = spe_context_run(w->spe_ctx,  &entry, 0, w->spu_args, 0, &si);
1222
1223   if (r < 0){                   // error
1224     char buf[64];
1225     snprintf(buf, sizeof(buf), "worker[%d]: spe_context_run", w->spe_idx);
1226     perror(buf);
1227   }
1228   else if (r == 0){
1229     // spe program called exit.
1230     if (s_worker_debug)
1231       printf("worker[%d]: SPE_EXIT w/ exit_code = %d\n",
1232              w->spe_idx, si.result.spe_exit_code);
1233   }
1234   else {
1235     // called stop_and_signal
1236     //
1237     // I'm not sure we'll ever get here.  I think the event
1238     // handler will catch this...
1239     printf("worker[%d]: SPE_STOP_AND_SIGNAL w/ spe_signal_code = 0x%x\n",
1240            w->spe_idx, si.result.spe_signal_code);
1241   }
1242
1243   // in any event, we're committing suicide now ;)
1244   if (s_worker_debug)
1245     printf("worker[%d]: WS_DEAD\n", w->spe_idx);
1246
1247   w->state = WS_DEAD;
1248   return 0;
1249 }
1250
1251 ////////////////////////////////////////////////////////////////////////
1252
1253 gc_client_thread_info *
1254 gc_job_manager_impl::alloc_cti()
1255 {
1256   for (unsigned int i = 0; i < d_options.max_client_threads; i++){
1257     if (d_client_thread[i].d_free){
1258       // try to atomically grab it
1259       if (_atomic_dec_if_positive(ptr_to_ea(&d_client_thread[i].d_free)) == 0){
1260         // got it...
1261         gc_client_thread_info *cti = &d_client_thread[i];
1262         cti->d_state = CT_NOT_WAITING;
1263         bv_zero(cti->d_jobs_done);
1264         cti->d_njobs_waiting_for = 0;
1265         cti->d_jobs_waiting_for = 0;
1266         
1267         return cti;
1268       }
1269     }
1270   }
1271   return 0;
1272 }
1273
1274 void
1275 gc_job_manager_impl::free_cti(gc_client_thread_info *cti)
1276 {
1277   assert((size_t) (cti - d_client_thread.get()) < d_options.max_client_threads);
1278   cti->d_free = 1;
1279 }
1280
1281 int
1282 gc_job_manager_impl::ea_args_maxsize()
1283 {
1284   scoped_lock   l(d_eh_mutex);
1285
1286   while (d_ea_args_maxsize == 0)        // wait for it to be initialized
1287     d_eh_cond.wait(l);
1288
1289   return d_ea_args_maxsize;
1290 }
1291
1292 void
1293 gc_job_manager_impl::set_debug(int debug)
1294 {
1295   d_debug = debug;
1296   s_worker_debug = debug;
1297 }
1298
1299 int
1300 gc_job_manager_impl::debug()
1301 {
1302   return d_debug;
1303 }
1304
1305 ////////////////////////////////////////////////////////////////////////
1306
1307 void
1308 gc_job_manager_impl::setup_logfiles()
1309 {
1310   if (!d_options.enable_logging)
1311     return;
1312
1313   if (d_options.log2_nlog_entries == 0)
1314     d_options.log2_nlog_entries = 12;
1315
1316   // must end up a multiple of the page size
1317
1318   size_t pagesize = getpagesize();
1319   size_t s = (1 << d_options.log2_nlog_entries) * sizeof(gc_log_entry_t);
1320   s = ((s + pagesize - 1) / pagesize) * pagesize;
1321   size_t nentries = s / sizeof(gc_log_entry_t);
1322   assert(is_power_of_2(nentries));
1323
1324   for (unsigned int i = 0; i < d_options.nspes; i++){
1325     char filename[100];
1326     snprintf(filename, sizeof(filename), "spu_log.%02d", i);
1327     int fd = open(filename, O_CREAT|O_TRUNC|O_RDWR, 0664);
1328     if (fd == -1){
1329       perror(filename);
1330       return;
1331     }
1332     lseek(fd, s - 1, SEEK_SET);
1333     write(fd, "\0", 1);
1334     void *p = mmap(0, s, PROT_READ|PROT_WRITE, MAP_SHARED, fd, 0);
1335     if (p == MAP_FAILED){
1336       perror("gc_job_manager_impl::setup_logfiles: mmap");
1337       close(fd);
1338       return;
1339     }
1340     close(fd);
1341     memset(p, 0, s);
1342     d_spu_args[i].log.base = ptr_to_ea(p);
1343     d_spu_args[i].log.nentries = nentries;
1344   }
1345 }
1346
1347 void
1348 gc_job_manager_impl::sync_logfiles()
1349 {
1350   for (unsigned int i = 0; i < d_options.nspes; i++){
1351     if (d_spu_args[i].log.base)
1352       msync(ea_to_ptr(d_spu_args[i].log.base),
1353             d_spu_args[i].log.nentries * sizeof(gc_log_entry_t),
1354             MS_ASYNC);
1355   }
1356 }
1357
1358 void
1359 gc_job_manager_impl::unmap_logfiles()
1360 {
1361   for (unsigned int i = 0; i < d_options.nspes; i++){
1362     if (d_spu_args[i].log.base)
1363       munmap(ea_to_ptr(d_spu_args[i].log.base),
1364              d_spu_args[i].log.nentries * sizeof(gc_log_entry_t));
1365   }
1366 }
1367
1368 ////////////////////////////////////////////////////////////////////////
1369 //
1370 // lookup proc names in d_proc_def table
1371
1372 gc_proc_id_t 
1373 gc_job_manager_impl::lookup_proc(const std::string &proc_name)
1374 {
1375   for (int i = 0; i < d_nproc_defs; i++)
1376     if (proc_name == d_proc_def[i].name)
1377       return i;
1378
1379   throw gc_unknown_proc(proc_name);
1380 }
1381
1382 std::vector<std::string>
1383 gc_job_manager_impl::proc_names()
1384 {
1385   std::vector<std::string> r;
1386   for (int i = 0; i < d_nproc_defs; i++)
1387     r.push_back(d_proc_def[i].name);
1388
1389   return r;
1390 }
1391
1392 ////////////////////////////////////////////////////////////////////////
1393
1394 worker_ctx::~worker_ctx()
1395 {
1396   if (spe_ctx){
1397     int r = spe_context_destroy(spe_ctx);
1398     if (r != 0){
1399       perror("spe_context_destroy");
1400     }
1401     spe_ctx = 0;
1402   }
1403   state = WS_FREE;
1404 }