Fix problem with commands timing out (specifically stop_rx_streaming)
authorEric Blossom <eb@comsec.com>
Fri, 4 Sep 2009 10:32:32 +0000 (03:32 -0700)
committerEric Blossom <eb@comsec.com>
Fri, 4 Sep 2009 10:32:32 +0000 (03:32 -0700)
After fixing the race, this change uses Tom's idea to stop enqueuing
data when trying to stop, and adds a new flush_rx_samples method
to drop any samples that may have already been accumulated.

I ran Tom's test case 500 times with 0 failures ;-)

usrp2/host/lib/usrp2_impl.cc
usrp2/host/lib/usrp2_impl.h

index b81b3cd61a5d8969e80e81de5eb7f57da18213e5..d1e85805c981ae94f33fc9b64b6fea39eb2d2568 100644 (file)
@@ -133,7 +133,7 @@ namespace usrp2 {
       d_bg_running(false), d_rx_seqno(-1), d_tx_seqno(0), d_next_rid(0),
       d_num_rx_frames(0), d_num_rx_missing(0), d_num_rx_overruns(0), d_num_rx_bytes(0), 
       d_num_enqueued(0), d_enqueued_mutex(), d_bg_pending_cond(&d_enqueued_mutex),
-      d_channel_rings(NCHANS), d_tx_interp(0), d_rx_decim(0)
+      d_channel_rings(NCHANS), d_tx_interp(0), d_rx_decim(0), d_dont_enqueue(true)
   {
     if (!d_eth_buf->open(ifc, htons(U2_ETHERTYPE)))
       throw std::runtime_error("Unable to register USRP2 protocol");
@@ -375,6 +375,10 @@ namespace usrp2 {
       return handle_control_packet(base, len);
     }
     else {                             // data packets
+
+      if (d_dont_enqueue)              // toss packet
+       return data_handler::RELEASE;
+
       return handle_data_packet(base, len);
     }
 
@@ -636,6 +640,7 @@ namespace usrp2 {
       cmd.eop.opcode = OP_EOP;
       cmd.eop.len = sizeof(cmd.eop);
     
+      d_dont_enqueue = false;
       bool success = false;
       pending_reply p(cmd.op.rid, &reply, sizeof(reply));
       success = transmit_cmd_and_wait(&cmd, sizeof(cmd), &p, DEF_CMD_TIMEOUT);
@@ -643,6 +648,8 @@ namespace usrp2 {
       
       if (success)
        d_channel_rings[channel] = ring_sptr(new ring(d_eth_buf->max_frames()));
+      else
+       d_dont_enqueue = true;
 
       //fprintf(stderr, "usrp2::start_rx_streaming: success = %d\n", success);
       return success;
@@ -664,6 +671,9 @@ namespace usrp2 {
       return false;
     }
 
+    d_dont_enqueue = true;     // no new samples
+    flush_rx_samples(channel); // dump any we may already have
+
     op_stop_rx_cmd cmd;
     op_generic_t reply;
 
@@ -680,7 +690,7 @@ namespace usrp2 {
     
       bool success = false;
       pending_reply p(cmd.op.rid, &reply, sizeof(reply));
-      success = transmit_cmd_and_wait(&cmd, sizeof(cmd), &p, 10 * DEF_CMD_TIMEOUT);
+      success = transmit_cmd_and_wait(&cmd, sizeof(cmd), &p, DEF_CMD_TIMEOUT);
       success = success && (ntohx(reply.ok) == 1);
       d_channel_rings[channel].reset();
       //fprintf(stderr, "usrp2::stop_rx_streaming:  success = %d\n", success);
@@ -736,6 +746,36 @@ namespace usrp2 {
     return true;
   }
 
+  bool
+  usrp2::impl::flush_rx_samples(unsigned int channel)
+  {
+    if (channel > MAX_CHAN) {
+      std::cerr << "usrp2: invalid channel (" << channel
+                << " )" << std::endl;
+      return false;
+    }
+
+    if (channel > 0) {
+      std::cerr << "usrp2: channel " << channel
+                << " not implemented" << std::endl;
+      return false;
+    }
+
+    ring_sptr rp = d_channel_rings[channel];
+    if (!rp){
+      return false;
+    }
+
+    // Iterate through frames and drop them
+    void *p;
+    size_t frame_len_in_bytes;
+    while (rp->dequeue(&p, &frame_len_in_bytes)) {
+      d_eth_buf->release_frame(p);
+      dec_enqueued();
+    }
+    return true;
+  }
+
   // ----------------------------------------------------------------
   //                           Transmit
   // ----------------------------------------------------------------
index 434e0e9b13f865aecc7c8fef1ac8b0b6264b6741..ed71a6ba33903c09589b342ea75cdccb8c1fcf34 100644 (file)
@@ -87,6 +87,8 @@ namespace usrp2 {
     int                   d_tx_interp;         // shadow tx interp 
     int                   d_rx_decim;          // shadow rx decim
 
+    bool          d_dont_enqueue;
+
     void inc_enqueued() {
       omni_mutex_lock l(d_enqueued_mutex);
       d_num_enqueued++;
@@ -142,6 +144,7 @@ namespace usrp2 {
     bool read_gpio(int bank, uint16_t *value);
     bool start_rx_streaming(unsigned int channel, unsigned int items_per_frame);
     bool rx_samples(unsigned int channel, rx_sample_handler *handler);
+    bool flush_rx_samples(unsigned int channel);
     bool stop_rx_streaming(unsigned int channel);
     unsigned int rx_overruns() const { return d_num_rx_overruns; }
     unsigned int rx_missing() const { return d_num_rx_missing; }