Source code for tvb.core.services.hpc_operation_service
# -*- coding: utf-8 -*-
#
#
# TheVirtualBrain-Framework Package. This package holds all Data Management, and
# Web-UI helpful to run brain-simulations. To use it, you also need to download
# TheVirtualBrain-Scientific Package (for simulators). See content of the
# documentation-folder for more details. See also http://www.thevirtualbrain.org
#
# (c) 2012-2023, Baycrest Centre for Geriatric Care ("Baycrest") and others
#
# This program is free software: you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software Foundation,
# either version 3 of the License, or (at your option) any later version.
# This program is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
# PARTICULAR PURPOSE. See the GNU General Public License for more details.
# You should have received a copy of the GNU General Public License along with this
# program. If not, see <http://www.gnu.org/licenses/>.
#
#
# CITATION:
# When using The Virtual Brain for scientific publications, please cite it as explained here:
# https://www.thevirtualbrain.org/tvb/zwei/neuroscience-publications
#
#
"""
.. moduleauthor:: Paula Popa <paula.popa@codemart.ro>
.. moduleauthor:: Bogdan Valean <bogdan.valean@codemart.ro>
"""
import os
from functools import partial
from tvb.basic.logger.builder import get_logger
from tvb.core.entities.model.model_operation import STATUS_ERROR, STATUS_CANCELED, STATUS_FINISHED
from tvb.core.entities.model.model_operation import STATUS_STARTED, STATUS_PENDING
from tvb.core.entities.storage import dao
from tvb.core.services.backend_clients.hpc_scheduler_client import HPCSchedulerClient, HPCJobStatus
from tvb.core.services.exceptions import OperationException
from tvb.storage.storage_interface import StorageInterface
try:
from pyunicore.client import Job, Transport
except ImportError:
from tvb.basic.config.settings import HPCSettings
HPCSettings.CAN_RUN_HPC = False
[docs]class HPCOperationService(object):
LOGGER = get_logger(__name__)
@staticmethod
def _operation_error(operation):
operation.mark_complete(STATUS_ERROR)
dao.store_entity(operation)
@staticmethod
def _operation_canceled(operation):
operation.mark_complete(STATUS_CANCELED)
dao.store_entity(operation)
@staticmethod
def _operation_started(operation):
operation.start_now()
dao.store_entity(operation)
@staticmethod
def _operation_finished(operation, simulator_gid):
op_ident = dao.get_operation_process_for_operation(operation.id)
# TODO: Handle login
job = Job(Transport(os.environ[HPCSchedulerClient.CSCS_LOGIN_TOKEN_ENV_KEY]),
op_ident.job_id)
operation = dao.get_operation_by_id(operation.id)
folder = HPCSchedulerClient.storage_interface.get_project_folder(operation.project.name)
storage_interface = StorageInterface()
if storage_interface.encryption_enabled():
storage_interface.inc_project_usage_count(folder)
storage_interface.sync_folders(folder)
try:
sim_h5_filenames, metric_op, metric_h5_filename = \
HPCSchedulerClient.stage_out_to_operation_folder(job.working_dir, operation, simulator_gid)
operation.mark_complete(STATUS_FINISHED)
dao.store_entity(operation)
HPCSchedulerClient().update_db_with_results(operation, sim_h5_filenames, metric_op, metric_h5_filename)
except OperationException as exception:
HPCOperationService.LOGGER.error(exception)
HPCOperationService._operation_error(operation)
finally:
if storage_interface.encryption_enabled():
storage_interface.sync_folders(folder)
storage_interface.set_project_inactive(operation.project)
[docs] @staticmethod
def handle_hpc_status_changed(operation, simulator_gid, new_status):
# type: (Operation, str, str) -> None
switcher = {
STATUS_ERROR: HPCOperationService._operation_error,
STATUS_CANCELED: HPCOperationService._operation_canceled,
STATUS_STARTED: HPCOperationService._operation_started,
STATUS_FINISHED: partial(HPCOperationService._operation_finished, simulator_gid=simulator_gid)
}
update_func = switcher.get(new_status, lambda: "Invalid operation status")
update_func(operation)
[docs] @staticmethod
def check_operations_job():
operations = dao.get_operations_for_hpc_job()
if operations is None or len(operations) == 0:
return
for operation in operations:
HPCOperationService.LOGGER.info("Start processing operation {}".format(operation.id))
try:
op_ident = dao.get_operation_process_for_operation(operation.id)
if op_ident is not None:
transport = Transport(os.environ[HPCSchedulerClient.CSCS_LOGIN_TOKEN_ENV_KEY])
job = Job(transport, op_ident.job_id)
job_status = job.properties['status']
if job.is_running():
if operation.status == STATUS_PENDING and job_status == HPCJobStatus.READY.value:
HPCOperationService._operation_started(operation)
HPCOperationService.LOGGER.info(
"CSCS job status: {} for operation {}.".format(job_status, operation.id))
return
HPCOperationService.LOGGER.info(
"Job for operation {} has status {}".format(operation.id, job_status))
if job_status == HPCJobStatus.SUCCESSFUL.value:
simulator_gid = operation.view_model_gid
HPCOperationService._operation_finished(operation, simulator_gid)
else:
HPCOperationService._operation_error(operation)
except Exception:
HPCOperationService.LOGGER.error(
"There was an error on background processing process for operation {}".format(operation.id),
exc_info=True)