Source code for tvb.interfaces.rest.client.main_api

# -*- coding: utf-8 -*-
#
#
# TheVirtualBrain-Framework Package. This package holds all Data Management, and
# Web-UI helpful to run brain-simulations. To use it, you also need to download
# TheVirtualBrain-Scientific Package (for simulators). See content of the
# documentation-folder for more details. See also http://www.thevirtualbrain.org
#
# (c) 2012-2023, Baycrest Centre for Geriatric Care ("Baycrest") and others
#
# This program is free software: you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software Foundation,
# either version 3 of the License, or (at your option) any later version.
# This program is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
# PARTICULAR PURPOSE.  See the GNU General Public License for more details.
# You should have received a copy of the GNU General Public License along with this
# program.  If not, see <http://www.gnu.org/licenses/>.
#
#
#   CITATION:
# When using The Virtual Brain for scientific publications, please cite it as explained here:
# https://www.thevirtualbrain.org/tvb/zwei/neuroscience-publications
#
#
from datetime import datetime, timedelta

import requests
from tvb.interfaces.rest.client.client_decorators import handle_response
from tvb.interfaces.rest.commons.strings import Strings, RestLink, FormKeyInput


[docs]class MainApi: """ Base API class which will be inherited by all the API specific subclasses """ def __init__(self, server_url, auth_token=''): # type: (str, str) -> None """ Rest server url, where all rest calls will be made :param server_url: REST server base URL :param auth_token: Keycloak authorization token """ self.server_url = server_url + "/" + Strings.BASE_PATH.value self.authorization_token = auth_token self.token_expiry_date = None self.refresh_token = None
[docs] def build_request_url(self, url): return self.server_url + url
[docs] def secured_request(self): """ If we have an expiration date and the token is expired we make a refresh call then we build a secured request based on the refreshed token. :return: secured request session """ if self.token_expiry_date is not None and datetime.now() >= self.token_expiry_date \ and self.refresh_token is not None: refresh_token_response = self._refresh_token() self.update_tokens(refresh_token_response) return self._build_request()
def _build_request(self): """ Build a secured request protected by the authorization token set before, used in the entire session :return: secured requests session """ authorization_header = {Strings.AUTH_HEADER.value: Strings.BEARER.value + self.authorization_token} with requests.Session() as request_session: request_session.headers.update(authorization_header) return request_session @handle_response def _refresh_token(self): return self._build_request().put(self.build_request_url(RestLink.LOGIN.compute_url(True)), json={ FormKeyInput.KEYCLOAK_REFRESH_TOKEN.value: self.refresh_token, })
[docs] def update_tokens(self, response): self.refresh_token = response['refresh_token'] self.authorization_token = response['access_token'] expires_in = response['expires_in'] self.token_expiry_date = datetime.now() + timedelta(seconds=expires_in)