From e982c05c72c5c427e04c13f68d0864eacc5d257f Mon Sep 17 00:00:00 2001 From: Bdale Garbee Date: Fri, 18 Apr 2025 00:34:11 -0600 Subject: [PATCH] first apparently-working hard-coded command line program to run a test, logging acquistion data to binary file 'testdata' --- application/runatest.py | 349 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 349 insertions(+) create mode 100755 application/runatest.py diff --git a/application/runatest.py b/application/runatest.py new file mode 100755 index 0000000..d87d945 --- /dev/null +++ b/application/runatest.py @@ -0,0 +1,349 @@ +#!/usr/bin/env python3 +""" +Copyright (c) Bdale Garbee , released under GPLv3 + +Portions taken from iio_readdev.py + Copyright (C) 2020 Analog Devices, Inc. + Author: Cristian Iacob + +""" + +import sys +import argparse +import gpiod +import iio +import signal +import threading +import time + +from gpiod.line import Direction, Value + +def set_line_values(chip_path, line_values): + value_str = {Value.ACTIVE: "Active", Value.INACTIVE: "Inactive"} + + request = gpiod.request_lines( + chip_path, + consumer=sys.argv[0], + config={ + tuple(line_values.keys()): gpiod.LineSettings(direction=Direction.OUTPUT) + }, + ) + request.set_values(line_values) + +class Arguments: + """Class for parsing the input arguments.""" + + def __init__(self): + """Arguments class constructor.""" + self.parser = argparse.ArgumentParser(description="iio_readdev") + self._add_parser_arguments() + args = self.parser.parse_args() + + self.network = str(args.network) if args.network else None + self.arg_uri = str(args.uri) if args.uri else None + self.scan_for_context = args.auto + self.buffer_size = int(args.buffer_size) if args.buffer_size else 256 + self.num_samples = int(args.samples) if args.samples else 0 + self.timeout = int(args.timeout) if args.timeout else 0 + self.device_name = args.device[0] + self.channels = args.channel + + def _add_parser_arguments(self): + self.parser.add_argument( + "-n", + "--network", + type=str, + metavar="", + help="Use the network backend with the provided hostname.", + ) + self.parser.add_argument( + "-u", + "--uri", + type=str, + metavar="", + help="Use the context with the provided URI.", + ) + self.parser.add_argument( + "-b", + "--buffer-size", + type=int, + metavar="", + help="Size of the capture buffer. Default is 256.", + ) + self.parser.add_argument( + "-s", + "--samples", + type=int, + metavar="", + help="Number of samples to capture, 0 = infinite. Default is 0.", + ) + self.parser.add_argument( + "-T", + "--timeout", + type=int, + metavar="", + help="Buffer timeout in milliseconds. 0 = no timeout", + ) + self.parser.add_argument( + "-a", + "--auto", + action="store_true", + help="Scan for available contexts and if only one is available use it.", + ) + self.parser.add_argument("device", type=str, nargs=1) + self.parser.add_argument("channel", type=str, nargs="*") + + +class ContextBuilder: + """Class for creating the requested context.""" + + def __init__(self, arguments): + """ + Class constructor. + + Args: + arguments: type=Arguments + Contains the input arguments. + """ + self.ctx = None + self.arguments = arguments + + def _timeout(self): + if self.arguments.timeout >= 0: + self.ctx.timeout = self.arguments.timeout + return self + + def _auto(self): + contexts = iio.scan_contexts() + if len(contexts) == 0: + raise Exception("No IIO context found.\n") + if len(contexts) == 1: + uri, _ = contexts.popitem() + self.ctx = iio.Context(_context=uri) + else: + print("Multiple contexts found. Please select one using --uri!") + for uri, _ in contexts.items(): + print(uri) + sys.exit(0) + + return self + + def _uri(self): + self.ctx = iio.Context(_context=self.arguments.arg_uri) + return self + + def _network(self): + self.ctx = iio.NetworkContext(self.arguments.network) + return self + + def _default(self): + self.ctx = iio.Context() + return self + + def create(self): + """Create the requested context.""" + try: + if self.arguments.scan_for_context: + self._auto() + elif self.arguments.arg_uri: + self._uri() + else: + self._default() + except FileNotFoundError: + raise Exception("Unable to create IIO context!\n") + + self._timeout() + + return self.ctx + + +class BufferBuilder: + """Class for creating the buffer.""" + + def __init__(self, ctx, arguments): + """ + Class constructor. + + Args: + ctx: type=iio.Context + This buffer's context. + arguments: type=Arguments + Contains the input arguments. + """ + self.ctx = ctx + self.arguments = arguments + self.dev = None + + def _device(self): + self.dev = self.ctx.find_device(self.arguments.device_name) + + if self.dev is None: + raise Exception("Device %s not found!" % self.arguments.device_name) + + return self + + def _channels(self): + if len(self.arguments.channels) == 0: + for channel in self.dev.channels: + channel.enabled = True + else: + for channel_idx in self.arguments.channels: + self.dev.channels[int(channel_idx)].enabled = True + + return self + + def create(self): + """Create the IIO buffer.""" + self._device() + self._channels() + buffer = iio.Buffer(self.dev, self.arguments.buffer_size) + + if buffer is None: + raise Exception("Unable to create buffer!\n") + + return buffer + + +class DataReader(threading.Thread): + """Class for reading and logging sensor data..""" + + def __init__(self, ctx, arguments): + threading.Thread.__init__(self) + self.shutdown_flag = threading.Event() + + buffer_builder = BufferBuilder(ctx, arguments) + self.buffer = buffer_builder.create() + self.device = buffer_builder.dev + self.arguments = arguments + + def run(self): + print('DataReader thread #%s started' % self.ident) + + # open file for data logging + with open('testdata', 'wb') as file: + + # read data, writing to file + while not self.shutdown_flag.is_set(): + self.buffer.refill() + samples = self.buffer.read() + + if self.arguments.num_samples > 0: + file.write( + samples[: min(self.arguments.num_samples * self.device.sample_size, len(samples))] + ) + self.arguments.num_samples -= min( + self.arguments.num_samples, len(samples) + ) + + if self.arguments.num_samples == 0: + break + else: + file.write(bytes(samples)) + + print('DataReader thread #%s stopped' % self.ident) + +class Pyro(threading.Thread): + """Class for managing pyro output.""" + + def __init__(self): + threading.Thread.__init__(self) + self.shutdown_flag = threading.Event() + + def run(self): + print('Pyro thread #%s started' % self.ident) + + # make sure logging has time to start first! + time.sleep(1) + + # turn pyro output on + set_line_values( + "/dev/gpiochip0", + {16: Value.ACTIVE, # pyro on + } + ) + + # leave pyro on for 3 seconds + time.sleep(3) + + # turn pyro output off + set_line_values( + "/dev/gpiochip0", + {16: Value.INACTIVE, # pyro off + } + ) + + # keep thread alive until test is done + while not self.shutdown_flag.is_set(): + time.sleep(0.5) + + print('Pyro thread #%s stopping' % self.ident) + +# create a custom exception to trigger clean exit with thread shutdown +class ServiceExit(Exception): + pass + +def service_shutdown(signum, frame): + print('Caught signal %d' % signum) + raise ServiceExit + +def main(): + # register signal handlers + signal.signal(signal.SIGTERM, service_shutdown) + signal.signal(signal.SIGINT, service_shutdown) + + print('main() started, arming system, 5 seconds to burn!') + + # set outputs to indicate armed + set_line_values( + "/dev/gpiochip0", + {17: Value.ACTIVE, # alarm b on + 27: Value.ACTIVE # alarm a on + } + ) + + # pause for 5 seconds + time.sleep(5) + + # kick off data logging and pyro threads + try: + # create data logging thread + arguments = Arguments() + context_builder = ContextBuilder(arguments) + logger = DataReader(context_builder.create(), arguments) + + # create pyro event thread + pyro = Pyro() + + # start all threads + logger.start() + pyro.start() + + # keep main thread running so we can capture signal + while True: + time.sleep(0.5) + + except ServiceExit: + print("ServiceExit entered, setting shutdown_flag for each thread") + # stop logging data by telling the thread to exit + logger.shutdown_flag.set() + pyro.shutdown_flag.set() + + print("ServiceExit waiting for each thread to exit") + # wait for the thread to exit + logger.join() + pyro.join() + + print("ServiceExit turning off GPIO outputs") + # turn off pyro output and alarms + set_line_values( + "/dev/gpiochip0", + {16: Value.INACTIVE, # pyro off + 17: Value.INACTIVE, # alarm b off + 27: Value.INACTIVE # alarm a off + } + ) + + sys.exit(0) + +if __name__ == "__main__": + main() -- 2.47.2