# vim: set fileencoding=utf-8:
#
# GPIO Zero: a library for controlling the Raspberry Pi's GPIO pins
#
# Copyright (c) 2016-2024 Dave Jones <dave@waveform.org.uk>
# Copyright (c) 2020 Fangchen Li <fangchen.li@outlook.com>
# Copyright (c) 2016 Andrew Scheller <github@loowis.durge.org>
#
# SPDX-License-Identifier: BSD-3-Clause
import os
from collections import namedtuple
from time import time, sleep, monotonic
from threading import Thread, Event
from math import isclose
# NOTE: Remove try when compatibility moves beyond Python 3.10
try:
from importlib_metadata import entry_points
except ImportError:
from importlib.metadata import entry_points
from ..exc import (
PinPWMUnsupported,
PinSetInput,
PinFixedPull,
PinInvalidPin,
PinInvalidFunction,
PinInvalidPull,
PinInvalidBounce,
)
from ..devices import Device
from ..mixins import SharedMixin
from . import SPI
from .pi import PiPin, PiFactory
from .spi import SPISoftware
PinState = namedtuple('PinState', ('timestamp', 'state'))
[docs]
class MockPin(PiPin):
"""
A mock pin used primarily for testing. This class does *not* support PWM.
"""
def __init__(self, factory, info):
super().__init__(factory, info)
self._function = 'input'
self._pull = info.pull or 'floating'
self._state = self._pull == 'up'
self._bounce = None
self._edges = 'both'
self._when_changed = None
self.clear_states()
def close(self):
self.when_changed = None
self.function = 'input'
def _get_function(self):
return self._function
def _set_function(self, value):
if value not in ('input', 'output'):
raise PinInvalidFunction('function must be input or output')
self._function = value
if value == 'input':
# Drive the input to the pull
self._set_pull(self._get_pull())
def _get_state(self):
return self._state
def _set_state(self, value):
if self._function == 'input':
raise PinSetInput(f'cannot set state of pin {self!r}')
assert self._function == 'output'
assert 0 <= value <= 1
self._change_state(bool(value))
def _change_state(self, value):
if self._state != value:
t = monotonic()
self._state = value
self.states.append(PinState(t - self._last_change, value))
self._last_change = t
return True
return False
def _get_frequency(self):
return None
def _set_frequency(self, value):
if value is not None:
raise PinPWMUnsupported()
def _get_pull(self):
return self._pull
def _set_pull(self, value):
if self.function != 'input':
raise PinFixedPull(f'cannot set pull on non-input pin {self!r}')
if self.info.pull and value != self.info.pull:
raise PinFixedPull(f'{self!r} has a fixed pull resistor')
if value not in ('floating', 'up', 'down'):
raise PinInvalidPull('pull must be floating, up, or down')
self._pull = value
if value == 'up':
self.drive_high()
elif value == 'down':
self.drive_low()
def _get_bounce(self):
return self._bounce
def _set_bounce(self, value):
# XXX Need to implement this
if value is not None:
try:
value = float(value)
except ValueError:
raise PinInvalidBounce('bounce must be None or a float')
self._bounce = value
def _get_edges(self):
return self._edges
def _set_edges(self, value):
assert value in ('none', 'falling', 'rising', 'both')
self._edges = value
def _disable_event_detect(self):
pass
def _enable_event_detect(self):
pass
def _call_when_changed(self):
super()._call_when_changed(self._last_change, self._state)
def drive_high(self):
assert self._function == 'input'
if self._change_state(True):
if self._edges in ('both', 'rising') and self._when_changed is not None:
self._call_when_changed()
def drive_low(self):
assert self._function == 'input'
if self._change_state(False):
if self._edges in ('both', 'falling') and self._when_changed is not None:
self._call_when_changed()
def clear_states(self):
self._last_change = monotonic()
self.states = [PinState(0.0, self._state)]
def assert_states(self, expected_states):
# Tests that the pin went through the expected states (a list of values)
for actual, expected in zip(self.states, expected_states):
assert actual.state == expected
def assert_states_and_times(self, expected_states):
# Tests that the pin went through the expected states at the expected
# times (times are compared with a tolerance of tens-of-milliseconds as
# that's about all we can reasonably expect in a non-realtime
# environment on a Pi 1)
for actual, expected in zip(self.states, expected_states):
assert isclose(actual.timestamp, expected[0], rel_tol=0.05, abs_tol=0.05)
assert isclose(actual.state, expected[1])
[docs]
class MockConnectedPin(MockPin):
"""
This derivative of :class:`MockPin` emulates a pin connected to another
mock pin. This is used in the "real pins" portion of the test suite to
check that one pin can influence another.
"""
def __init__(self, factory, info, input_pin=None):
super().__init__(factory, info)
self.input_pin = input_pin
def _change_state(self, value):
if self.input_pin:
if value:
self.input_pin.drive_high()
else:
self.input_pin.drive_low()
return super()._change_state(value)
[docs]
class MockChargingPin(MockPin):
"""
This derivative of :class:`MockPin` emulates a pin which, when set to
input, waits a predetermined length of time and then drives itself high
(as if attached to, e.g. a typical circuit using an LDR and a capacitor
to time the charging rate).
"""
def __init__(self, factory, info, charge_time=0.01):
super().__init__(factory, info)
self.charge_time = charge_time # dark charging time
self._charge_stop = Event()
self._charge_thread = None
def _set_function(self, value):
super()._set_function(value)
if value == 'input':
if self._charge_thread:
self._charge_stop.set()
self._charge_thread.join()
self._charge_stop.clear()
self._charge_thread = Thread(target=self._charge)
self._charge_thread.start()
elif value == 'output':
if self._charge_thread:
self._charge_stop.set()
self._charge_thread.join()
else:
assert False
def _charge(self):
if not self._charge_stop.wait(self.charge_time):
try:
self.drive_high()
except AssertionError: # pragma: no cover
# Charging pins are typically flipped between input and output
# repeatedly; if another thread has already flipped us to
# output ignore the assertion-error resulting from attempting
# to drive the pin high
pass
[docs]
class MockTriggerPin(MockPin):
"""
This derivative of :class:`MockPin` is intended to be used with another
:class:`MockPin` to emulate a distance sensor. Set *echo_pin* to the
corresponding pin instance. When this pin is driven high it will trigger
the echo pin to drive high for the echo time.
"""
def __init__(self, factory, info, echo_pin=None, echo_time=0.04):
super().__init__(factory, info)
self.echo_pin = echo_pin
self.echo_time = echo_time # longest echo time
self._echo_thread = None
def _set_state(self, value):
super()._set_state(value)
if value:
if self._echo_thread:
self._echo_thread.join()
self._echo_thread = Thread(target=self._echo)
self._echo_thread.start()
def _echo(self):
sleep(0.001)
self.echo_pin.drive_high()
sleep(self.echo_time)
self.echo_pin.drive_low()
[docs]
class MockPWMPin(MockPin):
"""
This derivative of :class:`MockPin` adds PWM support.
"""
def __init__(self, factory, info):
super().__init__(factory, info)
self._frequency = None
def close(self):
self.frequency = None
super().close()
def _set_state(self, value):
if self._function == 'input':
raise PinSetInput(f'cannot set state of pin {self!r}')
assert self._function == 'output'
assert 0 <= value <= 1
self._change_state(float(value))
def _get_frequency(self):
return self._frequency
def _set_frequency(self, value):
if value is not None:
assert self._function == 'output'
self._frequency = value
if value is None:
self._change_state(0.0)
class MockSPIClockPin(MockPin):
"""
This derivative of :class:`MockPin` is intended to be used as the clock pin
of a mock SPI device. It is not intended for direct construction in tests;
rather, construct a :class:`MockSPIDevice` with various pin numbers, and
this class will be used for the clock pin.
"""
def __init__(self, factory, info):
super().__init__(factory, info)
self.spi_devices = getattr(self, 'spi_devices', [])
def _set_state(self, value):
super()._set_state(value)
for dev in self.spi_devices:
dev.on_clock()
class MockSPISelectPin(MockPin):
"""
This derivative of :class:`MockPin` is intended to be used as the select
pin of a mock SPI device. It is not intended for direct construction in
tests; rather, construct a :class:`MockSPIDevice` with various pin numbers,
and this class will be used for the select pin.
"""
def __init__(self, factory, info):
super().__init__(factory, info)
self.spi_device = getattr(self, 'spi_device', None)
def _set_state(self, value):
super()._set_state(value)
if self.spi_device:
self.spi_device.on_select()
[docs]
class MockSPIDevice:
"""
This class is used to test :class:`SPIDevice` implementations. It can be
used to mock up the slave side of simple SPI devices, e.g. the MCP3xxx
series of ADCs.
Descendants should override the :meth:`on_start` and/or :meth:`on_bit`
methods to respond to SPI interface events. The :meth:`rx_word` and
:meth:`tx_word` methods can be used facilitate communications within these
methods. Such descendents can then be passed as the *spi_class* parameter
of the :class:`MockFactory` constructor to have instances attached to any
SPI interface requested by an :class:`SPIDevice`.
"""
def __init__(self, clock_pin, mosi_pin=None, miso_pin=None,
select_pin=None, *, clock_polarity=False, clock_phase=False,
lsb_first=False, bits_per_word=8, select_high=False,
pin_factory=None):
if pin_factory is None:
pin_factory = Device.pin_factory
assert isinstance(pin_factory, MockFactory)
self.clock_pin = pin_factory.pin(clock_pin, pin_class=MockSPIClockPin)
self.mosi_pin = None if mosi_pin is None else pin_factory.pin(mosi_pin)
self.miso_pin = None if miso_pin is None else pin_factory.pin(miso_pin)
self.select_pin = None if select_pin is None else pin_factory.pin(select_pin, pin_class=MockSPISelectPin)
self.clock_polarity = clock_polarity
self.clock_phase = clock_phase
self.lsb_first = lsb_first
self.bits_per_word = bits_per_word
self.select_high = select_high
self.rx_bit = 0
self.rx_buf = []
self.tx_buf = []
self.clock_pin.spi_devices.append(self)
self.select_pin.spi_device = self
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, exc_tb):
self.close()
def close(self):
if self in self.clock_pin.spi_devices:
self.clock_pin.spi_devices.remove(self)
if self.select_pin is not None:
self.select_pin.spi_device = None
def on_select(self):
if self.select_pin.state == self.select_high:
self.on_start()
def on_clock(self):
# Don't do anything if this SPI device isn't currently selected
if self.select_pin is None or self.select_pin.state == self.select_high:
# The XOR of the clock pin's values, polarity and phase indicates
# whether we're meant to be acting on this edge
if self.clock_pin.state ^ self.clock_polarity ^ self.clock_phase:
self.rx_bit += 1
if self.mosi_pin is not None:
self.rx_buf.append(self.mosi_pin.state)
if self.miso_pin is not None:
try:
tx_value = self.tx_buf.pop(0)
except IndexError:
tx_value = 0
if tx_value:
self.miso_pin.drive_high()
else:
self.miso_pin.drive_low()
self.on_bit()
def on_start(self):
"""
Override this in descendents to detect when the mock SPI device's
select line is activated.
"""
self.rx_bit = 0
self.rx_buf = []
self.tx_buf = []
def on_bit(self):
"""
Override this in descendents to react to receiving a bit.
The :attr:`rx_bit` attribute gives the index of the bit received (this
is reset to 0 by default by :meth:`on_select`). The :attr:`rx_buf`
sequence gives the sequence of 1s and 0s that have been recevied so
far. The :attr:`tx_buf` sequence gives the sequence of 1s and 0s to
transmit on the next clock pulses. All these attributes can be modified
within this method.
The :meth:`rx_word` and :meth:`tx_word` methods can also be used to
read and append to the buffers using integers instead of bool bits.
"""
pass
def rx_word(self):
result = 0
bits = reversed(self.rx_buf) if self.lsb_first else self.rx_buf
for bit in bits:
result <<= 1
result |= bit
return result
def tx_word(self, value, bits_per_word=None):
if bits_per_word is None:
bits_per_word = self.bits_per_word
bits = [0] * bits_per_word
for bit in range(bits_per_word):
bits[bit] = value & 1
value >>= 1
assert not value
if not self.lsb_first:
bits = reversed(bits)
self.tx_buf.extend(bits)
[docs]
class MockFactory(PiFactory):
"""
Factory for generating mock pins.
The *revision* parameter specifies what revision of Pi the mock factory
pretends to be (this affects the result of the :attr:`Factory.board_info`
attribute as well as where pull-ups are assumed to be).
The *pin_class* attribute specifies which mock pin class will be generated
by the :meth:`pin` method by default. This can be changed after
construction by modifying the :attr:`pin_class` attribute.
.. attribute:: pin_class
This attribute stores the :class:`MockPin` class (or descendant) that
will be used when constructing pins with the :meth:`pin` method (if
no *pin_class* parameter is used to override it). It defaults on
construction to the value of the *pin_class* parameter in the
constructor, or :class:`MockPin` if that is unspecified.
"""
def __init__(self, revision=None, pin_class=None):
super().__init__()
if revision is None:
revision = os.environ.get('GPIOZERO_MOCK_REVISION', 'a02082')
if pin_class is None:
pin_class = os.environ.get('GPIOZERO_MOCK_PIN_CLASS', MockPin)
self._revision = int(revision, base=16)
if isinstance(pin_class, bytes):
pin_class = pin_class.decode('ascii')
if isinstance(pin_class, str):
group = entry_points(group='gpiozero_mock_pin_classes')
pin_class = group[pin_class.lower()].load()
if not issubclass(pin_class, MockPin):
raise ValueError(f'invalid mock pin_class: {pin_class!r}')
self.pin_class = pin_class
def _get_revision(self):
return self._revision
[docs]
def reset(self):
"""
Clears the pins and reservations sets. This is primarily useful in
test suites to ensure the pin factory is back in a "clean" state before
the next set of tests are run.
"""
self.pins.clear()
self._reservations.clear()
[docs]
def pin(self, name, pin_class=None, **kwargs):
"""
The pin method for :class:`MockFactory` additionally takes a
*pin_class* attribute which can be used to override the class'
:attr:`pin_class` attribute. Any additional keyword arguments will be
passed along to the pin constructor (useful with things like
:class:`MockConnectedPin` which expect to be constructed with another
pin).
"""
if pin_class is None:
pin_class = self.pin_class
for header, info in self.board_info.find_pin(name):
try:
pin = self.pins[info]
except KeyError:
pin = pin_class(self, info, **kwargs)
self.pins[info] = pin
else:
# Ensure the pin class expected supports PWM (or not)
if issubclass(pin_class, MockPWMPin) != isinstance(pin, MockPWMPin):
raise ValueError(
f'pin {info.name} is already in use as a '
f'{pin.__class__.__name__}')
return pin
raise PinInvalidPin(f'{name} is not a valid pin name')
def _get_spi_class(self, shared, hardware):
return MockSPIInterfaceShared if shared else MockSPIInterface
[docs]
@staticmethod
def ticks():
return monotonic()
[docs]
@staticmethod
def ticks_diff(later, earlier):
return later - earlier
class MockSPIInterface(SPISoftware):
pass
class MockSPIInterfaceShared(SharedMixin, MockSPIInterface):
@classmethod
def _shared_key(cls, clock_pin, mosi_pin, miso_pin, select_pin,
pin_factory):
return (clock_pin, select_pin)