Source code for gpiozero.pins.native

# GPIO Zero: a library for controlling the Raspberry Pi's GPIO pins
# Copyright (c) 2015-2019 Dave Jones <dave@waveform.org.uk>
# Copyright (c) 2016 Andrew Scheller <github@loowis.durge.org>
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice,
#   this list of conditions and the following disclaimer.
#
# * Redistributions in binary form must reproduce the above copyright notice,
#   this list of conditions and the following disclaimer in the documentation
#   and/or other materials provided with the distribution.
#
# * Neither the name of the copyright holder nor the names of its contributors
#   may be used to endorse or promote products derived from this software
#   without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.

from __future__ import (
    unicode_literals,
    absolute_import,
    print_function,
    division,
    )
nstr = str
str = type('')

import io
import os
import mmap
import errno
import struct
import select
import warnings
from time import sleep
from threading import Thread, Event, RLock
from collections import Counter
try:
    from queue import Queue, Empty
except ImportError:
    from Queue import Queue, Empty

from .local import LocalPiPin, LocalPiFactory
from ..exc import (
    PinInvalidPull,
    PinInvalidEdges,
    PinInvalidFunction,
    PinFixedPull,
    PinSetInput,
    )


class GPIOMemory(object):

    GPIO_BASE_OFFSET = 0x200000
    PERI_BASE_OFFSET = {
        'BCM2835': 0x20000000,
        'BCM2836': 0x3f000000,
        'BCM2837': 0x3f000000,
    }

    # From BCM2835 data-sheet, p.91
    GPFSEL_OFFSET   = 0x00 >> 2
    GPSET_OFFSET    = 0x1c >> 2
    GPCLR_OFFSET    = 0x28 >> 2
    GPLEV_OFFSET    = 0x34 >> 2
    GPEDS_OFFSET    = 0x40 >> 2
    GPREN_OFFSET    = 0x4c >> 2
    GPFEN_OFFSET    = 0x58 >> 2
    GPHEN_OFFSET    = 0x64 >> 2
    GPLEN_OFFSET    = 0x70 >> 2
    GPAREN_OFFSET   = 0x7c >> 2
    GPAFEN_OFFSET   = 0x88 >> 2
    GPPUD_OFFSET    = 0x94 >> 2
    GPPUDCLK_OFFSET = 0x98 >> 2

    def __init__(self, soc):
        try:
            self.fd = os.open('/dev/gpiomem', os.O_RDWR | os.O_SYNC)
        except OSError:
            try:
                self.fd = os.open('/dev/mem', os.O_RDWR | os.O_SYNC)
            except OSError:
                raise IOError(
                    'unable to open /dev/gpiomem or /dev/mem; '
                    'upgrade your kernel or run as root')
            else:
                offset = self.peripheral_base(soc) + self.GPIO_BASE_OFFSET
        else:
            offset = 0
        self.mem = mmap.mmap(self.fd, 4096, offset=offset)

    def close(self):
        self.mem.close()
        os.close(self.fd)

    def peripheral_base(self, soc):
        try:
            with io.open('/proc/device-tree/soc/ranges', 'rb') as f:
                f.seek(4)
                # This is deliberately a big-endian read
                return struct.unpack(nstr('>L'), f.read(4))[0]
        except IOError:
            try:
                return self.PERI_BASE_OFFSET[soc]
            except KeyError:
                pass
        raise IOError('unable to determine peripheral base')

    def __getitem__(self, index):
        return struct.unpack_from(nstr('<L'), self.mem, index * 4)[0]

    def __setitem__(self, index, value):
        struct.pack_into(nstr('<L'), self.mem, index * 4, value)


