emdbg.power.yocto

  1# Copyright (c) 2023, Auterion AG
  2# SPDX-License-Identifier: BSD-3-Clause
  3
  4import time
  5from yoctopuce.yocto_api import YAPI, YRefParam, YModule
  6from yoctopuce.yocto_relay import YRelay
  7from contextlib import contextmanager
  8from .base import Base
  9import logging
 10LOGGER = logging.getLogger("power:yocto")
 11YOCTO_HUB_COUNT = 0
 12
 13
 14class YoctopuceException(Exception):
 15    """Base exception for Yoctopuce-related errors."""
 16    def __init__(self, msg, context=None):
 17        """:param context: Contains the `yoctopuce.yocto_api.YRefParam` error message"""
 18        Exception.__init__(self, msg)
 19        self.context = context
 20
 21
 22@contextmanager
 23def _yoctopuce_hub(location=None):
 24    """
 25    Registers the hub and frees the YAPI. Can be called multiple times.
 26
 27    :raises `YoctopuceException`: if hub cannot be registered.
 28    """
 29    global YOCTO_HUB_COUNT
 30    try:
 31        LOGGER.info("Starting...")
 32        errmsg = YRefParam()
 33        if YAPI.RegisterHub(location or "usb", errmsg) != YAPI.SUCCESS:
 34            raise YoctopuceException("Yoctopuce RegisterHub failed!", errmsg)
 35        YOCTO_HUB_COUNT += 1
 36        yield
 37    finally:
 38        LOGGER.debug("Stopping.")
 39        if YOCTO_HUB_COUNT > 0:
 40            YOCTO_HUB_COUNT -= 1
 41        if YOCTO_HUB_COUNT == 0:
 42            YAPI.FreeAPI()
 43
 44
 45@contextmanager
 46def relay(channel: int = 0, inverted: bool = False, relay=None, location=None):
 47    """
 48    Context manager for starting and stopping the Yocto API and instantiating a
 49    `Relay` object.
 50
 51    :param channel: Relays with multiple channels get their own object per channel.
 52    :param inverted: Inverts the output state of the relay channel.
 53    :param relay: Pass an existing `yoctopuce.relay.YRelay` object or
 54                  find the first available one.
 55    :param location: Location of Yoctopuce Hub or automatically find it.
 56
 57    :raises `YoctopuceException`: if hub cannot be registered, relay is not
 58                                  connected, not online or cannot be found.
 59
 60    :return: a `Relay` object.
 61    """
 62    with _yoctopuce_hub(location):
 63        yield Relay(channel, inverted, relay)
 64
 65
 66def _first_yoctopuce_relay(channel=0):
 67    """Finds the right channel"""
 68    relay = YRelay.FirstRelay()
 69    if relay is None:
 70        raise YoctopuceException("No relay connected!")
 71    if not relay.isOnline():
 72        raise YoctopuceException("Relay is not online!", relay)
 73
 74    serial = relay.get_module().get_serialNumber()
 75    LOGGER.debug(f"Using first found relay '{serial}'")
 76    return YRelay.FindRelay(f"{serial}.relay{channel}")
 77
 78
 79class Relay(Base):
 80    """
 81    Light Wrapper around the [Yocto-Relay API][yp].
 82    Each channel is accessed by its own object.
 83
 84    [yp]: https://www.yoctopuce.com/EN/products/yocto-relay/doc/RELAYLO1.usermanual.html
 85    """
 86    def __init__(self, channel: int = 0, inverted: bool = False, relay=None):
 87        """
 88        :param channel: Relays with multiple channels get their own object per channel.
 89        :param inverted: Inverts the output state of the relay channel.
 90        :param relay: Pass an existing `yoctopuce.relay.YRelay` object or
 91                      find the first available one.
 92
 93        :raises `YoctopuceException`: if relay is not connected, not online or cannot be found.
 94        """
 95        super().__init__()
 96        self.relay = relay or _first_yoctopuce_relay(channel)
 97        self.channel = channel
 98        self._map = (YRelay.STATE_B, YRelay.STATE_A) if inverted else (YRelay.STATE_A, YRelay.STATE_B)
 99        self._on = lambda: self.relay.set_state(self._map[0])
