# -*- coding: utf-8 -*-
#
#
# TheVirtualBrain-Framework Package. This package holds all Data Management, and
# Web-UI helpful to run brain-simulations. To use it, you also need to download
# TheVirtualBrain-Scientific Package (for simulators). See content of the
# documentation-folder for more details. See also http://www.thevirtualbrain.org
#
# (c) 2012-2023, Baycrest Centre for Geriatric Care ("Baycrest") and others
#
# This program is free software: you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software Foundation,
# either version 3 of the License, or (at your option) any later version.
# This program is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
# PARTICULAR PURPOSE. See the GNU General Public License for more details.
# You should have received a copy of the GNU General Public License along with this
# program. If not, see <http://www.gnu.org/licenses/>.
#
#
# CITATION:
# When using The Virtual Brain for scientific publications, please cite it as explained here:
# https://www.thevirtualbrain.org/tvb/zwei/neuroscience-publications
#
#
"""
DAO operations related to Algorithms and User Operations are defined here.
.. moduleauthor:: Lia Domide <lia.domide@codemart.ro>
.. moduleauthor:: Bogdan Neacsa <bogdan.neacsa@codemart.ro>
"""
from sqlalchemy import and_
from sqlalchemy import func as func
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.orm.exc import NoResultFound
from sqlalchemy.sql.expression import case as case_, desc
from tvb.core.entities.model.model_datatype import DataType
from tvb.core.entities.model.model_operation import *
from tvb.core.entities.model.model_operation import STATUS_ERROR
from tvb.core.entities.storage.root_dao import RootDAO, DEFAULT_PAGE_SIZE
[docs]class OperationDAO(RootDAO):
"""
OPERATION RELATED METHODS
"""
[docs] def get_operation_by_id(self, operation_id):
"""Retrieve OPERATION entity for a given Identifier."""
operation = self.session.query(Operation).filter_by(id=operation_id).one()
# Load lazy fields:
operation.user
operation.project
operation.operation_group
operation.algorithm
return operation
[docs] def try_get_operation_by_id(self, operation_id):
"""
Try to call self.get_operation_by_id, but when operation was not found, instead of failing, return None.
This could be called from situations like: stopping & removing op.
A check for None is compulsory after this call!
"""
try:
return self.get_operation_by_id(operation_id)
except SQLAlchemyError:
self.logger.exception("Operation not found for ID %s, we will return None" % operation_id)
return None
[docs] def get_operation_by_gid(self, operation_gid):
"""Retrieve OPERATION entity for a given gid."""
try:
operation = self.session.query(Operation).filter_by(gid=operation_gid).one()
operation.user
operation.project
operation.operation_group
operation.algorithm.algorithm_category
return operation
except SQLAlchemyError:
self.logger.exception("When fetching gid %s" % operation_gid)
return None
[docs] def get_operation_lazy_by_gid(self, operation_gid):
"""Retrieve OPERATION entity for a given gid."""
try:
return self.session.query(Operation).filter_by(gid=operation_gid).one()
except SQLAlchemyError:
self.logger.exception("When fetching gid %s" % operation_gid)
return None
[docs] def get_all_operations_for_uploaders(self, project_id):
"""
Returns all finished upload operations.
"""
try:
result = self.session.query(Operation).join(Algorithm).join(AlgorithmCategory).filter(
AlgorithmCategory.rawinput == True).filter(
Operation.fk_launched_in == project_id).filter(
Operation.status == STATUS_FINISHED).all()
return result
except SQLAlchemyError as excep:
self.logger.exception(excep)
return None
[docs] def get_operations(self, status=None, algorithm_classname="SimulatorAdapter"):
if status is None:
status = [STATUS_PENDING, STATUS_STARTED]
try:
result = self.session.query(Operation).join(Algorithm) \
.filter(Algorithm.classname == algorithm_classname) \
.filter(Operation.status.in_(status)).all()
return result
except SQLAlchemyError as excep:
self.logger.exception(excep)
return None
[docs] def get_operations_for_hpc_job(self):
status = [STATUS_PENDING, STATUS_STARTED]
algorithm_classname = "SimulatorAdapter"
queue_full = False
try:
result = self.session.query(Operation).join(Algorithm) \
.filter(Algorithm.classname == algorithm_classname) \
.filter(Operation.status.in_(status)) \
.filter(Operation.queue_full == queue_full).all()
return result
except SQLAlchemyError as excep:
self.logger.exception(excep)
return None
[docs] def is_upload_operation(self, operation_gid):
"""
Returns True only if the operation with the given gid is an upload operation.
"""
try:
result = self.session.query(Operation).join(Algorithm).join(AlgorithmCategory
).filter(AlgorithmCategory.rawinput == True
).filter(Operation.gid == operation_gid).count()
return result > 0
except SQLAlchemyError:
return False
[docs] def count_resulted_datatypes(self, operation_id):
"""
Returns the number of resulted datatypes from the specified operation.
"""
try:
result = self.session.query(DataType).filter_by(fk_from_operation=operation_id).count()
return result
except SQLAlchemyError as excep:
self.logger.exception(excep)
return None
[docs] def get_operation_process_for_operation(self, operation_id):
"""
Get the OperationProcessIdentifier for this operation id.
"""
try:
result = self.session.query(OperationProcessIdentifier
).filter(OperationProcessIdentifier.fk_from_operation == operation_id
).one()
except NoResultFound:
self.logger.debug("No operation process found for operation id=%s." % (str(operation_id),))
result = None
except SQLAlchemyError as excep:
self.logger.exception(excep)
result = None
return result
[docs] def get_operations_with_error_in_project(self, project_id):
"""
Retrieve OPERATION with errors entities for a given project.
"""
result = None
try:
query = self.session.query(Operation).filter_by(fk_launched_in=project_id).filter(
Operation.status == STATUS_ERROR)
result = query.all()
except SQLAlchemyError as excep:
self.logger.exception(excep)
return result
[docs] def get_operations_in_group(self, operation_group_id, is_count=False,
only_first_operation=False, only_gids=False):
"""
Retrieve OPERATION entities for a given group.
"""
result = None
try:
query = self.session.query(Operation)
if only_gids:
query = self.session.query(Operation.gid)
query = query.filter_by(fk_operation_group=operation_group_id)
if is_count:
result = query.count()
elif only_first_operation:
result = query.first()
else:
result = query.all()
except SQLAlchemyError as excep:
self.logger.exception(excep)
return result
[docs] def compute_disk_size_for_started_ops(self, user_id):
""" Get all the disk space that should be reserved for the started operations of this user. """
try:
expected_hdd_size = self.session.query(func.sum(Operation.estimated_disk_size)
).filter(Operation.fk_launched_by == user_id
).filter(Operation.status == STATUS_STARTED).scalar()
except SQLAlchemyError as excep:
self.logger.exception(excep)
expected_hdd_size = 0
return expected_hdd_size or 0
[docs] def get_filtered_operations(self, project_id, filter_chain, page_start=0,
page_size=DEFAULT_PAGE_SIZE, is_count=False):
"""
:param project_id: current project ID
:param filter_chain: instance of FilterChain
:param is_count: when True, return a number, otherwise the list of operation entities
:return a list of filtered operation in current project, page by page, or the total count for them.
"""
try:
select_clause = self.session.query(func.min(Operation.id))
if not is_count:
# Do not add select columns in case of COUNT, as they will be ignored anyway
select_clause = self.session.query(func.min(Operation.id), func.max(Operation.id),
func.count(Operation.id),
func.max(Operation.fk_operation_group),
func.min(Operation.fk_from_algo),
func.max(Operation.fk_launched_by),
func.min(Operation.create_date),
func.min(Operation.start_date),
func.max(Operation.completion_date),
func.min(Operation.status),
func.max(Operation.additional_info),
func.min(case_((Operation.visible, 1), else_=0)),
func.min(Operation.user_group),
func.min(Operation.gid))
query = select_clause.join(Algorithm).join(
AlgorithmCategory).filter(Operation.fk_launched_in == project_id)
if filter_chain is not None:
filter_string = filter_chain.get_sql_filter_equivalent()
query = query.filter(eval(filter_string))
query = query.group_by(case_((Operation.fk_operation_group > 0,
- Operation.fk_operation_group), else_=Operation.id))
if is_count:
return query.count()
return query.order_by(desc(func.max(Operation.id))).offset(page_start).limit(page_size).all()
except SQLAlchemyError as excep:
self.logger.exception(excep)
return 0 if is_count else None
[docs] def get_results_for_operation(self, operation_id):
"""
Retrieve DataTypes entities, resulted after executing an operation.
"""
try:
query = self.session.query(DataType
).filter_by(fk_from_operation=operation_id
).filter(and_(DataType.type != self.EXCEPTION_DATATYPE_GROUP,
DataType.type != self.EXCEPTION_DATATYPE_SIMULATION))
query = query.order_by(DataType.id)
result = query.all()
for dt in result:
dt.display_name
return result
except SQLAlchemyError as excep:
self.logger.exception(excep)
return None
[docs] def set_operation_and_group_visibility(self, entity_gid, is_visible, is_operation_group=False):
"""
Sets the operation visibility.
If 'is_operation_group' is True than this method will change the visibility for all
the operation from the OperationGroup with the GID field equal to 'entity_gid'.
"""
try:
query = self.session.query(Operation)
if is_operation_group:
group = self.get_operationgroup_by_gid(entity_gid)
query = query.filter(Operation.fk_operation_group == group.id)
else:
query = query.filter(Operation.gid == entity_gid)
query.update({"visible": is_visible})
self.session.commit()
except SQLAlchemyError as excep:
self.logger.exception(excep)
[docs] def get_operationgroup_by_gid(self, gid):
"""Retrieve by GID"""
try:
result = self.session.query(OperationGroup).filter_by(gid=gid).one()
return result
except SQLAlchemyError:
return None
[docs] def get_operationgroup_by_id(self, op_group_id):
"""Retrieve by ID"""
try:
result = self.session.query(OperationGroup).filter_by(id=op_group_id).one()
return result
except SQLAlchemyError:
return None
[docs] def get_operation_numbers(self, proj_id):
"""
Count total number of operations started for current project.
"""
stats = self.session.query(Operation.status, func.count(Operation.id)
).filter_by(fk_launched_in=proj_id
).group_by(Operation.status).all()
stats = dict(stats)
finished = stats.get(STATUS_FINISHED, 0)
started = stats.get(STATUS_STARTED, 0)
failed = stats.get(STATUS_ERROR, 0)
canceled = stats.get(STATUS_CANCELED, 0)
pending = stats.get(STATUS_PENDING, 0)
return finished, started, failed, canceled, pending
#
# CATEGORY RELATED METHODS
#
[docs] def get_algorithm_categories(self):
"""Retrieve all existent categories of Algorithms."""
try:
categories = self.session.query(AlgorithmCategory).distinct().order_by(
AlgorithmCategory.displayname).all()
except SQLAlchemyError as excep:
self.logger.exception(excep)
categories = []
return categories
[docs] def get_uploader_categories(self):
"""Retrieve categories with raw_input = true"""
try:
result = self.session.query(AlgorithmCategory).filter_by(rawinput=True).all()
except SQLAlchemyError as excep:
self.logger.exception(excep)
result = []
return result
[docs] def get_raw_categories(self):
"""Retrieve categories with raw_input = true"""
try:
result = self.session.query(AlgorithmCategory).filter_by(defaultdatastate='RAW_DATA').all()
except SQLAlchemyError as excep:
self.logger.exception(excep)
result = []
return result
[docs] def get_visualisers_categories(self):
"""Retrieve categories with display = true"""
try:
result = self.session.query(AlgorithmCategory).filter_by(display=True).all()
except SQLAlchemyError as excep:
self.logger.exception(excep)
result = []
return result
[docs] def get_launchable_categories(self, elimin_viewers=False):
"""Retrieve algorithm categories which can be launched on right-click (optionally filter visualizers)"""
try:
result = self.session.query(AlgorithmCategory).filter_by(launchable=True)
if elimin_viewers:
result = result.filter_by(display=False)
result = result.all()
except SQLAlchemyError as excep:
self.logger.exception(excep)
result = []
return result
[docs] def get_category_by_id(self, categ_id):
"""Retrieve category with given id"""
try:
result = self.session.query(AlgorithmCategory).filter_by(id=categ_id).one()
except SQLAlchemyError as excep:
self.logger.exception(excep)
result = None
return result
[docs] def filter_category(self, displayname, rawinput, display, launchable, order_nr):
"""Retrieve category with given id"""
try:
result = self.session.query(AlgorithmCategory
).filter_by(displayname=displayname
).filter_by(rawinput=rawinput).filter_by(display=display
).filter_by(launchable=launchable).filter_by(order_nr=order_nr).one()
return result
except NoResultFound:
return None
#
# ALGORITHM RELATED METHODS
#
[docs] def get_all_algorithms(self):
try:
result = self.session.query(Algorithm).distinct().all()
return result
except SQLAlchemyError as ex:
self.logger.exception(ex)
return None
[docs] def get_algorithm_by_id(self, algorithm_id):
try:
result = self.session.query(Algorithm).filter_by(id=algorithm_id).one()
result.algorithm_category
return result
except SQLAlchemyError as ex:
self.logger.exception(ex)
return None
[docs] def get_algorithm_by_module(self, module_name, class_name):
try:
result = self.session.query(Algorithm).filter_by(module=module_name, classname=class_name).one()
result.algorithm_category
return result
except SQLAlchemyError:
return None
[docs] def get_applicable_adapters(self, compatible_class_names, launch_categ):
"""
Retrieve a list of algorithms in a given list of categories with a given dataType classes as required input.
"""
try:
return self.session.query(Algorithm
).filter_by(removed=False
).filter(Algorithm.fk_category.in_(launch_categ)
).filter(Algorithm.required_datatype.in_(compatible_class_names)
).order_by(Algorithm.fk_category
).order_by(Algorithm.group_name).all()
except SQLAlchemyError:
self.logger.exception("Could not retrieve applicable Adapters ...")
return []
[docs] def get_adapters_from_categories(self, categories):
"""
Retrieve a list of stored adapters in the given categories.
"""
try:
return self.session.query(Algorithm
).filter_by(removed=False
).filter(Algorithm.fk_category.in_(categories)
).order_by(Algorithm.group_name
).order_by(Algorithm.displayname).all()
except SQLAlchemyError:
self.logger.exception("Could not retrieve Adapters ...")
return []
#
# RESULT FIGURE RELATED CODE
#
[docs] def get_previews(self, project_id, user_id=None, selected_session_name='all_sessions'):
"""
This method returns a tuple of 2 elements. The first element represents a dictionary
of form {'$session_name': [list_of_figures]}. This dictionary contains data only for the selected self.session.
If the selected session is 'all_sessions' than it will contain data for all the sessions.
The second element of the returned tuple is a dictionary of form
{'$session_name': $no_of_figures_in_this_session, ...}.
This dictionary contains information about all the sessions.
selected_session_name - represents the name of the session for which you
want to obtain the stored figures.
"""
try:
previews_info = self._get_previews_info(project_id, user_id)
if selected_session_name == 'all_sessions':
session_names = list(previews_info)
session_names.sort()
else:
session_names = [selected_session_name]
result = {}
for session_name in session_names:
figures_list = self.session.query(ResultFigure
).filter_by(fk_in_project=project_id
).filter_by(session_name=session_name)
if user_id is not None:
figures_list = figures_list.filter_by(fk_for_user=user_id)
figures_list = figures_list.order_by(desc(ResultFigure.id)).all()
# Force loading of project and operation - needed to compute image path
for figure in figures_list:
figure.project
result[session_name] = figures_list
return result, previews_info
except SQLAlchemyError as excep:
self.logger.exception(excep)
return {}, {}
def _get_previews_info(self, project_id, user_id):
"""
Returns a dictionary of form: {$session_name: $no_of_images_in_this_session, ...}.
"""
try:
result = {}
session_items = self.session.query(ResultFigure.session_name,
func.count(ResultFigure.session_name)
).filter_by(fk_in_project=project_id)
if user_id is not None:
session_items = session_items.filter_by(fk_for_user=user_id)
session_items = session_items.group_by(ResultFigure.session_name
).order_by(ResultFigure.session_name).all()
for item in session_items:
result[item[0]] = item[1]
return result
except SQLAlchemyError as excep:
self.logger.exception(excep)
return {}