Source code for labgrid.util.agents.deditec_relais8

"""
This module implements the communication protocol to switch the digital outputs
on the Deditec Relais8 board.

Supported modules:

- Relais8

Supported Functionality:

- Turn digital output on and off
"""

import logging

from binascii import unhexlify, hexlify

import usb.core
import usb.util


[docs] class Relais8: _padding = "FEFEFEFEFEFEFEFEFF" _post_padding = "000000" _configuration_sequence = [ "3430FF", "3434FF", "34C0FF", "3400FF", "3402FF", "340CFF", "340EFF", "3404FF", "3406FF", "3408FF", "340AFF", "3410FF", "3412FF", "3414FF", "3416FF", "34F4FF", "34F5FF", "34F6FF", "34F7FF", "34F0FF", "34ECFF", ]
[docs] def __init__(self, **args): self._dev = usb.core.find(**args) if self._dev is None: raise ValueError("Device not found") if self._dev.is_kernel_driver_active(0): self._dev.detach_kernel_driver(0) cfg = self._dev.get_active_configuration() intf = cfg[(0, 0)] self._dev.set_configuration() usb.util.claim_interface(self._dev, 0) self._dev.ctrl_transfer(64, 0, 0, 0, 0, 5000) self._dev.ctrl_transfer(64, 3, 20, 0, 0, 5000) self._dev.ctrl_transfer(64, 9, 2, 0, 0, 5000) self._ep_out = usb.util.find_descriptor( intf, custom_match=lambda e: usb.util.endpoint_direction(e.bEndpointAddress) == usb.util.ENDPOINT_OUT ) self._ep_in = usb.util.find_descriptor( intf, custom_match=lambda e: usb.util.endpoint_direction(e.bEndpointAddress) == usb.util.ENDPOINT_IN ) self._sequence_number = 0 self._logger = logging.getLogger("Device: ") self._run_configuration_sequence()
def _run_configuration_sequence(self): for d in Relais8._configuration_sequence: self.write_config_request(d) self.read(255)
[docs] def read(self, size): data = self._ep_in.read(size) self._logger.debug("Read data: %s", hexlify(data)) return data
[docs] def write_config_request(self, address): complete_request = unhexlify(Relais8._padding) complete_request += bytes([self._sequence_number]) complete_request += unhexlify(address + Relais8._post_padding) self._logger.debug("Sending Request: %s", hexlify(complete_request)) self.write(complete_request)
[docs] def write(self, data): self._sequence_number = (self._sequence_number + 1) & 0xFF self._logger.debug("Writing data: %s", hexlify(data)) self._ep_out.write(data)
[docs] def set_all_outputs(self, status, reset=False): assert isinstance(status, int) if status == 0: reset = True if status > 255 or status < 0: return data = f"0123{0 if reset else 8}000000001{status:02X}00000000000000" self.write(unhexlify(Relais8._padding + data)) self.read(255)
[docs] def set_output(self, number, status): if status: data = f"238000000001{1 << number - 1:02X}00000000000000" else: data = f"23A000000001{1 << number - 1:02X}00000000000000" complete_request = unhexlify(Relais8._padding) complete_request += bytes([self._sequence_number]) complete_request += unhexlify(data) self.write(complete_request) self.read(255)
[docs] def get_output(self, number): data = "34000000000F" complete_request = unhexlify(Relais8._padding) complete_request += bytes([self._sequence_number]) complete_request += unhexlify(data) self.write(complete_request) read_data = self.read(255).tobytes() index = read_data.find(b"\x1a") if read_data[index + 1] == self._sequence_number - 1: outputs = read_data[index + 2] else: raise IOError("Could not read outputs from device") return outputs & (1 << (number - 1)) > 0
[docs] def __del__(self): usb.util.release_interface(self._dev, 0)
[docs] def handle_set(busnum, devnum, number, status): relais8 = Relais8(bus=busnum, address=devnum) relais8.set_output(number, status)
[docs] def handle_get(busnum, devnum, number): relais8 = Relais8(bus=busnum, address=devnum) return relais8.get_output(number)
methods = { "set": handle_set, "get": handle_get, }