Source code for mxcubecore.HardwareObjects.ALBA.ALBAMachineInfo

#
#  Project name: MXCuBE
#  https://github.com/mxcube
#
#  This file is part of MXCuBE software.
#
#  MXCuBE is free software: you can redistribute it and/or modify
#  it under the terms of the GNU Lesser General Public License as published by
#  the Free Software Foundation, either version 3 of the License, or
#  (at your option) any later version.
#
#  MXCuBE is distributed in the hope that it will be useful,
#  but WITHOUT ANY WARRANTY; without even the implied warranty of
#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#  GNU Lesser General Public License for more details.
#
#  You should have received a copy of the GNU Lesser General Public License
#  along with MXCuBE. If not, see <http://www.gnu.org/licenses/>.

"""
[Name]
ALBAMachineInfo

[Description]
Hardware Object is used to get relevant machine information
(machine current, time to next injection, status, etc)
Based on EMBL HwObj

[Channels]
- MachineCurrent
- TopUpRemaining
- State

[Commands]

[Emitted signals]
- valuesChanged

[Functions]
- None

[Included Hardware Objects]
- None


Example Hardware Object XML file :
==================================
<object class="ALBAMachineInfo">
    <username>Mach</username>
    <taurusname>mach/ct/gateway</taurusname>
    <channel type="sardana" name="MachStatus" polling="1000">State</channel>
    <channel type="sardana" name="MachCurrent" polling="1000">Current</channel>
    <channel type="sardana" name="TopUpRemaining" polling="1000">TopUpRemaining</channel>
</object>
"""

import logging

from mxcubecore.BaseHardwareObjects import HardwareObject

__author__ = "Jordi Andreu"
__credits__ = ["MXCuBE collaboration"]

__version__ = "2.2."
__maintainer__ = "Jordi Andreu"
__email__ = "jandreu[at]cells.es"
__status__ = "Draft"


[docs]class ALBAMachineInfo(HardwareObject): """ Descript. : Displays actual information about the machine status. """ def __init__(self, name): super().__init__(name) self.logger = logging.getLogger("HWR MachineInfo") self.logger.info("__init__()") """ Descript. : """ # Parameters values self.values_dict = {} self.values_dict["mach_current"] = None self.values_dict["mach_status"] = "" self.values_dict["topup_remaining"] = "" # Dictionary for booleans indicating if values are in range # self.values_in_range_dict = {} self.chan_mach_current = None self.chan_mach_status = None self.chan_topup_remaining = None
[docs] def init(self): """ Descript. : Inits channels from xml configuration. """ try: self.chan_mach_current = self.get_channel_object("MachCurrent") if self.chan_mach_current is not None: self.chan_mach_current.connect_signal( "update", self.mach_current_changed ) self.chan_mach_status = self.get_channel_object("MachStatus") if self.chan_mach_status is not None: self.chan_mach_status.connect_signal("update", self.mach_status_changed) self.chan_topup_remaining = self.get_channel_object("TopUpRemaining") if self.chan_topup_remaining is not None: self.chan_topup_remaining.connect_signal( "update", self.topup_remaining_changed ) except KeyError: self.logger.warning("%s: cannot read machine info", self.name())
[docs] def mach_current_changed(self, value): """ Descript. : Function called if the machine current is changed Arguments : new machine current (float) Return : - """ if ( self.values_dict["mach_current"] is None or abs(self.values_dict["mach_current"] - value) > 0.10 ): self.values_dict["mach_current"] = value self.re_emit_values() self.logger.debug("New machine current value=%smA" % value)
[docs] def mach_status_changed(self, status): """ Descript. : Function called if machine status is changed Arguments : new machine status (string) Return : - """ self.values_dict["mach_status"] = str(status) self.re_emit_values() self.logger.debug("New machine status=%s" % status)
[docs] def topup_remaining_changed(self, value): """ Descript. : Function called if topup ramaining is changed Arguments : new topup remaining (float) Return : - """ self.values_dict["topup_remaining"] = value self.re_emit_values() self.logger.debug("New top-up remaining time=%ss" % value)
[docs] def re_emit_values(self): """ Descript. : Updates storage disc information, detects if intensity and storage space is in limits, forms a value list and value in range list, both emitted by qt as lists Arguments : - Return : - """ values_to_send = [] values_to_send.append(self.values_dict["mach_current"]) values_to_send.append(self.values_dict["mach_status"]) values_to_send.append(self.values_dict["topup_remaining"]) self.emit("valuesChanged", values_to_send) self.logger.debug("SIGNAL valuesChanged emitted")
def get_mach_current(self): try: value = self.chan_mach_current.get_value() except Exception as e: self.logger.error("Cannot read machine current value, returning 0") value = 0 finally: return value # return self.values_dict['mach_current'] def get_current(self): return self.get_mach_current() def get_message(self): return "Machinfo status: %s" % self.get_mach_status() # def get_current_value(self): # """ # Descript. : # """ # return self.values_dict['current'] def get_mach_status(self): return self.chan_mach_status.get_value() # return self.values_dict['mach_status'] def get_topup_remaining(self): return self.chan_topup_remaining.get_value()
# return self.values_dict['remaining'] def test_hwo(hwo): print("Current is", hwo.get_current())