class GPIOFS(object):

    GPIO_PATH = '/sys/class/gpio'

    def __init__(self, factory, queue):
        self._lock = RLock()
        self._exports = {}
        self._thread = NativeWatchThread(factory, queue)

    def close(self):
        # We *could* track the stuff we've exported and unexport it here, but
        # exports are a system global resource. We can't guarantee that some
        # other process isn't relying on something we've exported. In other
        # words, once exported it's *never* safe to unexport something. The
        # unexport method below is largely provided for debugging and testing.
        if self._thread is not None:
            self._thread.close()
            self._thread = None

    def path(self, name):
        return os.path.join(self.GPIO_PATH, name)

    def path_value(self, pin):
        return self.path('gpio%d/value' % pin)

    def path_dir(self, pin):
        return self.path('gpio%d/direction' % pin)

    def path_edge(self, pin):
        return self.path('gpio%d/edge' % pin)

    def exported(self, pin):
        return pin in self._exports

    def export(self, pin):
        with self._lock:
            try:
                result = self._exports[pin]
            except KeyError:
                result = None
                # Dirty hack to wait for udev to set permissions on
                # gpioN/value; there's no other way around this as there's no
                # synchronous mechanism for setting permissions on sysfs
                for i in range(10):
                    try:
                        # Must be O_NONBLOCK for use with epoll in edge
                        # triggered mode
                        result = os.open(self.path_value(pin),
                                         os.O_RDONLY | os.O_NONBLOCK)
                    except IOError as e:
                        if e.errno == errno.ENOENT:
                            with io.open(self.path('export'), 'w+b') as f:
                                f.write(str(pin).encode('ascii'))
                        elif e.errno == errno.EACCES:
                            sleep(i / 100)
                        else:
                            raise
                    else:
                        self._exports[pin] = result
                        break
                # Same for gpioN/edge. It must exist by this point but the
                # chmod -R may not have reached it yet...
                for i in range(10):
                    try:
                        with io.open(self.path_edge(pin), 'w+b'):
                            pass
                    except IOError as e:
                        if e.errno == errno.EACCES:
                            sleep(i / 100)
                        else:
                            raise
                if result is None:
                    raise RuntimeError('failed to export pin %d' % pin)
            return result

    def unexport(self, pin):
        with self._lock:
            try:
                os.close(self._exports.pop(pin))
            except KeyError:
                # unexport should be idempotent
                pass
            else:
                try:
                    with io.open(self.path('unexport'), 'w+b') as f:
                        f.write(str(pin).encode('ascii'))
                except IOError as e:
                    if e.errno == errno.EINVAL:
                        # Someone already unexported it; ignore the error
                        pass

    def watch(self, pin):
        with self._lock:
            self._thread.watch(self.export(pin), pin)

    def unwatch(self, pin):
        with self._lock:
            try:
                self._thread.unwatch(self._exports[pin])
            except KeyError:
                pass


class NativeWatchThread(Thread):
    def __init__(self, factory, queue):
        super(NativeWatchThread, self).__init__(
            target=self._run, args=(factory, queue))
        self.daemon = True
        self._stop_evt = Event()
        # XXX Make this compatible with BSDs with poll() option?
        self._epoll = select.epoll()
        self._watches = {}
        self.start()

    def close(self):
        self._stop_evt.set()
        self.join()
        self._epoll.close()

    def watch(self, fd, pin):
        self._watches[fd] = pin
        flags = select.EPOLLIN | select.EPOLLPRI | select.EPOLLET
        self._epoll.register(fd, flags)

    def unwatch(self, fd):
        self._epoll.unregister(fd)
        fd = self._watches.pop(fd, None)

    def _run(self, factory, queue):
        ticks = factory.ticks
        while not self._stop_evt.wait(0):
            for fd, event in self._epoll.poll(0.01):
                when = ticks()
                state = os.read(fd, 1) == b'1'
                os.lseek(fd, 0, 0)
                try:
                    queue.put((self._watches[fd], when, state))
                except KeyError:
                    pass


class NativeDispatchThread(Thread):
    def __init__(self, factory, queue):
        super(NativeDispatchThread, self).__init__(
            target=self._run, args=(factory, queue))
        self.daemon = True
        self._stop_evt = Event()
        self.start()

    def close(self):
        self._stop_evt.set()
        self.join()

    def _run(self, factory, queue):
        pins = factory.pins
        while not self._stop_evt.wait(0):
            try:
                num, ticks, state = queue.get(timeout=0.1)
            except Empty:
                continue
            try:
                pin = pins[num]
            except KeyError:
                pass
            else:
                if (
                        pin._bounce is None or pin._last_call is None or
                        factory.ticks_diff(ticks, pin._last_call) > pin._bounce
                ):
                    pin._call_when_changed(ticks, state)
                    pin._last_call = ticks


