/* -*- c++ -*- */
/*
- * Copyright 2008,2009 Free Software Foundation, Inc.
+ * Copyright 2008,2009,2010 Free Software Foundation, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
#include <usrp2/tune_result.h>
#include <usrp2/copiers.h>
#include <gruel/inet.h>
+#include <gruel/realtime.h>
#include <usrp2_types.h>
#include "usrp2_impl.h"
-#include "usrp2_thread.h"
#include "eth_buffer.h"
#include "pktfilter.h"
#include "control.h"
usrp2::impl::impl(const std::string &ifc, props *p, size_t rx_bufsize)
- : d_eth_buf(new eth_buffer(rx_bufsize)), d_interface_name(ifc), d_pf(0), d_bg_thread(0),
+ : d_eth_buf(new eth_buffer(rx_bufsize)), d_interface_name(ifc), d_pf(0),
d_bg_running(false), d_rx_seqno(-1), d_tx_seqno(0), d_next_rid(0),
d_num_rx_frames(0), d_num_rx_missing(0), d_num_rx_overruns(0), d_num_rx_bytes(0),
- d_num_enqueued(0), d_enqueued_mutex(), d_bg_pending_cond(&d_enqueued_mutex),
+ d_num_enqueued(0), d_enqueued_mutex(), d_bg_pending_cond(),
d_channel_rings(NCHANS), d_tx_interp(0), d_rx_decim(0), d_dont_enqueue(true)
{
if (!d_eth_buf->open(ifc, htons(U2_ETHERTYPE)))
memset(d_pending_replies, 0, sizeof(d_pending_replies));
- d_bg_thread = new usrp2_thread(this);
- d_bg_thread->start();
+ // Kick off receive thread
+ start_bg();
// In case the USRP2 was left streaming RX
// FIXME: only one channel right now
usrp2::impl::~impl()
{
stop_bg();
- d_bg_thread = 0; // thread class deletes itself
delete d_pf;
d_eth_buf->close();
delete d_eth_buf;
// Background loop: received packet demuxing
// ----------------------------------------------------------------
+ void
+ usrp2::impl::start_bg()
+ {
+ d_rx_tg.create_thread(boost::bind(&usrp2::impl::bg_loop, this));
+ }
+
void
usrp2::impl::stop_bg()
{
d_bg_running = false;
- d_bg_pending_cond.signal();
-
- void *dummy_status;
- d_bg_thread->join(&dummy_status);
+ d_bg_pending_cond.notify_one(); // FIXME: check if needed
+ d_rx_tg.join_all();
}
void
usrp2::impl::bg_loop()
{
+ gruel::enable_realtime_scheduling();
+
d_bg_running = true;
while(d_bg_running) {
DEBUG_LOG(":");
// The channel ring thread that decrements d_num_enqueued to zero
// will signal this thread to continue.
{
- omni_mutex_lock l(d_enqueued_mutex);
+ gruel::scoped_lock l(d_enqueued_mutex);
while(d_num_enqueued > 0 && d_bg_running)
- d_bg_pending_cond.wait();
+ d_bg_pending_cond.wait(l);
}
}
d_bg_running = false;
unsigned int chan = u2p_chan(&pkt->hdrs.fixed);
{
- omni_mutex_lock l(d_channel_rings_mutex);
+ gruel::scoped_lock l(d_channel_rings_mutex);
if (!d_channel_rings[chan]) {
DEBUG_LOG("!");
}
{
- omni_mutex_lock l(d_channel_rings_mutex);
+ gruel::scoped_lock l(d_channel_rings_mutex);
if (d_channel_rings[channel]) {
std::cerr << "usrp2: channel " << channel
<< " already streaming" << std::endl;
}
{
- omni_mutex_lock l(d_channel_rings_mutex);
+ gruel::scoped_lock guard(d_channel_rings_mutex);
if (d_channel_rings[channel]) {
std::cerr << "usrp2: channel " << channel
<< " already streaming" << std::endl;
}
{
- omni_mutex_lock l(d_channel_rings_mutex);
+ gruel::scoped_lock guard(d_channel_rings_mutex);
if (d_channel_rings[channel]) {
std::cerr << "usrp2: channel " << channel
<< " already streaming" << std::endl;
op_generic_t reply;
{
- omni_mutex_lock l(d_channel_rings_mutex);
+ gruel::scoped_lock l(d_channel_rings_mutex);
memset(&cmd, 0, sizeof(cmd));
init_etf_hdrs(&cmd.h, d_addr, 0, CONTROL_CHAN, -1);