import threading
import time
+BUFFER_SIZE = 256
+
from gpiod.line import Direction, Value
def set_line_values(chip_path, line_values):
)
request.set_values(line_values)
-class Arguments:
- """Class for parsing the input arguments."""
-
- def __init__(self):
- """Arguments class constructor."""
- self.parser = argparse.ArgumentParser(description="iio_readdev")
- self._add_parser_arguments()
- args = self.parser.parse_args()
-
- self.network = str(args.network) if args.network else None
- self.arg_uri = str(args.uri) if args.uri else None
- self.scan_for_context = args.auto
- self.buffer_size = int(args.buffer_size) if args.buffer_size else 256
- self.num_samples = int(args.samples) if args.samples else 0
- self.timeout = int(args.timeout) if args.timeout else 0
- self.device_name = args.device[0]
- self.channels = args.channel
-
- def _add_parser_arguments(self):
- self.parser.add_argument(
- "-n",
- "--network",
- type=str,
- metavar="",
- help="Use the network backend with the provided hostname.",
- )
- self.parser.add_argument(
- "-u",
- "--uri",
- type=str,
- metavar="",
- help="Use the context with the provided URI.",
- )
- self.parser.add_argument(
- "-b",
- "--buffer-size",
- type=int,
- metavar="",
- help="Size of the capture buffer. Default is 256.",
- )
- self.parser.add_argument(
- "-s",
- "--samples",
- type=int,
- metavar="",
- help="Number of samples to capture, 0 = infinite. Default is 0.",
- )
- self.parser.add_argument(
- "-T",
- "--timeout",
- type=int,
- metavar="",
- help="Buffer timeout in milliseconds. 0 = no timeout",
- )
- self.parser.add_argument(
- "-a",
- "--auto",
- action="store_true",
- help="Scan for available contexts and if only one is available use it.",
- )
- self.parser.add_argument("device", type=str, nargs=1)
- self.parser.add_argument("channel", type=str, nargs="*")
-
-
class ContextBuilder:
"""Class for creating the requested context."""
- def __init__(self, arguments):
- """
- Class constructor.
-
- Args:
- arguments: type=Arguments
- Contains the input arguments.
- """
+ def __init__(self):
self.ctx = None
- self.arguments = arguments
-
- def _timeout(self):
- if self.arguments.timeout >= 0:
- self.ctx.timeout = self.arguments.timeout
- return self
-
- def _auto(self):
- contexts = iio.scan_contexts()
- if len(contexts) == 0:
- raise Exception("No IIO context found.\n")
- if len(contexts) == 1:
- uri, _ = contexts.popitem()
- self.ctx = iio.Context(_context=uri)
- else:
- print("Multiple contexts found. Please select one using --uri!")
- for uri, _ in contexts.items():
- print(uri)
- sys.exit(0)
-
- return self
-
- def _uri(self):
- self.ctx = iio.Context(_context=self.arguments.arg_uri)
- return self
-
- def _network(self):
- self.ctx = iio.NetworkContext(self.arguments.network)
- return self
-
- def _default(self):
- self.ctx = iio.Context()
- return self
def create(self):
- """Create the requested context."""
try:
- if self.arguments.scan_for_context:
- self._auto()
- elif self.arguments.arg_uri:
- self._uri()
- else:
- self._default()
+ self.ctx = iio.LocalContext()
+
except FileNotFoundError:
raise Exception("Unable to create IIO context!\n")
- self._timeout()
-
return self.ctx
-
class BufferBuilder:
- """Class for creating the buffer."""
-
- def __init__(self, ctx, arguments):
+ def __init__(self, ctx):
"""
Class constructor.
Args:
ctx: type=iio.Context
This buffer's context.
- arguments: type=Arguments
- Contains the input arguments.
"""
self.ctx = ctx
- self.arguments = arguments
self.dev = None
def _device(self):
- self.dev = self.ctx.find_device(self.arguments.device_name)
+ self.dev = self.ctx.find_device('ads8688')
if self.dev is None:
- raise Exception("Device %s not found!" % self.arguments.device_name)
+ raise Exception("Device ads8688 not found!")
return self
def _channels(self):
- if len(self.arguments.channels) == 0:
- for channel in self.dev.channels:
- channel.enabled = True
- else:
- for channel_idx in self.arguments.channels:
- self.dev.channels[int(channel_idx)].enabled = True
+ for channel in self.dev.channels:
+ channel.enabled = True
return self
"""Create the IIO buffer."""
self._device()
self._channels()
- buffer = iio.Buffer(self.dev, self.arguments.buffer_size)
+ buffer = iio.Buffer(self.dev, BUFFER_SIZE)
if buffer is None:
raise Exception("Unable to create buffer!\n")
class DataReader(threading.Thread):
"""Class for reading and logging sensor data.."""
- def __init__(self, ctx, arguments):
+ def __init__(self, ctx):
threading.Thread.__init__(self)
self.shutdown_flag = threading.Event()
- buffer_builder = BufferBuilder(ctx, arguments)
+ buffer_builder = BufferBuilder(ctx)
self.buffer = buffer_builder.create()
self.device = buffer_builder.dev
- self.arguments = arguments
def run(self):
print('DataReader thread #%s started' % self.ident)
while not self.shutdown_flag.is_set():
self.buffer.refill()
samples = self.buffer.read()
-
- if self.arguments.num_samples > 0:
- file.write(
- samples[: min(self.arguments.num_samples * self.device.sample_size, len(samples))]
- )
- self.arguments.num_samples -= min(
- self.arguments.num_samples, len(samples)
- )
-
- if self.arguments.num_samples == 0:
- break
- else:
- file.write(bytes(samples))
+ file.write(bytes(samples))
print('DataReader thread #%s stopped' % self.ident)
# kick off data logging and pyro threads
try:
# create data logging thread
- arguments = Arguments()
- context_builder = ContextBuilder(arguments)
- logger = DataReader(context_builder.create(), arguments)
+ context_builder = ContextBuilder()
+ logger = DataReader(context_builder.create())
# create pyro event thread
pyro = Pyro()