from importlib import import_module
import attr
from ..factory import target_factory
from ..protocol import DigitalOutputProtocol
from ..util.proxy import proxymanager
from .common import Driver
from .exception import ExecutionError
from ..step import step
[docs]
@target_factory.reg_driver
@attr.s(eq=False)
class LXAIOBusPIODriver(Driver, DigitalOutputProtocol):
bindings = {
"pio": {"LXAIOBusPIO", "NetworkLXAIOBusPIO"},
}
[docs]
def __attrs_post_init__(self):
super().__attrs_post_init__()
self._requests = import_module("requests")
[docs]
def on_activate(self):
# we can only forward if the backend knows which port to use
host, port = proxymanager.get_host_and_port(self.pio)
self._url = f"http://{host}:{port}/nodes/{self.pio.node}/pins/{self.pio.pin}/"
[docs]
@Driver.check_active
@step(args=["status"])
def set(self, status):
if self.pio.invert:
status = not status
r = self._requests.post(self._url, data={"value": "1" if status else "0"})
r.raise_for_status()
j = r.json()
if j["code"] != 0:
raise ExecutionError(f"failed to set value: {j['error_message']}")
[docs]
@Driver.check_active
@step(result=["True"])
def get(self):
r = self._requests.get(self._url)
r.raise_for_status()
j = r.json()
if j["code"] != 0:
raise ExecutionError(f"failed to get value: {j['error_message']}")
result = j["result"]
if result not in (0, 1):
raise ExecutionError(f"invalid input value: {repr(result)}")
status = bool(result)
if self.pio.invert:
status = not status
return status