From: Bdale Garbee Date: Tue, 27 May 2025 00:17:23 +0000 (-0600) Subject: working enough to run one test and get raw data X-Git-Url: https://git.gag.com/?a=commitdiff_plain;h=02ff1c889e2a136e357684ad18e033b46755fd37;p=fw%2Fquantimotor working enough to run one test and get raw data --- diff --git a/ui/app.py b/ui/app.py index 2fc5752..ef66908 100755 --- a/ui/app.py +++ b/ui/app.py @@ -4,20 +4,25 @@ # Copyright (C) 2025 Bdale Garbee . GPLv3+ # -import os import cherrypy from cherrypy.lib.static import serve_file import gpiod from gpiod.line import Direction, Value import iio import json +import os import shutil import signal import sys +import threading import time # where we'll store all our test data DATAPATH = "/var/lib/quantimotor/" +test_path = "" + +loggertask = 0 +pyrotask = 0 n = 0 pressure = 0 @@ -38,6 +43,97 @@ ctrl = ctx.find_device('ads8688') OFFSET = "0" SCALE = "0.078127104" VOLTAGES = ['voltage0', 'voltage1', 'voltage2', 'voltage3', 'voltage4', 'voltage5', 'voltage6', 'voltage7'] +BUFFER_SIZE = 256 + +class ContextBuilder: + """Class for creating the requested context.""" + + def __init__(self): + self.ctx = None + + def create(self): + try: + self.ctx = iio.LocalContext() + + except FileNotFoundError: + raise Exception("Unable to create IIO context!\n") + + return self.ctx + +class BufferBuilder: + def __init__(self, ctx): + self.ctx = ctx + self.dev = None + + def _device(self): + self.dev = self.ctx.find_device('ads8688') + if self.dev is None: + raise Exception("Device ads8688 not found!") + return self + + def _channels(self): + for channel in self.dev.channels: + channel.enabled = True + return self + + def create(self): + self._device() + self._channels() + buffer = iio.Buffer(self.dev, BUFFER_SIZE) + if buffer is None: + raise Exception("Unable to create buffer!\n") + return buffer + +class DataReader(threading.Thread): + def __init__(self, ctx): + threading.Thread.__init__(self) + self.shutdown_flag = threading.Event() + + buffer_builder = BufferBuilder(ctx) + self.buffer = buffer_builder.create() + self.device = buffer_builder.dev + + def run(self): + global test_path + cherrypy.log('DataReader thread #%s started' % self.ident) + + # open file for data logging + file_path = test_path + "/rawdata" + with open(file_path, 'wb') as file: + + # read data, writing to file + while not self.shutdown_flag.is_set(): + self.buffer.refill() + samples = self.buffer.read() + file.write(bytes(samples)) + + cherrypy.log('DataReader thread #%s stopped' % self.ident) + +class Pyro(threading.Thread): + def __init__(self): + threading.Thread.__init__(self) + self.shutdown_flag = threading.Event() + + def run(self): + cherrypy.log('Pyro thread #%s started' % self.ident) + + # make sure logging has time to start first! + time.sleep(1) + + # turn pyro output on + set_line_values("/dev/gpiochip0", {16: Value.ACTIVE}) + + # leave pyro on for 3 seconds + time.sleep(3) + + # turn pyro output off + set_line_values("/dev/gpiochip0", {16: Value.INACTIVE}) + + # keep thread alive until test is done + while not self.shutdown_flag.is_set(): + time.sleep(0.5) + + cherrypy.log('Pyro thread #%s stopping' % self.ident) # set gpio output lines def set_line_values(chip_path, line_values): @@ -169,6 +265,7 @@ class App: # end point for starting a test @cherrypy.expose def starttest(self): + global test_path if cherrypy.request.method == 'GET': # create a directory name for this test t = time.localtime() @@ -182,6 +279,7 @@ class App: shutil.copy(metadata_path, test_path) status = "testing" cherrypy.log("status " + status) + runatest() return "Test " + test_path + " started" else: return "Method not allowed", 405 @@ -192,6 +290,7 @@ class App: if cherrypy.request.method == 'GET': status = "idle" cherrypy.log("status " + status) + endatest() return "Test concluded" else: return "Method not allowed", 405 @@ -237,6 +336,42 @@ class App: 'armed' : armed } +def runatest(): + global loggertask + global pyrotask + + # turn triggered buffering on + #os.system("echo 1 > /sys/bus/iio/devices/iio\:device0/buffer0/enable") + + # create data logging thread + context_builder = ContextBuilder() + loggertask = DataReader(context_builder.create()) + + # create pyro event thread + pyrotask = Pyro() + + # start all threads + loggertask.start() + pyrotask.start() + +def endatest(): + global loggertask + global pyrotask + + cherrypy.log("setting shutdown_flag for each thread") + # stop logging data by telling the thread to exit + loggertask.shutdown_flag.set() + pyrotask.shutdown_flag.set() + + cherrypy.log("TestExit waiting for each thread to exit") + # wait for the thread to exit + loggertask.join() + pyrotask.join() + + # turn triggered buffering off so low speed iio actions work while idle + # os.system("echo 0 > /sys/bus/iio/devices/iio\:device0/buffer0/enable") + status = "idle" + if __name__ == '__main__': # initialize hardware