--- /dev/null
+#!/usr/bin/env python3
+"""
+Copyright (c) Bdale Garbee <bdale@gag.com>, released under GPLv3
+
+Portions taken from iio_readdev.py
+ Copyright (C) 2020 Analog Devices, Inc.
+ Author: Cristian Iacob <cristian.iacob@analog.com>
+
+"""
+
+import sys
+import argparse
+import gpiod
+import iio
+import signal
+import threading
+import time
+
+from gpiod.line import Direction, Value
+
+def set_line_values(chip_path, line_values):
+ value_str = {Value.ACTIVE: "Active", Value.INACTIVE: "Inactive"}
+
+ request = gpiod.request_lines(
+ chip_path,
+ consumer=sys.argv[0],
+ config={
+ tuple(line_values.keys()): gpiod.LineSettings(direction=Direction.OUTPUT)
+ },
+ )
+ request.set_values(line_values)
+
+class Arguments:
+ """Class for parsing the input arguments."""
+
+ def __init__(self):
+ """Arguments class constructor."""
+ self.parser = argparse.ArgumentParser(description="iio_readdev")
+ self._add_parser_arguments()
+ args = self.parser.parse_args()
+
+ self.network = str(args.network) if args.network else None
+ self.arg_uri = str(args.uri) if args.uri else None
+ self.scan_for_context = args.auto
+ self.buffer_size = int(args.buffer_size) if args.buffer_size else 256
+ self.num_samples = int(args.samples) if args.samples else 0
+ self.timeout = int(args.timeout) if args.timeout else 0
+ self.device_name = args.device[0]
+ self.channels = args.channel
+
+ def _add_parser_arguments(self):
+ self.parser.add_argument(
+ "-n",
+ "--network",
+ type=str,
+ metavar="",
+ help="Use the network backend with the provided hostname.",
+ )
+ self.parser.add_argument(
+ "-u",
+ "--uri",
+ type=str,
+ metavar="",
+ help="Use the context with the provided URI.",
+ )
+ self.parser.add_argument(
+ "-b",
+ "--buffer-size",
+ type=int,
+ metavar="",
+ help="Size of the capture buffer. Default is 256.",
+ )
+ self.parser.add_argument(
+ "-s",
+ "--samples",
+ type=int,
+ metavar="",
+ help="Number of samples to capture, 0 = infinite. Default is 0.",
+ )
+ self.parser.add_argument(
+ "-T",
+ "--timeout",
+ type=int,
+ metavar="",
+ help="Buffer timeout in milliseconds. 0 = no timeout",
+ )
+ self.parser.add_argument(
+ "-a",
+ "--auto",
+ action="store_true",
+ help="Scan for available contexts and if only one is available use it.",
+ )
+ self.parser.add_argument("device", type=str, nargs=1)
+ self.parser.add_argument("channel", type=str, nargs="*")
+
+
+class ContextBuilder:
+ """Class for creating the requested context."""
+
+ def __init__(self, arguments):
+ """
+ Class constructor.
+
+ Args:
+ arguments: type=Arguments
+ Contains the input arguments.
+ """
+ self.ctx = None
+ self.arguments = arguments
+
+ def _timeout(self):
+ if self.arguments.timeout >= 0:
+ self.ctx.timeout = self.arguments.timeout
+ return self
+
+ def _auto(self):
+ contexts = iio.scan_contexts()
+ if len(contexts) == 0:
+ raise Exception("No IIO context found.\n")
+ if len(contexts) == 1:
+ uri, _ = contexts.popitem()
+ self.ctx = iio.Context(_context=uri)
+ else:
+ print("Multiple contexts found. Please select one using --uri!")
+ for uri, _ in contexts.items():
+ print(uri)
+ sys.exit(0)
+
+ return self
+
+ def _uri(self):
+ self.ctx = iio.Context(_context=self.arguments.arg_uri)
+ return self
+
+ def _network(self):
+ self.ctx = iio.NetworkContext(self.arguments.network)
+ return self
+
+ def _default(self):
+ self.ctx = iio.Context()
+ return self
+
+ def create(self):
+ """Create the requested context."""
+ try:
+ if self.arguments.scan_for_context:
+ self._auto()
+ elif self.arguments.arg_uri:
+ self._uri()
+ else:
+ self._default()
+ except FileNotFoundError:
+ raise Exception("Unable to create IIO context!\n")
+
+ self._timeout()
+
+ return self.ctx
+
+
+class BufferBuilder:
+ """Class for creating the buffer."""
+
+ def __init__(self, ctx, arguments):
+ """
+ Class constructor.
+
+ Args:
+ ctx: type=iio.Context
+ This buffer's context.
+ arguments: type=Arguments
+ Contains the input arguments.
+ """
+ self.ctx = ctx
+ self.arguments = arguments
+ self.dev = None
+
+ def _device(self):
+ self.dev = self.ctx.find_device(self.arguments.device_name)
+
+ if self.dev is None:
+ raise Exception("Device %s not found!" % self.arguments.device_name)
+
+ return self
+
+ def _channels(self):
+ if len(self.arguments.channels) == 0:
+ for channel in self.dev.channels:
+ channel.enabled = True
+ else:
+ for channel_idx in self.arguments.channels:
+ self.dev.channels[int(channel_idx)].enabled = True
+
+ return self
+
+ def create(self):
+ """Create the IIO buffer."""
+ self._device()
+ self._channels()
+ buffer = iio.Buffer(self.dev, self.arguments.buffer_size)
+
+ if buffer is None:
+ raise Exception("Unable to create buffer!\n")
+
+ return buffer
+
+
+class DataReader(threading.Thread):
+ """Class for reading and logging sensor data.."""
+
+ def __init__(self, ctx, arguments):
+ threading.Thread.__init__(self)
+ self.shutdown_flag = threading.Event()
+
+ buffer_builder = BufferBuilder(ctx, arguments)
+ self.buffer = buffer_builder.create()
+ self.device = buffer_builder.dev
+ self.arguments = arguments
+
+ def run(self):
+ print('DataReader thread #%s started' % self.ident)
+
+ # open file for data logging
+ with open('testdata', 'wb') as file:
+
+ # read data, writing to file
+ while not self.shutdown_flag.is_set():
+ self.buffer.refill()
+ samples = self.buffer.read()
+
+ if self.arguments.num_samples > 0:
+ file.write(
+ samples[: min(self.arguments.num_samples * self.device.sample_size, len(samples))]
+ )
+ self.arguments.num_samples -= min(
+ self.arguments.num_samples, len(samples)
+ )
+
+ if self.arguments.num_samples == 0:
+ break
+ else:
+ file.write(bytes(samples))
+
+ print('DataReader thread #%s stopped' % self.ident)
+
+class Pyro(threading.Thread):
+ """Class for managing pyro output."""
+
+ def __init__(self):
+ threading.Thread.__init__(self)
+ self.shutdown_flag = threading.Event()
+
+ def run(self):
+ print('Pyro thread #%s started' % self.ident)
+
+ # make sure logging has time to start first!
+ time.sleep(1)
+
+ # turn pyro output on
+ set_line_values(
+ "/dev/gpiochip0",
+ {16: Value.ACTIVE, # pyro on
+ }
+ )
+
+ # leave pyro on for 3 seconds
+ time.sleep(3)
+
+ # turn pyro output off
+ set_line_values(
+ "/dev/gpiochip0",
+ {16: Value.INACTIVE, # pyro off
+ }
+ )
+
+ # keep thread alive until test is done
+ while not self.shutdown_flag.is_set():
+ time.sleep(0.5)
+
+ print('Pyro thread #%s stopping' % self.ident)
+
+# create a custom exception to trigger clean exit with thread shutdown
+class ServiceExit(Exception):
+ pass
+
+def service_shutdown(signum, frame):
+ print('Caught signal %d' % signum)
+ raise ServiceExit
+
+def main():
+ # register signal handlers
+ signal.signal(signal.SIGTERM, service_shutdown)
+ signal.signal(signal.SIGINT, service_shutdown)
+
+ print('main() started, arming system, 5 seconds to burn!')
+
+ # set outputs to indicate armed
+ set_line_values(
+ "/dev/gpiochip0",
+ {17: Value.ACTIVE, # alarm b on
+ 27: Value.ACTIVE # alarm a on
+ }
+ )
+
+ # pause for 5 seconds
+ time.sleep(5)
+
+ # kick off data logging and pyro threads
+ try:
+ # create data logging thread
+ arguments = Arguments()
+ context_builder = ContextBuilder(arguments)
+ logger = DataReader(context_builder.create(), arguments)
+
+ # create pyro event thread
+ pyro = Pyro()
+
+ # start all threads
+ logger.start()
+ pyro.start()
+
+ # keep main thread running so we can capture signal
+ while True:
+ time.sleep(0.5)
+
+ except ServiceExit:
+ print("ServiceExit entered, setting shutdown_flag for each thread")
+ # stop logging data by telling the thread to exit
+ logger.shutdown_flag.set()
+ pyro.shutdown_flag.set()
+
+ print("ServiceExit waiting for each thread to exit")
+ # wait for the thread to exit
+ logger.join()
+ pyro.join()
+
+ print("ServiceExit turning off GPIO outputs")
+ # turn off pyro output and alarms
+ set_line_values(
+ "/dev/gpiochip0",
+ {16: Value.INACTIVE, # pyro off
+ 17: Value.INACTIVE, # alarm b off
+ 27: Value.INACTIVE # alarm a off
+ }
+ )
+
+ sys.exit(0)
+
+if __name__ == "__main__":
+ main()