3 * Copyright 2007,2008,2009,2010 Free Software Foundation, Inc.
5 * This file is part of GNU Radio
7 * GNU Radio is free software; you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation; either version 3, or (at your option)
12 * GNU Radio is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 * GNU General Public License for more details.
17 * You should have received a copy of the GNU General Public License along
18 * with this program; if not, write to the Free Software Foundation, Inc.,
19 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
25 #include "gc_job_manager_impl.h"
26 #include <gcell/gc_mbox.h>
27 #include <gcell/gc_aligned_alloc.h>
28 #include <gcell/memory_barrier.h>
29 #include <gc_proc_def_utils.h>
30 #include <atomic_dec_if_positive.h>
36 #include <sys/types.h>
42 typedef boost::unique_lock<boost::mutex> scoped_lock;
44 #define __nop() __asm__ volatile ("ori 0,0,0" : : : "memory")
45 #define __cctpl() __asm__ volatile ("or 1,1,1" : : : "memory")
46 #define __cctpm() __asm__ volatile ("or 2,2,2" : : : "memory")
47 #define __cctph() __asm__ volatile ("or 3,3,3" : : : "memory")
48 #define __db8cyc() __asm__ volatile ("or 28,28,28" : : : "memory")
49 #define __db10cyc() __asm__ volatile ("or 29,29,29" : : : "memory")
50 #define __db12cyc() __asm__ volatile ("or 30,30,30" : : : "memory")
51 #define __db16cyc() __asm__ volatile ("or 31,31,31" : : : "memory")
55 #define CCTPL() __cctpl()
56 #define CCTPM() __cctpm()
58 #define CCTPL() (void) 0
59 #define CCTPM() (void) 0
62 static const size_t CACHE_LINE_SIZE = 128;
64 static const unsigned int DEFAULT_MAX_JOBS = 128;
65 static const unsigned int DEFAULT_MAX_CLIENT_THREADS = 64;
67 // FIXME this really depends on the SPU code...
68 static const unsigned int MAX_TOTAL_INDIRECT_LENGTH = 16 * 1024;
71 static bool s_key_initialized = false;
72 static pthread_key_t s_client_key;
74 static int s_worker_debug = 0;
76 // custom deleter of gang_contexts for use with boost::shared_ptr
79 void operator()(spe_gang_context_ptr_t ctx) {
81 int r = spe_gang_context_destroy(ctx);
83 perror("spe_gang_context_destroy");
90 // custom deleter of anything that can be freed with "free"
93 void operator()(void *p) {
100 * Called when client thread is destroyed.
101 * We mark our client info free.
104 client_key_destructor(void *p)
106 ((gc_client_thread_info *) p)->d_free = 1;
110 is_power_of_2(uint32_t x)
112 return (x != 0) && !(x & (x - 1));
115 ////////////////////////////////////////////////////////////////////////
118 gc_job_manager_impl::gc_job_manager_impl(const gc_jm_options *options)
119 : d_debug(0), d_spu_args(0),
120 d_eh_cond(), d_eh_thread(0), d_eh_state(EHS_INIT),
121 d_shutdown_requested(false),
122 d_jc_cond(), d_jc_thread(0), d_jc_state(JCS_INIT), d_jc_njobs_active(0),
123 d_ntell(0), d_tell_start(0),
124 d_client_thread(0), d_ea_args_maxsize(0),
125 d_proc_def(0), d_proc_def_ls_addr(0), d_nproc_defs(0)
127 if (!s_key_initialized){
128 int r = pthread_key_create(&s_client_key, client_key_destructor);
130 throw std::runtime_error("pthread_key_create");
131 s_key_initialized = true;
135 pthread_setspecific(s_client_key, 0);
138 d_options = *options;
140 // provide the real default for those indicated with a zero
141 if (d_options.max_jobs == 0)
142 d_options.max_jobs = DEFAULT_MAX_JOBS;
143 if (d_options.max_client_threads == 0)
144 d_options.max_client_threads = DEFAULT_MAX_CLIENT_THREADS;
146 if (!d_options.program_handle){
147 fprintf(stderr, "gc_job_manager: options->program_handle must be non-zero\n");
148 throw std::runtime_error("gc_job_manager: options->program_handle must be non-zero");
151 int ncpu_nodes = spe_cpu_info_get(SPE_COUNT_PHYSICAL_CPU_NODES, -1);
152 int nusable_spes = spe_cpu_info_get(SPE_COUNT_USABLE_SPES, -1);
155 printf("cpu_nodes = %d\n", ncpu_nodes);
156 for (int i = 0; i < ncpu_nodes; i++){
157 printf("node[%d].physical_spes = %2d\n", i,
158 spe_cpu_info_get(SPE_COUNT_PHYSICAL_SPES, i));
159 printf("node[%d].usable_spes = %2d\n", i,
160 spe_cpu_info_get(SPE_COUNT_USABLE_SPES, i));
165 d_options.nspes = std::min(d_options.nspes, (unsigned int) MAX_SPES);
166 nusable_spes = std::min(nusable_spes, (int) MAX_SPES);
169 // sanity check requested number of spes.
171 if (d_options.nspes == 0) // use all of them
172 d_options.nspes = nusable_spes;
174 if (d_options.nspes > (unsigned int) nusable_spes){
176 "gc_job_manager: warning: caller requested %d spes. There are only %d available.\n",
177 d_options.nspes, nusable_spes);
178 if (d_options.gang_schedule){
179 // If we're gang scheduling we'll never get scheduled if we
180 // ask for more than are available.
181 throw std::out_of_range("gang_scheduling: not enough spes available");
183 else { // FIXME clamp to usable. problem on PS3 when overcommited
184 fprintf(stderr, "gc_job_manager: clamping nspes to %d\n", nusable_spes);
185 d_options.nspes = nusable_spes;
190 if (d_options.use_affinity){
191 printf("gc_job_manager: warning: affinity request was ignored\n");
194 if (d_options.gang_schedule){
195 d_gang = spe_gang_context_sptr(spe_gang_context_create(0), gang_deleter());
197 perror("gc_job_manager_impl[spe_gang_context_create]");
198 throw std::runtime_error("spe_gang_context_create");
202 d_ntell = std::min(d_options.nspes, 2U);
204 // ----------------------------------------------------------------
205 // initalize the job queue
207 d_queue = (gc_jd_queue_t *) gc_aligned_alloc(sizeof(gc_jd_queue_t), CACHE_LINE_SIZE);
209 boost::shared_ptr<void>((void *) d_queue, free_deleter());
210 gc_jd_queue_init(d_queue);
213 // ----------------------------------------------------------------
214 // create the spe contexts
216 // 1 spu_arg struct for each SPE
217 assert(sizeof(gc_spu_args_t) % 16 == 0);
219 (gc_spu_args_t *) gc_aligned_alloc(MAX_SPES * sizeof(gc_spu_args_t), 16);
221 boost::shared_ptr<void>((void *) d_spu_args, free_deleter());
223 // 2 completion info structs for each SPE (we double buffer them)
224 assert(sizeof(gc_comp_info_t) % CACHE_LINE_SIZE == 0);
226 (gc_comp_info_t *) gc_aligned_alloc(2 * MAX_SPES * sizeof(gc_comp_info_t),
229 boost::shared_ptr<void>((void *) d_comp_info, free_deleter());
232 // get a handle to the spe program
234 spe_program_handle_t *spe_image = d_options.program_handle.get();
236 // fish proc_def table out of SPE ELF file
238 if (!gcpd_find_table(spe_image, &d_proc_def, &d_nproc_defs, &d_proc_def_ls_addr)){
239 fprintf(stderr, "gc_job_manager_impl: couldn't find gc_proc_defs in SPE ELF file.\n");
240 throw std::runtime_error("no gc_proc_defs");
242 // fprintf(stderr, "d_proc_def_ls_addr = 0x%0x\n", d_proc_def_ls_addr);
244 int spe_flags = (SPE_EVENTS_ENABLE
246 | SPE_CFG_SIGNOTIFY1_OR
247 | SPE_CFG_SIGNOTIFY2_OR);
249 for (unsigned int i = 0; i < d_options.nspes; i++){
250 // FIXME affinity stuff goes here
251 d_worker[i].spe_ctx = spe_context_create(spe_flags, d_gang.get());;
252 if (d_worker[i].spe_ctx == 0){
253 perror("spe_context_create");
254 throw std::runtime_error("spe_context_create");
257 d_worker[i].spe_ctrl =
258 (spe_spu_control_area_t *)spe_ps_area_get(d_worker[i].spe_ctx, SPE_CONTROL_AREA);
259 if (d_worker[i].spe_ctrl == 0){
260 perror("spe_ps_area_get(SPE_CONTROL_AREA)");
261 throw std::runtime_error("spe_ps_area_get(SPE_CONTROL_AREA)");
264 d_worker[i].spe_idx = i;
265 d_worker[i].spu_args = &d_spu_args[i];
266 d_worker[i].spu_args->queue = ptr_to_ea(d_queue);
267 d_worker[i].spu_args->comp_info[0] = ptr_to_ea(&d_comp_info[2*i+0]);
268 d_worker[i].spu_args->comp_info[1] = ptr_to_ea(&d_comp_info[2*i+1]);
269 d_worker[i].spu_args->spu_idx = i;
270 d_worker[i].spu_args->nspus = d_options.nspes;
271 d_worker[i].spu_args->proc_def_ls_addr = d_proc_def_ls_addr;
272 d_worker[i].spu_args->nproc_defs = d_nproc_defs;
273 d_worker[i].spu_args->log.base = 0;
274 d_worker[i].spu_args->log.nentries = 0;
275 d_worker[i].state = WS_INIT;
277 int r = spe_program_load(d_worker[i].spe_ctx, spe_image);
279 perror("spe_program_load");
280 throw std::runtime_error("spe_program_load");
286 // ----------------------------------------------------------------
287 // initalize the free list of job descriptors
289 d_free_list = (gc_jd_stack_t *) gc_aligned_alloc(sizeof(gc_jd_stack_t), CACHE_LINE_SIZE);
290 // This ensures that the memory associated with d_free_list is
291 // automatically freed in the destructor or if an exception occurs
292 // here in the constructor.
294 boost::shared_ptr<void>((void *) d_free_list, free_deleter());
295 gc_jd_stack_init(d_free_list);
298 printf("sizeof(d_jd[0]) = %d (0x%x)\n", sizeof(d_jd[0]), sizeof(d_jd[0]));
299 printf("max_jobs = %u\n", d_options.max_jobs);
302 // Initialize the array of job descriptors.
303 d_jd = (gc_job_desc_t *) gc_aligned_alloc(sizeof(d_jd[0]) * d_options.max_jobs, CACHE_LINE_SIZE);
304 _d_jd_boost = boost::shared_ptr<void>((void *) d_jd, free_deleter());
308 for (int i = 0; i < (int) d_options.max_jobs; i++)
309 d_jd[i].sys.job_id = i;
311 // push them onto the free list
312 for (int i = d_options.max_jobs - 1; i >= 0; i--)
313 free_job_desc(&d_jd[i]);
315 // ----------------------------------------------------------------
316 // initialize d_client_thread
319 gc_client_thread_info_sa cti(
320 new gc_client_thread_info[d_options.max_client_threads]);
322 d_client_thread.swap(cti);
324 for (unsigned int i = 0; i < d_options.max_client_threads; i++)
325 d_client_thread[i].d_client_id = i;
328 // ----------------------------------------------------------------
329 // initialize bitvectors
331 // initialize d_bvlen, the number of longs in job related bitvectors.
332 int bits_per_long = sizeof(unsigned long) * 8;
333 d_bvlen = (d_options.max_jobs + bits_per_long - 1) / bits_per_long;
335 // allocate all bitvectors in a single cache-aligned chunk
336 size_t nlongs = d_bvlen * d_options.max_client_threads;
337 void *p = gc_aligned_alloc(nlongs * sizeof(unsigned long), CACHE_LINE_SIZE);
338 _d_all_bitvectors = boost::shared_ptr<void>(p, free_deleter());
340 // Now point the gc_client_thread_info bitvectors into this storage
341 unsigned long *v = (unsigned long *) p;
343 for (unsigned int i = 0; i < d_options.max_client_threads; i++, v += d_bvlen)
344 d_client_thread[i].d_jobs_done = v;
347 // ----------------------------------------------------------------
348 // create the spe event handler & worker (SPE) threads
350 create_event_handler();
353 ////////////////////////////////////////////////////////////////////////
355 gc_job_manager_impl::~gc_job_manager_impl()
359 d_jd = 0; // handled via _d_jd_boost
360 d_free_list = 0; // handled via _d_free_list_boost
361 d_queue = 0; // handled via _d_queue_boost
363 // clear cti, since we've deleted the underlying data
364 pthread_setspecific(s_client_key, 0);
370 gc_job_manager_impl::shutdown()
372 scoped_lock l(d_eh_mutex);
375 scoped_lock l2(d_jc_mutex);
376 d_shutdown_requested = true; // set flag for event handler thread
377 d_jc_cond.notify_one(); // wake up job completer
380 // should only happens during early QA code
381 if (d_eh_thread == 0 && d_eh_state == EHS_INIT)
384 while (d_eh_state != EHS_DEAD) // wait for it to finish
391 gc_job_manager_impl::nspes() const
393 return d_options.nspes;
396 ////////////////////////////////////////////////////////////////////////
399 gc_job_manager_impl::bv_zero(unsigned long *bv)
401 memset(bv, 0, sizeof(unsigned long) * d_bvlen);
405 gc_job_manager_impl::bv_clr(unsigned long *bv, unsigned int bitno)
407 unsigned int wi = bitno / (sizeof (unsigned long) * 8);
408 unsigned int bi = bitno & ((sizeof (unsigned long) * 8) - 1);
409 bv[wi] &= ~(1UL << bi);
413 gc_job_manager_impl::bv_set(unsigned long *bv, unsigned int bitno)
415 unsigned int wi = bitno / (sizeof (unsigned long) * 8);
416 unsigned int bi = bitno & ((sizeof (unsigned long) * 8) - 1);
417 bv[wi] |= (1UL << bi);
421 gc_job_manager_impl::bv_isset(unsigned long *bv, unsigned int bitno)
423 unsigned int wi = bitno / (sizeof (unsigned long) * 8);
424 unsigned int bi = bitno & ((sizeof (unsigned long) * 8) - 1);
425 return (bv[wi] & (1UL << bi)) != 0;
429 gc_job_manager_impl::bv_isclr(unsigned long *bv, unsigned int bitno)
431 unsigned int wi = bitno / (sizeof (unsigned long) * 8);
432 unsigned int bi = bitno & ((sizeof (unsigned long) * 8) - 1);
433 return (bv[wi] & (1UL << bi)) == 0;
436 ////////////////////////////////////////////////////////////////////////
439 gc_job_manager_impl::alloc_job_desc()
441 // stack is lock free, and safe to call from any thread
442 gc_job_desc *jd = gc_jd_stack_pop(d_free_list);
444 throw gc_bad_alloc("alloc_job_desc: none available");
450 gc_job_manager_impl::free_job_desc(gc_job_desc *jd)
452 // stack is lock free, thus safe to call from any thread
454 gc_jd_stack_push(d_free_list, jd);
457 ////////////////////////////////////////////////////////////////////////
461 gc_job_manager_impl::incr_njobs_active()
463 scoped_lock l(d_jc_mutex);
465 if (d_shutdown_requested)
468 if (d_jc_njobs_active++ == 0) // signal on 0 to 1 transition
469 d_jc_cond.notify_one();
475 gc_job_manager_impl::decr_njobs_active(int n)
477 scoped_lock l(d_jc_mutex);
478 d_jc_njobs_active -= n;
483 * We check as much as we can here on the PPE side, so that the SPE
487 check_direct_args(gc_job_desc *jd, gc_job_direct_args *args)
489 if (args->nargs > MAX_ARGS_DIRECT){
490 jd->status = JS_BAD_N_DIRECT;
498 check_ea_args(gc_job_desc *jd, gc_job_ea_args *p)
500 if (p->nargs > MAX_ARGS_EA){
501 jd->status = JS_BAD_N_EA;
505 uint32_t dir_union = 0;
507 for (unsigned int i = 0; i < p->nargs; i++){
508 dir_union |= p->arg[i].direction;
509 switch(p->arg[i].direction){
515 jd->status = JS_BAD_DIRECTION;
521 unsigned int common_eah = (p->arg[0].ea_addr) >> 32;
522 for (unsigned int i = 1; i < p->nargs; i++){
523 if ((p->arg[i].ea_addr >> 32) != common_eah){
524 jd->status = JS_BAD_EAH;
530 jd->sys.direction_union = dir_union;
535 gc_job_manager_impl::submit_job(gc_job_desc *jd)
537 // Ensure it's one of our job descriptors
539 if (jd < d_jd || jd >= &d_jd[d_options.max_jobs]){
540 jd->status = JS_BAD_JOB_DESC;
544 // Ensure we've got a client_thread_info assigned to this thread.
546 gc_client_thread_info *cti =
547 (gc_client_thread_info *) pthread_getspecific(s_client_key);
548 if (unlikely(cti == 0)){
549 if ((cti = alloc_cti()) == 0){
550 fprintf(stderr, "gc_job_manager_impl::submit_job: Too many client threads.\n");
551 jd->status = JS_TOO_MANY_CLIENTS;
554 int r = pthread_setspecific(s_client_key, cti);
556 jd->status = JS_BAD_JUJU;
557 fprintf(stderr, "pthread_setspecific failed (return = %d)\n", r);
562 if (jd->proc_id == GCP_UNKNOWN_PROC){
563 jd->status = JS_UNKNOWN_PROC;
567 if (!check_direct_args(jd, &jd->input))
570 if (!check_direct_args(jd, &jd->output))
573 if (!check_ea_args(jd, &jd->eaa))
577 jd->sys.client_id = cti->d_client_id;
579 if (!incr_njobs_active()){
580 jd->status = JS_SHUTTING_DOWN;
584 gc_jd_queue_enqueue(d_queue, jd);
585 // tell_spes_to_check_queue();
590 gc_job_manager_impl::wait_job(gc_job_desc *jd)
593 return wait_jobs(1, &jd, &done, GC_WAIT_ANY) == 1 && jd->status == JS_OK;
597 gc_job_manager_impl::wait_jobs(unsigned int njobs,
604 gc_client_thread_info *cti =
605 (gc_client_thread_info *) pthread_getspecific(s_client_key);
606 if (unlikely(cti == 0))
609 for (i = 0; i < njobs; i++){
611 if (unlikely(jd[i]->sys.client_id != cti->d_client_id)){
612 fprintf(stderr, "gc_job_manager_impl::wait_jobs: can't wait for a job you didn't submit\n");
618 scoped_lock l(cti->d_mutex);
620 // setup info for event handler
621 cti->d_state = (mode == GC_WAIT_ANY) ? CT_WAIT_ANY : CT_WAIT_ALL;
622 cti->d_njobs_waiting_for = njobs;
623 cti->d_jobs_waiting_for = jd;
624 assert(cti->d_jobs_done != 0);
626 unsigned int ndone = 0;
628 // wait for jobs to complete
632 for (i= 0; i < njobs; i++){
635 else if (bv_isset(cti->d_jobs_done, jd[i]->sys.job_id)){
636 bv_clr(cti->d_jobs_done, jd[i]->sys.job_id);
642 if (mode == GC_WAIT_ANY && ndone > 0)
645 if (mode == GC_WAIT_ALL && ndone == njobs)
648 // FIXME what happens when somebody calls shutdown?
650 cti->d_cond.wait(l); // wait for event handler to wake us up
653 cti->d_state = CT_NOT_WAITING;
654 cti->d_njobs_waiting_for = 0; // tidy up (not reqd)
655 cti->d_jobs_waiting_for = 0; // tidy up (not reqd)
660 ////////////////////////////////////////////////////////////////////////
663 gc_job_manager_impl::send_all_spes(uint32_t msg)
667 for (unsigned int i = 0; i < d_options.nspes; i++)
668 ok &= send_spe(i, msg);
674 gc_job_manager_impl::send_spe(unsigned int spe, uint32_t msg)
676 if (spe >= d_options.nspes)
679 int r = spe_in_mbox_write(d_worker[spe].spe_ctx, &msg, 1,
680 SPE_MBOX_ALL_BLOCKING);
682 perror("spe_in_mbox_write");
690 gc_job_manager_impl::tell_spes_to_check_queue()
692 int nspes = d_options.nspes;
694 for (int i = 0, ntold = 0; ntold < d_ntell && i < nspes ; ++i){
695 volatile spe_spu_control_area_t *spe_ctrl = d_worker[d_tell_start].spe_ctrl;
696 int nfree = (spe_ctrl->SPU_Mbox_Stat >> 8) & 0xFF;
698 spe_ctrl->SPU_In_Mbox = MK_MBOX_MSG(OP_CHECK_QUEUE, 0);
702 unsigned int t = d_tell_start + 1;
703 if (t >= d_options.nspes)
710 ////////////////////////////////////////////////////////////////////////
713 pthread_create_failure_msg(int r, const char *which)
719 case EAGAIN: s = "EAGAIN"; break;
720 case EINVAL: s = "EINVAL"; break;
721 case EPERM: s = "EPERM"; break;
723 snprintf(buf, sizeof(buf), "Unknown error %d", r);
727 fprintf(stderr, "pthread_create[%s] failed: %s\n", which, s);
732 start_thread(pthread_t *thread,
733 void *(*start_routine)(void *), void *arg,
737 pthread_attr_init(&attr);
738 pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
740 // FIXME save sigprocmask
741 // FIXME set sigprocmask
743 int r = pthread_create(thread, &attr, start_routine, arg);
745 // FIXME restore sigprocmask
748 pthread_create_failure_msg(r, msg);
755 ////////////////////////////////////////////////////////////////////////
757 static void *start_worker(void *arg);
760 start_event_handler(void *arg)
762 gc_job_manager_impl *p = (gc_job_manager_impl *) arg;
763 p->event_handler_loop();
768 start_job_completer(void *arg)
770 gc_job_manager_impl *p = (gc_job_manager_impl *) arg;
771 p->job_completer_loop();
776 gc_job_manager_impl::create_event_handler()
778 // create the SPE event handler and register our interest in events
780 d_spe_event_handler.ptr = spe_event_handler_create();
781 if (d_spe_event_handler.ptr == 0){
782 perror("spe_event_handler_create");
783 throw std::runtime_error("spe_event_handler_create");
786 for (unsigned int i = 0; i < d_options.nspes; i++){
788 memset(&eu, 0, sizeof(eu));
789 eu.events = SPE_EVENT_OUT_INTR_MBOX | SPE_EVENT_SPE_STOPPED;
790 eu.spe = d_worker[i].spe_ctx;
791 eu.data.u32 = i; // set in events returned by spe_event_wait
793 if (spe_event_handler_register(d_spe_event_handler.ptr, &eu) != 0){
794 perror("spe_event_handler_register");
795 throw std::runtime_error("spe_event_handler_register");
799 // create the event handling thread
801 if (!start_thread(&d_eh_thread, start_event_handler, this, "event_handler")){
802 throw std::runtime_error("pthread_create");
805 // create the job completion thread
807 if (!start_thread(&d_jc_thread, start_job_completer, this, "job_completer")){
808 throw std::runtime_error("pthread_create");
811 // create the SPE worker threads
814 for (unsigned int i = 0; ok && i < d_options.nspes; i++){
816 snprintf(name, sizeof(name), "worker[%d]", i);
817 ok &= start_thread(&d_worker[i].thread, start_worker,
823 // FIXME Clean up the mess. Need to terminate event handler and all workers.
825 // this should cause the workers to exit, unless they're seriously broken
826 send_all_spes(MK_MBOX_MSG(OP_EXIT, 0));
830 throw std::runtime_error("pthread_create");
834 ////////////////////////////////////////////////////////////////////////
837 gc_job_manager_impl::set_eh_state(evt_handler_state s)
839 scoped_lock l(d_eh_mutex);
841 d_eh_cond.notify_all();
845 gc_job_manager_impl::set_ea_args_maxsize(int maxsize)
847 scoped_lock l(d_eh_mutex);
848 d_ea_args_maxsize = maxsize;
849 d_eh_cond.notify_all();
853 gc_job_manager_impl::print_event(spe_event_unit_t *evt)
855 printf("evt: spe = %d events = (0x%x)", evt->data.u32, evt->events);
857 if (evt->events & SPE_EVENT_OUT_INTR_MBOX)
858 printf(" OUT_INTR_MBOX");
860 if (evt->events & SPE_EVENT_IN_MBOX)
863 if (evt->events & SPE_EVENT_TAG_GROUP)
864 printf(" TAG_GROUP");
866 if (evt->events & SPE_EVENT_SPE_STOPPED)
867 printf(" SPE_STOPPED");
872 struct job_client_info {
878 compare_jci_clients(const void *va, const void *vb)
880 const job_client_info *a = (job_client_info *) va;
881 const job_client_info *b = (job_client_info *) vb;
883 return a->client_id - b->client_id;
887 gc_job_manager_impl::notify_clients_jobs_are_done(unsigned int spe_num,
888 unsigned int completion_info_idx)
890 const char *msg = "gc_job_manager_impl::notify_client_job_is_done (INTERNAL ERROR)";
892 smp_rmb(); // order reads so we know that data sent from SPE is here
894 gc_comp_info_t *ci = &d_comp_info[2 * spe_num + (completion_info_idx & 0x1)];
896 if (ci->ncomplete == 0){ // never happens, but ensures code below is correct
901 decr_njobs_active(ci->ncomplete);
904 static int total_jobs;
905 static int total_msgs;
907 total_jobs += ci->ncomplete;
908 printf("ppe: tj = %6d tm = %6d\n", total_jobs, total_msgs);
911 job_client_info gci[GC_CI_NJOBS];
914 * Make one pass through and sanity check everything while filling in gci
916 for (unsigned int i = 0; i < ci->ncomplete; i++){
917 unsigned int job_id = ci->job_id[i];
919 if (job_id >= d_options.max_jobs){
920 // internal error, shouldn't happen
921 fprintf(stderr,"%s: invalid job_id = %d\n", msg, job_id);
922 ci->in_use = 0; // clear flag so SPE knows we're done with it
925 gc_job_desc *jd = &d_jd[job_id];
927 if (jd->sys.client_id >= d_options.max_client_threads){
928 // internal error, shouldn't happen
929 fprintf(stderr, "%s: invalid client_id = %d\n", msg, jd->sys.client_id);
930 ci->in_use = 0; // clear flag so SPE knows we're done with it
934 gci[i].job_id = job_id;
935 gci[i].client_id = jd->sys.client_id;
938 // sort by client_id so we only have to lock & signal once / client
940 if (ci->ncomplete > 1)
941 qsort(gci, ci->ncomplete, sizeof(gci[0]), compare_jci_clients);
945 gc_client_thread_info *last_cti = &d_client_thread[gci[0].client_id];
946 last_cti->d_mutex.lock();
947 bv_set(last_cti->d_jobs_done, gci[0].job_id); // mark job done
949 for (unsigned int i = 1; i < ci->ncomplete; i++){
951 gc_client_thread_info *cti = &d_client_thread[gci[i].client_id];
953 if (cti != last_cti){ // new client?
955 // yes. signal old client, unlock old, lock new
957 // FIXME we could distinguish between CT_WAIT_ALL & CT_WAIT_ANY
959 if (last_cti->d_state == CT_WAIT_ANY || last_cti->d_state == CT_WAIT_ALL)
960 last_cti->d_cond.notify_one(); // wake client thread up
962 last_cti->d_mutex.unlock();
968 bv_set(cti->d_jobs_done, gci[i].job_id);
973 if (last_cti->d_state == CT_WAIT_ANY || last_cti->d_state == CT_WAIT_ALL)
974 last_cti->d_cond.notify_one(); // wake client thread up
975 last_cti->d_mutex.unlock();
977 ci->in_use = 0; // clear flag so SPE knows we're done with it
981 gc_job_manager_impl::handle_event(spe_event_unit_t *evt)
985 int spe_num = evt->data.u32;
987 // only a single event type can be signaled at a time
989 if (evt->events == SPE_EVENT_OUT_INTR_MBOX) { // SPE sent us 1 or more msgs
990 static const int NMSGS = 32;
991 unsigned int msg[NMSGS];
992 int n = spe_out_intr_mbox_read(evt->spe, msg, NMSGS, SPE_MBOX_ANY_BLOCKING);
993 // printf("spe_out_intr_mbox_read = %d\n", n);
995 perror("spe_out_intr_mbox_read");
998 for (int i = 0; i < n; i++){
999 switch(MBOX_MSG_OP(msg[i])){
1003 printf("eh: job_done (0x%08x) from spu[%d]\n", msg[i], spe_num);
1004 notify_clients_jobs_are_done(spe_num, MBOX_MSG_ARG(msg[i]));
1007 case OP_SPU_BUFSIZE:
1008 set_ea_args_maxsize(MBOX_MSG_ARG(msg[i]));
1013 printf("eh: Unexpected msg (0x%08x) from spu[%d]\n", msg[i], spe_num);
1019 else if (evt->events == SPE_EVENT_SPE_STOPPED){ // the SPE stopped
1021 int r = spe_stop_info_read(evt->spe, &si);
1023 perror("spe_stop_info_read");
1026 switch (si.stop_reason){
1029 printf("eh: spu[%d] SPE_EXIT w/ exit_code = %d\n",
1030 spe_num, si.result.spe_exit_code);
1033 case SPE_STOP_AND_SIGNAL:
1034 printf("eh: spu[%d] SPE_STOP_AND_SIGNAL w/ spe_signal_code = 0x%x\n",
1035 spe_num, si.result.spe_signal_code);
1037 case SPE_RUNTIME_ERROR:
1038 printf("eh: spu[%d] SPE_RUNTIME_ERROR w/ spe_runtime_error = 0x%x\n",
1039 spe_num, si.result.spe_runtime_error);
1041 case SPE_RUNTIME_EXCEPTION:
1042 printf("eh: spu[%d] SPE_RUNTIME_EXCEPTION w/ spe_runtime_exception = 0x%x\n",
1043 spe_num, si.result.spe_runtime_exception);
1045 case SPE_RUNTIME_FATAL:
1046 printf("eh: spu[%d] SPE_RUNTIME_FATAL w/ spe_runtime_fatal = 0x%x\n",
1047 spe_num, si.result.spe_runtime_fatal);
1049 case SPE_CALLBACK_ERROR:
1050 printf("eh: spu[%d] SPE_CALLBACK_ERROR w/ spe_callback_error = 0x%x\n",
1051 spe_num, si.result.spe_callback_error);
1053 case SPE_ISOLATION_ERROR:
1054 printf("eh: spu[%d] SPE_ISOLATION_ERROR w/ spe_isolation_error = 0x%x\n",
1055 spe_num, si.result.spe_isolation_error);
1058 printf("eh: spu[%d] UNKNOWN STOP REASON (%d) w/ spu_status = 0x%x\n",
1059 spe_num, si.stop_reason, si.spu_status);
1064 #if 0 // not enabled
1065 else if (evt->events == SPE_EVENT_IN_MBOX){ // there's room to write to SPE
1066 // spe_in_mbox_write (ignore)
1068 else if (evt->events == SPE_EVENT_TAG_GROUP){ // our DMA completed
1069 // spe_mfcio_tag_status_read
1073 fprintf(stderr, "handle_event: unexpected evt->events = 0x%x\n", evt->events);
1079 // This is the "main program" of the event handling thread
1082 gc_job_manager_impl::event_handler_loop()
1084 static const int MAX_EVENTS = 16;
1085 static const int TIMEOUT = 20; // how long to block in milliseconds
1087 spe_event_unit_t events[MAX_EVENTS];
1090 printf("event_handler_loop: starting\n");
1092 set_eh_state(EHS_RUNNING);
1094 // ask the first spe for its max bufsize
1095 send_spe(0, MK_MBOX_MSG(OP_GET_SPU_BUFSIZE, 0));
1100 case EHS_RUNNING: // normal stuff
1101 if (d_shutdown_requested) {
1102 set_eh_state(EHS_SHUTTING_DOWN);
1106 case EHS_SHUTTING_DOWN:
1107 if (d_jc_state == JCS_DEAD){
1108 send_all_spes(MK_MBOX_MSG(OP_EXIT, 0));
1109 set_eh_state(EHS_WAITING_FOR_WORKERS_TO_DIE);
1113 case EHS_WAITING_FOR_WORKERS_TO_DIE:
1115 bool all_dead = true;
1116 for (unsigned int i = 0; i < d_options.nspes; i++)
1117 all_dead &= d_worker[i].state == WS_DEAD;
1120 set_eh_state(EHS_DEAD);
1122 printf("event_handler_loop: exiting\n");
1129 set_eh_state(EHS_DEAD);
1130 printf("event_handler_loop(default): exiting\n");
1134 // block waiting for events...
1135 int nevents = spe_event_wait(d_spe_event_handler.ptr,
1136 events, MAX_EVENTS, TIMEOUT);
1138 perror("spe_wait_event");
1141 for (int i = 0; i < nevents; i++){
1142 handle_event(&events[i]);
1147 ////////////////////////////////////////////////////////////////////////
1150 gc_job_manager_impl::poll_for_job_completion()
1152 static const int niter = 10000;
1154 CCTPL(); // change current (h/w) thread priority to low
1156 for (int n = 0; n < niter; n++){
1158 for (unsigned int spe_num = 0; spe_num < d_options.nspes; spe_num++){
1159 volatile spe_spu_control_area_t *spe_ctrl = d_worker[spe_num].spe_ctrl;
1160 int nentries = spe_ctrl->SPU_Mbox_Stat & 0xFF;
1161 while (nentries-- > 0){
1162 unsigned int msg = spe_ctrl->SPU_Out_Mbox;
1163 switch(MBOX_MSG_OP(msg)){
1166 printf("jc: job_done (0x%08x) from spu[%d]\n", msg, spe_num);
1168 CCTPM(); // change current thread priority to medium
1169 notify_clients_jobs_are_done(spe_num, MBOX_MSG_ARG(msg));
1174 printf("jc: Unexpected msg (0x%08x) from spu[%d]\n", msg, spe_num);
1184 // This is the "main program" of the job completer thread
1187 gc_job_manager_impl::job_completer_loop()
1189 d_jc_state = JCS_RUNNING;
1193 scoped_lock l(d_jc_mutex);
1194 if (d_jc_njobs_active == 0){
1195 if (d_shutdown_requested){
1196 d_jc_state = JCS_DEAD;
1203 poll_for_job_completion();
1207 ////////////////////////////////////////////////////////////////////////
1208 // this is the top of the SPE worker threads
1211 start_worker(void *arg)
1213 worker_ctx *w = (worker_ctx *) arg;
1216 w->state = WS_RUNNING;
1218 printf("worker[%d]: WS_RUNNING\n", w->spe_idx);
1220 unsigned int entry = SPE_DEFAULT_ENTRY;
1221 int r = spe_context_run(w->spe_ctx, &entry, 0, w->spu_args, 0, &si);
1223 if (r < 0){ // error
1225 snprintf(buf, sizeof(buf), "worker[%d]: spe_context_run", w->spe_idx);
1229 // spe program called exit.
1231 printf("worker[%d]: SPE_EXIT w/ exit_code = %d\n",
1232 w->spe_idx, si.result.spe_exit_code);
1235 // called stop_and_signal
1237 // I'm not sure we'll ever get here. I think the event
1238 // handler will catch this...
1239 printf("worker[%d]: SPE_STOP_AND_SIGNAL w/ spe_signal_code = 0x%x\n",
1240 w->spe_idx, si.result.spe_signal_code);
1243 // in any event, we're committing suicide now ;)
1245 printf("worker[%d]: WS_DEAD\n", w->spe_idx);
1251 ////////////////////////////////////////////////////////////////////////
1253 gc_client_thread_info *
1254 gc_job_manager_impl::alloc_cti()
1256 for (unsigned int i = 0; i < d_options.max_client_threads; i++){
1257 if (d_client_thread[i].d_free){
1258 // try to atomically grab it
1259 if (_atomic_dec_if_positive(ptr_to_ea(&d_client_thread[i].d_free)) == 0){
1261 gc_client_thread_info *cti = &d_client_thread[i];
1262 cti->d_state = CT_NOT_WAITING;
1263 bv_zero(cti->d_jobs_done);
1264 cti->d_njobs_waiting_for = 0;
1265 cti->d_jobs_waiting_for = 0;
1275 gc_job_manager_impl::free_cti(gc_client_thread_info *cti)
1277 assert((size_t) (cti - d_client_thread.get()) < d_options.max_client_threads);
1282 gc_job_manager_impl::ea_args_maxsize()
1284 scoped_lock l(d_eh_mutex);
1286 while (d_ea_args_maxsize == 0) // wait for it to be initialized
1289 return d_ea_args_maxsize;
1293 gc_job_manager_impl::set_debug(int debug)
1296 s_worker_debug = debug;
1300 gc_job_manager_impl::debug()
1305 ////////////////////////////////////////////////////////////////////////
1308 gc_job_manager_impl::setup_logfiles()
1310 if (!d_options.enable_logging)
1313 if (d_options.log2_nlog_entries == 0)
1314 d_options.log2_nlog_entries = 12;
1316 // must end up a multiple of the page size
1318 size_t pagesize = getpagesize();
1319 size_t s = (1 << d_options.log2_nlog_entries) * sizeof(gc_log_entry_t);
1320 s = ((s + pagesize - 1) / pagesize) * pagesize;
1321 size_t nentries = s / sizeof(gc_log_entry_t);
1322 assert(is_power_of_2(nentries));
1324 for (unsigned int i = 0; i < d_options.nspes; i++){
1326 snprintf(filename, sizeof(filename), "spu_log.%02d", i);
1327 int fd = open(filename, O_CREAT|O_TRUNC|O_RDWR, 0664);
1332 lseek(fd, s - 1, SEEK_SET);
1334 void *p = mmap(0, s, PROT_READ|PROT_WRITE, MAP_SHARED, fd, 0);
1335 if (p == MAP_FAILED){
1336 perror("gc_job_manager_impl::setup_logfiles: mmap");
1342 d_spu_args[i].log.base = ptr_to_ea(p);
1343 d_spu_args[i].log.nentries = nentries;
1348 gc_job_manager_impl::sync_logfiles()
1350 for (unsigned int i = 0; i < d_options.nspes; i++){
1351 if (d_spu_args[i].log.base)
1352 msync(ea_to_ptr(d_spu_args[i].log.base),
1353 d_spu_args[i].log.nentries * sizeof(gc_log_entry_t),
1359 gc_job_manager_impl::unmap_logfiles()
1361 for (unsigned int i = 0; i < d_options.nspes; i++){
1362 if (d_spu_args[i].log.base)
1363 munmap(ea_to_ptr(d_spu_args[i].log.base),
1364 d_spu_args[i].log.nentries * sizeof(gc_log_entry_t));
1368 ////////////////////////////////////////////////////////////////////////
1370 // lookup proc names in d_proc_def table
1373 gc_job_manager_impl::lookup_proc(const std::string &proc_name)
1375 for (int i = 0; i < d_nproc_defs; i++)
1376 if (proc_name == d_proc_def[i].name)
1379 throw gc_unknown_proc(proc_name);
1382 std::vector<std::string>
1383 gc_job_manager_impl::proc_names()
1385 std::vector<std::string> r;
1386 for (int i = 0; i < d_nproc_defs; i++)
1387 r.push_back(d_proc_def[i].name);
1392 ////////////////////////////////////////////////////////////////////////
1394 worker_ctx::~worker_ctx()
1397 int r = spe_context_destroy(spe_ctx);
1399 perror("spe_context_destroy");