Source code for tvb.core.services.simulator_service

# -*- coding: utf-8 -*-
#
#
# TheVirtualBrain-Framework Package. This package holds all Data Management, and
# Web-UI helpful to run brain-simulations. To use it, you also need to download
# TheVirtualBrain-Scientific Package (for simulators). See content of the
# documentation-folder for more details. See also http://www.thevirtualbrain.org
#
# (c) 2012-2023, Baycrest Centre for Geriatric Care ("Baycrest") and others
#
# This program is free software: you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software Foundation,
# either version 3 of the License, or (at your option) any later version.
# This program is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
# PARTICULAR PURPOSE.  See the GNU General Public License for more details.
# You should have received a copy of the GNU General Public License along with this
# program.  If not, see <http://www.gnu.org/licenses/>.
#
#
#   CITATION:
# When using The Virtual Brain for scientific publications, please cite it as explained here:
# https://www.thevirtualbrain.org/tvb/zwei/neuroscience-publications
#
#

"""
.. moduleauthor:: Paula Popa <paula.popa@codemart.ro>
"""

import copy
import json
import uuid
import numpy

from tvb.basic.logger.builder import get_logger
from tvb.core.entities import load
from tvb.core.entities.filters.chain import FilterChain
from tvb.core.entities.model.model_burst import BurstConfiguration
from tvb.core.entities.model.model_datatype import DataTypeGroup
from tvb.core.entities.storage import dao
from tvb.core.neocom import h5
from tvb.core.services.algorithm_service import AlgorithmService
from tvb.core.services.burst_service import BurstService
from tvb.core.services.exceptions import BurstServiceException
from tvb.core.services.operation_service import OperationService, GROUP_BURST_PENDING
from tvb.simulator.integrators import IntegratorStochastic
from tvb.storage.storage_interface import StorageInterface


