Source code for tvb.core.services.burst_service

# -*- coding: utf-8 -*-
#
#
# TheVirtualBrain-Framework Package. This package holds all Data Management, and
# Web-UI helpful to run brain-simulations. To use it, you also need to download
# TheVirtualBrain-Scientific Package (for simulators). See content of the
# documentation-folder for more details. See also http://www.thevirtualbrain.org
#
# (c) 2012-2023, Baycrest Centre for Geriatric Care ("Baycrest") and others
#
# This program is free software: you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software Foundation,
# either version 3 of the License, or (at your option) any later version.
# This program is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
# PARTICULAR PURPOSE.  See the GNU General Public License for more details.
# You should have received a copy of the GNU General Public License along with this
# program.  If not, see <http://www.gnu.org/licenses/>.
#
#
#   CITATION:
# When using The Virtual Brain for scientific publications, please cite it as explained here:
# https://www.thevirtualbrain.org/tvb/zwei/neuroscience-publications
#
#

import json
import os
from datetime import datetime

from tvb.basic.logger.builder import get_logger
from tvb.config import MEASURE_METRICS_MODULE, MEASURE_METRICS_CLASS
from tvb.core.entities.file.simulator.burst_configuration_h5 import BurstConfigurationH5
from tvb.core.entities.file.simulator.datatype_measure_h5 import DatatypeMeasureH5
from tvb.core.entities.file.simulator.view_model import SimulatorAdapterModel
from tvb.core.entities.generic_attributes import GenericAttributes
from tvb.core.entities.model.model_burst import BurstConfiguration
from tvb.core.entities.model.model_datatype import DataTypeGroup
from tvb.core.entities.model.model_operation import Operation, STATUS_FINISHED, STATUS_PENDING, STATUS_CANCELED
from tvb.core.entities.model.model_operation import OperationGroup, STATUS_ERROR, STATUS_STARTED, has_finished
from tvb.core.entities.storage import dao
from tvb.core.entities.transient.range_parameter import RangeParameter
from tvb.core.neocom import h5
from tvb.core.neocom.h5 import DirLoader
from tvb.core.services.import_service import ImportService
from tvb.core.utils import format_bytes_human, format_timedelta
from tvb.storage.storage_interface import StorageInterface

MAX_BURSTS_DISPLAYED = 50
STATUS_FOR_OPERATION = {
    STATUS_PENDING: BurstConfiguration.BURST_RUNNING,
    STATUS_STARTED: BurstConfiguration.BURST_RUNNING,
    STATUS_CANCELED: BurstConfiguration.BURST_CANCELED,
    STATUS_ERROR: BurstConfiguration.BURST_ERROR,
    STATUS_FINISHED: BurstConfiguration.BURST_FINISHED
}


