# -*- coding: utf-8 -*-
#
#
# TheVirtualBrain-Framework Package. This package holds all Data Management, and
# Web-UI helpful to run brain-simulations. To use it, you also need to download
# TheVirtualBrain-Scientific Package (for simulators). See content of the
# documentation-folder for more details. See also http://www.thevirtualbrain.org
#
# (c) 2012-2023, Baycrest Centre for Geriatric Care ("Baycrest") and others
#
# This program is free software: you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software Foundation,
# either version 3 of the License, or (at your option) any later version.
# This program is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
# PARTICULAR PURPOSE. See the GNU General Public License for more details.
# You should have received a copy of the GNU General Public License along with this
# program. If not, see <http://www.gnu.org/licenses/>.
#
#
# CITATION:
# When using The Virtual Brain for scientific publications, please cite it as explained here:
# https://www.thevirtualbrain.org/tvb/zwei/neuroscience-publications
#
#
"""
.. moduleauthor:: Paula Popa <paula.popa@codemart.ro>
.. moduleauthor:: Bogdan Valean <bogdan.valean@codemart.ro>
"""
import os
import typing
import uuid
from contextlib import closing
from enum import Enum
from threading import Thread, Event
from requests import HTTPError
from tvb.basic.config.settings import HPCSettings
from tvb.basic.logger.builder import get_logger
from tvb.basic.profile import TvbProfile
from tvb.config import MEASURE_METRICS_MODEL_CLASS
from tvb.core.entities.file.simulator.datatype_measure_h5 import DatatypeMeasureH5
from tvb.core.entities.model.model_operation import Operation, STATUS_CANCELED, STATUS_ERROR, OperationProcessIdentifier
from tvb.core.entities.storage import dao, OperationDAO
from tvb.core.neocom import h5
from tvb.core.services.backend_clients.backend_client import BackendClient
from tvb.core.services.burst_service import BurstService
from tvb.core.services.exceptions import OperationException
from tvb.storage.storage_interface import StorageInterface
try:
import pyunicore.client as unicore_client
from pyunicore.client import Job, Storage, Client
except ImportError:
HPCSettings.CAN_RUN_HPC = False
LOGGER = get_logger(__name__)
HPC_THREADS = []
[docs]class HPCJobStatus(Enum):
STAGINGIN = "STAGINGIN"
READY = "READY"
QUEUED = "QUEUED"
STAGINGOUT = "STAGINGOUT"
SUCCESSFUL = "SUCCESSFUL"
FAILED = "FAILED"
[docs]def get_op_thread(op_id):
# type: (int) -> HPCOperationThread
op_thread = None
for thread in HPC_THREADS:
if thread.operation_id == op_id:
op_thread = thread
break
if op_thread is not None:
HPC_THREADS.remove(op_thread)
return op_thread
[docs]class HPCOperationThread(Thread):
def __init__(self, operation_id, *args, **kwargs):
super(HPCOperationThread, self).__init__(*args, **kwargs)
self.operation_id = operation_id
self._stop_event = Event()
[docs] def stop(self):
self._stop_event.set()
[docs] def stopped(self):
return self._stop_event.is_set()
[docs]class HPCSchedulerClient(BackendClient):
"""
Simple class, to mimic the same behavior we are expecting from StandAloneClient, but firing the operation on
an HPC node. Define TVB_BIN_ENV_KEY and CSCS_LOGIN_TOKEN_ENV_KEY as environment variables before running on HPC.
"""
OUTPUT_FOLDER = 'output'
TVB_BIN_ENV_KEY = 'TVB_BIN'
CSCS_LOGIN_TOKEN_ENV_KEY = 'CSCS_LOGIN_TOKEN'
CSCS_PROJECT = 'CSCS_PROJECT'
HOME_FOLDER_MOUNT = '/HOME_FOLDER'
CSCS_DATA_FOLDER = 'data'
CONTAINER_INPUT_FOLDER = '/home/tvb_user/.data'
storage_interface = StorageInterface()
@staticmethod
def _prepare_input(operation, simulator_gid):
# type: (Operation, str) -> list
storage_path = StorageInterface().get_project_folder(operation.project.name,
str(operation.id))
vm_files, dt_files = h5.gather_references_of_view_model(simulator_gid, storage_path)
vm_files.extend(dt_files)
return vm_files
@staticmethod
def _configure_job(simulator_gid, available_space, is_group_launch, operation_id):
# type: (str, int, bool, int) -> (dict, list)
bash_entrypoint = os.path.join(os.environ[HPCSchedulerClient.TVB_BIN_ENV_KEY],
HPCSettings.HPC_LAUNCHER_SH_SCRIPT)
base_url = TvbProfile.current.web.BASE_URL
inputs_in_container = os.path.join(
HPCSchedulerClient.CONTAINER_INPUT_FOLDER,
StorageInterface.get_encryption_handler(simulator_gid).current_enc_dirname)
# Build job configuration JSON
my_job = {HPCSettings.UNICORE_EXE_KEY: os.path.basename(bash_entrypoint),
HPCSettings.UNICORE_ARGS_KEY: [simulator_gid, available_space, is_group_launch, base_url,
inputs_in_container, HPCSchedulerClient.HOME_FOLDER_MOUNT,
operation_id], HPCSettings.UNICORE_RESOURCER_KEY: {"CPUs": "1"}}
if HPCSchedulerClient.CSCS_PROJECT in os.environ:
my_job[HPCSettings.UNICORE_PROJECT_KEY] = os.environ[HPCSchedulerClient.CSCS_PROJECT]
return my_job, bash_entrypoint
@staticmethod
def _listdir(working_dir, base='/'):
# type: (Storage, str) -> dict
"""
We took this code from pyunicore Storage.listdir method and extended it to use a subdirectory.
Looking at the method signature, it should have had this behavior, but the 'base' argument is not used later
inside the method code.
Probably will be fixed soon in their API, so we could delete this.
:return: dict of {str: PathFile} objects
"""
ret = {}
try:
for path, meta in working_dir.contents(base)['content'].items():
path_url = working_dir.path_urls['files'] + path
path = path[1:] # strip leading '/'
if meta['isDirectory']:
ret[path] = unicore_client.PathDir(working_dir, path_url, path)
else:
ret[path] = unicore_client.PathFile(working_dir, path_url, path)
return ret
except HTTPError as http_error:
if http_error.response.status_code == 404:
raise OperationException("Folder {} is not present on HPC storage.".format(base))
raise http_error
[docs] def update_datatype_groups(self):
# TODO: update column count_results
pass
[docs] @staticmethod
def update_db_with_results(operation, sim_h5_filenames, metric_operation, metric_h5_filename):
# type: (Operation, list, Operation, str) -> (str, int)
"""
Generate corresponding Index entities for the resulted H5 files and insert them in DB.
"""
burst_service = BurstService()
index_list = []
is_group = operation.fk_operation_group is not None
burst_config = burst_service.get_burst_for_operation_id(operation.id)
if is_group:
burst_config = burst_service.get_burst_for_operation_id(operation.fk_operation_group, True)
all_indexes = burst_service.prepare_indexes_for_simulation_results(operation, sim_h5_filenames, burst_config)
if is_group:
# Update the operation group name
operation_group = dao.get_operationgroup_by_id(metric_operation.fk_operation_group)
operation_group.fill_operationgroup_name("DatatypeMeasureIndex")
dao.store_entity(operation_group)
metric_index = burst_service.prepare_index_for_metric_result(metric_operation, metric_h5_filename,
burst_config)
all_indexes.append(metric_index)
for index in all_indexes:
index = dao.store_entity(index)
index_list.append(index)
burst_service.update_burst_status(burst_config)
@staticmethod
def _create_job_with_pyunicore(pyunicore_client, job_description, job_script, inputs):
# type: (Client, {}, str, list) -> Job
"""
Submit and start a batch job on the site, optionally uploading input data files.
We took this code from the pyunicore Client.new_job method in order to use our own upload method
:return: job
"""
if len(inputs) > 0 or job_description.get('haveClientStageIn') is True:
job_description['haveClientStageIn'] = "true"
with closing(
pyunicore_client.transport.post(url=pyunicore_client.site_urls['jobs'], json=job_description)) as resp:
job_url = resp.headers['Location']
job = Job(pyunicore_client.transport, job_url)
if len(inputs) > 0:
working_dir = job.working_dir
HPCSchedulerClient._upload_file_with_pyunicore(working_dir, job_script, None)
for input in inputs:
HPCSchedulerClient._upload_file_with_pyunicore(working_dir, input)
if job_description.get('haveClientStageIn', None) == "true":
try:
job.start()
except:
pass
return job
@staticmethod
def _upload_file_with_pyunicore(working_dir, input_name, subfolder=CSCS_DATA_FOLDER, destination=None):
# type: (Storage, str, object, str) -> None
"""
Upload file to the HPC working dir.
We took this upload code from pyunicore Storage.upload method and modified it because in the original code the
upload URL is generated using the os.path.join method. The result is an invalid URL for windows os.
"""
if destination is None:
destination = os.path.basename(input_name)
if subfolder:
url = "{}/{}/{}/{}".format(working_dir.resource_url, "files", subfolder, destination)
else:
url = "{}/{}/{}".format(working_dir.resource_url, "files", destination)
headers = {'Content-Type': 'application/octet-stream'}
with open(input_name, 'rb') as fd:
working_dir.transport.put(
url=url,
headers=headers,
data=fd)
@staticmethod
def _build_unicore_client(auth_token, registry_url, supercomputer):
# type: (str, str, str) -> Client
transport = unicore_client.Transport(auth_token)
registry = unicore_client.Registry(transport, registry_url)
return registry.site(supercomputer)
@staticmethod
def _launch_job_with_pyunicore(operation, simulator_gid, is_group_launch):
# type: (Operation, str, bool) -> Job
LOGGER.info("Prepare job inputs for operation: {}".format(operation.id))
job_plain_inputs = HPCSchedulerClient._prepare_input(operation, simulator_gid)
available_space = HPCSchedulerClient.compute_available_disk_space(operation)
LOGGER.info("Prepare job configuration for operation: {}".format(operation.id))
job_config, job_script = HPCSchedulerClient._configure_job(simulator_gid, available_space,
is_group_launch, operation.id)
LOGGER.info("Prepare encryption for operation: {}".format(operation.id))
encryption_handler = StorageInterface.get_encryption_handler(simulator_gid)
LOGGER.info("Encrypt job inputs for operation: {}".format(operation.id))
job_encrypted_inputs = encryption_handler.encrypt_inputs(job_plain_inputs)
# use "DAINT-CSCS" -- change if another supercomputer is prepared for usage
LOGGER.info("Prepare unicore client for operation: {}".format(operation.id))
site_client = HPCSchedulerClient._build_unicore_client(os.environ[HPCSchedulerClient.CSCS_LOGIN_TOKEN_ENV_KEY],
unicore_client._HBP_REGISTRY_URL,
TvbProfile.current.hpc.HPC_COMPUTE_SITE)
LOGGER.info("Submit job for operation: {}".format(operation.id))
job = HPCSchedulerClient._create_job_with_pyunicore(pyunicore_client=site_client, job_description=job_config,
job_script=job_script, inputs=job_encrypted_inputs)
LOGGER.info("Job url {} for operation: {}".format(job.resource_url, operation.id))
op_identifier = OperationProcessIdentifier(operation_id=operation.id, job_id=job.resource_url)
dao.store_entity(op_identifier)
LOGGER.info("Job mount point: {}".format(job.working_dir.properties[HPCSettings.JOB_MOUNT_POINT_KEY]))
return job
[docs] @staticmethod
def compute_available_disk_space(operation):
# type: (Operation) -> int
disk_space_per_user = TvbProfile.current.MAX_DISK_SPACE
pending_op_disk_space = dao.compute_disk_size_for_started_ops(operation.fk_launched_by)
user_disk_space = dao.compute_user_generated_disk_size(operation.fk_launched_by) # From kB to Bytes
available_space = disk_space_per_user - pending_op_disk_space - user_disk_space
return available_space
@staticmethod
def _stage_out_results(working_dir, simulator_gid):
# type: (Storage, typing.Union[uuid.UUID, str]) -> list
output_subfolder = HPCSchedulerClient.CSCS_DATA_FOLDER + '/' + HPCSchedulerClient.OUTPUT_FOLDER
output_list = HPCSchedulerClient._listdir(working_dir, output_subfolder)
LOGGER.info("Output list {}".format(output_list))
storage_interface = StorageInterface()
encrypted_dir = os.path.join(storage_interface.get_encryption_handler(simulator_gid).get_encrypted_dir(),
HPCSchedulerClient.OUTPUT_FOLDER)
encrypted_files = HPCSchedulerClient._stage_out_outputs(encrypted_dir, output_list)
# Clean data uploaded on CSCS
LOGGER.info("Clean uploaded files and results")
working_dir.rmdir(HPCSchedulerClient.CSCS_DATA_FOLDER)
LOGGER.info(encrypted_files)
return encrypted_files
@staticmethod
def _handle_metric_results(metric_encrypted_file, metric_vm_encrypted_file, operation, encryption_handler):
if not metric_encrypted_file:
return None, None
metric_op_dir, metric_op = BurstService.prepare_metrics_operation(operation)
metric_files = encryption_handler.decrypt_files_to_dir([metric_encrypted_file,
metric_vm_encrypted_file], metric_op_dir)
metric_file = metric_files[0]
metric_vm = h5.load_view_model_from_file(metric_files[1])
metric_op.view_model_gid = metric_vm.gid.hex
dao.store_entity(metric_op)
return metric_op, metric_file
[docs] @staticmethod
def stage_out_to_operation_folder(working_dir, operation, simulator_gid):
# type: (Storage, Operation, typing.Union[uuid.UUID, str]) -> (list, Operation, str)
encrypted_files = HPCSchedulerClient._stage_out_results(working_dir, simulator_gid)
simulation_results = list()
metric_encrypted_file = None
metric_vm_encrypted_file = None
for encrypted_file in encrypted_files:
if os.path.basename(encrypted_file).startswith(DatatypeMeasureH5.file_name_base()):
metric_encrypted_file = encrypted_file
elif os.path.basename(encrypted_file).startswith(MEASURE_METRICS_MODEL_CLASS):
metric_vm_encrypted_file = encrypted_file
else:
simulation_results.append(encrypted_file)
encryption_handler = StorageInterface.get_encryption_handler(simulator_gid)
metric_op, metric_file = HPCSchedulerClient._handle_metric_results(metric_encrypted_file,
metric_vm_encrypted_file, operation,
encryption_handler)
project = dao.get_project_by_id(operation.fk_launched_in)
operation_dir = HPCSchedulerClient.storage_interface.get_project_folder(project.name, str(operation.id))
h5_filenames = encryption_handler.decrypt_files_to_dir(simulation_results, operation_dir)
encryption_handler.cleanup_encryption_handler()
LOGGER.info("Decrypted h5: {}".format(h5_filenames))
LOGGER.info("Metric op: {}".format(metric_op))
LOGGER.info("Metric file: {}".format(metric_file))
return h5_filenames, metric_op, metric_file
@staticmethod
def _run_hpc_job(operation_identifier):
# type: (int) -> None
operation = dao.get_operation_by_id(operation_identifier)
project_folder = HPCSchedulerClient.storage_interface.get_project_folder(operation.project.name)
storage_interface = StorageInterface()
storage_interface.inc_running_op_count(project_folder)
is_group_launch = operation.fk_operation_group is not None
simulator_gid = operation.view_model_gid
try:
HPCSchedulerClient._launch_job_with_pyunicore(operation, simulator_gid, is_group_launch)
except Exception as exception:
LOGGER.error("Failed to submit job HPC", exc_info=True)
operation.mark_complete(STATUS_ERROR,
exception.response.text if isinstance(exception, HTTPError) else repr(exception))
dao.store_entity(operation)
storage_interface.check_and_delete(project_folder)
@staticmethod
def _stage_out_outputs(encrypted_dir_path, output_list):
# type: (str, dict) -> list
if not os.path.isdir(encrypted_dir_path):
os.makedirs(encrypted_dir_path)
encrypted_files = list()
for output_filename, output_filepath in output_list.items():
if type(output_filepath) is not unicore_client.PathFile:
LOGGER.info("Object {} is not a file.")
continue
filename = os.path.join(encrypted_dir_path, os.path.basename(output_filename))
output_filepath.download(filename)
encrypted_files.append(filename)
return encrypted_files
[docs] @staticmethod
def execute(operation_id, user_name_label, adapter_instance):
# type: (int, None, None) -> None
"""Call the correct system command to submit a job to HPC."""
thread = HPCOperationThread(operation_id, target=HPCSchedulerClient._run_hpc_job,
kwargs={'operation_identifier': operation_id})
thread.start()
HPC_THREADS.append(thread)
[docs] @staticmethod
def stop_operation(operation_id):
# TODO: Review this implementation after DAINT maintenance
operation = dao.get_operation_by_id(operation_id)
if not operation or operation.has_finished:
LOGGER.warning("Operation already stopped: %s" % operation_id)
return True
LOGGER.debug("Stopping HPC operation: %s" % str(operation_id))
op_ident = OperationDAO().get_operation_process_for_operation(operation_id)
if op_ident is not None:
# TODO: Handle login
transport = unicore_client.Transport(os.environ[HPCSchedulerClient.CSCS_LOGIN_TOKEN_ENV_KEY])
# Abort HPC job
job = Job(transport, op_ident.job_id)
if job.is_running():
job.abort()
# Kill thread
operation_thread = get_op_thread(operation_id)
if operation_thread is None:
LOGGER.warning("Thread for operation {} is not available".format(operation_id))
else:
operation_thread.stop()
while not operation_thread.stopped():
LOGGER.info("Thread for operation {} is stopping".format(operation_id))
BurstService().persist_operation_state(operation, STATUS_CANCELED)
return True