]> git.gag.com Git - fw/quantimotor/commitdiff
first apparently-working hard-coded command line program to run a test, logging
authorBdale Garbee <bdale@gag.com>
Fri, 18 Apr 2025 06:34:11 +0000 (00:34 -0600)
committerBdale Garbee <bdale@gag.com>
Fri, 18 Apr 2025 06:34:11 +0000 (00:34 -0600)
acquistion data to binary file 'testdata'

application/runatest.py [new file with mode: 0755]

diff --git a/application/runatest.py b/application/runatest.py
new file mode 100755 (executable)
index 0000000..d87d945
--- /dev/null
@@ -0,0 +1,349 @@
+#!/usr/bin/env python3
+"""
+Copyright (c) Bdale Garbee <bdale@gag.com>, released under GPLv3
+
+Portions taken from iio_readdev.py
+       Copyright (C) 2020 Analog Devices, Inc.
+       Author: Cristian Iacob <cristian.iacob@analog.com>
+
+"""
+
+import sys
+import argparse
+import gpiod
+import iio
+import signal
+import threading
+import time
+
+from gpiod.line import Direction, Value
+
+def set_line_values(chip_path, line_values):
+    value_str = {Value.ACTIVE: "Active", Value.INACTIVE: "Inactive"}
+
+    request = gpiod.request_lines(
+        chip_path,
+        consumer=sys.argv[0],
+        config={
+            tuple(line_values.keys()): gpiod.LineSettings(direction=Direction.OUTPUT)
+        },
+    )
+    request.set_values(line_values)
+
+class Arguments:
+    """Class for parsing the input arguments."""
+
+    def __init__(self):
+        """Arguments class constructor."""
+        self.parser = argparse.ArgumentParser(description="iio_readdev")
+        self._add_parser_arguments()
+        args = self.parser.parse_args()
+
+        self.network = str(args.network) if args.network else None
+        self.arg_uri = str(args.uri) if args.uri else None
+        self.scan_for_context = args.auto
+        self.buffer_size = int(args.buffer_size) if args.buffer_size else 256
+        self.num_samples = int(args.samples) if args.samples else 0
+        self.timeout = int(args.timeout) if args.timeout else 0
+        self.device_name = args.device[0]
+        self.channels = args.channel
+
+    def _add_parser_arguments(self):
+        self.parser.add_argument(
+            "-n",
+            "--network",
+            type=str,
+            metavar="",
+            help="Use the network backend with the provided hostname.",
+        )
+        self.parser.add_argument(
+            "-u",
+            "--uri",
+            type=str,
+            metavar="",
+            help="Use the context with the provided URI.",
+        )
+        self.parser.add_argument(
+            "-b",
+            "--buffer-size",
+            type=int,
+            metavar="",
+            help="Size of the capture buffer. Default is 256.",
+        )
+        self.parser.add_argument(
+            "-s",
+            "--samples",
+            type=int,
+            metavar="",
+            help="Number of samples to capture, 0 = infinite. Default is 0.",
+        )
+        self.parser.add_argument(
+            "-T",
+            "--timeout",
+            type=int,
+            metavar="",
+            help="Buffer timeout in milliseconds. 0 = no timeout",
+        )
+        self.parser.add_argument(
+            "-a",
+            "--auto",
+            action="store_true",
+            help="Scan for available contexts and if only one is available use it.",
+        )
+        self.parser.add_argument("device", type=str, nargs=1)
+        self.parser.add_argument("channel", type=str, nargs="*")
+
+
+class ContextBuilder:
+    """Class for creating the requested context."""
+
+    def __init__(self, arguments):
+        """
+        Class constructor.
+
+        Args:
+            arguments: type=Arguments
+                Contains the input arguments.
+        """
+        self.ctx = None
+        self.arguments = arguments
+
+    def _timeout(self):
+        if self.arguments.timeout >= 0:
+            self.ctx.timeout = self.arguments.timeout
+        return self
+
+    def _auto(self):
+        contexts = iio.scan_contexts()
+        if len(contexts) == 0:
+            raise Exception("No IIO context found.\n")
+        if len(contexts) == 1:
+            uri, _ = contexts.popitem()
+            self.ctx = iio.Context(_context=uri)
+        else:
+            print("Multiple contexts found. Please select one using --uri!")
+            for uri, _ in contexts.items():
+                print(uri)
+            sys.exit(0)
+
+        return self
+
+    def _uri(self):
+        self.ctx = iio.Context(_context=self.arguments.arg_uri)
+        return self
+
+    def _network(self):
+        self.ctx = iio.NetworkContext(self.arguments.network)
+        return self
+
+    def _default(self):
+        self.ctx = iio.Context()
+        return self
+
+    def create(self):
+        """Create the requested context."""
+        try:
+            if self.arguments.scan_for_context:
+                self._auto()
+            elif self.arguments.arg_uri:
+                self._uri()
+            else:
+                self._default()
+        except FileNotFoundError:
+            raise Exception("Unable to create IIO context!\n")
+
+        self._timeout()
+
+        return self.ctx
+
+
+class BufferBuilder:
+    """Class for creating the buffer."""
+
+    def __init__(self, ctx, arguments):
+        """
+        Class constructor.
+
+        Args:
+            ctx: type=iio.Context
+                This buffer's context.
+            arguments: type=Arguments
+                Contains the input arguments.
+        """
+        self.ctx = ctx
+        self.arguments = arguments
+        self.dev = None
+
+    def _device(self):
+        self.dev = self.ctx.find_device(self.arguments.device_name)
+
+        if self.dev is None:
+            raise Exception("Device %s not found!" % self.arguments.device_name)
+
+        return self
+
+    def _channels(self):
+        if len(self.arguments.channels) == 0:
+            for channel in self.dev.channels:
+                channel.enabled = True
+        else:
+            for channel_idx in self.arguments.channels:
+                self.dev.channels[int(channel_idx)].enabled = True
+
+        return self
+
+    def create(self):
+        """Create the IIO buffer."""
+        self._device()
+        self._channels()
+        buffer = iio.Buffer(self.dev, self.arguments.buffer_size)
+
+        if buffer is None:
+            raise Exception("Unable to create buffer!\n")
+
+        return buffer
+
+
+class DataReader(threading.Thread):
+    """Class for reading and logging sensor data.."""
+
+    def __init__(self, ctx, arguments):
+        threading.Thread.__init__(self)
+        self.shutdown_flag = threading.Event()
+
+        buffer_builder = BufferBuilder(ctx, arguments)
+        self.buffer = buffer_builder.create()
+        self.device = buffer_builder.dev
+        self.arguments = arguments
+
+    def run(self):
+        print('DataReader thread #%s started' % self.ident)
+
+        # open file for data logging
+        with open('testdata', 'wb') as file:
+
+            # read data, writing to file
+            while not self.shutdown_flag.is_set():
+                self.buffer.refill()
+                samples = self.buffer.read()
+    
+                if self.arguments.num_samples > 0:
+                    file.write(
+                        samples[: min(self.arguments.num_samples * self.device.sample_size, len(samples))]
+                    )
+                    self.arguments.num_samples -= min(
+                        self.arguments.num_samples, len(samples)
+                    )
+    
+                    if self.arguments.num_samples == 0:
+                        break
+                else:
+                    file.write(bytes(samples))
+
+        print('DataReader thread #%s stopped' % self.ident)
+
+class Pyro(threading.Thread):
+    """Class for managing pyro output."""
+
+    def __init__(self):
+        threading.Thread.__init__(self)
+        self.shutdown_flag = threading.Event()
+
+    def run(self):
+        print('Pyro thread #%s started' % self.ident)
+
+        # make sure logging has time to start first!
+        time.sleep(1)
+
+        # turn pyro output on
+        set_line_values(
+           "/dev/gpiochip0", 
+           {16: Value.ACTIVE,             # pyro on
+           }
+        )
+    
+        # leave pyro on for 3 seconds
+        time.sleep(3)
+    
+        # turn pyro output off
+        set_line_values(
+           "/dev/gpiochip0", 
+           {16: Value.INACTIVE,             # pyro off
+           }
+        )
+       
+        # keep thread alive until test is done 
+        while not self.shutdown_flag.is_set():
+            time.sleep(0.5)
+
+        print('Pyro thread #%s stopping' % self.ident)
+
+# create a custom exception to trigger clean exit with thread shutdown
+class ServiceExit(Exception):
+    pass
+
+def service_shutdown(signum, frame):
+    print('Caught signal %d' % signum)
+    raise ServiceExit
+
+def main():
+    # register signal handlers
+    signal.signal(signal.SIGTERM, service_shutdown)
+    signal.signal(signal.SIGINT, service_shutdown)
+
+    print('main() started, arming system, 5 seconds to burn!')
+
+    # set outputs to indicate armed
+    set_line_values(
+       "/dev/gpiochip0", 
+       {17: Value.ACTIVE,             # alarm b on
+        27: Value.ACTIVE              # alarm a on
+       }
+    )
+
+    # pause for 5 seconds
+    time.sleep(5) 
+
+    # kick off data logging and pyro threads
+    try:
+        # create data logging thread
+        arguments = Arguments()
+        context_builder = ContextBuilder(arguments)
+        logger = DataReader(context_builder.create(), arguments)
+    
+        # create pyro event thread
+        pyro = Pyro()
+
+        # start all threads
+        logger.start()
+        pyro.start()
+
+       # keep main thread running so we can capture signal
+        while True:
+            time.sleep(0.5)
+
+    except ServiceExit:
+        print("ServiceExit entered, setting shutdown_flag for each thread")
+        # stop logging data by telling the thread to exit
+        logger.shutdown_flag.set()
+        pyro.shutdown_flag.set()
+
+        print("ServiceExit waiting for each thread to exit")
+        # wait for the thread to exit
+        logger.join()
+        pyro.join()
+
+        print("ServiceExit turning off GPIO outputs")
+        # turn off pyro output and alarms
+        set_line_values(
+            "/dev/gpiochip0", 
+            {16: Value.INACTIVE,    # pyro off
+             17: Value.INACTIVE,    # alarm b off
+             27: Value.INACTIVE     # alarm a off
+            }
+        )
+
+    sys.exit(0)
+
+if __name__ == "__main__":
+    main()