Source code for mxcubecore.HardwareObjects.LNLS.EPICSActuator

"""
Superclass for EPICS actuators.

Should be put as the first superclass,
e.g. class EPICSMotor(EPICSActuator, AbstractMotor):

Example of xml file:

<object class="LNLS.EPICSActuator">
    <channel type="epics" name="epicsActuator_val">MNC:B:LUCIOLE01:LIGHT_CH1</channel>
    <channel type="epics" name="epicsActuator_rbv" polling="500">MNC:B:LUCIOLE01:LIGHT_CH1</channel>
    <username>BackLight</username>
    <motor_name>BackLight</motor_name>
    <default_limits>(0, 8000)</default_limits>
</object>
"""

import random
import time

import gevent

from mxcubecore.HardwareObjects.abstract import AbstractActuator


[docs]class EPICSActuator(AbstractActuator.AbstractActuator): """EPCIS actuator class""" ACTUATOR_VAL = "epicsActuator_val" # target ACTUATOR_RBV = "epicsActuator_rbv" # readback def __init__(self, name): super(EPICSActuator, self).__init__(name) self.__wait_actuator_task = None self._nominal_limits = (-1e4, 1e4)
[docs] def init(self): """Initialization method""" super(EPICSActuator, self).init() self.update_state(self.STATES.READY)
def _wait_actuator(self): """Wait actuator to be ready.""" time.sleep(0.3) self.update_state(self.STATES.READY)
[docs] def get_value(self): """Override AbstractActuator method.""" return self.get_channel_value(self.ACTUATOR_RBV)
[docs] def set_value(self, value, timeout=0): """Override AbstractActuator method.""" if self.read_only: raise ValueError("Attempt to set value for read-only Actuator") if self.validate_value(value): self.update_state(self.STATES.BUSY) if timeout or timeout is None: with gevent.Timeout( timeout, RuntimeError("Motor %s timed out" % self.username) ): self._set_value(value) new_value = self._wait_actuator(value) else: self._set_value(value) self.__wait_actuator_task = gevent.spawn(self._wait_actuator) else: raise ValueError( "Invalid value %s; limits are %s" % (value, self.get_limits()) )
[docs] def abort(self): """Immediately halt movement. By default self.stop = self.abort""" if self.__wait_actuator_task is not None: self.__wait_actuator_task.kill() self.update_state(self.STATES.READY)
def _set_value(self, value): """Override AbstractActuator method.""" self.set_channel_value(self.ACTUATOR_VAL, value)