Source code for tvb.core.operation_hpc_launcher

# -*- coding: utf-8 -*-
#
#
# TheVirtualBrain-Framework Package. This package holds all Data Management, and
# Web-UI helpful to run brain-simulations. To use it, you also need to download
# TheVirtualBrain-Scientific Package (for simulators). See content of the
# documentation-folder for more details. See also http://www.thevirtualbrain.org
#
# (c) 2012-2023, Baycrest Centre for Geriatric Care ("Baycrest") and others
#
# This program is free software: you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software Foundation,
# either version 3 of the License, or (at your option) any later version.
# This program is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
# PARTICULAR PURPOSE.  See the GNU General Public License for more details.
# You should have received a copy of the GNU General Public License along with this
# program.  If not, see <http://www.gnu.org/licenses/>.
#
#
#   CITATION:
# When using The Virtual Brain for scientific publications, please cite it as explained here:
# https://www.thevirtualbrain.org/tvb/zwei/neuroscience-publications
#
#

import json
import os
import sys
import requests
from requests import HTTPError
from tvb.adapters.simulator.hpc_simulator_adapter import HPCSimulatorAdapter
from tvb.basic.logger.builder import get_logger
from tvb.basic.profile import TvbProfile
from tvb.config.init.datatypes_registry import populate_datatypes_registry
from tvb.core.entities.model.model_operation import STATUS_STARTED, STATUS_FINISHED, STATUS_ERROR
from tvb.core.neocom import h5
from tvb.core.services.authorization import AuthorizationManager
from tvb.core.services.backend_clients.hpc_scheduler_client import HPCSchedulerClient
from tvb.storage.storage_interface import StorageInterface

log = get_logger(__name__)

UPDATE_STATUS_KEY = "NEW_STATUS"

if __name__ == '__main__':
    TvbProfile.set_profile(TvbProfile.WEB_PROFILE, True)
    TvbProfile.current.hpc.IS_HPC_RUN = True


def _encrypt_results(adapter_instance, encryption_handler):
    output_plain_dir = adapter_instance._get_output_path()
    output_plain_files = os.listdir(output_plain_dir)
    output_plain_files = [os.path.join(output_plain_dir, plain_file) for plain_file in output_plain_files]
    log.info("Encrypt files: {}".format(output_plain_files))
    encryption_handler.encrypt_inputs(output_plain_files, HPCSchedulerClient.OUTPUT_FOLDER)


[docs]def do_operation_launch(simulator_gid, available_disk_space, is_group_launch, base_url, operation_id, plain_dir='/root/plain'): try: log.info("Preparing HPC launch for simulation with id={}".format(simulator_gid)) populate_datatypes_registry() log.info("Current TVB profile has HPC run=: {}".format(TvbProfile.current.hpc.IS_HPC_RUN)) encryption_handler = StorageInterface.get_encryption_handler(simulator_gid) _request_passfile(simulator_gid, operation_id, base_url, encryption_handler.get_password_file()) encryption_handler.decrypt_results_to_dir(plain_dir) log.info("Current wdir is: {}".format(plain_dir)) view_model = h5.load_view_model(simulator_gid, plain_dir) adapter_instance = HPCSimulatorAdapter(plain_dir, is_group_launch) _update_operation_status(STATUS_STARTED, simulator_gid, operation_id, base_url) adapter_instance._prelaunch(None, view_model, available_disk_space) _encrypt_results(adapter_instance, encryption_handler) _update_operation_status(STATUS_FINISHED, simulator_gid, operation_id, base_url) except Exception as excep: log.error("Could not execute operation {}".format(str(sys.argv[1]))) log.exception(excep) _update_operation_status(STATUS_ERROR, simulator_gid, operation_id, base_url) raise excep
# TODO: extract common rest api parts CHUNK_SIZE = 128 def _save_file(file_path, response): with open(file_path, 'wb') as local_file: for chunk in response.iter_content(chunk_size=CHUNK_SIZE): if chunk: local_file.write(chunk) return file_path def _request_passfile(simulator_gid, operation_id, base_url, destination_path): # type: (str, str, str, str) -> str try: req_params = "{}/hpc/encryption_config/{}/{}".format(base_url, simulator_gid, operation_id) log.info('URL is: {}'.format(req_params)) response = _build_secured_request().get(req_params) log.info('Response is: {}'.format(response)) if response.ok: log.info('Passfile downloaded at: {}'.format(destination_path)) path = _save_file(destination_path, response) if not os.path.exists(path): raise Exception("Cannot find password file.") return path except HTTPError: log.warning( "Failed to request passfile from TVB server {} for simulator {}".format(base_url, simulator_gid)) def _update_operation_status(status, simulator_gid, operation_id, base_url): # type: (str, str, str, str) -> None try: req_params = "{}/hpc/update_status/{}/{}".format(base_url, simulator_gid, operation_id) log.info('URL is: {}'.format(req_params)) response = _build_secured_request().put(req_params, data={ UPDATE_STATUS_KEY: status, }) if not response.ok: log.warning("Failed to update status. {}".format(response.json())) except Exception: log.warning( "Failed to notify TVB server {} for simulator {} status update {}".format(base_url, simulator_gid, status)) def _build_secured_request(): token_file_path = os.path.join(HPCSchedulerClient.HOME_FOLDER_MOUNT, ".token") kc_config_file_path = os.path.join(HPCSchedulerClient.HOME_FOLDER_MOUNT, ".kc_config") token = "" if os.path.exists(token_file_path) and os.path.exists(kc_config_file_path): try: with open(token_file_path, "r") as file: refresh_token = file.read() kc_instance = AuthorizationManager(kc_config_file_path).get_keycloak_instance() response = kc_instance.refresh_token(refresh_token.replace('\n', '')) token = response['access_token'] except Exception as e: log.error(e, exc_info=True) else: log.warning("Token file was not found.") with requests.Session() as request: auth_header = {"Authorization": "Bearer {}".format(token)} request.headers.update(auth_header) return request if __name__ == '__main__': simulator_gid = sys.argv[1] available_disk_space = sys.argv[2] is_group_launch = json.loads(sys.argv[3].lower()) base_url = sys.argv[4] operation_id = sys.argv[5] do_operation_launch(simulator_gid, available_disk_space, is_group_launch, base_url, operation_id)