From 732148eca55aaa66723c65f4610232e5ca76db35 Mon Sep 17 00:00:00 2001 From: Bdale Garbee Date: Sat, 19 Apr 2025 03:10:53 +0000 Subject: [PATCH] remove a lot of the iio_readdev.py remnants we don't actually need --- application/runatest.py | 165 ++++------------------------------------ 1 file changed, 16 insertions(+), 149 deletions(-) diff --git a/application/runatest.py b/application/runatest.py index d87d945..c60f0b1 100755 --- a/application/runatest.py +++ b/application/runatest.py @@ -16,6 +16,8 @@ import signal import threading import time +BUFFER_SIZE = 256 + from gpiod.line import Direction, Value def set_line_values(chip_path, line_values): @@ -30,165 +32,44 @@ def set_line_values(chip_path, line_values): ) request.set_values(line_values) -class Arguments: - """Class for parsing the input arguments.""" - - def __init__(self): - """Arguments class constructor.""" - self.parser = argparse.ArgumentParser(description="iio_readdev") - self._add_parser_arguments() - args = self.parser.parse_args() - - self.network = str(args.network) if args.network else None - self.arg_uri = str(args.uri) if args.uri else None - self.scan_for_context = args.auto - self.buffer_size = int(args.buffer_size) if args.buffer_size else 256 - self.num_samples = int(args.samples) if args.samples else 0 - self.timeout = int(args.timeout) if args.timeout else 0 - self.device_name = args.device[0] - self.channels = args.channel - - def _add_parser_arguments(self): - self.parser.add_argument( - "-n", - "--network", - type=str, - metavar="", - help="Use the network backend with the provided hostname.", - ) - self.parser.add_argument( - "-u", - "--uri", - type=str, - metavar="", - help="Use the context with the provided URI.", - ) - self.parser.add_argument( - "-b", - "--buffer-size", - type=int, - metavar="", - help="Size of the capture buffer. Default is 256.", - ) - self.parser.add_argument( - "-s", - "--samples", - type=int, - metavar="", - help="Number of samples to capture, 0 = infinite. Default is 0.", - ) - self.parser.add_argument( - "-T", - "--timeout", - type=int, - metavar="", - help="Buffer timeout in milliseconds. 0 = no timeout", - ) - self.parser.add_argument( - "-a", - "--auto", - action="store_true", - help="Scan for available contexts and if only one is available use it.", - ) - self.parser.add_argument("device", type=str, nargs=1) - self.parser.add_argument("channel", type=str, nargs="*") - - class ContextBuilder: """Class for creating the requested context.""" - def __init__(self, arguments): - """ - Class constructor. - - Args: - arguments: type=Arguments - Contains the input arguments. - """ + def __init__(self): self.ctx = None - self.arguments = arguments - - def _timeout(self): - if self.arguments.timeout >= 0: - self.ctx.timeout = self.arguments.timeout - return self - - def _auto(self): - contexts = iio.scan_contexts() - if len(contexts) == 0: - raise Exception("No IIO context found.\n") - if len(contexts) == 1: - uri, _ = contexts.popitem() - self.ctx = iio.Context(_context=uri) - else: - print("Multiple contexts found. Please select one using --uri!") - for uri, _ in contexts.items(): - print(uri) - sys.exit(0) - - return self - - def _uri(self): - self.ctx = iio.Context(_context=self.arguments.arg_uri) - return self - - def _network(self): - self.ctx = iio.NetworkContext(self.arguments.network) - return self - - def _default(self): - self.ctx = iio.Context() - return self def create(self): - """Create the requested context.""" try: - if self.arguments.scan_for_context: - self._auto() - elif self.arguments.arg_uri: - self._uri() - else: - self._default() + self.ctx = iio.LocalContext() + except FileNotFoundError: raise Exception("Unable to create IIO context!\n") - self._timeout() - return self.ctx - class BufferBuilder: - """Class for creating the buffer.""" - - def __init__(self, ctx, arguments): + def __init__(self, ctx): """ Class constructor. Args: ctx: type=iio.Context This buffer's context. - arguments: type=Arguments - Contains the input arguments. """ self.ctx = ctx - self.arguments = arguments self.dev = None def _device(self): - self.dev = self.ctx.find_device(self.arguments.device_name) + self.dev = self.ctx.find_device('ads8688') if self.dev is None: - raise Exception("Device %s not found!" % self.arguments.device_name) + raise Exception("Device ads8688 not found!") return self def _channels(self): - if len(self.arguments.channels) == 0: - for channel in self.dev.channels: - channel.enabled = True - else: - for channel_idx in self.arguments.channels: - self.dev.channels[int(channel_idx)].enabled = True + for channel in self.dev.channels: + channel.enabled = True return self @@ -196,7 +77,7 @@ class BufferBuilder: """Create the IIO buffer.""" self._device() self._channels() - buffer = iio.Buffer(self.dev, self.arguments.buffer_size) + buffer = iio.Buffer(self.dev, BUFFER_SIZE) if buffer is None: raise Exception("Unable to create buffer!\n") @@ -207,14 +88,13 @@ class BufferBuilder: class DataReader(threading.Thread): """Class for reading and logging sensor data..""" - def __init__(self, ctx, arguments): + def __init__(self, ctx): threading.Thread.__init__(self) self.shutdown_flag = threading.Event() - buffer_builder = BufferBuilder(ctx, arguments) + buffer_builder = BufferBuilder(ctx) self.buffer = buffer_builder.create() self.device = buffer_builder.dev - self.arguments = arguments def run(self): print('DataReader thread #%s started' % self.ident) @@ -226,19 +106,7 @@ class DataReader(threading.Thread): while not self.shutdown_flag.is_set(): self.buffer.refill() samples = self.buffer.read() - - if self.arguments.num_samples > 0: - file.write( - samples[: min(self.arguments.num_samples * self.device.sample_size, len(samples))] - ) - self.arguments.num_samples -= min( - self.arguments.num_samples, len(samples) - ) - - if self.arguments.num_samples == 0: - break - else: - file.write(bytes(samples)) + file.write(bytes(samples)) print('DataReader thread #%s stopped' % self.ident) @@ -307,9 +175,8 @@ def main(): # kick off data logging and pyro threads try: # create data logging thread - arguments = Arguments() - context_builder = ContextBuilder(arguments) - logger = DataReader(context_builder.create(), arguments) + context_builder = ContextBuilder() + logger = DataReader(context_builder.create()) # create pyro event thread pyro = Pyro() -- 2.47.2