"""
This module implements the communication protocol to switch the digital outputs
on the Deditec Relais8 board.
Supported modules:
- Relais8
Supported Functionality:
- Turn digital output on and off
"""
import logging
from binascii import unhexlify, hexlify
import usb.core
import usb.util
[docs]
class Relais8:
_padding = "FEFEFEFEFEFEFEFEFF"
_post_padding = "000000"
_configuration_sequence = [
"3430FF",
"3434FF",
"34C0FF",
"3400FF",
"3402FF",
"340CFF",
"340EFF",
"3404FF",
"3406FF",
"3408FF",
"340AFF",
"3410FF",
"3412FF",
"3414FF",
"3416FF",
"34F4FF",
"34F5FF",
"34F6FF",
"34F7FF",
"34F0FF",
"34ECFF",
]
[docs]
def __init__(self, **args):
self._dev = usb.core.find(**args)
if self._dev is None:
raise ValueError("Device not found")
if self._dev.is_kernel_driver_active(0):
self._dev.detach_kernel_driver(0)
cfg = self._dev.get_active_configuration()
intf = cfg[(0, 0)]
self._dev.set_configuration()
usb.util.claim_interface(self._dev, 0)
self._dev.ctrl_transfer(64, 0, 0, 0, 0, 5000)
self._dev.ctrl_transfer(64, 3, 20, 0, 0, 5000)
self._dev.ctrl_transfer(64, 9, 2, 0, 0, 5000)
self._ep_out = usb.util.find_descriptor(
intf, custom_match=lambda e: usb.util.endpoint_direction(e.bEndpointAddress) == usb.util.ENDPOINT_OUT
)
self._ep_in = usb.util.find_descriptor(
intf, custom_match=lambda e: usb.util.endpoint_direction(e.bEndpointAddress) == usb.util.ENDPOINT_IN
)
self._sequence_number = 0
self._logger = logging.getLogger("Device: ")
self._run_configuration_sequence()
def _run_configuration_sequence(self):
for d in Relais8._configuration_sequence:
self.write_config_request(d)
self.read(255)
[docs]
def read(self, size):
data = self._ep_in.read(size)
self._logger.debug("Read data: %s", hexlify(data))
return data
[docs]
def write_config_request(self, address):
complete_request = unhexlify(Relais8._padding)
complete_request += bytes([self._sequence_number])
complete_request += unhexlify(address + Relais8._post_padding)
self._logger.debug("Sending Request: %s", hexlify(complete_request))
self.write(complete_request)
[docs]
def write(self, data):
self._sequence_number = (self._sequence_number + 1) & 0xFF
self._logger.debug("Writing data: %s", hexlify(data))
self._ep_out.write(data)
[docs]
def set_all_outputs(self, status, reset=False):
assert isinstance(status, int)
if status == 0:
reset = True
if status > 255 or status < 0:
return
data = f"0123{0 if reset else 8}000000001{status:02X}00000000000000"
self.write(unhexlify(Relais8._padding + data))
self.read(255)
[docs]
def set_output(self, number, status):
if status:
data = f"238000000001{1 << number - 1:02X}00000000000000"
else:
data = f"23A000000001{1 << number - 1:02X}00000000000000"
complete_request = unhexlify(Relais8._padding)
complete_request += bytes([self._sequence_number])
complete_request += unhexlify(data)
self.write(complete_request)
self.read(255)
[docs]
def get_output(self, number):
data = "34000000000F"
complete_request = unhexlify(Relais8._padding)
complete_request += bytes([self._sequence_number])
complete_request += unhexlify(data)
self.write(complete_request)
read_data = self.read(255).tobytes()
index = read_data.find(b"\x1a")
if read_data[index + 1] == self._sequence_number - 1:
outputs = read_data[index + 2]
else:
raise IOError("Could not read outputs from device")
return outputs & (1 << (number - 1)) > 0
[docs]
def __del__(self):
usb.util.release_interface(self._dev, 0)
[docs]
def handle_set(busnum, devnum, number, status):
relais8 = Relais8(bus=busnum, address=devnum)
relais8.set_output(number, status)
[docs]
def handle_get(busnum, devnum, number):
relais8 = Relais8(bus=busnum, address=devnum)
return relais8.get_output(number)
methods = {
"set": handle_set,
"get": handle_get,
}