#
# Project name: MXCuBE
# https://github.com/mxcube
#
# This file is part of MXCuBE software.
#
# MXCuBE is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# MXCuBE is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with MXCuBE. If not, see <http://www.gnu.org/licenses/>.
"""
GenericDiffractometer
"""
import ast
import copy
import enum
import json
import logging
import math
import os
import time
from typing import (
Dict,
List,
Tuple,
Union,
)
import gevent
import gevent.event
import numpy
from gevent.lock import Semaphore
from pydantic import (
BaseModel,
Field,
ValidationError,
)
from mxcubecore import HardwareRepository as HWR
from mxcubecore.BaseHardwareObjects import HardwareObject
from mxcubecore.HardwareObjects import sample_centring
from mxcubecore.model import queue_model_objects
__credits__ = ["MXCuBE collaboration"]
__version__ = "2.2."
__status__ = "Draft"
[docs]class DiffractometerState:
"""
Enumeration of diffractometer states
"""
Created = 0
Initializing = 1
On = 2
Off = 3
Closed = 4
Open = 5
Ready = 6
Busy = 7
Moving = 8
Standby = 9
Running = 10
Started = 11
Stopped = 12
Paused = 13
Remote = 14
Reset = 15
Closing = 16
Disable = 17
Waiting = 18
Positioned = 19
Starting = 20
Loading = 21
Unknown = 22
Alarm = 23
Fault = 24
Invalid = 25
Offline = 26
STATE_DESC = {
Created: "Created",
Initializing: "Initializing",
On: "On",
Off: "Off",
Closed: "Closed",
Open: "Open",
Ready: "Ready",
Busy: "Busy",
Moving: "Moving",
Standby: "Standby",
Running: "Running",
Started: "Started",
Stopped: "Stopped",
Paused: "Paused",
Remote: "Remote",
Reset: "Reset",
Closing: "Closing",
Disable: "Disable",
Waiting: "Waiting",
Positioned: " Positioned",
Starting: "Starting",
Loading: "Loading",
Unknown: "Unknown",
Alarm: "Alarm",
Fault: "Fault",
Invalid: "Invalid",
Offline: "Offline",
}
@staticmethod
def tostring(state):
return DiffractometerState.STATE_DESC.get(state, "Unknown")
[docs]class PhaseEnum(str, enum.Enum):
centring = "Centring"
data_collection = "DataCollection"
beam_location = "BeamLocation"
transfer = "Transfer"
unknown = "Unknown"
[docs]class PhaseModel(BaseModel):
value: PhaseEnum = PhaseEnum.unknown
[docs]class HeadTypeEnum(str, enum.Enum):
no_kappa = "NO_KAPPA"
mini_kappa = "MINI_KAPPA"
chip = "CHIP"
plate = "PLATE"
[docs]class HolderTypeEnum(str, enum.Enum):
known_geometry = "KNOWN_GEOMETRY"
free_geometry = "FREE_GEOMETRY"
[docs]class BlockShapeEnum(str, enum.Enum):
rectangular = "RECTANGULAR"
elliptical = "ELLIPTICAL"
[docs]class CalibrationData(BaseModel):
top_left: Tuple[float, float, float] = Field(
[0, 0, 0], description="Top left corner motor position"
)
top_right: Tuple[float, float, float] = Field(
[0, 0, 0], description="Top right corner motor position"
)
bottom_left: Tuple[float, float, float] = Field(
[0, 0, 0], description="Bottom left corner motor position"
)
[docs]class SampleHolderSectionModel(BaseModel):
calibration_data: CalibrationData | None = None
section_offset: Tuple[int, int] = Field(
[0, 0], description="Block offset in grid layout system coordinates x, y"
)
block_size: Tuple[float, float] = Field(
[15, 15], description="Block size horizontal, vertical in mm"
)
block_spacing: Tuple[float, float] = Field(
[15, 15], description="Spacing between blocks horizontal, vertical in mm"
)
block_shape: BlockShapeEnum = BlockShapeEnum.rectangular
number_of_rows: int = Field(6, description="Numer of rows")
number_of_collumns: int = Field(6, description="Numer of collumns")
row_labels: List[str] = Field([], description="Row lables")
column_lables: List[str] = Field([], description="Collumn lables")
targets_per_block: Tuple[int, int] = Field(
[20, 20], description="Targets per block dim1 and dim2"
)
[docs]class ChipLayout(BaseModel):
head_type: HeadTypeEnum = HeadTypeEnum.chip
holder_type: HolderTypeEnum = HolderTypeEnum.known_geometry
holder_brand: str = Field("", description="Brand/make of sample holder")
holder_size: Tuple[float, float] = Field(
[0, 0], description="Size of sample holder in mm horizontal and vertical"
)
sections: List[SampleHolderSectionModel] = []
calibration_data: CalibrationData | None = None
[docs]class GonioHeadConfiguration(BaseModel):
current: str = Field("", description="Selected chip layout")
available: Dict[str, ChipLayout] = Field({}, description="Available chip layouts")
[docs]class GenericDiffractometer(HardwareObject):
"""
Abstract base class for diffractometers
"""
CENTRING_MOTORS_NAME = [
"phi",
"phiz",
"phiy",
"sampx",
"sampy",
"kappa",
"kappa_phi",
"beam_x",
"beam_y",
]
STATE_CHANGED_EVENT = "stateChanged"
STATUS_CHANGED_EVENT = "statusChanged"
MOTOR_POSITION_CHANGED_EVENT = "motorPositionsChanged"
MOTOR_STATUS_CHANGED_EVENT = "motorStatusChanged"
HEAD_TYPE_MINIKAPPA = "MiniKappa"
HEAD_TYPE_SMARTMAGNET = "SmartMagnet"
HEAD_TYPE_PLATE = "Plate"
HEAD_TYPE_PERMANENT = "Permanent"
CENTRING_METHOD_MANUAL = "Manual 3-click"
CENTRING_METHOD_AUTO = "Computer automatic"
CENTRING_METHOD_MOVE_TO_BEAM = "Move to beam"
# TODO NBNB FIX THIS CONFUSION!!!
MANUAL3CLICK_MODE = CENTRING_METHOD_MANUAL
C3D_MODE = CENTRING_METHOD_AUTO
PHASE_TRANSFER = "Transfer"
PHASE_CENTRING = "Centring"
PHASE_COLLECTION = "DataCollection"
PHASE_BEAM = "BeamLocation"
PHASE_UNKNOWN = "Unknown"
def __init__(self, name):
HardwareObject.__init__(self, name)
# Hardware objects ----------------------------------------------------
self.motor_hwobj_dict = {}
self.centring_motors_list = None
self.front_light_switch = None
self.back_light_switch = None
self.aperture = None
self.beamstop = None
self.cryo = None
self.capillary = None
self.use_sc = False
# Channels and commands -----------------------------------------------
self.channel_dict = {}
self.used_channels_list = []
self.command_dict = {}
self.used_commands_list = []
# flag for using sample_centring hwobj or not
self.use_sample_centring = None
# Preventing user multiple clicks during manual centring step
self.waiting_for_click = None # None = legacy/no-wait mode, True = waiting, False = manual centring in progress
self.click_lock = Semaphore()
# time to delay for state polling for controllers
# not updating state immediately after cmd started
self.delay_state_polling = None
self.delay_state_polling = (
None # time to delay for state polling for controllers
)
# not updating state immediately after cmd started
# Internal values -----------------------------------------------------
self.ready_event = None
self.head_type = GenericDiffractometer.HEAD_TYPE_MINIKAPPA
self.phase_list = []
self.grid_direction = None
self.reversing_rotation = None
self.beam_position = None
self.zoom_centre = None
self.pixels_per_mm_x = None
self.pixels_per_mm_y = None
self.current_state = None
self.current_phase = None
self.transfer_mode = None
self.current_centring_procedure = None
self.current_centring_method = None
self.current_motor_positions = {}
self.current_motor_states = {}
self.fast_shutter_is_open = None
self.centring_status = {"valid": False}
self.centring_time = 0
self.user_confirms_centring = None
self.user_clicked_event = None
self.automatic_centring_try_count = 1
self.omega_reference_par = None
self.move_to_motors_positions_task = None
self.move_to_motors_positions_procedure = None
self.centring_methods = {
GenericDiffractometer.CENTRING_METHOD_MANUAL: self.start_manual_centring,
GenericDiffractometer.CENTRING_METHOD_AUTO: self.start_automatic_centring,
GenericDiffractometer.CENTRING_METHOD_MOVE_TO_BEAM: self.start_move_to_beam,
}
self.connect(self, "equipmentReady", self.equipment_ready)
self.connect(self, "equipmentNotReady", self.equipment_not_ready)
# HACK
self.get_motor_positions = self.get_positions
[docs] def init(self):
super().init()
# Internal values -----------------------------------------------------
self.ready_event = gevent.event.Event()
self.user_clicked_event = gevent.event.AsyncResult()
self.user_confirms_centring = True
self.beamstop = self.get_object_by_role("beamstop")
self.aperture = self.get_object_by_role("aperture")
self.capillary = self.get_object_by_role("capillary")
self.cryo = self.get_object_by_role("cryo")
# Hardware objects ----------------------------------------------------
# if HWR.beamline.sample_view.camera is not None:
# self.image_height = HWR.beamline.sample_view.camera.get_height()
# self.image_width = HWR.beamline.sample_view.camera.get_width()
# else:
# self.log.debug(
# "Diffractometer: " + "Camera hwobj is not defined"
# )
if HWR.beamline.beam is not None:
self.beam_position = HWR.beamline.beam.get_beam_position_on_screen()
self.connect(
HWR.beamline.beam, "beamPosChanged", self.beam_position_changed
)
else:
self.beam_position = [0, 0]
self.log.warning("Diffractometer: " + "BeamInfo hwobj is not defined")
self.front_light_switch = self.get_object_by_role("frontlightswitch")
self.back_light_switch = self.get_object_by_role("backlightswitch")
# Channels -----------------------------------------------------------
self.used_channels_list = self.get_property("used_channels", [])
if isinstance(self.used_channels_list, str):
self.used_channels_list = ast.literal_eval(self.used_channels_list)
for channel_name in self.used_channels_list:
self.channel_dict[channel_name] = self.get_channel_object(channel_name)
if self.channel_dict[channel_name] is None:
continue
if channel_name == "TransferMode":
self.connect(
self.channel_dict["TransferMode"],
"update",
self.transfer_mode_changed,
)
elif channel_name == "CurrentPhase":
self.connect(
self.channel_dict["CurrentPhase"],
"update",
self.current_phase_changed,
)
elif channel_name == "HeadType":
self.connect(
self.channel_dict["HeadType"], "update", self.head_type_changed
)
elif channel_name == "State":
self.connect(self.channel_dict["State"], "update", self.state_changed)
# Commands -----------------------------------------------------------
self.used_commands_list = self.get_property("used_commands", [])
if isinstance(self.used_commands_list, str):
self.used_commands_list = ast.literal_eval(self.used_commands_list)
for command_name in self.used_commands_list:
self.command_dict[command_name] = self.get_command_object(command_name)
# Centring motors ----------------------------------------------------
self.centring_motors_list = self.get_property(
"centring_motors", GenericDiffractometer.CENTRING_MOTORS_NAME
)
if isinstance(self.centring_motors_list, str):
self.centring_motors_list = ast.literal_eval(self.centring_motors_list)
queue_model_objects.CentredPosition.set_diffractometer_motor_names(
*self.centring_motors_list
)
for motor_name in self.centring_motors_list:
# NBNB TODO refactor configuration, and set properties directly (see below)
temp_motor_hwobj = self.get_object_by_role(motor_name)
if temp_motor_hwobj is not None:
self.motor_hwobj_dict[motor_name] = temp_motor_hwobj
self.connect(temp_motor_hwobj, "stateChanged", self.motor_state_changed)
self.connect(
temp_motor_hwobj, "valueChanged", self.centring_motor_moved
)
if motor_name == "phi":
self.connect(
temp_motor_hwobj,
"valueChanged",
self.emit_diffractometer_moved,
)
#
# A work-around to make mesh grid 'Rotate to' feature work.
#
# MXCuBE requires that diffractometer HWO have a `self.centringPhi`
# attribute, used to change 'omega' rotation when running
# mesh 'Rotate to' routine.
#
# See https://github.com/mxcube/mxcubecore/issues/1360 for details.
#
self.centringPhi = self.motor_hwobj_dict["phi"]
elif motor_name == "zoom":
self.connect(
temp_motor_hwobj,
"predefinedPositionChanged",
self.zoom_motor_predefined_position_changed,
)
self.connect(
temp_motor_hwobj, "stateChanged", self.zoom_motor_state_changed
)
else:
self.log.warning(
"Diffractometer: Motor "
+ "%s listed in the centring motor list, but not initalized"
% motor_name
)
# sample changer -----------------------------------------------------
if HWR.beamline.sample_changer is None:
self.log.warning("Diffractometer: Sample Changer is not defined")
else:
# By default use sample changer if it's defined and transfer_mode
# is set to SAMPLE_CHANGER.
# if not defined, set use_sc to True
if self.transfer_mode is None or self.transfer_mode == "SAMPLE_CHANGER":
self.use_sc = True
try:
self.use_sample_centring = self.get_property("sample_centring")
if self.use_sample_centring:
self.centring_phi = sample_centring.CentringMotor(
self.motor_hwobj_dict["phi"], direction=-1
)
self.centring_phiz = sample_centring.CentringMotor(
self.motor_hwobj_dict["phiz"]
)
self.centring_phiy = sample_centring.CentringMotor(
self.motor_hwobj_dict["phiy"], direction=-1
)
self.centring_sampx = sample_centring.CentringMotor(
self.motor_hwobj_dict["sampx"]
)
self.centring_sampy = sample_centring.CentringMotor(
self.motor_hwobj_dict["sampy"]
)
except Exception:
self.log.exception("")
try:
self.delay_state_polling = self.get_property("delay_state_polling")
except Exception:
self.log.exception("")
# Other parameters ---------------------------------------------------
self.zoom_centre = self.get_property("zoom_centre")
if self.zoom_centre is None:
self.zoom_centre = {"x": 0, "y": 0}
self.log.warning(
"Diffractometer: zoom centre not configured. Using default."
)
elif isinstance(self.zoom_centre, str):
self.zoom_centre = ast.literal_eval(self.zoom_centre)
elif isinstance(self.zoom_centre, dict):
self.zoom_centre = self.zoom_centre
else:
_m = "Diffractometer: zoom centre misconfigured. not a dict"
raise ValueError(_m)
if not all(key in self.zoom_centre for key in ["x", "y"]):
_m = "Diffractometer: zoom centre misconfigured. missing x or y"
raise ValueError(_m)
self.reversing_rotation = self.get_property("reversing_rotation")
# grid_direction describes how a grid is collected
# 'fast' is collection direction and 'slow' describes
# move to the next collection line
self.grid_direction = self.get_property("grid_direction")
if self.grid_direction is None:
self.grid_direction = {"fast": (0, 1), "slow": (1, 0), "omega_ref": 0}
self.log.warning(
"Diffractometer: Grid direction not configured. Using default."
)
elif isinstance(self.grid_direction, str):
self.grid_direction = ast.literal_eval(self.grid_direction)
elif not isinstance(self.grid_direction, dict):
_m = "Diffractometer: grid direction misconfigured. not a dict"
raise ValueError(_m)
if not all(key in self.grid_direction for key in ["fast", "slow", "omega_ref"]):
_m = "Diffractometer: grid direction misconfigured. missing fast, slow, or omega_ref"
raise ValueError(_m)
self.phase_list = self.get_property(
"phase_list",
[
GenericDiffractometer.PHASE_TRANSFER,
GenericDiffractometer.PHASE_CENTRING,
GenericDiffractometer.PHASE_COLLECTION,
GenericDiffractometer.PHASE_BEAM,
],
)
if isinstance(self.phase_list, str):
self.phase_list = ast.literal_eval(self.phase_list)
# to make it compatible
def __getattr__(self, attr):
if attr.startswith("__"):
raise AttributeError(attr)
else:
if attr == "currentCentringProcedure":
return self.current_centring_procedure
if attr == "centringStatus":
return self.centring_status
if attr == "imageClicked":
return self.image_clicked
if attr == "cancelCentringMethod":
return self.cancel_centring_method
if attr == "pixelsPerMmY":
return self.pixels_per_mm_x
if attr == "pixelsPerMmZ":
return self.pixels_per_mm_y
return HardwareObject.__getattr__(self, attr)
# Contained Objects
# NBNB Temporary hack - should be cleaned up together with configuration
@property
def omega(self):
"""omega motor object
Returns:
AbstractActuator
"""
return self.motor_hwobj_dict.get("phi")
@property
def kappa(self):
"""kappa motor object
Returns:
AbstractActuator
"""
return self.get_object_by_role("kappa")
@property
def kappa_phi(self):
"""kappa motor object
Returns:
AbstractActuator
"""
return self.get_object_by_role("kappa_phi")
@property
def centring_x(self):
"""centring_x motor object
Returns:
AbstractActuator
"""
return self.motor_hwobj_dict.get("sampx")
@property
def centring_y(self):
"""centring_y motor object
Returns:
AbstractActuator
"""
return self.motor_hwobj_dict.get("sampy")
@property
def alignment_x(self):
"""alignment_x motor object (also used as graphics.focus)
Returns:
AbstractActuator
"""
return self.motor_hwobj_dict.get("focus")
@property
def alignment_y(self):
"""alignment_y motor object
Returns:
AbstractActuator
"""
return self.motor_hwobj_dict.get("phiy")
@property
def alignment_z(self):
"""alignment_z motor object
Returns:
AbstractActuator
"""
return self.motor_hwobj_dict.get("phiz")
@property
def zoom(self):
"""zoom motor object
NBNB TBD This is supposedly now living in AbstractSampleView,
And the property was removed in an earlier PR (20251021, 15:24 Elmir Jagudin)
BUT 1) there does not seem to be any mechanism to set it from config there
2) it is configured on the diffractometer according to available config files
3) accessing diffractometer.zoom is all through the code
This needs global refactoring, but meanwhile this property should remain
Returns:
AbstractActuator
"""
return self.motor_hwobj_dict.get("zoom")
[docs] def is_ready(self):
"""
Detects if device is ready
"""
return self.current_state == DiffractometerState.tostring(
DiffractometerState.Ready
)
def wait_device_not_ready(self, timeout=5):
with gevent.Timeout(timeout, Exception("Timeout waiting for device not ready")):
while self.is_ready():
time.sleep(0.01)
[docs] def wait_device_ready(self, timeout=30):
"""Waits when diffractometer status is ready:
:param timeout: timeout in second
:type timeout: int
"""
with gevent.Timeout(timeout, Exception("Timeout waiting for device ready")):
while not self.is_ready():
time.sleep(0.01)
wait_ready = wait_device_ready
[docs] def execute_server_task(self, method, timeout=30, *args):
"""Method is used to execute commands and wait till
diffractometer is in ready state
:param method: method to be executed
:type method: instance
:param timeout: timeout in seconds
:type timeout: seconds
"""
# self.ready_event.clear()
self.current_state = DiffractometerState.tostring(DiffractometerState.Busy)
method(*args)
time.sleep(5)
# gevent.sleep(2)
self.wait_device_ready(timeout)
self.ready_event.set()
[docs] def in_plate_mode(self):
"""Returns True if diffractometer in plate mod
:returns: boolean
"""
return self.head_type == GenericDiffractometer.HEAD_TYPE_PLATE
[docs] def get_head_type(self):
"""Returns head type
:returns: string
"""
return self.head_type
[docs] def use_sample_changer(self):
"""Returns True if sample changer is in use
:returns: boolean
"""
return self.use_sc
[docs] def set_use_sc(self, flag):
"""Sets use_sc flag, that indicates if sample changer is used
:param flag: use sample changer flag
:type flag: boolean
"""
if flag:
# check both transfer_mode and sample_Changer
if HWR.beamline.sample_changer is None:
self.log.error("Diffractometer: Sample " + "Changer is not available")
return False
if (
self.transfer_mode is None
or self.channel_dict["TransferMode"].get_value() == "SAMPLE_CHANGER"
):
# if transferMode is not defined, ignore the checkup
self.use_sc = True
else:
self.log.error(
"Diffractometer: Set the "
+ "diffractometer TransferMode to SAMPLE_CHANGER first!!"
)
return False
else:
self.use_sc = False
return True
[docs] def transfer_mode_changed(self, transfer_mode):
"""
Descript. :
"""
self.log.info("current_transfer_mode is set to %s" % transfer_mode)
self.transfer_mode = transfer_mode
if transfer_mode != "SAMPLE_CHANGER":
self.use_sc = False
self.emit("minidiffTransferModeChanged", (transfer_mode,))
def get_transfer_mode(self):
""" """
return self.transfer_mode
[docs] def get_current_phase(self):
"""
Descript. :
"""
return self.current_phase
[docs] def get_grid_direction(self):
"""
Descript. :
"""
return self.grid_direction
[docs] def get_available_centring_methods(self):
"""
Descript. :
"""
return self.centring_methods.keys()
[docs] def get_current_centring_method(self):
"""
Descript. :
"""
return self.current_centring_method
def is_reversing_rotation(self):
""" """
return self.reversing_rotation is True
[docs] def beam_position_changed(self, value):
"""
Descript. :
"""
self.beam_position = list(value)
# def get_motor_positions(self):
# return
# TODO rename to get_motor_positions
[docs] def get_positions(self):
"""
Descript. :
"""
self.current_motor_positions["beam_x"] = (
self.beam_position[0] - self.zoom_centre["x"]
) / self.pixels_per_mm_y
self.current_motor_positions["beam_y"] = (
self.beam_position[1] - self.zoom_centre["y"]
) / self.pixels_per_mm_x
return self.current_motor_positions
[docs] def get_motors(self):
"""Get motor_name:Motor dictionary"""
return self.motor_hwobj_dict.copy()
[docs] def get_snapshot(self):
"""
Get snapshot from sample view
Returns:
bytes: A bytes object of the current camera image.
"""
if HWR.beamline.sample_view:
return HWR.beamline.sample_view.get_snapshot()
def save_snapshot(self, filename):
if HWR.beamline.sample_view:
return HWR.beamline.sample_view.save_snapshot(filename)
[docs] def get_pixels_per_mm(self):
"""
Returns tuple with pixels_per_mm_x and pixels_per_mm_y
:returns: list with two floats
"""
return (self.pixels_per_mm_x, self.pixels_per_mm_y)
[docs] def get_phase_list(self):
"""
Returns list of available phases
:returns: list with str
"""
return self.phase_list
def start_centring_method(self, method, sample_info=None, wait=False):
""" """
if self.current_centring_method is not None:
self.log.error(
"Diffractometer: already in centring method %s"
% self.current_centring_method
)
return
curr_time = time.strftime("%Y-%m-%d %H:%M:%S")
self.centring_status = {
"valid": False,
"startTime": curr_time,
"angleLimit": None,
}
self.emit_centring_started(method)
try:
centring_method = self.centring_methods[method]
except KeyError as diag:
self.log.error("Diffractometer: unknown centring method (%s)" % str(diag))
self.emit_centring_failed()
else:
try:
centring_method(sample_info, wait_result=wait)
except Exception:
self.log.exception("Diffractometer: problem while centring")
self.emit_centring_failed()
def cancel_centring_method(self, reject=False):
if self.current_centring_procedure is not None:
try:
self.current_centring_procedure.kill()
except Exception:
self.log.exception(
"Diffractometer: problem aborting the centring method"
)
try:
# TODO... do we need this at all?
# fun = self.cancel_centring_methods[self.current_centring_method]
pass
except KeyError:
self.emit_centring_failed()
else:
try:
fun()
except Exception:
self.emit_centring_failed()
else:
self.emit_centring_failed()
self.emit_progress_message("")
if reject:
self.reject_centring()
def start_manual_centring(self, sample_info=None, wait_result=None):
self.emit_progress_message(f"{self.CENTRING_METHOD_MANUAL} centring...")
if self.use_sample_centring:
self.current_centring_procedure = sample_centring.start(
{
"phi": self.centring_phi,
"phiy": self.centring_phiy,
"sampx": self.centring_sampx,
"sampy": self.centring_sampy,
"phiz": self.centring_phiz,
},
self.pixels_per_mm_x,
self.pixels_per_mm_y,
self.beam_position[0],
self.beam_position[1],
)
else:
self.current_centring_procedure = gevent.spawn(self.manual_centring)
self.current_centring_procedure.link(self.centring_done)
def start_automatic_centring(
self, sample_info=None, loop_only=False, wait_result=None
):
self.emit_progress_message("Automatic centring...")
while self.automatic_centring_try_count > 0:
if self.use_sample_centring:
self.current_centring_procedure = sample_centring.start_auto(
HWR.beamline.sample_view,
{
"phi": self.centring_phi,
"phiy": self.centring_phiy,
"sampx": self.centring_sampx,
"sampy": self.centring_sampy,
"phiz": self.centring_phiz,
},
self.pixels_per_mm_x,
self.pixels_per_mm_y,
self.beam_position[0],
self.beam_position[1],
msg_cb=self.emit_progress_message,
new_point_cb=lambda point: self.emit(
"newAutomaticCentringPoint", (point,)
),
)
else:
self.current_centring_procedure = gevent.spawn(self.automatic_centring)
self.current_centring_procedure.link(self.centring_done)
if wait_result:
self.ready_event.wait()
self.ready_event.clear()
self.automatic_centring_try_count -= 1
self.automatic_centring_try_count = 1
[docs] def start_move_to_beam(
self, coord_x=None, coord_y=None, omega=None, wait_result=None
):
"""
Descript. :
"""
try:
self.emit_progress_message("Move to beam...")
self.centring_time = time.time()
curr_time = time.strftime("%Y-%m-%d %H:%M:%S")
self.centring_status = {
"valid": True,
"startTime": curr_time,
"endTime": curr_time,
}
if coord_x is None and coord_y is None:
coord_x = self.beam_position[0]
coord_y = self.beam_position[1]
motors = self.get_centred_point_from_coord(
coord_x, coord_y, return_by_names=True
)
if omega is not None:
motors["phi"] = omega
self.centring_status["motors"] = motors
self.centring_status["valid"] = True
self.centring_status["angleLimit"] = True
self.emit_progress_message("")
self.accept_centring()
self.current_centring_method = None
self.current_centring_procedure = None
except Exception:
logging.exception("Diffractometer: Could not complete 2D centring")
def centring_done(self, centring_procedure):
try:
motor_pos = centring_procedure.get()
if isinstance(motor_pos, gevent.GreenletExit):
raise motor_pos
except Exception:
logging.exception("Could not complete centring")
self.emit_centring_failed()
else:
self.emit_progress_message("Moving sample to centred position...")
self.emit_centring_moving()
try:
self.log.debug(
"Centring finished. Moving motors to position %s" % str(motor_pos)
)
self.move_to_motors_positions(motor_pos, wait=True)
except Exception:
logging.exception("Could not move to centred position")
self.emit_centring_failed()
else:
# if 3 click centring move -180. well. dont, in principle the calculated
# centred positions include omega to initial position
pass
# if not self.in_plate_mode():
# self.log.debug("Centring finished. Moving omega back to initial position")
# self.motor_hwobj_dict['phi'].set_value_relative(-180, timeout=None)
# self.log.debug(" Moving omega done")
if (
self.current_centring_method
== GenericDiffractometer.CENTRING_METHOD_AUTO
):
self.emit("newAutomaticCentringPoint", motor_pos)
self.ready_event.set()
self.centring_time = time.time()
self.emit_centring_successful()
self.emit_progress_message("")
def manual_centring(self):
raise NotImplementedError
def automatic_centring(self):
raise NotImplementedError
def centring_motor_moved(self, pos):
if time.time() - self.centring_time > 1.0:
self.invalidate_centring()
self.emit_diffractometer_moved()
def invalidate_centring(self):
if self.current_centring_procedure is None and self.centring_status["valid"]:
self.centring_status = {"valid": False}
self.emit_progress_message("")
self.emit("centringInvalid", ())
def emit_diffractometer_moved(self, *args):
self.emit("diffractometerMoved", ())
def motor_positions_to_screen(self, centred_positions_dict):
if self.use_sample_centring:
self.update_zoom_calibration()
if None in (self.pixels_per_mm_x, self.pixels_per_mm_y):
return 0, 0
phi_angle = math.radians(
self.centring_phi.direction * self.centring_phi.get_value()
)
sampx = self.centring_sampx.direction * (
centred_positions_dict["sampx"] - self.centring_sampx.get_value()
)
sampy = self.centring_sampy.direction * (
centred_positions_dict["sampy"] - self.centring_sampy.get_value()
)
phiy = self.centring_phiy.direction * (
centred_positions_dict["phiy"] - self.centring_phiy.get_value()
)
phiz = self.centring_phiz.direction * (
centred_positions_dict["phiz"] - self.centring_phiz.get_value()
)
rot_matrix = numpy.matrix(
[
math.cos(phi_angle),
-math.sin(phi_angle),
math.sin(phi_angle),
math.cos(phi_angle),
]
)
rot_matrix.shape = (2, 2)
inv_rot_matrix = numpy.array(rot_matrix.I)
dx, dy = (
numpy.dot(numpy.array([sampx, sampy]), inv_rot_matrix)
* self.pixels_per_mm_x
)
x = (phiy * self.pixels_per_mm_x) + self.beam_position[0]
y = dy + (phiz * self.pixels_per_mm_y) + self.beam_position[1]
return x, y
else:
raise NotImplementedError
def move_to_centred_position(self, centred_position):
self.move_motors(centred_position)
def move_to_motors_positions(self, motors_positions, wait=False):
self.emit_progress_message("Moving to motors positions...")
self.move_to_motors_positions_procedure = gevent.spawn(
self.move_motors, motors_positions
)
self.move_to_motors_positions_procedure.link(self.move_motors_done)
if wait:
self.wait_device_ready(10)
[docs] def move_motors(self, motor_positions, timeout=15):
"""
Moves diffractometer motors to the requested positions
:param motors_dict: dictionary with motor names or hwobj
and target values.
:type motors_dict: dict
"""
if not isinstance(motor_positions, dict):
motor_positions = motor_positions.as_dict()
self.wait_device_ready(timeout)
for motor in motor_positions.keys():
position = motor_positions[motor]
self.log.debug(f"moving motor {motor} to position {position}")
if isinstance(motor, str):
motor_role = motor
motor = self.motor_hwobj_dict.get(motor_role)
if None in (motor, position):
continue
motor.set_value(position)
self.wait_device_ready(timeout)
if self.delay_state_polling is not None and self.delay_state_polling > 0:
# delay polling for state in the
# case of controller not reporting MOVING immediately after cmd
gevent.sleep(self.delay_state_polling)
self.wait_device_ready(timeout)
[docs] def move_motors_done(self, move_motors_procedure):
"""
Descript. :
"""
self.move_to_motors_positions_procedure = None
self.emit_progress_message("")
[docs] def move_to_beam(self, x, y, omega=None):
"""
Descript. : function to create a centring point based on all motors
positions.
"""
try:
pos = self.get_centred_point_from_coord(x, y, return_by_names=False)
if omega is not None:
pos["phiMotor"] = omega
self.move_to_motors_positions(pos)
except Exception:
self.log.exception("Diffractometer: could not center to beam, aborting")
[docs] def image_clicked(
self, x: float, y: float, xi: float | None = None, yi: float | None = None
):
"""Handles a user click sent from the frontend during the manual centring.
This method is called by the backend when the user clicks on the sample
image in the frontend.
The attribute `self.waiting_for_click` controls whether the click should
be accepted or ignored:
- None: click is accepted (legacy)
- True: waiting for a click, accept it and mark it as received
- False: already received a click, ignore further clicks
Args:
x: X coordinate of the click.
y: Y coordinate of the click.
xi: ...
yi: ...
Raises:
RuntimeError: If a click is received while a previous one is still being processed.
"""
with self.click_lock:
# "waiting for click" logic is not implememted (legacy) or it is actually waiting for a click
if self.waiting_for_click is None or self.waiting_for_click:
if self.waiting_for_click:
self.waiting_for_click = False
if self.use_sample_centring:
sample_centring.user_click(x, y)
else:
self.user_clicked_event.set((x, y))
# Already received a click, ignore further clicks
else:
self.log.warning(
"User attempted to click while the previous centring step was still in progress. Click ignored"
)
err_msg = "Click ignored: a centring step is still being processed. Please wait before clicking again."
raise RuntimeError(err_msg)
[docs] def accept_centring(self):
"""
Descript. :
Arg. " fully_centred_point. True if 3 click centring
else False
"""
self.centring_status["valid"] = True
self.centring_status["accepted"] = True
centring_status = self.get_centring_status()
if "motors" not in centring_status:
centring_status["motors"] = self.get_positions()
self.emit("centringAccepted", (True, centring_status))
self.emit("fsmConditionChanged", "centering_position_accepted", True)
def reject_centring(self):
if self.current_centring_procedure:
self.current_centring_procedure.kill()
self.centring_status = {"valid": False}
self.emit_progress_message("")
self.emit("centringAccepted", (False, self.get_centring_status()))
self.emit("fsmConditionChanged", "centering_position_accepted", False)
def emit_centring_started(self, method):
self.current_centring_method = method
self.emit("centringStarted", (method, False))
def emit_centring_moving(self):
self.emit("centringMoving", ())
def emit_centring_failed(self):
self.centring_status = {"valid": False}
method = self.current_centring_method
self.current_centring_method = None
self.current_centring_procedure = None
self.emit("centringFailed", (method, self.get_centring_status()))
def emit_centring_successful(self):
if self.current_centring_procedure is not None:
curr_time = time.strftime("%Y-%m-%d %H:%M:%S")
self.centring_status["endTime"] = curr_time
motor_pos = self.current_centring_procedure.get()
self.centring_status["motors"] = self.convert_from_obj_to_name(motor_pos)
self.centring_status["method"] = self.current_centring_method
self.centring_status["valid"] = True
method = self.current_centring_method
self.emit("centringSuccessful", (method, self.get_centring_status()))
self.current_centring_method = None
self.current_centring_procedure = None
else:
self.log.debug(
"Diffractometer: Trying to emit "
+ "centringSuccessful outside of a centring"
)
def emit_progress_message(self, msg=None):
self.emit("progressMessage", (msg,))
def get_centring_status(self):
return copy.deepcopy(self.centring_status)
def get_centred_point_from_coord(self, x, y, return_by_names=None):
raise NotImplementedError
[docs] def get_point_between_two_points(
self, point_one, point_two, frame_num, frame_total
):
"""
Method returns a centring point between two centring points
It is used to get a position on a helical line based on
frame number and total frame number
"""
new_point = {}
point_one = point_one.as_dict()
point_two = point_two.as_dict()
for motor in point_one.keys():
new_motor_pos = (
frame_num
/ float(frame_total)
* abs(point_one[motor] - point_two[motor])
+ point_one[motor]
)
new_motor_pos += 0.5 * (point_two[motor] - point_one[motor]) / frame_total
new_point[motor] = new_motor_pos
return new_point
def convert_from_obj_to_name(self, motor_pos):
motors = {}
for motor_role in self.centring_motors_list:
motor_obj = self.get_object_by_role(motor_role)
try:
motors[motor_role] = motor_pos[motor_obj]
except KeyError:
if motor_obj:
motors[motor_role] = motor_obj.get_value()
motors["beam_x"] = (
self.beam_position[0] - self.zoom_centre["x"]
) / self.pixels_per_mm_y
motors["beam_y"] = (
self.beam_position[1] - self.zoom_centre["y"]
) / self.pixels_per_mm_x
return motors
[docs] def visual_align(self, point_1, point_2):
"""
Descript. :
"""
return
[docs] def move_omega_relative(self, relative_angle):
"""
Descript. :
"""
return
[docs] def get_osc_limits(self):
"""Returns osc limits"""
return
def get_osc_max_speed(self):
return
[docs] def get_scan_limits(self, speed=None, num_images=None, exp_time=None):
"""
Gets scan limits. Necessary for example in the plate mode
where osc range is limited
"""
return
[docs] def set_phase(self, phase, timeout=None):
"""Sets diffractometer to selected phase
By default available phase is Centring, BeamLocation,
DataCollection, Transfer
:param phase: phase
:type phase: string
:param timeout: timeout in sec
:type timeout: int
"""
if timeout:
self.ready_event.clear()
set_phase_task = gevent.spawn(
self.execute_server_task,
self.command_dict["startSetPhase"],
timeout,
phase,
)
self.ready_event.wait()
self.ready_event.clear()
else:
self.command_dict["startSetPhase"](phase)
def update_zoom_calibration(self):
self.pixels_per_mm_x = 1.0 / self.channel_dict["CoaxCamScaleX"].get_value()
self.pixels_per_mm_y = 1.0 / self.channel_dict["CoaxCamScaleY"].get_value()
self.emit("pixelsPerMmChanged", ((self.pixels_per_mm_x, self.pixels_per_mm_y)))
def zoom_motor_state_changed(self, state):
self.emit("zoomMotorStateChanged", (state,))
self.emit("minidiffStateChanged", (state,))
def zoom_motor_predefined_position_changed(self, position_name, offset):
self.update_zoom_calibration()
self.emit("zoomMotorPredefinedPositionChanged", (position_name, offset))
def equipment_ready(self):
self.emit("minidiffReady", ())
def equipment_not_ready(self):
self.emit("minidiffNotReady", ())
[docs] def motor_state_changed(self, state):
"""
Descript. :
"""
self.emit("minidiffStateChanged", (state,))
[docs] def current_phase_changed(self, current_phase):
"""
Descript. :
"""
self.current_phase = current_phase
if current_phase != GenericDiffractometer.PHASE_UNKNOWN:
logging.getLogger("GUI").info(
"Diffractometer: Current phase changed to %s" % current_phase
)
self.emit("minidiffPhaseChanged", (current_phase,))
[docs] def sample_is_loaded_changed(self, sample_is_loaded):
"""
Descript. :
"""
self.sample_is_loaded = sample_is_loaded
self.emit("minidiffSampleIsLoadedChanged", (sample_is_loaded,))
[docs] def head_type_changed(self, head_type):
"""
Descript. :
"""
self.head_type = head_type
self.emit("minidiffHeadTypeChanged", (head_type,))
if "SampleIsLoaded" not in str(self.used_channels_list):
return
try:
self.disconnect(
self.channel_dict["SampleIsLoaded"],
"update",
self.sample_is_loaded_changed,
)
except Exception:
self.log.exception("")
if (
head_type == GenericDiffractometer.HEAD_TYPE_MINIKAPPA
or head_type == GenericDiffractometer.HEAD_TYPE_SMARTMAGNET
):
self.connect(
self.channel_dict["SampleIsLoaded"],
"update",
self.sample_is_loaded_changed,
)
else:
self.log.info(
"Diffractometer: SmartMagnet "
+ "is not available, only works for Minikappa and SmartMagnet head"
)
def move_kappa_and_phi(self, kappa, kappa_phi):
return
[docs] def close_kappa(self):
"""
Descript. :
"""
return
def get_osc_dynamic_limits(self):
return (-10000, 10000)
def zoom_in(self):
return
def zoom_out(self):
return
def save_centring_positions(self):
pass
[docs] def force_emit_signals(self):
for motor_hwobj in self.motor_hwobj_dict.values():
motor_hwobj.force_emit_signals()
def get_head_configuration(self) -> Union[GonioHeadConfiguration, None]:
chip_def_fpath = self.get_property("chip_definition_file", "")
chip_def_fpath = HWR.get_hardware_repository().find_in_repository(
chip_def_fpath
)
data = None
if os.path.isfile(chip_def_fpath):
with open(chip_def_fpath, "r") as _f:
chip_def = json.load(_f)
try:
data = GonioHeadConfiguration(**chip_def)
except ValidationError:
self.log.exception("Validation error in %s" % chip_def_fpath)
return data
def set_head_configuration(self, str_data: str) -> None:
data = json.loads(str_data)
chip_def_fpath = self.get_property("chip_definition_file", "")
chip_def_fpath = HWR.get_hardware_repository().find_in_repository(
chip_def_fpath
)
if os.path.isfile(chip_def_fpath):
with open(chip_def_fpath, "w+") as _f:
try:
GonioHeadConfiguration(**data)
except ValidationError:
self.log.exception("Validation error in %s" % chip_def_fpath)
else:
_f.write(json.dumps(data, indent=4))
def set_chip_layout(self, layout_name: str) -> bool:
data = self.get_head_configuration().dict()
data["current"] = layout_name
self.set_head_configuration(json.dumps(data))
return True