Source code for mxcubecore.HardwareObjects.SOLEIL.TangoDCMotor

import logging

import numpy
from PyTango import DeviceProxy

from mxcubecore import HardwareRepository as HWR
from mxcubecore.BaseHardwareObjects import HardwareObject
from mxcubecore.Command.Tango import TangoCommand
from mxcubecore.HardwareObjects.abstract.AbstractMotor import MotorStates


[docs]class TangoDCMotor(HardwareObject): MOVESTARTED = 0 NOTINITIALIZED = 0 UNUSABLE = 0 READY = 2 MOVING = 4 ONLIMIT = 1 stateDict = { "UNKNOWN": 0, "OFF": 0, "ALARM": 1, "FAULT": 1, "STANDBY": 2, "RUNNING": 4, "MOVING": 4, "ON": 2, } def __init__(self, name): # State values as expected by Motor bricks super().__init__(name) self.GUIstep = 0.1 self.motor_states = MotorStates() def _init(self): self.positionValue = 0.0 self.stateValue = "UNKNOWN" threshold = self.get_property("threshold") self.threshold = ( 0.0018 # default value. change it with property threshold in xml ) self.old_value = 0.0 self.tangoname = self.get_property("tangoname") self.motor_name = self.get_property("motor_name") self.ho = DeviceProxy(self.tangoname) try: self.dataType = self.get_property("datatype") if self.dataType is None: self.dataType = "float" except Exception: self.dataType = "float" if threshold is not None: try: self.threshold = float(threshold) except Exception: pass self.set_is_ready(True) try: self.limitsCommand = self.get_command_object("limits") except KeyError: self.limitsCommand = None self.positionChan = self.get_channel_object( "position" ) # utile seulement si positionchan n'est pas defini dans le code self.stateChan = self.get_channel_object( "state" ) # utile seulement si statechan n'est pas defini dans le code self.positionChan.connect_signal("update", self.positionChanged) self.stateChan.connect_signal("update", self.motorStateChanged) def positionChanged(self, value): self.positionValue = value if abs(float(value) - self.old_value) > self.threshold: try: # logging.getLogger("HWR").error("%s: TangoDCMotor new position , %s", self.name(), value) self.emit("valueChanged", (value,)) self.old_value = value except Exception: logging.getLogger("HWR").error( "%s: TangoDCMotor not responding, %s", self.id, "" ) self.old_value = value
[docs] def is_ready(self): return self.stateValue == "STANDBY"
def connect_notify(self, signal): if signal == "hardware_object_name,stateChanged": self.motorStateChanged(TangoDCMotor.stateDict[self.stateValue]) elif signal == "limitsChanged": self.motorLimitsChanged() elif signal == "valueChanged": self.motor_positions_changed(self.positionValue) self.set_is_ready(True) def motorState(self): return TangoDCMotor.stateDict[self.stateValue] def motorStateChanged(self, state): self.stateValue = str(state) self.set_is_ready(True) logging.info("motor state changed. it is %s " % self.stateValue) self.emit("stateChanged", (TangoDCMotor.stateDict[self.stateValue],))
[docs] def get_state(self): return TangoDCMotor.stateDict[self.stateValue]
def get_limits(self): try: logging.getLogger("HWR").info( "TangoDCMotor.get_limits: trying to get limits for motor_name %s " % (self.motor_name) ) limits = self.ho.getMotorLimits( self.motor_name ) # limitsCommand() # self.ho.getMotorLimits(self.motor_name) logging.getLogger("HWR").info( "TangoDCMotor.get_limits: Getting limits for %s -- %s " % (self.motor_name, str(limits)) ) if numpy.inf in limits: limits = numpy.array([-10000, 10000]) except Exception: # import traceback # logging.getLogger("HWR").info("TangoDCMotor.get_limits: Cannot get limits for %s.\nException %s " % (self.motor_name, traceback.print_exc())) if self.motor_name in [ "detector_distance", "detector_horizontal", "detector_vertical", ]: info = self.positionChan.getInfo() limits = [float(info.min_value), float(info.max_value)] # if self.motor_name == 'detector_ts': # limits = [96, 1100] # elif self.motor_name == 'detector_tx': # limits = elif self.motor_name == "exposure": limits = [float(self.min_value), float(self.max_value)] if limits is None: try: limits = self.get_property("min"), self.get_property("max") logging.getLogger("HWR").info( "TangoDCMotor.get_limits: %.4f ***** %.4f" % limits ) limits = numpy.array(limits) except Exception: # logging.getLogger("HWR").info("TangoDCMotor.get_limits: Cannot get limits for %s" % self.name()) limits = None return limits def motorLimitsChanged(self): self.emit("limitsChanged", (self.get_limits(),)) def motorIsMoving(self): return self.stateValue == "RUNNING" or self.stateValue == "MOVING" def motorMoveDone(self, channelValue): if self.stateValue == "STANDBY": self.emit("moveDone", (self.tangoname, "tango")) def motor_positions_changed(self, absolutePosition): self.emit("valueChanged", (absolutePosition,)) def syncQuestionAnswer(self, specSteps, controllerSteps): return ( "0" # This is only for spec motors. 0 means do not change anything on sync ) def get_value(self): return self.positionChan.get_value() def convertValue(self, value): logging.info("TangoDCMotor: converting value to %s " % str(self.dataType)) retvalue = value if self.dataType in ["short", "int", "long"]: retvalue = int(value) return retvalue def get_motor_mnemonic(self): return self.name def _set_value(self, value): """Move the motor to the required position Arguments: absolutePosition -- position to move to """ logging.getLogger("TangoClient").info( "TangoDCMotor move (%s). Trying to go to %s: type '%s'", self.motor_name, value, type(value), ) value = float(value) if not isinstance(value, float) and not isinstance(value, int): logging.getLogger("TangoClient").error( "Cannot move %s: position '%s' is not a number. It is a %s", self.tangoname, value, type(value), ) logging.info("TangoDCMotor: move. motor will go to %s " % str(value)) logging.getLogger("HWR").info( "TangoDCMotor.move to absolute position: %.3f" % value ) logging.getLogger("TangoClient").info( "TangoDCMotor move. Trying to go to %s: that is a '%s'", value, type(value), ) # if abs(self.get_value() - value) > epsilon: # logging.info( # "TangoDCMotor: difference larger then epsilon (%s), executing the move " # % str(epsilon) # ) self.positionChan.set_value(self.convertValue(value)) # else: # logging.info( # "TangoDCMotor: not moving really as epsilon is large %s " % str(epsilon) # ) # logging.info("TangoDCMotor: self.get_value() %s " % str(self.get_value())) # logging.info("TangoDCMotor: value %s " % str(value))
[docs] def stop(self): logging.getLogger("HWR").info("TangoDCMotor.stop") stopcmd = self.get_command_object("Stop")() if not stopcmd: stopcmd = TangoCommand("stopcmd", "Stop", self.tangoname) stopcmd()
def isSpecConnected(self): logging.getLogger().debug("%s: TangoDCMotor.isSpecConnected()" % self.name()) return (Truehardware_object_name,)
def test(): hwr = HWR.get_hardware_repository() hwr.connect() motor = hwr.get_hardware_object("/phi") print(motor.get_value()) if __name__ == "__main__": test()