[docs]class SimulatorService(object): def __init__(self): self.logger = get_logger(self.__class__.__module__) self.burst_service = BurstService() self.operation_service = OperationService() self.algorithm_service = AlgorithmService() self.storage_interface = StorageInterface() @staticmethod def _reset_model(session_stored_simulator): session_stored_simulator.model = type(session_stored_simulator.model)() vi_indexes = session_stored_simulator.determine_indexes_for_chosen_vars_of_interest() vi_indexes = numpy.array(list(vi_indexes.values())) for monitor in session_stored_simulator.monitors: monitor.variables_of_interest = vi_indexes
[docs] def reset_at_connectivity_change(self, is_simulator_copy, form, session_stored_simulator): """ In case the user copies a simulation and changes the Connectivity, we want to reset the Model and Noise parameters because they might not fit to the new Connectivity's nr of regions. """ if is_simulator_copy and form.connectivity.value != session_stored_simulator.connectivity: self._reset_model(session_stored_simulator) if issubclass(type(session_stored_simulator.integrator), IntegratorStochastic): session_stored_simulator.integrator.noise = type(session_stored_simulator.integrator.noise)()
[docs] def reset_at_surface_change(self, is_simulator_copy, form, session_stored_simulator): """ In case the user copies a surface-simulation and changes the Surface, we want to reset the Model parameters because they might not fit to the new Surface's nr of vertices. """ if is_simulator_copy and (session_stored_simulator.surface is None and form.surface.value or session_stored_simulator.surface and form.surface.value != session_stored_simulator.surface.surface_gid): self._reset_model(session_stored_simulator)
@staticmethod def _set_simulator_range_parameter(simulator, range_parameter_name, range_parameter_value): range_param_name_list = range_parameter_name.split('.') current_attr = simulator for param_name in range_param_name_list[:len(range_param_name_list) - 1]: current_attr = getattr(current_attr, param_name) setattr(current_attr, range_param_name_list[-1], range_parameter_value)
[docs] def async_launch_and_prepare_simulation(self, burst_config, user, project, simulator_algo, simulator): try: operation = self.operation_service.prepare_operation(user.id, project, simulator_algo, view_model=simulator, burst_gid=burst_config.gid, op_group_id=burst_config.fk_operation_group) burst_config = self.burst_service.update_simulation_fields(burst_config, operation.id, simulator.gid) self.burst_service.store_burst_configuration(burst_config) wf_errs = 0 try: OperationService().launch_operation(operation.id, True) return operation except Exception as excep: self.logger.error(excep) wf_errs += 1 if burst_config: self.burst_service.mark_burst_finished(burst_config, error_message=str(excep)) self.logger.debug("Finished launching workflow. The operation was launched successfully, " + str(wf_errs) + " had error on pre-launch steps") except Exception as excep: self.logger.error(excep) if burst_config: self.burst_service.mark_burst_finished(burst_config, error_message=str(excep))
[docs] def prepare_simulation_on_server(self, user_id, project, algorithm, zip_folder_path, simulator_file): simulator_vm = h5.load_view_model_from_file(simulator_file) operation = self.operation_service.prepare_operation(user_id, project, algorithm, view_model=simulator_vm) self.async_launch_simulation_on_server(operation, zip_folder_path) return operation
[docs] def async_launch_simulation_on_server(self, operation, zip_folder_path): try: OperationService().launch_operation(operation.id, True) return operation except Exception as excep: self.logger.error(excep) finally: self.storage_interface.remove_folder(zip_folder_path)
@staticmethod def _set_range_param_in_dict(param_value): if type(param_value) is numpy.ndarray: return param_value[0] elif isinstance(param_value, uuid.UUID): return param_value.hex else: return param_value
[docs] def async_launch_and_prepare_pse(self, burst_config, user, project, simulator_algo, range_param1, range_param2, session_stored_simulator): try: algo_category = simulator_algo.algorithm_category operation_group = burst_config.operation_group metric_operation_group = burst_config.metric_operation_group range_param2_values = [None] if range_param2: range_param2_values = range_param2.get_range_values() GROUP_BURST_PENDING[burst_config.id] = True operations, pse_canceled = self._prepare_operations(algo_category, burst_config, metric_operation_group, operation_group, project, range_param1, range_param2, range_param2_values, session_stored_simulator, simulator_algo, user) GROUP_BURST_PENDING[burst_config.id] = False if pse_canceled: return wf_errs = self._launch_operations(operations, burst_config) self.logger.debug("Finished launching workflows. " + str(len(operations) - wf_errs) + " were launched successfully, " + str(wf_errs) + " had error on pre-launch steps") return operations[0] if len(operations) > 0 else None except Exception as excep: self.logger.error(excep) self.burst_service.mark_burst_finished(burst_config, error_message=str(excep))
def _launch_operations(self, operations, burst_config): wf_errs = 0 for operation in operations: try: burst_config = dao.get_burst_by_id(burst_config.id) if burst_config is None or burst_config.status in [BurstConfiguration.BURST_CANCELED, BurstConfiguration.BURST_ERROR]: self.logger.debug("Preparing operations cannot continue. Burst config {}".format( burst_config)) return OperationService().launch_operation(operation.id, True) except Exception as excep: self.logger.error(excep) wf_errs += 1 self.burst_service.mark_burst_finished(burst_config, error_message=str(excep)) return wf_errs def _prepare_operations(self, algo_category, burst_config, metric_operation_group, operation_group, project, range_param1, range_param2, range_param2_values, session_stored_simulator, simulator_algo, user): first_simulator = None pse_canceled = False operations = [] for param1_value in range_param1.get_range_values(): for param2_value in range_param2_values: burst_config = dao.get_burst_by_id(burst_config.id) if burst_config is None: self.logger.debug("Burst config was deleted") pse_canceled = True break if burst_config.status in [BurstConfiguration.BURST_CANCELED, BurstConfiguration.BURST_ERROR]: self.logger.debug("Current burst status is {}. Preparing operations cannot continue.".format( burst_config.status)) pse_canceled = True break # Copy, but generate a new GUID for every Simulator in PSE simulator = copy.deepcopy(session_stored_simulator) simulator.gid = uuid.uuid4() self._set_simulator_range_parameter(simulator, range_param1.name, param1_value) ranges = {range_param1.name: self._set_range_param_in_dict(param1_value)} if param2_value is not None: self._set_simulator_range_parameter(simulator, range_param2.name, param2_value) ranges[range_param2.name] = self._set_range_param_in_dict(param2_value) ranges = json.dumps(ranges) simulator.range_values = ranges operation = self.operation_service.prepare_operation(user.id, project, simulator_algo, view_model=simulator, ranges=ranges, burst_gid=burst_config.gid, op_group_id=burst_config.fk_operation_group) operations.append(operation) if first_simulator is None: first_simulator = simulator burst_config = self.burst_service.update_simulation_fields(burst_config, operation.id, first_simulator.gid) self.burst_service.store_burst_configuration(burst_config) datatype_group = DataTypeGroup(operation_group, operation_id=operation.id, fk_parent_burst=burst_config.gid, state=algo_category.defaultdatastate) dao.store_entity(datatype_group) metrics_datatype_group = DataTypeGroup(metric_operation_group, fk_parent_burst=burst_config.gid, state=algo_category.defaultdatastate) dao.store_entity(metrics_datatype_group) return operations, pse_canceled
[docs] @staticmethod def compute_conn_branch_conditions(is_branch, simulator): if not is_branch: return None conn = load.load_entity_by_gid(simulator.connectivity) if conn.number_of_regions: return FilterChain(fields=[FilterChain.datatype + '.number_of_regions'], operations=["=="], values=[conn.number_of_regions])
[docs] @staticmethod def validate_first_fragment(form, project_id, conn_idx): conn_count = dao.count_datatypes(project_id, conn_idx) if conn_count == 0: form.connectivity.errors.append("No connectivity in the project! Simulation cannot be started without " "a connectivity!")
[docs] def get_simulation_state_index(self, burst_config, simulation_history_class): parent_burst = burst_config.parent_burst_object simulation_state_index = dao.get_generic_entity(simulation_history_class, parent_burst.gid, "fk_parent_burst") if simulation_state_index is None or len(simulation_state_index) < 1: exc = BurstServiceException("Simulation State not found for %s, thus we are unable to branch from " "it!" % burst_config.name) self.logger.error(exc) raise exc return simulation_state_index