# Copyright (C) 2025 Bdale Garbee <bdale@gag.com>. GPLv3+
#
-import os
import cherrypy
from cherrypy.lib.static import serve_file
import gpiod
from gpiod.line import Direction, Value
import iio
import json
+import os
import shutil
import signal
import sys
+import threading
import time
# where we'll store all our test data
DATAPATH = "/var/lib/quantimotor/"
+test_path = ""
+
+loggertask = 0
+pyrotask = 0
n = 0
pressure = 0
OFFSET = "0"
SCALE = "0.078127104"
VOLTAGES = ['voltage0', 'voltage1', 'voltage2', 'voltage3', 'voltage4', 'voltage5', 'voltage6', 'voltage7']
+BUFFER_SIZE = 256
+
+class ContextBuilder:
+ """Class for creating the requested context."""
+
+ def __init__(self):
+ self.ctx = None
+
+ def create(self):
+ try:
+ self.ctx = iio.LocalContext()
+
+ except FileNotFoundError:
+ raise Exception("Unable to create IIO context!\n")
+
+ return self.ctx
+
+class BufferBuilder:
+ def __init__(self, ctx):
+ self.ctx = ctx
+ self.dev = None
+
+ def _device(self):
+ self.dev = self.ctx.find_device('ads8688')
+ if self.dev is None:
+ raise Exception("Device ads8688 not found!")
+ return self
+
+ def _channels(self):
+ for channel in self.dev.channels:
+ channel.enabled = True
+ return self
+
+ def create(self):
+ self._device()
+ self._channels()
+ buffer = iio.Buffer(self.dev, BUFFER_SIZE)
+ if buffer is None:
+ raise Exception("Unable to create buffer!\n")
+ return buffer
+
+class DataReader(threading.Thread):
+ def __init__(self, ctx):
+ threading.Thread.__init__(self)
+ self.shutdown_flag = threading.Event()
+
+ buffer_builder = BufferBuilder(ctx)
+ self.buffer = buffer_builder.create()
+ self.device = buffer_builder.dev
+
+ def run(self):
+ global test_path
+ cherrypy.log('DataReader thread #%s started' % self.ident)
+
+ # open file for data logging
+ file_path = test_path + "/rawdata"
+ with open(file_path, 'wb') as file:
+
+ # read data, writing to file
+ while not self.shutdown_flag.is_set():
+ self.buffer.refill()
+ samples = self.buffer.read()
+ file.write(bytes(samples))
+
+ cherrypy.log('DataReader thread #%s stopped' % self.ident)
+
+class Pyro(threading.Thread):
+ def __init__(self):
+ threading.Thread.__init__(self)
+ self.shutdown_flag = threading.Event()
+
+ def run(self):
+ cherrypy.log('Pyro thread #%s started' % self.ident)
+
+ # make sure logging has time to start first!
+ time.sleep(1)
+
+ # turn pyro output on
+ set_line_values("/dev/gpiochip0", {16: Value.ACTIVE})
+
+ # leave pyro on for 3 seconds
+ time.sleep(3)
+
+ # turn pyro output off
+ set_line_values("/dev/gpiochip0", {16: Value.INACTIVE})
+
+ # keep thread alive until test is done
+ while not self.shutdown_flag.is_set():
+ time.sleep(0.5)
+
+ cherrypy.log('Pyro thread #%s stopping' % self.ident)
# set gpio output lines
def set_line_values(chip_path, line_values):
# end point for starting a test
@cherrypy.expose
def starttest(self):
+ global test_path
if cherrypy.request.method == 'GET':
# create a directory name for this test
t = time.localtime()
shutil.copy(metadata_path, test_path)
status = "testing"
cherrypy.log("status " + status)
+ runatest()
return "Test " + test_path + " started"
else:
return "Method not allowed", 405
if cherrypy.request.method == 'GET':
status = "idle"
cherrypy.log("status " + status)
+ endatest()
return "Test concluded"
else:
return "Method not allowed", 405
'armed' : armed
}
+def runatest():
+ global loggertask
+ global pyrotask
+
+ # turn triggered buffering on
+ #os.system("echo 1 > /sys/bus/iio/devices/iio\:device0/buffer0/enable")
+
+ # create data logging thread
+ context_builder = ContextBuilder()
+ loggertask = DataReader(context_builder.create())
+
+ # create pyro event thread
+ pyrotask = Pyro()
+
+ # start all threads
+ loggertask.start()
+ pyrotask.start()
+
+def endatest():
+ global loggertask
+ global pyrotask
+
+ cherrypy.log("setting shutdown_flag for each thread")
+ # stop logging data by telling the thread to exit
+ loggertask.shutdown_flag.set()
+ pyrotask.shutdown_flag.set()
+
+ cherrypy.log("TestExit waiting for each thread to exit")
+ # wait for the thread to exit
+ loggertask.join()
+ pyrotask.join()
+
+ # turn triggered buffering off so low speed iio actions work while idle
+ # os.system("echo 0 > /sys/bus/iio/devices/iio\:device0/buffer0/enable")
+ status = "idle"
+
if __name__ == '__main__':
# initialize hardware