from __future__ import (
unicode_literals,
absolute_import,
print_function,
division,
)
str = type('')
import warnings
import pigpio
from . import Pin
from .data import pi_info
from ..exc import (
PinInvalidFunction,
PinSetInput,
PinFixedPull,
PinInvalidPull,
PinInvalidBounce,
PinInvalidState,
PinNonPhysical,
PinNoPins,
)
[docs]class PiGPIOPin(Pin):
"""
Uses the `pigpio`_ library to interface to the Pi's GPIO pins. The pigpio
library relies on a daemon (``pigpiod``) to be running as root to provide
access to the GPIO pins, and communicates with this daemon over a network
socket.
While this does mean only the daemon itself should control the pins, the
architecture does have several advantages:
* Pins can be remote controlled from another machine (the other
machine doesn't even have to be a Raspberry Pi; it simply needs the
`pigpio`_ client library installed on it)
* The daemon supports hardware PWM via the DMA controller
* Your script itself doesn't require root privileges; it just needs to
be able to communicate with the daemon
You can construct pigpiod pins manually like so::
from gpiozero.pins.pigpiod import PiGPIOPin
from gpiozero import LED
led = LED(PiGPIOPin(12))
This is particularly useful for controlling pins on a remote machine. To
accomplish this simply specify the host (and optionally port) when
constructing the pin::
from gpiozero.pins.pigpiod import PiGPIOPin
from gpiozero import LED
from signal import pause
led = LED(PiGPIOPin(12, host='192.168.0.2'))
.. note::
In some circumstances, especially when playing with PWM, it does appear
to be possible to get the daemon into "unusual" states. We would be
most interested to hear any bug reports relating to this (it may be a
bug in our pin implementation). A workaround for now is simply to
restart the ``pigpiod`` daemon.
.. _pigpio: http://abyz.co.uk/rpi/pigpio/
"""
_CONNECTIONS = {}
_PINS = {}
GPIO_FUNCTIONS = {
'input': pigpio.INPUT,
'output': pigpio.OUTPUT,
'alt0': pigpio.ALT0,
'alt1': pigpio.ALT1,
'alt2': pigpio.ALT2,
'alt3': pigpio.ALT3,
'alt4': pigpio.ALT4,
'alt5': pigpio.ALT5,
}
GPIO_PULL_UPS = {
'up': pigpio.PUD_UP,
'down': pigpio.PUD_DOWN,
'floating': pigpio.PUD_OFF,
}
GPIO_EDGES = {
'both': pigpio.EITHER_EDGE,
'rising': pigpio.RISING_EDGE,
'falling': pigpio.FALLING_EDGE,
}
GPIO_FUNCTION_NAMES = {v: k for (k, v) in GPIO_FUNCTIONS.items()}
GPIO_PULL_UP_NAMES = {v: k for (k, v) in GPIO_PULL_UPS.items()}
GPIO_EDGES_NAMES = {v: k for (k, v) in GPIO_EDGES.items()}
PI_INFO = None
def __new__(cls, number, host='localhost', port=8888):
# XXX What about remote pins? This should probably be instance
# specific rather than class specific for pigpio. Need to check how
# to query remote info though...
if cls.PI_INFO is None:
cls.PI_INFO = pi_info()
try:
return cls._PINS[(host, port, number)]
except KeyError:
self = super(PiGPIOPin, cls).__new__(cls)
try:
self._connection = cls._CONNECTIONS[(host, port)]
except KeyError:
self._connection = pigpio.pi(host, port)
cls._CONNECTIONS[(host, port)] = self._connection
try:
cls.PI_INFO.physical_pin('GPIO%d' % number)
except PinNoPins:
warnings.warn(
PinNonPhysical(
'no physical pins exist for GPIO%d' % number))
self._host = host
self._port = port
self._number = number
self._pull = 'up' if cls.PI_INFO.pulled_up('GPIO%d' % number) else 'floating'
self._pwm = False
self._bounce = None
self._when_changed = None
self._callback = None
self._edges = pigpio.EITHER_EDGE
try:
self._connection.set_mode(self._number, pigpio.INPUT)
except pigpio.error as e:
raise ValueError(e)
self._connection.set_pull_up_down(self._number, self.GPIO_PULL_UPS[self._pull])
self._connection.set_glitch_filter(self._number, 0)
self._connection.set_PWM_range(self._number, 255)
cls._PINS[(host, port, number)] = self
return self
def __repr__(self):
if self._host == 'localhost':
return "GPIO%d" % self._number
else:
return "GPIO%d on %s:%d" % (self._number, self._host, self._port)
@property
def host(self):
return self._host
@property
def port(self):
return self._port
@property
def number(self):
return self._number
def close(self):
# If we're shutting down, the connection may have disconnected itself
# already. Unfortunately, the connection's "connected" property is
# rather buggy - disconnecting doesn't set it to False! So we're
# naughty and check an internal variable instead...
if self._connection.sl.s is not None:
self.frequency = None
self.when_changed = None
self.function = 'input'
self.pull = 'up' if self.PI_INFO.pulled_up('GPIO%d' % self.number) else 'floating'
def _get_function(self):
return self.GPIO_FUNCTION_NAMES[self._connection.get_mode(self._number)]
def _set_function(self, value):
if value != 'input':
self._pull = 'floating'
try:
self._connection.set_mode(self._number, self.GPIO_FUNCTIONS[value])
except KeyError:
raise PinInvalidFunction('invalid function "%s" for pin %r' % (value, self))
def _get_state(self):
if self._pwm:
return self._connection.get_PWM_dutycycle(self._number) / 255
else:
return bool(self._connection.read(self._number))
def _set_state(self, value):
if self._pwm:
try:
self._connection.set_PWM_dutycycle(self._number, int(value * 255))
except pigpio.error:
raise PinInvalidState('invalid state "%s" for pin %r' % (value, self))
elif self.function == 'input':
raise PinSetInput('cannot set state of pin %r' % self)
else:
# write forces pin to OUTPUT, hence the check above
self._connection.write(self._number, bool(value))
def _get_pull(self):
return self._pull
def _set_pull(self, value):
if self.function != 'input':
raise PinFixedPull('cannot set pull on non-input pin %r' % self)
if value != 'up' and self.PI_INFO.pulled_up('GPIO%d' % self._number):
raise PinFixedPull('%r has a physical pull-up resistor' % self)
try:
self._connection.set_pull_up_down(self._number, self.GPIO_PULL_UPS[value])
self._pull = value
except KeyError:
raise PinInvalidPull('invalid pull "%s" for pin %r' % (value, self))
def _get_frequency(self):
if self._pwm:
return self._connection.get_PWM_frequency(self._number)
return None
def _set_frequency(self, value):
if not self._pwm and value is not None:
self._connection.set_PWM_frequency(self._number, value)
self._connection.set_PWM_dutycycle(self._number, 0)
self._pwm = True
elif self._pwm and value is not None:
self._connection.set_PWM_frequency(self._number, value)
elif self._pwm and value is None:
self._connection.set_PWM_dutycycle(self._number, 0)
self._pwm = False
def _get_bounce(self):
return None if not self._bounce else self._bounce / 1000000
def _set_bounce(self, value):
if value is None:
value = 0
elif value < 0:
raise PinInvalidBounce('bounce must be 0 or greater')
self._connection.set_glitch_filter(self._number, int(value * 1000000))
def _get_edges(self):
return self.GPIO_EDGES_NAMES[self._edges]
def _set_edges(self, value):
f = self.when_changed
self.when_changed = None
try:
self._edges = self.GPIO_EDGES[value]
finally:
self.when_changed = f
def _get_when_changed(self):
if self._callback is None:
return None
return self._callback.callb.func
def _set_when_changed(self, value):
if self._callback is not None:
self._callback.cancel()
self._callback = None
if value is not None:
self._callback = self._connection.callback(
self._number, self._edges,
lambda gpio, level, tick: value())