DATAPATH = "/var/lib/quantimotor/"
test_path = ""
-loggertask = 0
-pyrotask = 0
-
n = 0
pressure = 0
thrust = 0
VOLTAGES = ['voltage0', 'voltage1', 'voltage2', 'voltage3', 'voltage4', 'voltage5', 'voltage6', 'voltage7']
BUFFER_SIZE = 256
-class ContextBuilder:
- """Class for creating the requested context."""
-
- def __init__(self):
- self.ctx = None
-
- def create(self):
- try:
- self.ctx = iio.LocalContext()
-
- except FileNotFoundError:
- raise Exception("Unable to create IIO context!\n")
-
- return self.ctx
-
-class BufferBuilder:
- def __init__(self, ctx):
- self.ctx = ctx
- self.dev = None
-
- def _device(self):
- self.dev = self.ctx.find_device('ads8688')
- if self.dev is None:
- raise Exception("Device ads8688 not found!")
- return self
-
- def _channels(self):
- for channel in self.dev.channels:
- channel.enabled = True
- return self
-
- def create(self):
- self._device()
- self._channels()
- buffer = iio.Buffer(self.dev, BUFFER_SIZE)
- if buffer is None:
- raise Exception("Unable to create buffer!\n")
- return buffer
-
class DataReader(threading.Thread):
+ global buffer
+
def __init__(self, ctx):
threading.Thread.__init__(self)
self.shutdown_flag = threading.Event()
- buffer_builder = BufferBuilder(ctx)
- self.buffer = buffer_builder.create()
- self.device = buffer_builder.dev
-
def run(self):
global test_path
cherrypy.log('DataReader thread #%s started' % self.ident)
+ # set up IIO triggered buffer
+ buffer = iio.Buffer(ctrl, BUFFER_SIZE)
+ if buffer is None:
+ raise Exception("Unable to create buffer!\n")
+
# open file for data logging
file_path = test_path + "/rawdata"
with open(file_path, 'wb') as file:
# read data, writing to file
while not self.shutdown_flag.is_set():
- self.buffer.refill()
- samples = self.buffer.read()
+ buffer.refill()
+ samples = buffer.read()
file.write(bytes(samples))
+ buffer.cancel()
+ # if the cancel isn't enough...
+ # if 'buffer' in locals():
+ # iio.triggered_buffer_cleanup(buffer)
cherrypy.log('DataReader thread #%s stopped' % self.ident)
class Pyro(threading.Thread):
# end point for accepting and saving test metadata
@cherrypy.expose
@cherrypy.tools.json_in()
+ @cherrypy.tools.json_out()
def metadata(self):
if cherrypy.request.method == 'POST':
jsondata = cherrypy.request.json
with open(metadata_path, 'w') as json_file:
json.dump(jsondata, json_file, indent=2)
return "Data received and processed"
+ elif cherrypy.request.method == 'GET':
+ metadata_path = DATAPATH + "metadata.json"
+ with open(metadata_path, 'r') as json_file:
+ jsondata = json.load(json_file)
+ return jsondata
else:
return "Method not allowed", 405
if cherrypy.request.method == 'GET':
# create a directory name for this test
t = time.localtime()
- test_time = time.strftime("%Y:%m:%d_%H:%M:%S", t)
+ test_time = time.strftime("%Y.%m.%d_%H%M%S", t)
test_path = DATAPATH + test_time
# create a directory for this test
if not os.path.isdir(test_path):
def runatest():
global loggertask
global pyrotask
+ global ctx
# turn triggered buffering on
- #os.system("echo 1 > /sys/bus/iio/devices/iio\:device0/buffer0/enable")
+ os.system("echo 1 > /sys/bus/iio/devices/iio\:device0/buffer0/enable")
# create data logging thread
- context_builder = ContextBuilder()
- loggertask = DataReader(context_builder.create())
+ loggertask = DataReader(ctx)
# create pyro event thread
pyrotask = Pyro()
loggertask.shutdown_flag.set()
pyrotask.shutdown_flag.set()
- cherrypy.log("TestExit waiting for each thread to exit")
+ cherrypy.log("waiting for each thread to exit")
# wait for the thread to exit
loggertask.join()
pyrotask.join()
# turn triggered buffering off so low speed iio actions work while idle
- # os.system("echo 0 > /sys/bus/iio/devices/iio\:device0/buffer0/enable")
+ os.system("echo 0 > /sys/bus/iio/devices/iio\:device0/buffer0/enable")
status = "idle"
chan = ctrl.find_channel(id)
chan.attrs['scale'].value = SCALE
chan.attrs['offset'].value = OFFSET
+ chan.enabled = True
# subscribe our LED management function to the main loop
cherrypy.engine.subscribe('main', manageLEDs)