]> git.gag.com Git - fw/quantimotor/commitdiff
remove a lot of the iio_readdev.py remnants we don't actually need
authorBdale Garbee <bdale@rpi3-20241118.gag.com>
Sat, 19 Apr 2025 03:10:53 +0000 (03:10 +0000)
committerBdale Garbee <bdale@rpi3-20241118.gag.com>
Sat, 19 Apr 2025 03:10:53 +0000 (03:10 +0000)
application/runatest.py

index d87d94563dbb66e0dc135868512d15cae1847aac..c60f0b1c613626912c98e04c3122cb85ceae4e1b 100755 (executable)
@@ -16,6 +16,8 @@ import signal
 import threading
 import time
 
+BUFFER_SIZE = 256
+
 from gpiod.line import Direction, Value
 
 def set_line_values(chip_path, line_values):
@@ -30,165 +32,44 @@ def set_line_values(chip_path, line_values):
     )
     request.set_values(line_values)
 
-class Arguments:
-    """Class for parsing the input arguments."""
-
-    def __init__(self):
-        """Arguments class constructor."""
-        self.parser = argparse.ArgumentParser(description="iio_readdev")
-        self._add_parser_arguments()
-        args = self.parser.parse_args()
-
-        self.network = str(args.network) if args.network else None
-        self.arg_uri = str(args.uri) if args.uri else None
-        self.scan_for_context = args.auto
-        self.buffer_size = int(args.buffer_size) if args.buffer_size else 256
-        self.num_samples = int(args.samples) if args.samples else 0
-        self.timeout = int(args.timeout) if args.timeout else 0
-        self.device_name = args.device[0]
-        self.channels = args.channel
-
-    def _add_parser_arguments(self):
-        self.parser.add_argument(
-            "-n",
-            "--network",
-            type=str,
-            metavar="",
-            help="Use the network backend with the provided hostname.",
-        )
-        self.parser.add_argument(
-            "-u",
-            "--uri",
-            type=str,
-            metavar="",
-            help="Use the context with the provided URI.",
-        )
-        self.parser.add_argument(
-            "-b",
-            "--buffer-size",
-            type=int,
-            metavar="",
-            help="Size of the capture buffer. Default is 256.",
-        )
-        self.parser.add_argument(
-            "-s",
-            "--samples",
-            type=int,
-            metavar="",
-            help="Number of samples to capture, 0 = infinite. Default is 0.",
-        )
-        self.parser.add_argument(
-            "-T",
-            "--timeout",
-            type=int,
-            metavar="",
-            help="Buffer timeout in milliseconds. 0 = no timeout",
-        )
-        self.parser.add_argument(
-            "-a",
-            "--auto",
-            action="store_true",
-            help="Scan for available contexts and if only one is available use it.",
-        )
-        self.parser.add_argument("device", type=str, nargs=1)
-        self.parser.add_argument("channel", type=str, nargs="*")
-
-
 class ContextBuilder:
     """Class for creating the requested context."""
 
-    def __init__(self, arguments):
-        """
-        Class constructor.
-
-        Args:
-            arguments: type=Arguments
-                Contains the input arguments.
-        """
+    def __init__(self):
         self.ctx = None
-        self.arguments = arguments
-
-    def _timeout(self):
-        if self.arguments.timeout >= 0:
-            self.ctx.timeout = self.arguments.timeout
-        return self
-
-    def _auto(self):
-        contexts = iio.scan_contexts()
-        if len(contexts) == 0:
-            raise Exception("No IIO context found.\n")
-        if len(contexts) == 1:
-            uri, _ = contexts.popitem()
-            self.ctx = iio.Context(_context=uri)
-        else:
-            print("Multiple contexts found. Please select one using --uri!")
-            for uri, _ in contexts.items():
-                print(uri)
-            sys.exit(0)
-
-        return self
-
-    def _uri(self):
-        self.ctx = iio.Context(_context=self.arguments.arg_uri)
-        return self
-
-    def _network(self):
-        self.ctx = iio.NetworkContext(self.arguments.network)
-        return self
-
-    def _default(self):
-        self.ctx = iio.Context()
-        return self
 
     def create(self):
-        """Create the requested context."""
         try:
-            if self.arguments.scan_for_context:
-                self._auto()
-            elif self.arguments.arg_uri:
-                self._uri()
-            else:
-                self._default()
+            self.ctx = iio.LocalContext()
+
         except FileNotFoundError:
             raise Exception("Unable to create IIO context!\n")
 
-        self._timeout()
-
         return self.ctx
 
-
 class BufferBuilder:
-    """Class for creating the buffer."""
-
-    def __init__(self, ctx, arguments):
+    def __init__(self, ctx):
         """
         Class constructor.
 
         Args:
             ctx: type=iio.Context
                 This buffer's context.
-            arguments: type=Arguments
-                Contains the input arguments.
         """
         self.ctx = ctx
-        self.arguments = arguments
         self.dev = None
 
     def _device(self):
-        self.dev = self.ctx.find_device(self.arguments.device_name)
+        self.dev = self.ctx.find_device('ads8688')
 
         if self.dev is None:
-            raise Exception("Device %s not found!" % self.arguments.device_name)
+            raise Exception("Device ads8688 not found!")
 
         return self
 
     def _channels(self):
-        if len(self.arguments.channels) == 0:
-            for channel in self.dev.channels:
-                channel.enabled = True
-        else:
-            for channel_idx in self.arguments.channels:
-                self.dev.channels[int(channel_idx)].enabled = True
+        for channel in self.dev.channels:
+            channel.enabled = True
 
         return self
 
@@ -196,7 +77,7 @@ class BufferBuilder:
         """Create the IIO buffer."""
         self._device()
         self._channels()
-        buffer = iio.Buffer(self.dev, self.arguments.buffer_size)
+        buffer = iio.Buffer(self.dev, BUFFER_SIZE)
 
         if buffer is None:
             raise Exception("Unable to create buffer!\n")
@@ -207,14 +88,13 @@ class BufferBuilder:
 class DataReader(threading.Thread):
     """Class for reading and logging sensor data.."""
 
-    def __init__(self, ctx, arguments):
+    def __init__(self, ctx):
         threading.Thread.__init__(self)
         self.shutdown_flag = threading.Event()
 
-        buffer_builder = BufferBuilder(ctx, arguments)
+        buffer_builder = BufferBuilder(ctx)
         self.buffer = buffer_builder.create()
         self.device = buffer_builder.dev
-        self.arguments = arguments
 
     def run(self):
         print('DataReader thread #%s started' % self.ident)
@@ -226,19 +106,7 @@ class DataReader(threading.Thread):
             while not self.shutdown_flag.is_set():
                 self.buffer.refill()
                 samples = self.buffer.read()
-    
-                if self.arguments.num_samples > 0:
-                    file.write(
-                        samples[: min(self.arguments.num_samples * self.device.sample_size, len(samples))]
-                    )
-                    self.arguments.num_samples -= min(
-                        self.arguments.num_samples, len(samples)
-                    )
-    
-                    if self.arguments.num_samples == 0:
-                        break
-                else:
-                    file.write(bytes(samples))
+                file.write(bytes(samples))
 
         print('DataReader thread #%s stopped' % self.ident)
 
@@ -307,9 +175,8 @@ def main():
     # kick off data logging and pyro threads
     try:
         # create data logging thread
-        arguments = Arguments()
-        context_builder = ContextBuilder(arguments)
-        logger = DataReader(context_builder.create(), arguments)
+        context_builder = ContextBuilder()
+        logger = DataReader(context_builder.create())
     
         # create pyro event thread
         pyro = Pyro()