Merged eb/gcell-wip -r10213:10230 into the trunk. This reduces the
[debian/gnuradio] / gcell / lib / runtime / gc_job_manager_impl.cc
index 629019f4d0a3eec1f0466cb7dde8daba90286519..4469d50239bc577baca429504d7732eabd890413 100644 (file)
@@ -1,6 +1,6 @@
 /* -*- c++ -*- */
 /*
- * Copyright 2007 Free Software Foundation, Inc.
+ * Copyright 2007,2008,2009 Free Software Foundation, Inc.
  * 
  * This file is part of GNU Radio
  * 
 #include <sys/stat.h>
 #include <fcntl.h>
 #include <string.h>
+#include <sched.h>
 
 
+#define __nop() __asm__ volatile ("ori 0,0,0" : : : "memory")
+#define __cctpl() __asm__ volatile ("or 1,1,1" : : : "memory")
+#define __cctpm() __asm__ volatile ("or 2,2,2" : : : "memory")
+#define __cctph() __asm__ volatile ("or 3,3,3" : : : "memory")
+#define __db8cyc() __asm__ volatile ("or 28,28,28" : : : "memory")
+#define __db10cyc() __asm__ volatile ("or 29,29,29" : : : "memory")
+#define __db12cyc() __asm__ volatile ("or 30,30,30" : : : "memory")
+#define __db16cyc() __asm__ volatile ("or 31,31,31" : : : "memory")
+
+
+#if 1
+#define CCTPL() __cctpl()
+#define CCTPM() __cctpm()
+#else
+#define CCTPL() (void) 0
+#define CCTPM() (void) 0
+#endif
+
 static const size_t CACHE_LINE_SIZE = 128;
 
 static const unsigned int DEFAULT_MAX_JOBS = 128;
@@ -99,6 +118,8 @@ gc_job_manager_impl::gc_job_manager_impl(const gc_jm_options *options)
   : d_debug(0), d_spu_args(0),
     d_eh_cond(&d_eh_mutex), d_eh_thread(0), d_eh_state(EHS_INIT),
     d_shutdown_requested(false),
+    d_jc_cond(&d_jc_mutex), d_jc_thread(0), d_jc_state(JCS_INIT), d_jc_njobs_active(0),
+    d_ntell(0), d_tell_start(0),
     d_client_thread(0), d_ea_args_maxsize(0),
     d_proc_def(0), d_proc_def_ls_addr(0), d_nproc_defs(0)
 {
@@ -177,6 +198,8 @@ gc_job_manager_impl::gc_job_manager_impl(const gc_jm_options *options)
     }
   }
 
+  d_ntell = std::min(d_options.nspes, 2U);
+
   // ----------------------------------------------------------------
   // initalize the job queue
   
@@ -218,6 +241,7 @@ gc_job_manager_impl::gc_job_manager_impl(const gc_jm_options *options)
   // fprintf(stderr, "d_proc_def_ls_addr = 0x%0x\n", d_proc_def_ls_addr);
 
   int spe_flags = (SPE_EVENTS_ENABLE
+                  | SPE_MAP_PS
                   | SPE_CFG_SIGNOTIFY1_OR
                   | SPE_CFG_SIGNOTIFY2_OR);
   
@@ -228,6 +252,14 @@ gc_job_manager_impl::gc_job_manager_impl(const gc_jm_options *options)
       perror("spe_context_create");
       throw std::runtime_error("spe_context_create");
     }
+
+    d_worker[i].spe_ctrl = 
+      (spe_spu_control_area_t *)spe_ps_area_get(d_worker[i].spe_ctx, SPE_CONTROL_AREA);
+    if (d_worker[i].spe_ctrl == 0){
+      perror("spe_ps_area_get(SPE_CONTROL_AREA)");
+      throw std::runtime_error("spe_ps_area_get(SPE_CONTROL_AREA)");
+    }
+
     d_worker[i].spe_idx = i;
     d_worker[i].spu_args = &d_spu_args[i];
     d_worker[i].spu_args->queue = ptr_to_ea(d_queue);
@@ -315,7 +347,6 @@ gc_job_manager_impl::gc_job_manager_impl(const gc_jm_options *options)
   // create the spe event handler & worker (SPE) threads
 
   create_event_handler();
-
 }
 
 ////////////////////////////////////////////////////////////////////////
@@ -339,7 +370,11 @@ gc_job_manager_impl::shutdown()
 {
   omni_mutex_lock      l(d_eh_mutex);
 
-  d_shutdown_requested = true;         // set flag for event handler thread
+  {
+    omni_mutex_lock    l2(d_jc_mutex);
+    d_shutdown_requested = true;       // set flag for event handler thread
+    d_jc_cond.signal();                        // wake up job completer
+  }
 
   // should only happens during early QA code
   if (d_eh_thread == 0 && d_eh_state == EHS_INIT)
@@ -420,6 +455,29 @@ gc_job_manager_impl::free_job_desc(gc_job_desc *jd)
 
 ////////////////////////////////////////////////////////////////////////
 
+
+inline bool
+gc_job_manager_impl::incr_njobs_active()
+{
+  omni_mutex_lock      l(d_jc_mutex);
+
+  if (d_shutdown_requested)
+    return false;
+
+  if (d_jc_njobs_active++ == 0)        // signal on 0 to 1 transition
+    d_jc_cond.signal();
+
+  return true;
+}
+
+inline void
+gc_job_manager_impl::decr_njobs_active(int n)
+{
+  omni_mutex_lock      l(d_jc_mutex);
+  d_jc_njobs_active -= n;
+}
+
+
 /*
  * We check as much as we can here on the PPE side, so that the SPE
  * doesn't have to.
@@ -475,11 +533,6 @@ check_ea_args(gc_job_desc *jd, gc_job_ea_args *p)
 bool
 gc_job_manager_impl::submit_job(gc_job_desc *jd)
 {
-  if (unlikely(d_shutdown_requested)){
-    jd->status = JS_SHUTTING_DOWN;
-    return false;
-  }
-
   // Ensure it's one of our job descriptors
 
   if (jd < d_jd || jd >= &d_jd[d_options.max_jobs]){
@@ -522,9 +575,13 @@ gc_job_manager_impl::submit_job(gc_job_desc *jd)
   jd->status = JS_OK;
   jd->sys.client_id = cti->d_client_id;
 
-  // FIXME keep count of jobs in progress?
+  if (!incr_njobs_active()){
+    jd->status = JS_SHUTTING_DOWN;
+    return false;
+  }
   
   gc_jd_queue_enqueue(d_queue, jd);
+  // tell_spes_to_check_queue();
   return true;
 }
 
@@ -628,6 +685,27 @@ gc_job_manager_impl::send_spe(unsigned int spe, uint32_t msg)
   return r == 1;
 }
 
+void 
+gc_job_manager_impl::tell_spes_to_check_queue()
+{
+  int nspes = d_options.nspes;
+
+  for (int i = 0, ntold = 0; ntold < d_ntell && i < nspes ; ++i){
+    volatile spe_spu_control_area_t *spe_ctrl = d_worker[d_tell_start].spe_ctrl;
+    int nfree = (spe_ctrl->SPU_Mbox_Stat >> 8) & 0xFF;
+    if (nfree == 4){
+      spe_ctrl->SPU_In_Mbox = MK_MBOX_MSG(OP_CHECK_QUEUE, 0);
+      ntold++;
+    }
+
+    unsigned int t = d_tell_start + 1;
+    if (t >= d_options.nspes)
+      t = 0;
+    d_tell_start = t;
+  }
+}
+
+
 ////////////////////////////////////////////////////////////////////////
 
 static void
@@ -685,6 +763,14 @@ start_event_handler(void *arg)
   return 0;
 }
 
+static void *
+start_job_completer(void *arg)
+{
+  gc_job_manager_impl *p = (gc_job_manager_impl *) arg;
+  p->job_completer_loop();
+  return 0;
+}
+
 void
 gc_job_manager_impl::create_event_handler()
 {
@@ -709,12 +795,18 @@ gc_job_manager_impl::create_event_handler()
     }
   }
 
-  // create our event handling thread
+  // create the event handling thread
 
   if (!start_thread(&d_eh_thread, start_event_handler, this, "event_handler")){
     throw std::runtime_error("pthread_create");
   }
 
+  // create the job completion thread
+
+  if (!start_thread(&d_jc_thread, start_job_completer, this, "job_completer")){
+    throw std::runtime_error("pthread_create");
+  }
+
   // create the SPE worker threads
 
   bool ok = true;
@@ -805,6 +897,8 @@ gc_job_manager_impl::notify_clients_jobs_are_done(unsigned int spe_num,
     return;
   }
 
+  decr_njobs_active(ci->ncomplete);
+
   if (0){
     static int total_jobs;
     static int total_msgs;
@@ -902,12 +996,13 @@ gc_job_manager_impl::handle_event(spe_event_unit_t *evt)
     else {
       for (int i = 0; i < n; i++){
        switch(MBOX_MSG_OP(msg[i])){
+#if 0
        case OP_JOBS_DONE:
          if (debug())
            printf("eh: job_done (0x%08x) from spu[%d]\n", msg[i], spe_num);
          notify_clients_jobs_are_done(spe_num, MBOX_MSG_ARG(msg[i]));
          break;
-
+#endif
        case OP_SPU_BUFSIZE:
          set_ea_args_maxsize(MBOX_MSG_ARG(msg[i]));
          break;
@@ -1001,18 +1096,17 @@ gc_job_manager_impl::event_handler_loop()
   while (1){
     switch(d_eh_state){
 
-    case EHS_RUNNING:      // normal stuff
+    case EHS_RUNNING:                  // normal stuff
       if (d_shutdown_requested) {
        set_eh_state(EHS_SHUTTING_DOWN);
       }
       break;
 
     case EHS_SHUTTING_DOWN:
-
-      // FIXME wait until job queue is empty, then tell them to exit
-
-      send_all_spes(MK_MBOX_MSG(OP_EXIT, 0));
-      set_eh_state(EHS_WAITING_FOR_WORKERS_TO_DIE);
+      if (d_jc_state == JCS_DEAD){
+       send_all_spes(MK_MBOX_MSG(OP_EXIT, 0));
+       set_eh_state(EHS_WAITING_FOR_WORKERS_TO_DIE);
+      }
       break;
 
     case EHS_WAITING_FOR_WORKERS_TO_DIE:
@@ -1050,7 +1144,67 @@ gc_job_manager_impl::event_handler_loop()
 }
 
 ////////////////////////////////////////////////////////////////////////
-// This is the top of the SPE worker threads
+
+void
+gc_job_manager_impl::poll_for_job_completion()
+{
+  static const int niter = 10000;
+
+  CCTPL();             // change current (h/w) thread priority to low
+
+  for (int n = 0; n < niter; n++){
+
+    for (unsigned int spe_num = 0; spe_num < d_options.nspes; spe_num++){
+      volatile spe_spu_control_area_t *spe_ctrl = d_worker[spe_num].spe_ctrl;
+      int nentries = spe_ctrl->SPU_Mbox_Stat & 0xFF;
+      while (nentries-- > 0){
+       unsigned int msg = spe_ctrl->SPU_Out_Mbox;
+       switch(MBOX_MSG_OP(msg)){
+       case OP_JOBS_DONE:
+         if (debug())
+           printf("jc: job_done (0x%08x) from spu[%d]\n", msg, spe_num);
+
+         CCTPM();              // change current thread priority to medium
+         notify_clients_jobs_are_done(spe_num, MBOX_MSG_ARG(msg));
+         CCTPL();
+         break;
+
+       default:
+         printf("jc: Unexpected msg (0x%08x) from spu[%d]\n", msg, spe_num);
+         break;
+       }
+      }
+    }
+  }
+  CCTPM();
+}
+
+//
+// This is the "main program" of the job completer thread
+//
+void
+gc_job_manager_impl::job_completer_loop()
+{
+  d_jc_state = JCS_RUNNING;
+
+  while (1){
+    {
+      omni_mutex_lock  l(d_jc_mutex);
+      if (d_jc_njobs_active == 0){
+       if (d_shutdown_requested){
+         d_jc_state = JCS_DEAD;
+         return;
+       }
+       d_jc_cond.wait();
+      }
+    }
+
+    poll_for_job_completion();
+  }
+}
+
+////////////////////////////////////////////////////////////////////////
+// this is the top of the SPE worker threads
 
 static void *
 start_worker(void *arg)