Source code for mxcubecore.Command.BlueskyHttpServer

# encoding: utf-8
#
#  Project name: MXCuBE
#  https://github.com/mxcube
#
#  This file is part of MXCuBE software.
#
#  MXCuBE is free software: you can redistribute it and/or modify
#  it under the terms of the GNU Lesser General Public License as published by
#  the Free Software Foundation, either version 3 of the License, or
#  (at your option) any later version.
#
#  MXCuBE is distributed in the hope that it will be useful,
#  but WITHOUT ANY WARRANTY; without even the implied warranty of
#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#  GNU Lesser General Public License for more details.
#
#  You should have received a copy of the GNU Lesser General Public License
#  along with MXCuBE. If not, see <http://www.gnu.org/licenses/>.

import json
import os
import time

import gevent
import requests

from mxcubecore.CommandContainer import CommandObject

__copyright__ = """ Copyright © 2010 - 2020 by MXCuBE Collaboration """
__license__ = "LGPLv3+"


[docs]class BlueskyHttpServerCommand(CommandObject): """Interface for communicating with the Bluesky Http Server API.""" _default_timeout = 5 _status_path = "api/status" _execute_path = "api/queue/item/execute" def __init__(self, name, url, timeout=5, **kwargs): CommandObject.__init__(self, name, **kwargs) self.username = "" self._default_timeout = timeout self._url = f"{url}/" if url[-1] != "/" else url self._headers = {"Authorization": f"ApiKey {os.environ['AUTH_KEY']}"} def format_response(self, response): if response: response.raise_for_status() return json.loads(response.text) return response def status(self): response = requests.get( self._url + self._status_path, headers=self._headers, timeout=self._default_timeout, ) return self.format_response(response) def monitor_manager_state(self, stop_state, timeout=86400): with gevent.Timeout(timeout, exception=TimeoutError): while self.status()["manager_state"] != stop_state: time.sleep(0.1) def execute_plan(self, plan_name, kwargs=None): if not kwargs: kwargs = {} return requests.post( self._url + self._execute_path, headers=self._headers, json={ "user": self.username, "item": {"name": plan_name, "item_type": "plan", "kwargs": kwargs}, }, timeout=self._default_timeout, )
[docs] def is_connected(self): http_server_status = self.status() re_environment_open = http_server_status["worker_environment_exists"] re_running = http_server_status["re_state"] is not None return re_environment_open and re_running