]> git.gag.com Git - fw/quantimotor/commitdiff
working enough to run one test and get raw data
authorBdale Garbee <bdale@gag.com>
Tue, 27 May 2025 00:17:23 +0000 (18:17 -0600)
committerBdale Garbee <bdale@gag.com>
Tue, 27 May 2025 00:17:23 +0000 (18:17 -0600)
ui/app.py

index 2fc57521cdbf0c23c0c2baf85c0c64b689031f1d..ef66908b78392e8a7c1b2c1919acb376754d553f 100755 (executable)
--- a/ui/app.py
+++ b/ui/app.py
@@ -4,20 +4,25 @@
 # Copyright (C) 2025 Bdale Garbee <bdale@gag.com>.  GPLv3+
 #
 
-import os
 import cherrypy
 from cherrypy.lib.static import serve_file
 import gpiod
 from gpiod.line import Direction, Value
 import iio
 import json
+import os
 import shutil
 import signal
 import sys
+import threading
 import time
 
 # where we'll store all our test data
 DATAPATH = "/var/lib/quantimotor/"
+test_path = ""
+
+loggertask = 0
+pyrotask = 0
 
 n = 0
 pressure = 0
@@ -38,6 +43,97 @@ ctrl = ctx.find_device('ads8688')
 OFFSET = "0"
 SCALE = "0.078127104"
 VOLTAGES = ['voltage0', 'voltage1', 'voltage2', 'voltage3', 'voltage4', 'voltage5', 'voltage6', 'voltage7']
+BUFFER_SIZE = 256
+
+class ContextBuilder:
+    """Class for creating the requested context."""
+
+    def __init__(self):
+        self.ctx = None
+
+    def create(self):
+        try:
+            self.ctx = iio.LocalContext()
+
+        except FileNotFoundError:
+            raise Exception("Unable to create IIO context!\n")
+
+        return self.ctx
+
+class BufferBuilder:
+    def __init__(self, ctx):
+        self.ctx = ctx
+        self.dev = None
+
+    def _device(self):
+        self.dev = self.ctx.find_device('ads8688')
+        if self.dev is None:
+            raise Exception("Device ads8688 not found!")
+        return self
+
+    def _channels(self):
+        for channel in self.dev.channels:
+            channel.enabled = True
+        return self
+
+    def create(self):
+        self._device()
+        self._channels()
+        buffer = iio.Buffer(self.dev, BUFFER_SIZE)
+        if buffer is None:
+            raise Exception("Unable to create buffer!\n")
+        return buffer
+
+class DataReader(threading.Thread):
+    def __init__(self, ctx):
+        threading.Thread.__init__(self)
+        self.shutdown_flag = threading.Event()
+
+        buffer_builder = BufferBuilder(ctx)
+        self.buffer = buffer_builder.create()
+        self.device = buffer_builder.dev
+
+    def run(self):
+        global test_path
+        cherrypy.log('DataReader thread #%s started' % self.ident)
+
+        # open file for data logging
+        file_path = test_path + "/rawdata"
+        with open(file_path, 'wb') as file:
+
+            # read data, writing to file
+            while not self.shutdown_flag.is_set():
+                self.buffer.refill()
+                samples = self.buffer.read()
+                file.write(bytes(samples))
+
+        cherrypy.log('DataReader thread #%s stopped' % self.ident)
+
+class Pyro(threading.Thread):
+    def __init__(self):
+        threading.Thread.__init__(self)
+        self.shutdown_flag = threading.Event()
+
+    def run(self):
+        cherrypy.log('Pyro thread #%s started' % self.ident)
+
+        # make sure logging has time to start first!
+        time.sleep(1)
+
+        # turn pyro output on
+        set_line_values("/dev/gpiochip0", {16: Value.ACTIVE})
+    
+        # leave pyro on for 3 seconds
+        time.sleep(3)
+    
+        # turn pyro output off
+        set_line_values("/dev/gpiochip0", {16: Value.INACTIVE})
+       
+        # keep thread alive until test is done 
+        while not self.shutdown_flag.is_set():
+            time.sleep(0.5)
+
+        cherrypy.log('Pyro thread #%s stopping' % self.ident)
 
 # set gpio output lines
 def set_line_values(chip_path, line_values):
@@ -169,6 +265,7 @@ class App:
   # end point for starting a test
   @cherrypy.expose
   def starttest(self):
+    global test_path
     if cherrypy.request.method == 'GET':
       # create a directory name for this test
       t = time.localtime()
@@ -182,6 +279,7 @@ class App:
       shutil.copy(metadata_path, test_path)
       status = "testing"
       cherrypy.log("status " + status)
+      runatest()
       return "Test " + test_path + " started"
     else:
       return "Method not allowed", 405
@@ -192,6 +290,7 @@ class App:
     if cherrypy.request.method == 'GET':
       status = "idle"
       cherrypy.log("status " + status)
+      endatest()
       return "Test concluded"
     else:
       return "Method not allowed", 405
@@ -237,6 +336,42 @@ class App:
       'armed' : armed
     }
 
+def runatest():
+  global loggertask
+  global pyrotask
+
+  # turn triggered buffering on
+  #os.system("echo 1 > /sys/bus/iio/devices/iio\:device0/buffer0/enable")
+
+  # create data logging thread
+  context_builder = ContextBuilder()
+  loggertask = DataReader(context_builder.create())
+    
+  # create pyro event thread
+  pyrotask = Pyro()
+
+  # start all threads
+  loggertask.start()
+  pyrotask.start()
+
+def endatest():
+  global loggertask
+  global pyrotask
+
+  cherrypy.log("setting shutdown_flag for each thread")
+  # stop logging data by telling the thread to exit
+  loggertask.shutdown_flag.set()
+  pyrotask.shutdown_flag.set()
+
+  cherrypy.log("TestExit waiting for each thread to exit")
+  # wait for the thread to exit
+  loggertask.join()
+  pyrotask.join()
+
+  # turn triggered buffering off so low speed iio actions work while idle
+  # os.system("echo 0 > /sys/bus/iio/devices/iio\:device0/buffer0/enable")
+  status = "idle"
+
 
 if __name__ == '__main__':
   # initialize hardware