100        self._off = lambda: self.relay.set_state(self._map[1])
101
102
103if __name__ == "__main__":
104    import emdbg
105    import argparse
106
107    parser = argparse.ArgumentParser(
108        description="Have you tried turning it off and on again?")
109    parser.add_argument(
110        "command",
111        choices=["on", "off", "cycle"],
112        help="Turn on and off.")
113    parser.add_argument(
114        "--channel",
115        type=int,
116        default=0,
117        help="The channel of the relay.")
118    parser.add_argument(
119        "--inverted",
120        action="store_true",
121        default=False,
122        help="Channel is inverted.")
123    parser.add_argument(
124        "--location",
125        help="Location of Yoctopuce hub.")
126    parser.add_argument(
127        "--time",
128        type=int,
129        default=1,
130        help="Location of Yoctopuce hub.")
131    parser.add_argument(
132        "-v",
133        dest="verbosity",
134        action="count",
135        help="Verbosity level.")
136    args = parser.parse_args()
137    emdbg.logger.configure(args.verbosity)
138
139    with relay(args.channel, args.inverted, args.location) as pwr:
140        if args.command == "on":
141            pwr.on(args.time)
142        elif args.command == "off":
143            pwr.off(args.time)
144        elif args.command == "cycle":
145            pwr.cycle(args.time, args.time)
LOGGER = <Logger power:yocto (WARNING)>
YOCTO_HUB_COUNT = 0
class YoctopuceException(builtins.Exception):
15class YoctopuceException(Exception):
16    """Base exception for Yoctopuce-related errors."""
17    def __init__(self, msg, context=None):
18        """:param context: Contains the `yoctopuce.yocto_api.YRefParam` error message"""
19        Exception.__init__(self, msg)
20        self.context = context

Base exception for Yoctopuce-related errors.

YoctopuceException(msg, context=None)
17    def __init__(self, msg, context=None):
18        """:param context: Contains the `yoctopuce.yocto_api.YRefParam` error message"""
19        Exception.__init__(self, msg)
20        self.context = context
Parameters
  • context: Contains the yoctopuce.yocto_api.YRefParam error message
context
@contextmanager
def relay(channel: int = 0, inverted: bool = False, relay=None, location=None):
46@contextmanager
47def relay(channel: int = 0, inverted: bool = False, relay=None, location=None):
48    """
49    Context manager for starting and stopping the Yocto API and instantiating a
50    `Relay` object.
51
52    :param channel: Relays with multiple channels get their own object per channel.
53    :param inverted: Inverts the output state of the relay channel.
54    :param relay: Pass an existing `yoctopuce.relay.YRelay` object or
55                  find the first available one.
56    :param location: Location of Yoctopuce Hub or automatically find it.
57
58    :raises `YoctopuceException`: if hub cannot be registered, relay is not
59                                  connected, not online or cannot be found.
60
61    :return: a `Relay` object.
62    """
63    with _yoctopuce_hub(location):
64        yield Relay(channel, inverted, relay)

Context manager for starting and stopping the Yocto API and instantiating a Relay object.

Parameters
  • channel: Relays with multiple channels get their own object per channel.
  • inverted: Inverts the output state of the relay channel.
  • relay: Pass an existing yoctopuce.relay.YRelay object or find the first available one.
  • location: Location of Yoctopuce Hub or automatically find it.
Raises
  • YoctopuceException: if hub cannot be registered, relay is not connected, not online or cannot be found.
Returns

a Relay object.

class Relay(emdbg.power.base.Base):
 80class Relay(Base):
 81    """
 82    Light Wrapper around the [Yocto-Relay API][yp].
 83    Each channel is accessed by its own object.
 84
 85    [yp]: https://www.yoctopuce.com/EN/products/yocto-relay/doc/RELAYLO1.usermanual.html
 86    """
 87    def __init__(self, channel: int = 0, inverted: bool = False, relay=None):
 88        """
 89        :param channel: Relays with multiple channels get their own object per channel.
 90        :param inverted: Inverts the output state of the relay channel.
 91        :param relay: Pass an existing `yoctopuce.relay.YRelay` object or
 92                      find the first available one.
 93
 94        :raises `YoctopuceException`: if relay is not connected, not online or cannot be found.
 95        """
 96        super().__init__()
 97        self.relay = relay or _first_yoctopuce_relay(channel)
 98        self.channel = channel
 99        self._map = (YRelay.STATE_B, YRelay.STATE_A) if inverted else (YRelay.STATE_A, YRelay.STATE_B)
100        self._on = lambda: self.relay.set_state(self._map[0])
101        self._off = lambda: self.relay.set_state(self._map[1])

Light Wrapper around the Yocto-Relay API. Each channel is accessed by its own object.

Relay(channel: int = 0, inverted: bool = False, relay=None)
 87    def __init__(self, channel: int = 0, inverted: bool = False, relay=None):
 88        """
 89        :param channel: Relays with multiple channels get their own object per channel.
 90        :param inverted: Inverts the output state of the relay channel.
 91        :param relay: Pass an existing `yoctopuce.relay.YRelay` object or
 92                      find the first available one.
 93
 94        :raises `YoctopuceException`: if relay is not connected, not online or cannot be found.
 95        """
 96        super().__init__()
 97        self.relay = relay or _first_yoctopuce_relay(channel)
 98        self.channel = channel
 99        self._map = (YRelay.STATE_B, YRelay.STATE_A) if inverted else (YRelay.STATE_A, YRelay.STATE_B)
100        self._on = lambda: self.relay.set_state(self._map[0])
101        self._off = lambda: self.relay.set_state(self._map[1])
Parameters
  • channel: Relays with multiple channels get their own object per channel.
  • inverted: Inverts the output state of the relay channel.
  • relay: Pass an existing yoctopuce.relay.YRelay object or find the first available one.
Raises
relay
channel
Inherited Members
emdbg.power.base.Base
on
off
cycle