# vim: set fileencoding=utf-8:
#
# GPIO Zero: a library for controlling the Raspberry Pi's GPIO pins
#
# Copyright (c) 2016-2023 Dave Jones <dave@waveform.org.uk>
# Copyright (c) 2017-2021 Ben Nuttall <ben@bennuttall.com>
# Copyright (c) 2019 Jeevan M R <14.jeevan@gmail.com>
# Copyright (c) 2019 Andrew Scheller <github@loowis.durge.org>
#
# SPDX-License-Identifier: BSD-3-Clause
import os
import io
import warnings
import subprocess
from datetime import datetime, time
from .devices import Device
from .mixins import EventsMixin, event
from .threads import GPIOThread
from .exc import ThresholdOutOfRange, DeviceClosed
[docs]
class InternalDevice(EventsMixin, Device):
"""
Extends :class:`Device` to provide a basis for devices which have no
specific hardware representation. These are effectively pseudo-devices and
usually represent operating system services like the internal clock, file
systems or network facilities.
"""
def __init__(self, *, pin_factory=None):
self._closed = False
super().__init__(pin_factory=pin_factory)
def close(self):
self._closed = True
super().close()
@property
def closed(self):
return self._closed
def __repr__(self):
try:
self._check_open()
return f"<gpiozero.{self.__class__.__name__} object>"
except DeviceClosed:
return f"<gpiozero.{self.__class__.__name__} object closed>"
[docs]
class PolledInternalDevice(InternalDevice):
"""
Extends :class:`InternalDevice` to provide a background thread to poll
internal devices that lack any other mechanism to inform the instance of
changes.
"""
def __init__(self, *, event_delay=1.0, pin_factory=None):
self._event_thread = None
self._event_delay = event_delay
super().__init__(pin_factory=pin_factory)
def close(self):
try:
self._start_stop_events(False)
except AttributeError:
pass # pragma: no cover
super().close()
@property
def event_delay(self):
"""
The delay between sampling the device's value for the purposes of
firing events.
Note that this only applies to events assigned to attributes like
:attr:`~EventsMixin.when_activated` and
:attr:`~EventsMixin.when_deactivated`. When using the
:attr:`~SourceMixin.source` and :attr:`~ValuesMixin.values` properties,
the sampling rate is controlled by the
:attr:`~SourceMixin.source_delay` property.
"""
return self._event_delay
@event_delay.setter
def event_delay(self, value):
self._event_delay = float(value)
def wait_for_active(self, timeout=None):
self._start_stop_events(True)
try:
return super().wait_for_active(timeout)
finally:
self._start_stop_events(
self.when_activated or self.when_deactivated)
def wait_for_inactive(self, timeout=None):
self._start_stop_events(True)
try:
return super().wait_for_inactive(timeout)
finally:
self._start_stop_events(
self.when_activated or self.when_deactivated)
def _watch_value(self):
while not self._event_thread.stopping.wait(self._event_delay):
self._fire_events(self.pin_factory.ticks(), self.is_active)
def _start_stop_events(self, enabled):
if self._event_thread and not enabled:
self._event_thread.stop()
self._event_thread = None
elif not self._event_thread and enabled:
self._event_thread = GPIOThread(self._watch_value)
self._event_thread.start()
[docs]
class PingServer(PolledInternalDevice):
"""
Extends :class:`PolledInternalDevice` to provide a device which is active
when a *host* (domain name or IP address) can be pinged.
The following example lights an LED while ``google.com`` is reachable::
from gpiozero import PingServer, LED
from signal import pause
google = PingServer('google.com')
led = LED(4)
google.when_activated = led.on
google.when_deactivated = led.off
pause()
:param str host:
The hostname or IP address to attempt to ping.
:type event_delay: float
:param event_delay:
The number of seconds between pings (defaults to 10 seconds).
:type pin_factory: Factory or None
:param pin_factory:
See :doc:`api_pins` for more information (this is an advanced feature
which most users can ignore).
"""
def __init__(self, host, *, event_delay=10.0, pin_factory=None):
self._host = host
super().__init__(event_delay=event_delay, pin_factory=pin_factory)
self._fire_events(self.pin_factory.ticks(), self.is_active)
def __repr__(self):
try:
self._check_open()
return f'<gpiozero.PingServer object host="{self.host}">'
except DeviceClosed:
return super().__repr__()
@property
def host(self):
"""
The hostname or IP address to test whenever :attr:`value` is queried.
"""
return self._host
@property
def value(self):
"""
Returns :data:`1` if the host returned a single ping, and :data:`0`
otherwise.
"""
# XXX This is doing a DNS lookup every time it's queried; should we
# call gethostbyname in the constructor and ping that instead (good
# for consistency, but what if the user *expects* the host to change
# address?)
with io.open(os.devnull, 'wb') as devnull:
try:
subprocess.check_call(
['ping', '-c1', self.host],
stdout=devnull, stderr=devnull)
except subprocess.CalledProcessError:
return 0
else:
return 1
when_activated = event(
"""
The function to run when the device changes state from inactive
(host unresponsive) to active (host responsive).
This can be set to a function which accepts no (mandatory)
parameters, or a Python function which accepts a single mandatory
parameter (with as many optional parameters as you like). If the
function accepts a single mandatory parameter, the device that
activated it will be passed as that parameter.
Set this property to ``None`` (the default) to disable the event.
""")
when_deactivated = event(
"""
The function to run when the device changes state from inactive
(host responsive) to active (host unresponsive).
This can be set to a function which accepts no (mandatory)
parameters, or a Python function which accepts a single mandatory
parameter (with as many optional parameters as you like). If the
function accepts a single mandatory parameter, the device that
activated it will be passed as that parameter.
Set this property to ``None`` (the default) to disable the event.
""")
[docs]
class CPUTemperature(PolledInternalDevice):
"""
Extends :class:`PolledInternalDevice` to provide a device which is active
when the CPU temperature exceeds the *threshold* value.
The following example plots the CPU's temperature on an LED bar graph::
from gpiozero import LEDBarGraph, CPUTemperature
from signal import pause
# Use minimums and maximums that are closer to "normal" usage so the
# bar graph is a bit more "lively"
cpu = CPUTemperature(min_temp=50, max_temp=90)
print(f'Initial temperature: {cpu.temperature}C')
graph = LEDBarGraph(5, 6, 13, 19, 25, pwm=True)
graph.source = cpu
pause()
:param str sensor_file:
The file from which to read the temperature. This defaults to the
sysfs file :file:`/sys/class/thermal/thermal_zone0/temp`. Whatever
file is specified is expected to contain a single line containing the
temperature in milli-degrees celsius.
:param float min_temp:
The temperature at which :attr:`value` will read 0.0. This defaults to
0.0.
:param float max_temp:
The temperature at which :attr:`value` will read 1.0. This defaults to
100.0.
:param float threshold:
The temperature above which the device will be considered "active".
(see :attr:`is_active`). This defaults to 80.0.
:type event_delay: float
:param event_delay:
The number of seconds between file reads (defaults to 5 seconds).
:type pin_factory: Factory or None
:param pin_factory:
See :doc:`api_pins` for more information (this is an advanced feature
which most users can ignore).
"""
def __init__(self, sensor_file='/sys/class/thermal/thermal_zone0/temp', *,
min_temp=0.0, max_temp=100.0, threshold=80.0, event_delay=5.0,
pin_factory=None):
self.sensor_file = sensor_file
super().__init__(event_delay=event_delay, pin_factory=pin_factory)
try:
if min_temp >= max_temp:
raise ValueError('max_temp must be greater than min_temp')
self.min_temp = min_temp
self.max_temp = max_temp
if not min_temp <= threshold <= max_temp:
warnings.warn(ThresholdOutOfRange(
'threshold is outside of the range (min_temp, max_temp)'))
self.threshold = threshold
self._fire_events(self.pin_factory.ticks(), self.is_active)
except:
self.close()
raise
def __repr__(self):
try:
self._check_open()
return (
f'<gpiozero.{self.__class__.__name__} object '
f'temperature={self.temperature:.2f}>')
except DeviceClosed:
return super().__repr__()
@property
def temperature(self):
"""
Returns the current CPU temperature in degrees celsius.
"""
with io.open(self.sensor_file, 'r') as f:
return float(f.read().strip()) / 1000
@property
def value(self):
"""
Returns the current CPU temperature as a value between 0.0
(representing the *min_temp* value) and 1.0 (representing the
*max_temp* value). These default to 0.0 and 100.0 respectively, hence
:attr:`value` is :attr:`temperature` divided by 100 by default.
"""
temp_range = self.max_temp - self.min_temp
return (self.temperature - self.min_temp) / temp_range
@property
def is_active(self):
"""
Returns :data:`True` when the CPU :attr:`temperature` exceeds the
*threshold*.
"""
return self.temperature > self.threshold
when_activated = event(
"""
The function to run when the device changes state from inactive to
active (temperature reaches *threshold*).
This can be set to a function which accepts no (mandatory)
parameters, or a Python function which accepts a single mandatory
parameter (with as many optional parameters as you like). If the
function accepts a single mandatory parameter, the device that
activated it will be passed as that parameter.
Set this property to ``None`` (the default) to disable the event.
""")
when_deactivated = event(
"""
The function to run when the device changes state from active to
inactive (temperature drops below *threshold*).
This can be set to a function which accepts no (mandatory)
parameters, or a Python function which accepts a single mandatory
parameter (with as many optional parameters as you like). If the
function accepts a single mandatory parameter, the device that
activated it will be passed as that parameter.
Set this property to ``None`` (the default) to disable the event.
""")
[docs]
class LoadAverage(PolledInternalDevice):
"""
Extends :class:`PolledInternalDevice` to provide a device which is active
when the CPU load average exceeds the *threshold* value.
The following example plots the load average on an LED bar graph::
from gpiozero import LEDBarGraph, LoadAverage
from signal import pause
la = LoadAverage(min_load_average=0, max_load_average=2)
graph = LEDBarGraph(5, 6, 13, 19, 25, pwm=True)
graph.source = la
pause()
:param str load_average_file:
The file from which to read the load average. This defaults to the
proc file :file:`/proc/loadavg`. Whatever file is specified is expected
to contain three space-separated load averages at the beginning of the
file, representing 1 minute, 5 minute and 15 minute averages
respectively.
:param float min_load_average:
The load average at which :attr:`value` will read 0.0. This defaults to
0.0.
:param float max_load_average:
The load average at which :attr:`value` will read 1.0. This defaults to
1.0.
:param float threshold:
The load average above which the device will be considered "active".
(see :attr:`is_active`). This defaults to 0.8.
:param int minutes:
The number of minutes over which to average the load. Must be 1, 5 or
15. This defaults to 5.
:type event_delay: float
:param event_delay:
The number of seconds between file reads (defaults to 10 seconds).
:type pin_factory: Factory or None
:param pin_factory:
See :doc:`api_pins` for more information (this is an advanced feature
which most users can ignore).
"""
def __init__(self, load_average_file='/proc/loadavg', *,
min_load_average=0.0, max_load_average=1.0, threshold=0.8,
minutes=5, event_delay=10.0, pin_factory=None):
if min_load_average >= max_load_average:
raise ValueError(
'max_load_average must be greater than min_load_average')
self.load_average_file = load_average_file
self.min_load_average = min_load_average
self.max_load_average = max_load_average
if not min_load_average <= threshold <= max_load_average:
warnings.warn(ThresholdOutOfRange(
'threshold is outside of the range (min_load_average, '
'max_load_average)'))
self.threshold = threshold
if minutes not in (1, 5, 15):
raise ValueError('minutes must be 1, 5 or 15')
self._load_average_file_column = {
1: 0,
5: 1,
15: 2,
}[minutes]
super().__init__(event_delay=event_delay, pin_factory=pin_factory)
self._fire_events(self.pin_factory.ticks(), None)
def __repr__(self):
try:
self._check_open()
return (
f'<gpiozero.{self.__class__.__name__} object '
f'load average={self.load_average:.2f}>')
except DeviceClosed:
return super().__repr__()
@property
def load_average(self):
"""
Returns the current load average.
"""
with io.open(self.load_average_file, 'r') as f:
file_columns = f.read().strip().split()
return float(file_columns[self._load_average_file_column])
@property
def value(self):
"""
Returns the current load average as a value between 0.0 (representing
the *min_load_average* value) and 1.0 (representing the
*max_load_average* value). These default to 0.0 and 1.0 respectively.
"""
load_average_range = self.max_load_average - self.min_load_average
return (self.load_average - self.min_load_average) / load_average_range
@property
def is_active(self):
"""
Returns :data:`True` when the :attr:`load_average` exceeds the
*threshold*.
"""
return self.load_average > self.threshold
when_activated = event(
"""
The function to run when the device changes state from inactive to
active (load average reaches *threshold*).
This can be set to a function which accepts no (mandatory)
parameters, or a Python function which accepts a single mandatory
parameter (with as many optional parameters as you like). If the
function accepts a single mandatory parameter, the device that
activated it will be passed as that parameter.
Set this property to ``None`` (the default) to disable the event.
""")
when_deactivated = event(
"""
The function to run when the device changes state from active to
inactive (load average drops below *threshold*).
This can be set to a function which accepts no (mandatory)
parameters, or a Python function which accepts a single mandatory
parameter (with as many optional parameters as you like). If the
function accepts a single mandatory parameter, the device that
activated it will be passed as that parameter.
Set this property to ``None`` (the default) to disable the event.
""")
[docs]
class TimeOfDay(PolledInternalDevice):
"""
Extends :class:`PolledInternalDevice` to provide a device which is active
when the computer's clock indicates that the current time is between
*start_time* and *end_time* (inclusive) which are :class:`~datetime.time`
instances.
The following example turns on a lamp attached to an :class:`Energenie`
plug between 07:00AM and 08:00AM::
from gpiozero import TimeOfDay, Energenie
from datetime import time
from signal import pause
lamp = Energenie(1)
morning = TimeOfDay(time(7), time(8))
morning.when_activated = lamp.on
morning.when_deactivated = lamp.off
pause()
Note that *start_time* may be greater than *end_time*, indicating a time
period which crosses midnight.
:param ~datetime.time start_time:
The time from which the device will be considered active.
:param ~datetime.time end_time:
The time after which the device will be considered inactive.
:param bool utc:
If :data:`True` (the default), a naive UTC time will be used for the
comparison rather than a local time-zone reading.
:type event_delay: float
:param event_delay:
The number of seconds between file reads (defaults to 10 seconds).
:type pin_factory: Factory or None
:param pin_factory:
See :doc:`api_pins` for more information (this is an advanced feature
which most users can ignore).
"""
def __init__(self, start_time, end_time, *, utc=True, event_delay=5.0,
pin_factory=None):
self._start_time = None
self._end_time = None
self._utc = True
super().__init__(event_delay=event_delay, pin_factory=pin_factory)
try:
self._start_time = self._validate_time(start_time)
self._end_time = self._validate_time(end_time)
if self.start_time == self.end_time:
raise ValueError('end_time cannot equal start_time')
self._utc = utc
self._fire_events(self.pin_factory.ticks(), self.is_active)
except:
self.close()
raise
def __repr__(self):
try:
self._check_open()
return (
f'<gpiozero.{self.__class__.__name__} object active between '
f'{self.start_time} and {self.end_time} '
f'{("local", "UTC")[self.utc]}>')
except DeviceClosed:
return super().__repr__()
def _validate_time(self, value):
if isinstance(value, datetime):
value = value.time()
if not isinstance(value, time):
raise ValueError(
'start_time and end_time must be a datetime, or time instance')
return value
@property
def start_time(self):
"""
The time of day after which the device will be considered active.
"""
return self._start_time
@property
def end_time(self):
"""
The time of day after which the device will be considered inactive.
"""
return self._end_time
@property
def utc(self):
"""
If :data:`True`, use a naive UTC time reading for comparison instead of
a local timezone reading.
"""
return self._utc
@property
def value(self):
"""
Returns :data:`1` when the system clock reads between :attr:`start_time`
and :attr:`end_time`, and :data:`0` otherwise. If :attr:`start_time` is
greater than :attr:`end_time` (indicating a period that crosses
midnight), then this returns :data:`1` when the current time is
greater than :attr:`start_time` or less than :attr:`end_time`.
"""
now = datetime.utcnow().time() if self.utc else datetime.now().time()
if self.start_time < self.end_time:
return int(self.start_time <= now <= self.end_time)
else:
return int(not self.end_time < now < self.start_time)
when_activated = event(
"""
The function to run when the device changes state from inactive to
active (time reaches *start_time*).
This can be set to a function which accepts no (mandatory)
parameters, or a Python function which accepts a single mandatory
parameter (with as many optional parameters as you like). If the
function accepts a single mandatory parameter, the device that
activated it will be passed as that parameter.
Set this property to ``None`` (the default) to disable the event.
""")
when_deactivated = event(
"""
The function to run when the device changes state from active to
inactive (time reaches *end_time*).
This can be set to a function which accepts no (mandatory)
parameters, or a Python function which accepts a single mandatory
parameter (with as many optional parameters as you like). If the
function accepts a single mandatory parameter, the device that
activated it will be passed as that parameter.
Set this property to ``None`` (the default) to disable the event.
""")
[docs]
class DiskUsage(PolledInternalDevice):
"""
Extends :class:`PolledInternalDevice` to provide a device which is active
when the disk space used exceeds the *threshold* value.
The following example plots the disk usage on an LED bar graph::
from gpiozero import LEDBarGraph, DiskUsage
from signal import pause
disk = DiskUsage()
print(f'Current disk usage: {disk.usage}%')
graph = LEDBarGraph(5, 6, 13, 19, 25, pwm=True)
graph.source = disk
pause()
:param str filesystem:
A path within the filesystem for which the disk usage needs to be
computed. This defaults to :file:`/`, which is the root filesystem.
:param float threshold:
The disk usage percentage above which the device will be considered
"active" (see :attr:`is_active`). This defaults to 90.0.
:type event_delay: float
:param event_delay:
The number of seconds between file reads (defaults to 30 seconds).
:type pin_factory: Factory or None
:param pin_factory:
See :doc:`api_pins` for more information (this is an advanced feature
which most users can ignore).
"""
def __init__(self, filesystem='/', *, threshold=90.0, event_delay=30.0,
pin_factory=None):
super().__init__(
event_delay=event_delay, pin_factory=pin_factory)
os.statvfs(filesystem)
if not 0 <= threshold <= 100:
warnings.warn(ThresholdOutOfRange(
'threshold is outside of the range (0, 100)'))
self.filesystem = filesystem
self.threshold = threshold
self._fire_events(self.pin_factory.ticks(), None)
def __repr__(self):
try:
self._check_open()
return (
f'<gpiozero.{self.__class__.__name__} object '
f'usage={self.usage:.2f}>')
except DeviceClosed:
return super().__repr__()
@property
def usage(self):
"""
Returns the current disk usage in percentage.
"""
return self.value * 100
@property
def value(self):
"""
Returns the current disk usage as a value between 0.0 and 1.0 by
dividing :attr:`usage` by 100.
"""
# This slightly convoluted calculation is equivalent to df's "Use%";
# it calculates the percentage of FS usage as a proportion of the
# space available to *non-root users*. Technically this means it can
# exceed 100% (when FS is filled to the point that only root can write
# to it), hence the clamp.
vfs = os.statvfs(self.filesystem)
used = vfs.f_blocks - vfs.f_bfree
total = used + vfs.f_bavail
return min(1.0, used / total)
@property
def is_active(self):
"""
Returns :data:`True` when the disk :attr:`usage` exceeds the
*threshold*.
"""
return self.usage > self.threshold
when_activated = event(
"""
The function to run when the device changes state from inactive to
active (disk usage reaches *threshold*).
This can be set to a function which accepts no (mandatory)
parameters, or a Python function which accepts a single mandatory
parameter (with as many optional parameters as you like). If the
function accepts a single mandatory parameter, the device that
activated it will be passed as that parameter.
Set this property to ``None`` (the default) to disable the event.
""")
when_deactivated = event(
"""
The function to run when the device changes state from active to
inactive (disk usage drops below *threshold*).
This can be set to a function which accepts no (mandatory)
parameters, or a Python function which accepts a single mandatory
parameter (with as many optional parameters as you like). If the
function accepts a single mandatory parameter, the device that
activated it will be passed as that parameter.
Set this property to ``None`` (the default) to disable the event.
""")