Source code for mxcubecore.Command.BlueskyHttpServer

# encoding: utf-8
#
#  Project name: MXCuBE
#  https://github.com/mxcube
#
#  This file is part of MXCuBE software.
#
#  MXCuBE is free software: you can redistribute it and/or modify
#  it under the terms of the GNU Lesser General Public License as published by
#  the Free Software Foundation, either version 3 of the License, or
#  (at your option) any later version.
#
#  MXCuBE is distributed in the hope that it will be useful,
#  but WITHOUT ANY WARRANTY; without even the implied warranty of
#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#  GNU Lesser General Public License for more details.
#
#  You should have received a copy of the GNU Lesser General Public License
#  along with MXCuBE. If not, see <http://www.gnu.org/licenses/>.

import json
import logging
import os
import time

import gevent
import requests

from mxcubecore import Poller
from mxcubecore.CommandContainer import CommandObject

__copyright__ = """ Copyright © 2010 - 2020 by MXCuBE Collaboration """
__license__ = "LGPLv3+"


[docs]class BlueskyHttpServerCommand(CommandObject): """Interface for communicating with the Bluesky Http Server API.""" _default_timeout = 5 _status_path = "api/status" _execute_path = "api/queue/item/execute" _console_uid_path = "api/console_output/uid" _console_path = "api/console_output_update" def __init__(self, name, url, timeout=5, **kwargs): CommandObject.__init__(self, name, **kwargs) self.username = "" self._default_timeout = timeout self._url = f"{url}/" if url[-1] != "/" else url self._headers = {"Authorization": f"ApiKey {os.environ['AUTH_KEY']}"} self.console_output_uid = self.get_console_uid() self.last_msg_uid = "" self.user_level_log = logging.getLogger("user_level_log") self.output_poller = Poller.poll( polled_call=self.update_console_output, polling_period=500, value_changed_callback=self.show_console_output, error_callback=self.poll_failed, ) def poll_failed(self, exception, poller_id): self.user_level_log.error(exception) poller = Poller.get_poller(poller_id) if poller is not None: poller.restart(1000) def format_response(self, response): if response: response.raise_for_status() return json.loads(response.text) return response def status(self): response = requests.get( self._url + self._status_path, headers=self._headers, timeout=self._default_timeout, ) return self.format_response(response) def monitor_manager_state(self, stop_state, timeout=86400): with gevent.Timeout(timeout, exception=TimeoutError): while self.status()["manager_state"] != stop_state: time.sleep(0.1) def execute_plan(self, plan_name, kwargs=None): if not kwargs: kwargs = {} return requests.post( self._url + self._execute_path, headers=self._headers, json={ "user": self.username, "item": {"name": plan_name, "item_type": "plan", "kwargs": kwargs}, }, timeout=self._default_timeout, )
[docs] def is_connected(self): http_server_status = self.status() re_environment_open = http_server_status["worker_environment_exists"] re_running = http_server_status["re_state"] is not None return re_environment_open and re_running
def get_console_uid(self): response = requests.get( self._url + self._console_uid_path, headers=self._headers, timeout=self._default_timeout, ) return self.format_response(response)["console_output_uid"] def show_console_output(self, value=None): if value is None: value = [] for console_msg in value: new_console_line = console_msg["msg"] new_console_line = new_console_line.strip() if new_console_line != "": is_error = "[E " in new_console_line is_warning = "[W " in new_console_line is_debug = ("[I " in new_console_line) or ("[D " in new_console_line) if is_error: self.user_level_log.error(new_console_line) elif is_warning: self.user_level_log.warning(new_console_line) elif not is_debug: self.user_level_log.info(new_console_line) def update_console_output(self): current_uid = self.get_console_uid() if self.console_output_uid != current_uid: response = requests.get( self._url + self._console_path, headers=self._headers, json={"last_msg_uid": self.last_msg_uid}, timeout=self._default_timeout, ) output = self.format_response(response) self.last_msg_uid = output["last_msg_uid"] self.console_output_uid = current_uid return output["console_output_msgs"] return []