Source code for gpiozero.pins.mock

# vim: set fileencoding=utf-8:
# GPIO Zero: a library for controlling the Raspberry Pi's GPIO pins
# Copyright (c) 2016-2021 Dave Jones <>
# Copyright (c) 2016 Andrew Scheller <>
# SPDX-License-Identifier: BSD-3-Clause

from __future__ import (
str = type('')

import os
from collections import namedtuple
from time import time, sleep
from threading import Thread, Event
    from math import isclose
except ImportError:
    from ..compat import isclose

import pkg_resources

from ..exc import (
from ..devices import Device
from .local import LocalPiPin, LocalPiFactory

PinState = namedtuple('PinState', ('timestamp', 'state'))

[docs]class MockPin(LocalPiPin): """ A mock pin used primarily for testing. This class does *not* support PWM. """ def __init__(self, factory, number): super(MockPin, self).__init__(factory, number) self._function = 'input' self._pull = 'up' if self.factory.pi_info.pulled_up(repr(self)) else 'floating' self._state = self._pull == 'up' self._bounce = None self._edges = 'both' self._when_changed = None self.clear_states() def close(self): self.when_changed = None self.function = 'input' def _get_function(self): return self._function def _set_function(self, value): if value not in ('input', 'output'): raise PinInvalidFunction('function must be input or output') self._function = value if value == 'input': # Drive the input to the pull self._set_pull(self._get_pull()) def _get_state(self): return self._state def _set_state(self, value): if self._function == 'input': raise PinSetInput('cannot set state of pin %r' % self) assert self._function == 'output' assert 0 <= value <= 1 self._change_state(bool(value)) def _change_state(self, value): if self._state != value: t = time() self._state = value self.states.append(PinState(t - self._last_change, value)) self._last_change = t return True return False def _get_frequency(self): return None def _set_frequency(self, value): if value is not None: raise PinPWMUnsupported() def _get_pull(self): return self._pull def _set_pull(self, value): if self.function != 'input': raise PinFixedPull('cannot set pull on non-input pin %r' % self) if value != 'up' and self.factory.pi_info.pulled_up(repr(self)): raise PinFixedPull('%r has a physical pull-up resistor' % self) if value not in ('floating', 'up', 'down'): raise PinInvalidPull('pull must be floating, up, or down') self._pull = value if value == 'up': self.drive_high() elif value == 'down': self.drive_low() def _get_bounce(self): return self._bounce def _set_bounce(self, value): # XXX Need to implement this if value is not None: try: value = float(value) except ValueError: raise PinInvalidBounce('bounce must be None or a float') self._bounce = value def _get_edges(self): return self._edges def _set_edges(self, value): assert value in ('none', 'falling', 'rising', 'both') self._edges = value def _disable_event_detect(self): pass def _enable_event_detect(self): pass def drive_high(self): assert self._function == 'input' if self._change_state(True): if self._edges in ('both', 'rising') and self._when_changed is not None: self._call_when_changed() def drive_low(self): assert self._function == 'input' if self._change_state(False): if self._edges in ('both', 'falling') and self._when_changed is not None: self._call_when_changed() def clear_states(self): self._last_change = time() self.states = [PinState(0.0, self._state)] def assert_states(self, expected_states): # Tests that the pin went through the expected states (a list of values) for actual, expected in zip(self.states, expected_states): assert actual.state == expected def assert_states_and_times(self, expected_states): # Tests that the pin went through the expected states at the expected # times (times are compared with a tolerance of tens-of-milliseconds as # that's about all we can reasonably expect in a non-realtime # environment on a Pi 1) for actual, expected in zip(self.states, expected_states): assert isclose(actual.timestamp, expected[0], rel_tol=0.05, abs_tol=0.05) assert isclose(actual.state, expected[1])
[docs]class MockConnectedPin(MockPin): """ This derivative of :class:`MockPin` emulates a pin connected to another mock pin. This is used in the "real pins" portion of the test suite to check that one pin can influence another. """ def __init__(self, factory, number, input_pin=None): super(MockConnectedPin, self).__init__(factory, number) self.input_pin = input_pin def _change_state(self, value): if self.input_pin: if value: self.input_pin.drive_high() else: self.input_pin.drive_low() return super(MockConnectedPin, self)._change_state(value)
[docs]class MockChargingPin(MockPin): """ This derivative of :class:`MockPin` emulates a pin which, when set to input, waits a predetermined length of time and then drives itself high (as if attached to, e.g. a typical circuit using an LDR and a capacitor to time the charging rate). """ def __init__(self, factory, number, charge_time=0.01): super(MockChargingPin, self).__init__(factory, number) self.charge_time = charge_time # dark charging time self._charge_stop = Event() self._charge_thread = None def _set_function(self, value): super(MockChargingPin, self)._set_function(value) if value == 'input': if self._charge_thread: self._charge_stop.set() self._charge_thread.join() self._charge_stop.clear() self._charge_thread = Thread(target=self._charge) self._charge_thread.start() elif value == 'output': if self._charge_thread: self._charge_stop.set() self._charge_thread.join() else: assert False def _charge(self): if not self._charge_stop.wait(self.charge_time): try: self.drive_high() except AssertionError: # pragma: no cover # Charging pins are typically flipped between input and output # repeatedly; if another thread has already flipped us to # output ignore the assertion-error resulting from attempting # to drive the pin high pass
[docs]class MockTriggerPin(MockPin): """ This derivative of :class:`MockPin` is intended to be used with another :class:`MockPin` to emulate a distance sensor. Set *echo_pin* to the corresponding pin instance. When this pin is driven high it will trigger the echo pin to drive high for the echo time. """ def __init__(self, factory, number, echo_pin=None, echo_time=0.04): super(MockTriggerPin, self).__init__(factory, number) self.echo_pin = echo_pin self.echo_time = echo_time # longest echo time self._echo_thread = None def _set_state(self, value): super(MockTriggerPin, self)._set_state(value) if value: if self._echo_thread: self._echo_thread.join() self._echo_thread = Thread(target=self._echo) self._echo_thread.start() def _echo(self): sleep(0.001) self.echo_pin.drive_high() sleep(self.echo_time) self.echo_pin.drive_low()
[docs]class MockPWMPin(MockPin): """ This derivative of :class:`MockPin` adds PWM support. """ def __init__(self, factory, number): super(MockPWMPin, self).__init__(factory, number) self._frequency = None def close(self): self.frequency = None super(MockPWMPin, self).close() def _set_state(self, value): if self._function == 'input': raise PinSetInput('cannot set state of pin %r' % self) assert self._function == 'output' assert 0 <= value <= 1 self._change_state(float(value)) def _get_frequency(self): return self._frequency def _set_frequency(self, value): if value is not None: assert self._function == 'output' self._frequency = value if value is None: self._change_state(0.0)
class MockSPIClockPin(MockPin): """ This derivative of :class:`MockPin` is intended to be used as the clock pin of a mock SPI device. It is not intended for direct construction in tests; rather, construct a :class:`MockSPIDevice` with various pin numbers, and this class will be used for the clock pin. """ def __init__(self, factory, number): super(MockSPIClockPin, self).__init__(factory, number) self.spi_devices = getattr(self, 'spi_devices', []) def _set_state(self, value): super(MockSPIClockPin, self)._set_state(value) for dev in self.spi_devices: dev.on_clock() class MockSPISelectPin(MockPin): """ This derivative of :class:`MockPin` is intended to be used as the select pin of a mock SPI device. It is not intended for direct construction in tests; rather, construct a :class:`MockSPIDevice` with various pin numbers, and this class will be used for the select pin. """ def __init__(self, factory, number): super(MockSPISelectPin, self).__init__(factory, number) self.spi_device = getattr(self, 'spi_device', None) def _set_state(self, value): super(MockSPISelectPin, self)._set_state(value) if self.spi_device: self.spi_device.on_select() class MockSPIDevice(object): def __init__( self, clock_pin, mosi_pin=None, miso_pin=None, select_pin=None, clock_polarity=False, clock_phase=False, lsb_first=False, bits_per_word=8, select_high=False): self.clock_pin =, pin_class=MockSPIClockPin) self.mosi_pin = None if mosi_pin is None else self.miso_pin = None if miso_pin is None else self.select_pin = None if select_pin is None else, pin_class=MockSPISelectPin) self.clock_polarity = clock_polarity self.clock_phase = clock_phase self.lsb_first = lsb_first self.bits_per_word = bits_per_word self.select_high = select_high self.rx_bit = 0 self.rx_buf = [] self.tx_buf = [] self.clock_pin.spi_devices.append(self) self.select_pin.spi_device = self def __enter__(self): return self def __exit__(self, exc_type, exc_value, exc_tb): self.close() def close(self): if self in self.clock_pin.spi_devices: self.clock_pin.spi_devices.remove(self) if self.select_pin is not None: self.select_pin.spi_device = None def on_select(self): if self.select_pin.state == self.select_high: self.on_start() def on_clock(self): # Don't do anything if this SPI device isn't currently selected if self.select_pin is None or self.select_pin.state == self.select_high: # The XOR of the clock pin's values, polarity and phase indicates # whether we're meant to be acting on this edge if self.clock_pin.state ^ self.clock_polarity ^ self.clock_phase: self.rx_bit += 1 if self.mosi_pin is not None: self.rx_buf.append(self.mosi_pin.state) if self.miso_pin is not None: try: tx_value = self.tx_buf.pop(0) except IndexError: tx_value = 0 if tx_value: self.miso_pin.drive_high() else: self.miso_pin.drive_low() self.on_bit() def on_start(self): """ Override this in descendents to detect when the mock SPI device's select line is activated. """ self.rx_bit = 0 self.rx_buf = [] self.tx_buf = [] def on_bit(self): """ Override this in descendents to react to receiving a bit. The :attr:`rx_bit` attribute gives the index of the bit received (this is reset to 0 by default by :meth:`on_select`). The :attr:`rx_buf` sequence gives the sequence of 1s and 0s that have been recevied so far. The :attr:`tx_buf` sequence gives the sequence of 1s and 0s to transmit on the next clock pulses. All these attributes can be modified within this method. The :meth:`rx_word` and :meth:`tx_word` methods can also be used to read and append to the buffers using integers instead of bool bits. """ pass def rx_word(self): result = 0 bits = reversed(self.rx_buf) if self.lsb_first else self.rx_buf for bit in bits: result <<= 1 result |= bit return result def tx_word(self, value, bits_per_word=None): if bits_per_word is None: bits_per_word = self.bits_per_word bits = [0] * bits_per_word for bit in range(bits_per_word): bits[bit] = value & 1 value >>= 1 assert not value if not self.lsb_first: bits = reversed(bits) self.tx_buf.extend(bits)
[docs]class MockFactory(LocalPiFactory): """ Factory for generating mock pins. The *revision* parameter specifies what revision of Pi the mock factory pretends to be (this affects the result of the :attr:`~gpiozero.Factory.pi_info` attribute as well as where pull-ups are assumed to be). The *pin_class* attribute specifies which mock pin class will be generated by the :meth:`pin` method by default. This can be changed after construction by modifying the :attr:`pin_class` attribute. .. attribute:: pin_class This attribute stores the :class:`MockPin` class (or descendent) that will be used when constructing pins with the :meth:`pin` method (if no *pin_class* parameter is used to override it). It defaults on construction to the value of the *pin_class* parameter in the constructor, or :class:`MockPin` if that is unspecified. """ def __init__(self, revision=None, pin_class=None): super(MockFactory, self).__init__() if revision is None: revision = os.environ.get('GPIOZERO_MOCK_REVISION', 'a02082') if pin_class is None: pin_class = os.environ.get('GPIOZERO_MOCK_PIN_CLASS', MockPin) self._revision = int(revision, base=16) if isinstance(pin_class, bytes): pin_class = pin_class.decode('ascii') if isinstance(pin_class, str): dist = pkg_resources.get_distribution('gpiozero') group = 'gpiozero_mock_pin_classes' pin_class = pkg_resources.load_entry_point(dist, group, pin_class.lower()) if not issubclass(pin_class, MockPin): raise ValueError('invalid mock pin_class: %r' % pin_class) self.pin_class = pin_class def _get_revision(self): return self._revision
[docs] def reset(self): """ Clears the pins and reservations sets. This is primarily useful in test suites to ensure the pin factory is back in a "clean" state before the next set of tests are run. """ self.pins.clear() self._reservations.clear()
[docs] def pin(self, spec, pin_class=None, **kwargs): """ The pin method for :class:`MockFactory` additionally takes a *pin_class* attribute which can be used to override the class' :attr:`pin_class` attribute. Any additional keyword arguments will be passed along to the pin constructor (useful with things like :class:`MockConnectedPin` which expect to be constructed with another pin). """ if pin_class is None: pin_class = self.pin_class n = self.pi_info.to_gpio(spec) try: pin = self.pins[n] except KeyError: pin = pin_class(self, n, **kwargs) self.pins[n] = pin else: # Ensure the pin class expected supports PWM (or not) if issubclass(pin_class, MockPWMPin) != isinstance(pin, MockPWMPin): raise ValueError('pin %d is already in use as a %s' % (n, pin.__class__.__name__)) return pin