[docs]class NativeFactory(LocalPiFactory): """ Extends :class:`~gpiozero.pins.local.LocalPiFactory`. Uses a built-in pure Python implementation to interface to the Pi's GPIO pins. This is the default pin implementation if no third-party libraries are discovered. .. warning:: This implementation does *not* currently support PWM. Attempting to use any class which requests PWM will raise an exception. You can construct native pin instances manually like so:: from gpiozero.pins.native import NativeFactory from gpiozero import LED factory = NativeFactory() led = LED(12, pin_factory=factory) """ def __init__(self): super(NativeFactory, self).__init__() queue = Queue() self.mem = GPIOMemory(self.pi_info.soc) self.fs = GPIOFS(self, queue) self.dispatch = NativeDispatchThread(self, queue) self.pin_class = NativePin def close(self): if self.dispatch is not None: self.dispatch.close() self.dispatch = None super(NativeFactory, self).close() if self.fs is not None: self.fs.close() self.fs = None if self.mem is not None: self.mem.close() self.mem = None
[docs]class NativePin(LocalPiPin): """ Extends :class:`~gpiozero.pins.local.LocalPiPin`. Native pin implementation. See :class:`NativeFactory` for more information. """ GPIO_FUNCTIONS = { 'input': 0b000, 'output': 0b001, 'alt0': 0b100, 'alt1': 0b101, 'alt2': 0b110, 'alt3': 0b111, 'alt4': 0b011, 'alt5': 0b010, } GPIO_PULL_UPS = { 'up': 0b10, 'down': 0b01, 'floating': 0b00, 'reserved': 0b11, } GPIO_FUNCTION_NAMES = {v: k for (k, v) in GPIO_FUNCTIONS.items()} GPIO_PULL_UP_NAMES = {v: k for (k, v) in GPIO_PULL_UPS.items()} def __init__(self, factory, number): super(NativePin, self).__init__(factory, number) self._func_offset = self.factory.mem.GPFSEL_OFFSET + (number // 10) self._func_shift = (number % 10) * 3 self._set_offset = self.factory.mem.GPSET_OFFSET + (number // 32) self._set_shift = number % 32 self._clear_offset = self.factory.mem.GPCLR_OFFSET + (number // 32) self._clear_shift = number % 32 self._level_offset = self.factory.mem.GPLEV_OFFSET + (number // 32) self._level_shift = number % 32 self._pull_offset = self.factory.mem.GPPUDCLK_OFFSET + (number // 32) self._pull_shift = number % 32 self._edge_offset = self.factory.mem.GPEDS_OFFSET + (number // 32) self._edge_shift = number % 32 self._rising_offset = self.factory.mem.GPREN_OFFSET + (number // 32) self._rising_shift = number % 32 self._falling_offset = self.factory.mem.GPFEN_OFFSET + (number // 32) self._falling_shift = number % 32 self._last_call = None self._when_changed = None self._change_thread = None self._change_event = Event() self.function = 'input' self._pull = 'floating' self.pull = 'up' if self.factory.pi_info.pulled_up(repr(self)) else 'floating' self.bounce = None self.edges = 'none' def close(self): self.edges = 'none' self.frequency = None self.when_changed = None self.function = 'input' self.pull = 'up' if self.factory.pi_info.pulled_up(repr(self)) else 'floating' def _get_function(self): return self.GPIO_FUNCTION_NAMES[(self.factory.mem[self._func_offset] >> self._func_shift) & 7] def _set_function(self, value): try: value = self.GPIO_FUNCTIONS[value] except KeyError: raise PinInvalidFunction('invalid function "%s" for pin %r' % (value, self)) self.factory.mem[self._func_offset] = ( self.factory.mem[self._func_offset] & ~(7 << self._func_shift) | (value << self._func_shift) ) def _get_state(self): return bool(self.factory.mem[self._level_offset] & (1 << self._level_shift)) def _set_state(self, value): if self.function == 'input': raise PinSetInput('cannot set state of pin %r' % self) if value: self.factory.mem[self._set_offset] = 1 << self._set_shift else: self.factory.mem[self._clear_offset] = 1 << self._clear_shift def _get_pull(self): return self.GPIO_PULL_UP_NAMES[self._pull] def _set_pull(self, value): if self.function != 'input': raise PinFixedPull('cannot set pull on non-input pin %r' % self) if value != 'up' and self.factory.pi_info.pulled_up(repr(self)): raise PinFixedPull('%r has a physical pull-up resistor' % self) try: value = self.GPIO_PULL_UPS[value] except KeyError: raise PinInvalidPull('invalid pull direction "%s" for pin %r' % (value, self)) self.factory.mem[self.factory.mem.GPPUD_OFFSET] = value sleep(0.000000214) self.factory.mem[self._pull_offset] = 1 << self._pull_shift sleep(0.000000214) self.factory.mem[self.factory.mem.GPPUD_OFFSET] = 0 self.factory.mem[self._pull_offset] = 0 self._pull = value def _get_bounce(self): return self._bounce def _set_bounce(self, value): self._bounce = None if value is None else float(value) def _get_edges(self): try: with io.open(self.factory.fs.path_edge(self.number), 'r') as f: return f.read().strip() except IOError as e: if e.errno == errno.ENOENT: return 'none' else: raise def _set_edges(self, value): if value != 'none': self.factory.fs.export(self.number) try: with io.open(self.factory.fs.path_edge(self.number), 'w') as f: f.write(value) except IOError as e: if e.errno == errno.ENOENT and value == 'none': pass elif e.errno == errno.EINVAL: raise PinInvalidEdges('invalid edge specification "%s" for pin %r' % self) else: raise def _enable_event_detect(self): self.factory.fs.watch(self.number) self._last_call = None def _disable_event_detect(self): self.factory.fs.unwatch(self.number)