+++ /dev/null
-#!/usr/bin/env python3
-# Copyright (C) 2025 Bdale Garbee <bdale@gag.com>. GPLv3+
-
-import os
-import cherrypy
-import plotly.express as px
-import gpiod
-import iio
-import signal
-import sys
-import time
-
-# configuration for each channel on ADS8688
-OFFSET = "0"
-SCALE = "0.078127104"
-VOLTAGES = ['voltage0', 'voltage1', 'voltage2', 'voltage3', 'voltage4', 'voltage5', 'voltage6', 'voltage7']
-
-file_path = os.getcwd()
-from gpiod.line import Direction, Value
-
-def set_line_values(chip_path, line_values):
- value_str = {Value.ACTIVE: "Active", Value.INACTIVE: "Inactive"}
-
- request = gpiod.request_lines(
- chip_path,
- consumer=sys.argv[0],
- config={
- tuple(line_values.keys()): gpiod.LineSettings(direction=Direction.OUTPUT)
- },
- )
- request.set_values(line_values)
-
-def handler(signum, frame):
- set_line_values(
- "/dev/gpiochip0",
- { 4: Value.INACTIVE, # indicate 'not ready' to LPC
- 16: Value.INACTIVE, # pyro off
- 17: Value.INACTIVE, # alarm b off
- 20: Value.INACTIVE, # turn continuity LED off
- 21: Value.INACTIVE, # turn armed LED off
- 27: Value.INACTIVE # alarm a off
- }
- )
- sys.exit(0)
-
-# having systemd use SIGINT to avoid CherryPy consuming the kill signal
-signal.signal(signal.SIGINT, handler)
-
-class Root(object):
- @cherrypy.expose
-
- # define what happens on default (index) page
- def index(self):
- cherrypy.log("index")
-
- # some sort of demo data
- df = px.data.gapminder().query("country=='Canada'")
-
- # constrain height to 600 to avoid having to scroll for other info?
- fig = px.line(df, x="year", y="lifeExp", \
- title='Life expectancy in Canada', height=600)
-
- # fig.to_html gives us the entire interactive graph as a python string!
- plotly_data = fig.to_html(full_html=False)
-
- # for now, just concatenate a header and a footer
- html_data = '<html> <body> <h1>QuantiMotor</h1>' + \
- plotly_data + \
- "<h2>Thats all folks!</h2></body> </html>"
-
- # give the complete html to the client
- return html_data
-
-if __name__ == '__main__':
- ctx = iio.LocalContext()
- ctrl = ctx.find_device('ads8688')
-
- # initialize hardware
- set_line_values(
- "/dev/gpiochip0",
- {25: Value.ACTIVE, # take ADS8688 out of reset
- 4: Value.ACTIVE, # indicate 'ready' to LPC
- 16: Value.INACTIVE, # pyro off
- 17: Value.INACTIVE, # alarm b off
- 20: Value.INACTIVE, # turn continuity LED off
- 21: Value.INACTIVE, # turn armed LED off
- 27: Value.INACTIVE # alarm a off
- }
- )
-
- # configure ADC channels
- for id in VOLTAGES:
- chan = ctrl.find_channel(id)
- # must set scale before offset, so offset 0 is valid!
- chan.attrs['scale'].value = SCALE
- chan.attrs['offset'].value = OFFSET
-
- config = {
- 'global': {
- 'server.socket_host': '0.0.0.0',
- 'server.socket_port': 8080,
- },
- }
-
- cherrypy.quickstart(Root(), '/', config)
-
+++ /dev/null
-#!/usr/bin/env python3
-"""
-Copyright (c) Bdale Garbee <bdale@gag.com>, released under GPLv3
-
-Portions taken from iio_readdev.py
- Copyright (C) 2020 Analog Devices, Inc.
- Author: Cristian Iacob <cristian.iacob@analog.com>
-
-"""
-
-import sys
-import argparse
-import gpiod
-import iio
-import signal
-import threading
-import time
-
-BUFFER_SIZE = 256
-
-from gpiod.line import Direction, Value
-
-def set_line_values(chip_path, line_values):
- value_str = {Value.ACTIVE: "Active", Value.INACTIVE: "Inactive"}
-
- request = gpiod.request_lines(
- chip_path,
- consumer=sys.argv[0],
- config={
- tuple(line_values.keys()): gpiod.LineSettings(direction=Direction.OUTPUT)
- },
- )
- request.set_values(line_values)
-
-class ContextBuilder:
- """Class for creating the requested context."""
-
- def __init__(self):
- self.ctx = None
-
- def create(self):
- try:
- self.ctx = iio.LocalContext()
-
- except FileNotFoundError:
- raise Exception("Unable to create IIO context!\n")
-
- return self.ctx
-
-class BufferBuilder:
- def __init__(self, ctx):
- """
- Class constructor.
-
- Args:
- ctx: type=iio.Context
- This buffer's context.
- """
- self.ctx = ctx
- self.dev = None
-
- def _device(self):
- self.dev = self.ctx.find_device('ads8688')
-
- if self.dev is None:
- raise Exception("Device ads8688 not found!")
-
- return self
-
- def _channels(self):
- for channel in self.dev.channels:
- channel.enabled = True
-
- return self
-
- def create(self):
- """Create the IIO buffer."""
- self._device()
- self._channels()
- buffer = iio.Buffer(self.dev, BUFFER_SIZE)
-
- if buffer is None:
- raise Exception("Unable to create buffer!\n")
-
- return buffer
-
-
-class DataReader(threading.Thread):
- """Class for reading and logging sensor data.."""
-
- def __init__(self, ctx):
- threading.Thread.__init__(self)
- self.shutdown_flag = threading.Event()
-
- buffer_builder = BufferBuilder(ctx)
- self.buffer = buffer_builder.create()
- self.device = buffer_builder.dev
-
- def run(self):
- print('DataReader thread #%s started' % self.ident)
-
- # open file for data logging
- with open('testdata', 'wb') as file:
-
- # read data, writing to file
- while not self.shutdown_flag.is_set():
- self.buffer.refill()
- samples = self.buffer.read()
- file.write(bytes(samples))
-
- print('DataReader thread #%s stopped' % self.ident)
-
-class Pyro(threading.Thread):
- """Class for managing pyro output."""
-
- def __init__(self):
- threading.Thread.__init__(self)
- self.shutdown_flag = threading.Event()
-
- def run(self):
- print('Pyro thread #%s started' % self.ident)
-
- # make sure logging has time to start first!
- time.sleep(1)
-
- # turn pyro output on
- set_line_values(
- "/dev/gpiochip0",
- {16: Value.ACTIVE, # pyro on
- }
- )
-
- # leave pyro on for 3 seconds
- time.sleep(3)
-
- # turn pyro output off
- set_line_values(
- "/dev/gpiochip0",
- {16: Value.INACTIVE, # pyro off
- }
- )
-
- # keep thread alive until test is done
- while not self.shutdown_flag.is_set():
- time.sleep(0.5)
-
- print('Pyro thread #%s stopping' % self.ident)
-
-# create a custom exception to trigger clean exit with thread shutdown
-class ServiceExit(Exception):
- pass
-
-def service_shutdown(signum, frame):
- print('Caught signal %d' % signum)
- raise ServiceExit
-
-def main():
- # register signal handlers
- signal.signal(signal.SIGTERM, service_shutdown)
- signal.signal(signal.SIGINT, service_shutdown)
-
- print('main() started, arming system, 5 seconds to burn!')
-
- # set outputs to indicate armed
- set_line_values(
- "/dev/gpiochip0",
- {17: Value.ACTIVE, # alarm b on
- 27: Value.ACTIVE # alarm a on
- }
- )
-
- # pause for 5 seconds
- time.sleep(5)
-
- # kick off data logging and pyro threads
- try:
- # create data logging thread
- context_builder = ContextBuilder()
- logger = DataReader(context_builder.create())
-
- # create pyro event thread
- pyro = Pyro()
-
- # start all threads
- logger.start()
- pyro.start()
-
- # keep main thread running so we can capture signal
- while True:
- time.sleep(0.5)
-
- except ServiceExit:
- print("ServiceExit entered, setting shutdown_flag for each thread")
- # stop logging data by telling the thread to exit
- logger.shutdown_flag.set()
- pyro.shutdown_flag.set()
-
- print("ServiceExit waiting for each thread to exit")
- # wait for the thread to exit
- logger.join()
- pyro.join()
-
- print("ServiceExit turning off GPIO outputs")
- # turn off pyro output and alarms
- set_line_values(
- "/dev/gpiochip0",
- {16: Value.INACTIVE, # pyro off
- 17: Value.INACTIVE, # alarm b off
- 27: Value.INACTIVE # alarm a off
- }
- )
-
- sys.exit(0)
-
-if __name__ == "__main__":
- main()