[docs]class BurstService(object): LAUNCH_NEW = 'new' LAUNCH_BRANCH = 'branch' def __init__(self): self.logger = get_logger(self.__class__.__module__) self.storage_interface = StorageInterface()
[docs] def mark_burst_finished(self, burst_entity, burst_status=None, error_message=None, store_h5_file=True): """ Mark Burst status field. Also compute 'weight' for current burst: no of operations inside, estimate time on disk... :param burst_entity: BurstConfiguration to be updated, at finish time. :param burst_status: BurstConfiguration status. By default BURST_FINISHED :param error_message: If given, set the status to error and perpetuate the message. """ if burst_status is None: burst_status = BurstConfiguration.BURST_FINISHED if error_message is not None: burst_status = BurstConfiguration.BURST_ERROR try: # If there are any DataType Groups in current Burst, update their counter. burst_dt_groups = dao.get_generic_entity(DataTypeGroup, burst_entity.gid, "fk_parent_burst") for dt_group in burst_dt_groups: dt_group.count_results = dao.count_datatypes_in_group(dt_group.id) dt_group.disk_size, dt_group.subject = dao.get_summary_for_group(dt_group.id) dao.store_entity(dt_group) # Update actual Burst entity fields burst_entity.datatypes_number = dao.count_datatypes_in_burst(burst_entity.gid) burst_entity.status = burst_status burst_entity.error_message = error_message burst_entity.finish_time = datetime.now() dao.store_entity(burst_entity) if store_h5_file: self.store_burst_configuration(burst_entity) except Exception: self.logger.exception("Could not correctly update Burst status and meta-data!") burst_entity.status = burst_status burst_entity.error_message = "Error when updating Burst Status" burst_entity.finish_time = datetime.now() dao.store_entity(burst_entity) if store_h5_file: self.store_burst_configuration(burst_entity)
[docs] def persist_operation_state(self, operation, operation_status, message=None): """ Update Operation instance state. Store it in DB and on HDD/ :param operation: Operation instance :param operation_status: new status :param message: message in case of error :return: operation instance changed """ operation.mark_complete(operation_status, message) operation.queue_full = False operation = dao.store_entity(operation) # update burst also burst_config = self.get_burst_for_operation_id(operation.id) if burst_config is not None: burst_status = STATUS_FOR_OPERATION.get(operation_status) self.mark_burst_finished(burst_config, burst_status, message) return operation
[docs] @staticmethod def get_burst_for_operation_id(operation_id, is_group=False): return dao.get_burst_for_operation_id(operation_id, is_group)
[docs] def rename_burst(self, burst_id, new_name): """ Rename the burst given by burst_id, setting it's new name to burst_name. """ burst = dao.get_burst_by_id(burst_id) burst.name = new_name dao.store_entity(burst) self.store_burst_configuration(burst)
[docs] @staticmethod def get_available_bursts(project_id): """ Return all the burst for the current project. """ bursts = dao.get_bursts_for_project(project_id, page_size=MAX_BURSTS_DISPLAYED) or [] return bursts
[docs] @staticmethod def populate_burst_disk_usage(bursts): """ Adds a disk_usage field to each burst object. The disk usage is computed as the sum of the datatypes generated by a burst """ sizes = dao.compute_bursts_disk_size([b.gid for b in bursts]) for b in bursts: b.disk_size = format_bytes_human(sizes[b.gid])
[docs] def update_history_status(self, id_list): """ For each burst_id received in the id_list read new status from DB and return a list [id, new_status, is_group, message, running_time] tuple. """ result = [] for b_id in id_list: burst = dao.get_burst_by_id(b_id) if burst is not None: if burst.status == burst.BURST_RUNNING: running_time = datetime.now() - burst.start_time else: running_time = burst.finish_time - burst.start_time running_time = format_timedelta(running_time, most_significant2=False) if burst.status == burst.BURST_ERROR: msg = 'Check Operations page for error Message' else: msg = '' result.append([burst.id, burst.status, burst.is_group, msg, running_time]) else: self.logger.debug("Could not find burst with id=" + str(b_id) + ". Might have been deleted by user!!") return result
[docs] @staticmethod def update_simulation_fields(burst, op_simulation_id, simulation_gid): burst.fk_simulation = op_simulation_id burst.simulator_gid = simulation_gid.hex burst = dao.store_entity(burst) return burst
[docs] @staticmethod def load_burst_configuration(burst_config_id): # type: (int) -> BurstConfiguration burst_config = dao.get_burst_by_id(burst_config_id) return burst_config
[docs] @staticmethod def remove_burst_configuration(burst_config_id): # type: (int) -> None dao.remove_entity(BurstConfiguration, burst_config_id)
[docs] @staticmethod def prepare_burst_for_pse(burst_config): # type: (BurstConfiguration) -> (BurstConfiguration) operation_group = OperationGroup(burst_config.fk_project, ranges=burst_config.ranges) operation_group = dao.store_entity(operation_group) metric_operation_group = OperationGroup(burst_config.fk_project, ranges=burst_config.ranges) metric_operation_group = dao.store_entity(metric_operation_group) burst_config.operation_group = operation_group burst_config.fk_operation_group = operation_group.id burst_config.metric_operation_group = metric_operation_group burst_config.fk_metric_operation_group = metric_operation_group.id return dao.store_entity(burst_config)
[docs] @staticmethod def store_burst_configuration(burst_config): project = dao.get_project_by_id(burst_config.fk_project) bc_path = h5.path_for(burst_config.fk_simulation, BurstConfigurationH5, burst_config.gid, project.name) with BurstConfigurationH5(bc_path) as bc_h5: bc_h5.store(burst_config)
[docs] @staticmethod def load_burst_configuration_from_folder(simulator_folder, project): bc_h5_filename = DirLoader(simulator_folder, None).find_file_for_has_traits_type(BurstConfiguration) burst_config = BurstConfiguration(project.id) with BurstConfigurationH5(os.path.join(simulator_folder, bc_h5_filename)) as bc_h5: bc_h5.load_into(burst_config) return burst_config
[docs] @staticmethod def prepare_simulation_name(burst, project_id): simulation_number = dao.get_number_of_bursts(project_id) + 1 if burst.name is None: simulation_name = 'simulation_' + str(simulation_number) else: simulation_name = burst.name return simulation_name, simulation_number
[docs] def prepare_indexes_for_simulation_results(self, operation, result_filenames, burst): indexes = list() self.logger.debug("Preparing indexes for simulation results in operation {}...".format(operation.id)) for filename in result_filenames: try: self.logger.debug("Preparing index for filename: {}".format(filename)) index = h5.index_for_h5_file(filename)() h5_class = h5.REGISTRY.get_h5file_for_index(type(index)) with h5_class(filename) as index_h5: index.fill_from_h5(index_h5) index.fill_from_generic_attributes(index_h5.load_generic_attributes()) index.fk_parent_burst = burst.gid index.fk_from_operation = operation.id if operation.fk_operation_group: datatype_group = dao.get_datatypegroup_by_op_group_id(operation.fk_operation_group) self.logger.debug( "Found DatatypeGroup with id {} for operation {}".format(datatype_group.id, operation.id)) index.fk_datatype_group = datatype_group.id # Update the operation group name operation_group = dao.get_operationgroup_by_id(operation.fk_operation_group) operation_group.fill_operationgroup_name("TimeSeriesRegionIndex") dao.store_entity(operation_group) self.logger.debug( "Prepared index {} for file {} in operation {}".format(index.summary_info, filename, operation.id)) indexes.append(index) except Exception as e: self.logger.debug("Skip preparing index {} because there was an error.".format(filename)) self.logger.error(e) self.logger.debug("Prepared {} indexes for results in operation {}...".format(len(indexes), operation.id)) return indexes
[docs] def prepare_index_for_metric_result(self, operation, result_filename, burst): self.logger.debug("Preparing index for metric result in operation {}...".format(operation.id)) index = h5.index_for_h5_file(result_filename)() with DatatypeMeasureH5(result_filename) as dti_h5: index.gid = dti_h5.gid.load().hex index.metrics = json.dumps(dti_h5.metrics.load()) index.fk_source_gid = dti_h5.analyzed_datatype.load().hex index.fk_from_operation = operation.id index.fk_parent_burst = burst.gid datatype_group = dao.get_datatypegroup_by_op_group_id(operation.fk_operation_group) self.logger.debug("Found DatatypeGroup with id {} for operation {}".format(datatype_group.id, operation.id)) index.fk_datatype_group = datatype_group.id self.logger.debug("Prepared index {} for results in operation {}...".format(index.summary_info, operation.id)) return index
def _update_pse_burst_status(self, burst_config): operations_in_group = dao.get_operations_in_group(burst_config.fk_operation_group) if burst_config.fk_metric_operation_group: operations_in_group.extend(dao.get_operations_in_group(burst_config.fk_metric_operation_group)) operation_statuses = list() for operation in operations_in_group: if not has_finished(operation.status): self.logger.debug( 'Operation {} in group {} is not finished, burst status will not be updated'.format( operation.id, operation.fk_operation_group)) return operation_statuses.append(operation.status) self.logger.debug( 'All operations in burst {} have finished. Will update burst status'.format(burst_config.id)) if STATUS_ERROR in operation_statuses: self.mark_burst_finished(burst_config, BurstConfiguration.BURST_ERROR, 'Some operations in PSE have finished with errors') elif STATUS_CANCELED in operation_statuses: self.mark_burst_finished(burst_config, BurstConfiguration.BURST_CANCELED) else: self.mark_burst_finished(burst_config)
[docs] def update_burst_status(self, burst_config): if burst_config.fk_operation_group: self._update_pse_burst_status(burst_config) else: operation = dao.get_operation_by_id(burst_config.fk_simulation) message = operation.additional_info if len(message) == 0: message = None self.mark_burst_finished(burst_config, STATUS_FOR_OPERATION[operation.status], message)
[docs] @staticmethod def prepare_metrics_operation(operation): # TODO reuse from OperationService and do not duplicate logic here parent_burst = dao.get_generic_entity(BurstConfiguration, operation.fk_operation_group, 'fk_operation_group')[0] metric_operation_group_id = parent_burst.fk_metric_operation_group range_values = operation.range_values metric_algo = dao.get_algorithm_by_module(MEASURE_METRICS_MODULE, MEASURE_METRICS_CLASS) metric_operation = Operation(None, operation.fk_launched_by, operation.fk_launched_in, metric_algo.id, status=STATUS_FINISHED, op_group_id=metric_operation_group_id, range_values=range_values) metric_operation.visible = False metric_operation = dao.store_entity(metric_operation) op_dir = StorageInterface().get_project_folder(operation.project.name, str(metric_operation.id)) return op_dir, metric_operation
[docs] @staticmethod def get_range_param_by_name(param_name, all_range_parameters): for range_param in all_range_parameters: if param_name == range_param.name: return range_param return None
[docs] @staticmethod def handle_range_params_at_loading(burst_config, all_range_parameters): param1, param2 = None, None if burst_config.range1: param1 = RangeParameter.from_json(burst_config.range1) param1.fill_from_default(BurstService.get_range_param_by_name(param1.name, all_range_parameters)) if burst_config.range2 is not None: param2 = RangeParameter.from_json(burst_config.range2) param2.fill_from_default(BurstService.get_range_param_by_name(param2.name, all_range_parameters)) return param1, param2
[docs] def prepare_data_for_burst_copy(self, burst_config_id, burst_name_format, project): burst_config = self.load_burst_configuration(burst_config_id) burst_config_copy = burst_config.clone() count = dao.count_bursts_with_name(burst_config.name, burst_config.fk_project) burst_config_copy.name = burst_name_format.format(burst_config.name, count + 1) storage_path = self.storage_interface.get_project_folder(project.name, str(burst_config.fk_simulation)) simulator = h5.load_view_model(burst_config.simulator_gid, storage_path) simulator.generic_attributes = GenericAttributes() return simulator, burst_config_copy
[docs] @staticmethod def store_burst(burst_config): return dao.store_entity(burst_config)
[docs] def load_simulation_from_zip(self, zip_file, project): import_service = ImportService() simulator_folder = import_service.import_simulator_configuration_zip(zip_file) simulator_h5_filename = DirLoader(simulator_folder, None).find_file_for_has_traits_type(SimulatorAdapterModel) simulator_h5_filepath = os.path.join(simulator_folder, simulator_h5_filename) simulator = h5.load_view_model_from_file(simulator_h5_filepath) burst_config = self.load_burst_configuration_from_folder(simulator_folder, project) burst_config_copy = burst_config.clone() simulator.generic_attributes.parent_burst = burst_config_copy.gid return simulator, burst_config_copy, simulator_folder