# vim: set fileencoding=utf-8:
#
# GPIO Zero: a library for controlling the Raspberry Pi's GPIO pins
#
# Copyright (c) 2015-2023 Dave Jones <dave@waveform.org.uk>
# Copyright (c) 2015-2021 Ben Nuttall <ben@bennuttall.com>
# Copyright (c) 2020 Robert Erdin <roberte@depop.com>
# Copyright (c) 2020 Fangchen Li <fangchen.li@outlook.com>
# Copyright (c) 2020 Dan Jackson <dan@djackson.org>
# Copyright (c) 2016-2020 Andrew Scheller <github@loowis.durge.org>
# Copyright (c) 2019 Kosovan Sofiia <sofiia.kosovan@gmail.com>
# Copyright (c) 2018 Philippe Muller <philippe.muller@gmail.com>
# Copyright (c) 2016 Steveis <SteveAmor@users.noreply.github.com>
#
# SPDX-License-Identifier: BSD-3-Clause
import warnings
from time import sleep
from threading import Event, Lock
from itertools import tee
from statistics import median, mean
from .exc import (
InputDeviceError,
DeviceClosed,
DistanceSensorNoEcho,
PinInvalidState,
PWMSoftwareFallback,
)
from .devices import GPIODevice, CompositeDevice
from .mixins import GPIOQueue, EventsMixin, HoldMixin, event
try:
from .pins.pigpio import PiGPIOFactory
except ImportError:
PiGPIOFactory = None
Button.is_pressed = Button.is_active
Button.pressed_time = Button.active_time
Button.when_pressed = Button.when_activated
Button.when_released = Button.when_deactivated
Button.wait_for_press = Button.wait_for_active
Button.wait_for_release = Button.wait_for_inactive
[docs]
class LineSensor(SmoothedInputDevice):
"""
Extends :class:`SmoothedInputDevice` and represents a single pin line
sensor like the TCRT5000 infra-red proximity sensor found in the `CamJam #3
EduKit`_.
A typical line sensor has a small circuit board with three pins: VCC, GND,
and OUT. VCC should be connected to a 3V3 pin, GND to one of the ground
pins, and finally OUT to the GPIO specified as the value of the *pin*
parameter in the constructor.
The following code will print a line of text indicating when the sensor
detects a line, or stops detecting a line::
from gpiozero import LineSensor
from signal import pause
sensor = LineSensor(4)
sensor.when_line = lambda: print('Line detected')
sensor.when_no_line = lambda: print('No line detected')
pause()
:type pin: int or str
:param pin:
The GPIO pin which the sensor is connected to. See :ref:`pin-numbering`
for valid pin numbers. If this is :data:`None` a :exc:`GPIODeviceError`
will be raised.
:type pull_up: bool or None
:param pull_up:
See description under :class:`InputDevice` for more information.
:type active_state: bool or None
:param active_state:
See description under :class:`InputDevice` for more information.
:param int queue_len:
The length of the queue used to store values read from the sensor. This
defaults to 5.
:param float sample_rate:
The number of values to read from the device (and append to the
internal queue) per second. Defaults to 100.
:param float threshold:
Defaults to 0.5. When the average of all values in the internal queue
rises above this value, the sensor will be considered "active" by the
:attr:`~SmoothedInputDevice.is_active` property, and all appropriate
events will be fired.
:param bool partial:
When :data:`False` (the default), the object will not return a value
for :attr:`~SmoothedInputDevice.is_active` until the internal queue has
filled with values. Only set this to :data:`True` if you require
values immediately after object construction.
:type pin_factory: Factory or None
:param pin_factory:
See :doc:`api_pins` for more information (this is an advanced feature
which most users can ignore).
.. _CamJam #3 EduKit: http://camjam.me/?page_id=1035
"""
def __init__(self, pin=None, *, pull_up=False, active_state=None,
queue_len=5, sample_rate=100, threshold=0.5, partial=False,
pin_factory=None):
super().__init__(
pin, pull_up=pull_up, active_state=active_state,
threshold=threshold, queue_len=queue_len,
sample_wait=1 / sample_rate, partial=partial,
pin_factory=pin_factory)
self._queue.start()
@property
def value(self):
"""
Returns a value representing the average of the queued values. This
is nearer 0 for black under the sensor, and nearer 1 for white under
the sensor.
"""
return super().value
@property
def line_detected(self):
return not self.is_active
LineSensor.when_line = LineSensor.when_deactivated
LineSensor.when_no_line = LineSensor.when_activated
LineSensor.wait_for_line = LineSensor.wait_for_inactive
LineSensor.wait_for_no_line = LineSensor.wait_for_active
[docs]
class MotionSensor(SmoothedInputDevice):
"""
Extends :class:`SmoothedInputDevice` and represents a passive infra-red
(PIR) motion sensor like the sort found in the `CamJam #2 EduKit`_.
.. _CamJam #2 EduKit: http://camjam.me/?page_id=623
A typical PIR device has a small circuit board with three pins: VCC, OUT,
and GND. VCC should be connected to a 5V pin, GND to one of the ground
pins, and finally OUT to the GPIO specified as the value of the *pin*
parameter in the constructor.
The following code will print a line of text when motion is detected::
from gpiozero import MotionSensor
pir = MotionSensor(4)
pir.wait_for_motion()
print("Motion detected!")
:type pin: int or str
:param pin:
The GPIO pin which the sensor is connected to. See :ref:`pin-numbering`
for valid pin numbers. If this is :data:`None` a :exc:`GPIODeviceError`
will be raised.
:type pull_up: bool or None
:param pull_up:
See description under :class:`InputDevice` for more information.
:type active_state: bool or None
:param active_state:
See description under :class:`InputDevice` for more information.
:param int queue_len:
The length of the queue used to store values read from the sensor. This
defaults to 1 which effectively disables the queue. If your motion
sensor is particularly "twitchy" you may wish to increase this value.
:param float sample_rate:
The number of values to read from the device (and append to the
internal queue) per second. Defaults to 10.
:param float threshold:
Defaults to 0.5. When the average of all values in the internal queue
rises above this value, the sensor will be considered "active" by the
:attr:`~SmoothedInputDevice.is_active` property, and all appropriate
events will be fired.
:param bool partial:
When :data:`False` (the default), the object will not return a value
for :attr:`~SmoothedInputDevice.is_active` until the internal queue has
filled with values. Only set this to :data:`True` if you require
values immediately after object construction.
:type pin_factory: Factory or None
:param pin_factory:
See :doc:`api_pins` for more information (this is an advanced feature
which most users can ignore).
"""
def __init__(self, pin=None, *, pull_up=False, active_state=None,
queue_len=1, sample_rate=10, threshold=0.5, partial=False,
pin_factory=None):
super().__init__(
pin, pull_up=pull_up, active_state=active_state,
threshold=threshold, queue_len=queue_len, sample_wait=1 /
sample_rate, partial=partial, pin_factory=pin_factory, average=mean)
self._queue.start()
@property
def value(self):
"""
With the default *queue_len* of 1, this is effectively boolean where 0
means no motion detected and 1 means motion detected. If you specify
a *queue_len* greater than 1, this will be an averaged value where
values closer to 1 imply motion detection.
"""
return super().value
MotionSensor.motion_detected = MotionSensor.is_active
MotionSensor.when_motion = MotionSensor.when_activated
MotionSensor.when_no_motion = MotionSensor.when_deactivated
MotionSensor.wait_for_motion = MotionSensor.wait_for_active
MotionSensor.wait_for_no_motion = MotionSensor.wait_for_inactive
[docs]
class LightSensor(SmoothedInputDevice):
"""
Extends :class:`SmoothedInputDevice` and represents a light dependent
resistor (LDR).
Connect one leg of the LDR to the 3V3 pin; connect one leg of a 1µF
capacitor to a ground pin; connect the other leg of the LDR and the other
leg of the capacitor to the same GPIO pin. This class repeatedly discharges
the capacitor, then times the duration it takes to charge (which will vary
according to the light falling on the LDR).
The following code will print a line of text when light is detected::
from gpiozero import LightSensor
ldr = LightSensor(18)
ldr.wait_for_light()
print("Light detected!")
:type pin: int or str
:param pin:
The GPIO pin which the sensor is attached to. See :ref:`pin-numbering`
for valid pin numbers. If this is :data:`None` a :exc:`GPIODeviceError`
will be raised.
:param int queue_len:
The length of the queue used to store values read from the circuit.
This defaults to 5.
:param float charge_time_limit:
If the capacitor in the circuit takes longer than this length of time
to charge, it is assumed to be dark. The default (0.01 seconds) is
appropriate for a 1µF capacitor coupled with the LDR from the
`CamJam #2 EduKit`_. You may need to adjust this value for different
valued capacitors or LDRs.
:param float threshold:
Defaults to 0.1. When the average of all values in the internal queue
rises above this value, the area will be considered "light", and all
appropriate events will be fired.
:param bool partial:
When :data:`False` (the default), the object will not return a value
for :attr:`~SmoothedInputDevice.is_active` until the internal queue has
filled with values. Only set this to :data:`True` if you require
values immediately after object construction.
:type pin_factory: Factory or None
:param pin_factory:
See :doc:`api_pins` for more information (this is an advanced feature
which most users can ignore).
.. _CamJam #2 EduKit: http://camjam.me/?page_id=623
"""
def __init__(self, pin=None, *, queue_len=5, charge_time_limit=0.01,
threshold=0.1, partial=False, pin_factory=None):
super().__init__(
pin, pull_up=False, threshold=threshold, queue_len=queue_len,
sample_wait=0.0, partial=partial, pin_factory=pin_factory)
try:
self._charge_time_limit = charge_time_limit
self._charge_time = None
self._charged = Event()
self.pin.edges = 'rising'
self.pin.bounce = None
self.pin.when_changed = self._cap_charged
self._queue.start()
except:
self.close()
raise
@property
def charge_time_limit(self):
return self._charge_time_limit
def _cap_charged(self, ticks, state):
self._charge_time = ticks
self._charged.set()
def _read(self):
# Drain charge from the capacitor
self.pin.function = 'output'
self.pin.state = False
sleep(0.1)
self._charge_time = None
self._charged.clear()
# Time the charging of the capacitor
start = self.pin_factory.ticks()
self.pin.function = 'input'
self._charged.wait(self.charge_time_limit)
if self._charge_time is None:
return 0.0
else:
return 1.0 - min(1.0,
(self.pin_factory.ticks_diff(self._charge_time, start) /
self.charge_time_limit))
@property
def value(self):
"""
Returns a value between 0 (dark) and 1 (light).
"""
return super().value
LightSensor.light_detected = LightSensor.is_active
LightSensor.when_light = LightSensor.when_activated
LightSensor.when_dark = LightSensor.when_deactivated
LightSensor.wait_for_light = LightSensor.wait_for_active
LightSensor.wait_for_dark = LightSensor.wait_for_inactive
[docs]
class DistanceSensor(SmoothedInputDevice):
"""
Extends :class:`SmoothedInputDevice` and represents an HC-SR04 ultrasonic
distance sensor, as found in the `CamJam #3 EduKit`_.
The distance sensor requires two GPIO pins: one for the *trigger* (marked
TRIG on the sensor) and another for the *echo* (marked ECHO on the sensor).
However, a voltage divider is required to ensure the 5V from the ECHO pin
doesn't damage the Pi. Wire your sensor according to the following
instructions:
1. Connect the GND pin of the sensor to a ground pin on the Pi.
2. Connect the TRIG pin of the sensor a GPIO pin.
3. Connect one end of a 330Ω resistor to the ECHO pin of the sensor.
4. Connect one end of a 470Ω resistor to the GND pin of the sensor.
5. Connect the free ends of both resistors to another GPIO pin. This forms
the required `voltage divider`_.
6. Finally, connect the VCC pin of the sensor to a 5V pin on the Pi.
Alternatively, the 3V3 tolerant HC-SR04P sensor (which does not require a
voltage divider) will work with this class.
.. note::
If you do not have the precise values of resistor specified above,
don't worry! What matters is the *ratio* of the resistors to each
other.
You also don't need to be absolutely precise; the `voltage divider`_
given above will actually output ~3V (rather than 3.3V). A simple 2:3
ratio will give 3.333V which implies you can take three resistors of
equal value, use one of them instead of the 330Ω resistor, and two of
them in series instead of the 470Ω resistor.
.. _voltage divider: https://en.wikipedia.org/wiki/Voltage_divider
The following code will periodically report the distance measured by the
sensor in cm assuming the TRIG pin is connected to GPIO17, and the ECHO
pin to GPIO18::
from gpiozero import DistanceSensor
from time import sleep
sensor = DistanceSensor(echo=18, trigger=17)
while True:
print('Distance: ', sensor.distance * 100)
sleep(1)
.. note::
For improved accuracy, use the pigpio pin driver rather than the default
RPi.GPIO driver (pigpio uses DMA sampling for much more precise edge
timing). This is particularly relevant if you're using Pi 1 or Pi Zero.
See :ref:`changing-pin-factory` for further information.
:type echo: int or str
:param echo:
The GPIO pin which the ECHO pin is connected to. See
:ref:`pin-numbering` for valid pin numbers. If this is :data:`None` a
:exc:`GPIODeviceError` will be raised.
:type trigger: int or str
:param trigger:
The GPIO pin which the TRIG pin is connected to. See
:ref:`pin-numbering` for valid pin numbers. If this is :data:`None` a
:exc:`GPIODeviceError` will be raised.
:param int queue_len:
The length of the queue used to store values read from the sensor.
This defaults to 9.
:param float max_distance:
The :attr:`value` attribute reports a normalized value between 0 (too
close to measure) and 1 (maximum distance). This parameter specifies
the maximum distance expected in meters. This defaults to 1.
:param float threshold_distance:
Defaults to 0.3. This is the distance (in meters) that will trigger the
``in_range`` and ``out_of_range`` events when crossed.
:param bool partial:
When :data:`False` (the default), the object will not return a value
for :attr:`~SmoothedInputDevice.is_active` until the internal queue has
filled with values. Only set this to :data:`True` if you require
values immediately after object construction.
:type pin_factory: Factory or None
:param pin_factory:
See :doc:`api_pins` for more information (this is an advanced feature
which most users can ignore).
.. _CamJam #3 EduKit: http://camjam.me/?page_id=1035
"""
ECHO_LOCK = Lock()
def __init__(self, echo=None, trigger=None, *, queue_len=9,
max_distance=1, threshold_distance=0.3, partial=False,
pin_factory=None):
self._trigger = None
super().__init__(
echo, pull_up=False, queue_len=queue_len, sample_wait=0.06,
partial=partial, ignore=frozenset({None}), pin_factory=pin_factory
)
try:
if max_distance <= 0:
raise ValueError('invalid maximum distance (must be positive)')
self._max_distance = max_distance
self.threshold = threshold_distance / max_distance
self.speed_of_sound = 343.26 # m/s
self._trigger = GPIODevice(trigger, pin_factory=pin_factory)
self._echo = Event()
self._echo_rise = None
self._echo_fall = None
self._trigger.pin.function = 'output'
self._trigger.pin.state = False
self.pin.edges = 'both'
self.pin.bounce = None
self.pin.when_changed = self._echo_changed
self._queue.start()
except:
self.close()
raise
if PiGPIOFactory is None or not isinstance(self.pin_factory, PiGPIOFactory):
warnings.warn(PWMSoftwareFallback(
'For more accurate readings, use the pigpio pin factory.'
'See https://gpiozero.readthedocs.io/en/stable/api_input.html#distancesensor-hc-sr04 for more info'
))
def close(self):
try:
self._trigger.close()
except AttributeError:
pass
self._trigger = None
super().close()
@property
def max_distance(self):
"""
The maximum distance that the sensor will measure in meters. This value
is specified in the constructor and is used to provide the scaling for
the :attr:`~SmoothedInputDevice.value` attribute. When :attr:`distance`
is equal to :attr:`max_distance`, :attr:`~SmoothedInputDevice.value`
will be 1.
"""
return self._max_distance
@max_distance.setter
def max_distance(self, value):
if value <= 0:
raise ValueError('invalid maximum distance (must be positive)')
t = self.threshold_distance
self._max_distance = value
self.threshold_distance = t
@property
def threshold_distance(self):
"""
The distance, measured in meters, that will trigger the
:attr:`when_in_range` and :attr:`when_out_of_range` events when
crossed. This is simply a meter-scaled variant of the usual
:attr:`~SmoothedInputDevice.threshold` attribute.
"""
return self.threshold * self.max_distance
@threshold_distance.setter
def threshold_distance(self, value):
self.threshold = value / self.max_distance
@property
def distance(self):
"""
Returns the current distance measured by the sensor in meters. Note
that this property will have a value between 0 and
:attr:`max_distance`.
"""
return self.value * self._max_distance
@property
def value(self):
"""
Returns a value between 0, indicating the reflector is either touching
the sensor or is sufficiently near that the sensor can't tell the
difference, and 1, indicating the reflector is at or beyond the
specified *max_distance*.
"""
return super().value
@property
def trigger(self):
"""
Returns the :class:`Pin` that the sensor's trigger is connected to.
"""
return self._trigger.pin
@property
def echo(self):
"""
Returns the :class:`Pin` that the sensor's echo is connected to. This
is simply an alias for the usual :attr:`~GPIODevice.pin` attribute.
"""
return self.pin
def _echo_changed(self, ticks, level):
if level:
self._echo_rise = ticks
else:
self._echo_fall = ticks
self._echo.set()
def _read(self):
# Wait up to 50ms for the echo pin to fall to low (the maximum echo
# pulse is 35ms so this gives some leeway); if it doesn't something is
# horribly wrong (most likely at the hardware level)
if self.pin.state:
if not self._echo.wait(0.05):
warnings.warn(DistanceSensorNoEcho('echo pin set high'))
return None
self._echo.clear()
self._echo_fall = None
self._echo_rise = None
# Obtain the class-level ECHO_LOCK to ensure multiple distance sensors
# don't listen for each other's "pings"
with DistanceSensor.ECHO_LOCK:
# Fire the trigger
self._trigger.pin.state = True
sleep(0.00001)
self._trigger.pin.state = False
# Wait up to 100ms for the echo pin to rise and fall (35ms is the
# maximum pulse time, but the pre-rise time is unspecified in the
# "datasheet"; 100ms seems sufficiently long to conclude something
# has failed)
if self._echo.wait(0.1):
if self._echo_fall is not None and self._echo_rise is not None:
distance = (
self.pin_factory.ticks_diff(
self._echo_fall, self._echo_rise) *
self.speed_of_sound / 2.0)
return min(1.0, distance / self._max_distance)
else:
# If we only saw the falling edge it means we missed
# the echo because it was too fast
return None
else:
# The echo pin never rose or fell; something's gone horribly
# wrong
warnings.warn(DistanceSensorNoEcho('no echo received'))
return None
@property
def in_range(self):
return not self.is_active
DistanceSensor.when_out_of_range = DistanceSensor.when_activated
DistanceSensor.when_in_range = DistanceSensor.when_deactivated
DistanceSensor.wait_for_out_of_range = DistanceSensor.wait_for_active
DistanceSensor.wait_for_in_range = DistanceSensor.wait_for_inactive
[docs]
class RotaryEncoder(EventsMixin, CompositeDevice):
"""
Represents a simple two-pin incremental `rotary encoder`_ device.
These devices typically have three pins labelled "A", "B", and "C". Connect
A and B directly to two GPIO pins, and C ("common") to one of the ground
pins on your Pi. Then simply specify the A and B pins as the arguments when
constructing this classs.
For example, if your encoder's A pin is connected to GPIO 21, and the B
pin to GPIO 20 (and presumably the C pin to a suitable GND pin), while an
LED (with a suitable 300Ω resistor) is connected to GPIO 5, the following
session will result in the brightness of the LED being controlled by
dialling the rotary encoder back and forth::
>>> from gpiozero import RotaryEncoder
>>> from gpiozero.tools import scaled_half
>>> rotor = RotaryEncoder(21, 20)
>>> led = PWMLED(5)
>>> led.source = scaled_half(rotor.values)
:type a: int or str
:param a:
The GPIO pin connected to the "A" output of the rotary encoder.
:type b: int or str
:param b:
The GPIO pin connected to the "B" output of the rotary encoder.
:type bounce_time: float or None
:param bounce_time:
If :data:`None` (the default), no software bounce compensation will be
performed. Otherwise, this is the length of time (in seconds) that the
component will ignore changes in state after an initial change.
:type max_steps: int
:param max_steps:
The number of steps clockwise the encoder takes to change the
:attr:`value` from 0 to 1, or counter-clockwise from 0 to -1.
If this is 0, then the encoder's :attr:`value` never changes, but you
can still read :attr:`steps` to determine the integer number of steps
the encoder has moved clockwise or counter clockwise.
:type threshold_steps: tuple of int
:param threshold_steps:
A (min, max) tuple of steps between which the device will be considered
"active", inclusive. In other words, when :attr:`steps` is greater than
or equal to the *min* value, and less than or equal the *max* value,
the :attr:`active` property will be :data:`True` and the appropriate
events (:attr:`when_activated`, :attr:`when_deactivated`) will be
fired. Defaults to (0, 0).
:type wrap: bool
:param wrap:
If :data:`True` and *max_steps* is non-zero, when the :attr:`steps`
reaches positive or negative *max_steps* it wraps around by negation.
Defaults to :data:`False`.
:type pin_factory: Factory or None
:param pin_factory:
See :doc:`api_pins` for more information (this is an advanced feature
which most users can ignore).
.. _rotary encoder: https://en.wikipedia.org/wiki/Rotary_encoder
"""
# The rotary encoder's two pins move through the following sequence when
# the encoder is rotated one step clockwise:
#
# ────┐ ┌─────┐ ┌────────
# _ │ │ │ │ counter ┌───┐
# A │ │ │ │ clockwise ┌─── │ 0 │ ───┐ clockwise
# └─────┘ └─────┘ (CCW) │ └───┘ │ (CW)
# : : : : │ ┌───┐ ┌───┐ │
# ───────┐ : ┌─────┐ : ┌───── ▾ │ 1 │ │ 2 │ ▾
# _ : │ : │ : │ : │ └───┘ └───┘
# B : │ : │ : │ : │ │ ┌───┐ │
# : └─────┘ : └─────┘ └─── │ 3 │ ───┘
# : : : : : : : : └───┘
# 0 2 3 1 0 2 3 1 0
#
# Treating the A pin as a "high" bit, and the B pin as a "low" bit, this
# means that the pins return the sequence 0, 2, 3, 1 for each step that the
# encoder takes clockwise. Conversely, the pins return the sequence 0, 1,
# 3, 2 for each step counter-clockwise.
#
# We can treat these values as edges to take in a simple state machine,
# which is represented in the dictionary below:
TRANSITIONS = {
'idle': ['idle', 'ccw1', 'cw1', 'idle'],
'ccw1': ['idle', 'ccw1', 'ccw3', 'ccw2'],
'ccw2': ['idle', 'ccw1', 'ccw3', 'ccw2'],
'ccw3': ['-1', 'idle', 'ccw3', 'ccw2'],
'cw1': ['idle', 'cw3', 'cw1', 'cw2'],
'cw2': ['idle', 'cw3', 'cw1', 'cw2'],
'cw3': ['+1', 'cw3', 'idle', 'cw2'],
}
# The state machine here includes more than just the strictly necessary
# edges; it also permits "wiggle" between intermediary states so that the
# overall graph looks like this:
#
# ┌──────┐
# │ │
# ┌─────┤ idle ├────┐
# │1 │ │ 2│
# │ └──────┘ │
# ▾ ▴ ▴ ▾
# ┌────────┐ │ │ ┌───────┐
# │ │ 0│ │0 │ │
# ┌───┤ ccw1 ├──┤ ├──┤ cw1 ├───┐
# │2 │ │ │ │ │ │ 1│
# │ └─┬──────┘ │ │ └─────┬─┘ │
# │ 3│ ▴ │ │ ▴ │3 │
# │ ▾ │1 │ │ 2│ ▾ │
# │ ┌──────┴─┐ │ │ ┌─┴─────┐ │
# │ │ │ 0│ │0 │ │ │
# │ │ ccw2 ├──┤ ├──┤ cw2 │ │
# │ │ │ │ │ │ │ │
# │ └─┬──────┘ │ │ └─────┬─┘ │
# │ 2│ ▴ │ │ ▴ │1 │
# │ ▾ │3 │ │ 3│ ▾ │
# │ ┌──────┴─┐ │ │ ┌─┴─────┐ │
# │ │ │ │ │ │ │ │
# └──▸│ ccw3 │ │ │ │ cw3 │◂──┘
# │ │ │ │ │ │
# └───┬────┘ │ │ └───┬───┘
# 0│ │ │ │0
# ▾ │ │ ▾
# ┌────────┐ │ │ ┌───────┐
# │ │ │ │ │ │
# │ -1 ├──┘ └──┤ +1 │
# │ │ │ │
# └────────┘ └───────┘
#
# Note that, once we start down the clockwise (cw) or counter-clockwise
# (ccw) path, we don't allow the state to pick the alternate direction
# without passing through the idle state again. This seems to work well in
# practice with several encoders, even quite jiggly ones with no debounce
# hardware or software
def __init__(self, a, b, *, bounce_time=None, max_steps=16,
threshold_steps=(0, 0), wrap=False, pin_factory=None):
min_thresh, max_thresh = threshold_steps
if max_thresh < min_thresh:
raise ValueError('maximum threshold cannot be less than minimum')
self._steps = 0
self._max_steps = int(max_steps)
self._threshold = (int(min_thresh), int(max_thresh))
self._wrap = bool(wrap)
self._state = 'idle'
self._edge = 0
self._when_rotated = None
self._when_rotated_cw = None
self._when_rotated_ccw = None
self._rotate_event = Event()
self._rotate_cw_event = Event()
self._rotate_ccw_event = Event()
super().__init__(
a=InputDevice(a, pull_up=True, pin_factory=pin_factory),
b=InputDevice(b, pull_up=True, pin_factory=pin_factory),
_order=('a', 'b'), pin_factory=pin_factory)
self.a.pin.bounce_time = bounce_time
self.b.pin.bounce_time = bounce_time
self.a.pin.edges = 'both'
self.b.pin.edges = 'both'
self.a.pin.when_changed = self._a_changed
self.b.pin.when_changed = self._b_changed
# Call _fire_events once to set initial state of events
self._fire_events(self.pin_factory.ticks(), self.is_active)
def __repr__(self):
try:
self._check_open()
return (
f"<gpiozero.{self.__class__.__name__} object on pins "
f"{self.a.pin!r} and {self.b.pin!r}>")
except DeviceClosed:
return super().__repr__()
def _a_changed(self, ticks, state):
edge = (self.a._state_to_value(state) << 1) | (self._edge & 0x1)
self._change_state(ticks, edge)
def _b_changed(self, ticks, state):
edge = (self._edge & 0x2) | self.b._state_to_value(state)
self._change_state(ticks, edge)
def _change_state(self, ticks, edge):
self._edge = edge
new_state = RotaryEncoder.TRANSITIONS[self._state][edge]
if new_state == '+1':
self._steps = (
self._steps + 1
if not self._max_steps or self._steps < self._max_steps else
-self._max_steps if self._wrap else self._max_steps
)
self._rotate_cw_event.set()
self._fire_rotated_cw()
self._rotate_cw_event.clear()
elif new_state == '-1':
self._steps = (
self._steps - 1
if not self._max_steps or self._steps > -self._max_steps else
self._max_steps if self._wrap else -self._max_steps
)
self._rotate_ccw_event.set()
self._fire_rotated_ccw()
self._rotate_ccw_event.clear()
else:
self._state = new_state
return
self._rotate_event.set()
self._fire_rotated()
self._rotate_event.clear()
self._fire_events(ticks, self.is_active)
self._state = 'idle'
[docs]
def wait_for_rotate(self, timeout=None):
"""
Pause the script until the encoder is rotated at least one step in
either direction, or the timeout is reached.
:type timeout: float or None
:param timeout:
Number of seconds to wait before proceeding. If this is
:data:`None` (the default), then wait indefinitely until the
encoder is rotated.
"""
return self._rotate_event.wait(timeout)
[docs]
def wait_for_rotate_clockwise(self, timeout=None):
"""
Pause the script until the encoder is rotated at least one step
clockwise, or the timeout is reached.
:type timeout: float or None
:param timeout:
Number of seconds to wait before proceeding. If this is
:data:`None` (the default), then wait indefinitely until the
encoder is rotated clockwise.
"""
return self._rotate_cw_event.wait(timeout)
[docs]
def wait_for_rotate_counter_clockwise(self, timeout=None):
"""
Pause the script until the encoder is rotated at least one step
counter-clockwise, or the timeout is reached.
:type timeout: float or None
:param timeout:
Number of seconds to wait before proceeding. If this is
:data:`None` (the default), then wait indefinitely until the
encoder is rotated counter-clockwise.
"""
return self._rotate_ccw_event.wait(timeout)
when_rotated = event(
"""
The function to be run when the encoder is rotated in either direction.
This can be set to a function which accepts no (mandatory) parameters,
or a Python function which accepts a single mandatory parameter (with
as many optional parameters as you like). If the function accepts a
single mandatory parameter, the device that activated will be passed
as that parameter.
Set this property to :data:`None` (the default) to disable the event.
""")
when_rotated_clockwise = event(
"""
The function to be run when the encoder is rotated clockwise.
This can be set to a function which accepts no (mandatory) parameters,
or a Python function which accepts a single mandatory parameter (with
as many optional parameters as you like). If the function accepts a
single mandatory parameter, the device that activated will be passed
as that parameter.
Set this property to :data:`None` (the default) to disable the event.
""")
when_rotated_counter_clockwise = event(
"""
The function to be run when the encoder is rotated counter-clockwise.
This can be set to a function which accepts no (mandatory) parameters,
or a Python function which accepts a single mandatory parameter (with
as many optional parameters as you like). If the function accepts a
single mandatory parameter, the device that activated will be passed
as that parameter.
Set this property to :data:`None` (the default) to disable the event.
""")
@property
def steps(self):
"""
The "steps" value of the encoder starts at 0. It increments by one for
every step the encoder is rotated clockwise, and decrements by one for
every step it is rotated counter-clockwise. The steps value is
limited by :attr:`max_steps`. It will not advance beyond positive or
negative :attr:`max_steps`, unless :attr:`wrap` is :data:`True` in
which case it will roll around by negation. If :attr:`max_steps` is
zero then steps are not limited at all, and will increase infinitely
in either direction, but :attr:`value` will return a constant zero.
Note that, in contrast to most other input devices, because the rotary
encoder has no absolute position the :attr:`steps` attribute (and
:attr:`value` by corollary) is writable.
"""
return self._steps
def _fire_rotated(self):
if self.when_rotated:
self.when_rotated()
def _fire_rotated_cw(self):
if self.when_rotated_clockwise:
self.when_rotated_clockwise()
def _fire_rotated_ccw(self):
if self.when_rotated_counter_clockwise:
self.when_rotated_counter_clockwise()
@steps.setter
def steps(self, value):
value = int(value)
if self._max_steps:
value = max(-self._max_steps, min(self._max_steps, value))
self._steps = value
@property
def value(self):
"""
Represents the value of the rotary encoder as a value between -1 and 1.
The value is calculated by dividing the value of :attr:`steps` into the
range from negative :attr:`max_steps` to positive :attr:`max_steps`.
Note that, in contrast to most other input devices, because the rotary
encoder has no absolute position the :attr:`value` attribute is
writable.
"""
try:
return self._steps / self._max_steps
except ZeroDivisionError:
return 0
@value.setter
def value(self, value):
self._steps = int(max(-1, min(1, float(value))) * self._max_steps)
@property
def is_active(self):
return self._threshold[0] <= self._steps <= self._threshold[1]
@property
def max_steps(self):
"""
The number of discrete steps the rotary encoder takes to move
:attr:`value` from 0 to 1 clockwise, or 0 to -1 counter-clockwise. In
another sense, this is also the total number of discrete states this
input can represent.
"""
return self._max_steps
@property
def threshold_steps(self):
"""
The mininum and maximum number of steps between which :attr:`is_active`
will return :data:`True`. Defaults to (0, 0).
"""
return self._threshold
@property
def wrap(self):
"""
If :data:`True`, when :attr:`value` reaches its limit (-1 or 1), it
"wraps around" to the opposite limit. When :data:`False`, the value
(and the corresponding :attr:`steps` attribute) simply don't advance
beyond their limits.
"""
return self._wrap