# -*- coding: utf-8 -*-
#
#
# TheVirtualBrain-Framework Package. This package holds all Data Management, and
# Web-UI helpful to run brain-simulations. To use it, you also need to download
# TheVirtualBrain-Scientific Package (for simulators). See content of the
# documentation-folder for more details. See also http://www.thevirtualbrain.org
#
# (c) 2012-2023, Baycrest Centre for Geriatric Care ("Baycrest") and others
#
# This program is free software: you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software Foundation,
# either version 3 of the License, or (at your option) any later version.
# This program is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
# PARTICULAR PURPOSE. See the GNU General Public License for more details.
# You should have received a copy of the GNU General Public License along with this
# program. If not, see <http://www.gnu.org/licenses/>.
#
#
# CITATION:
# When using The Virtual Brain for scientific publications, please cite it as explained here:
# https://www.thevirtualbrain.org/tvb/zwei/neuroscience-publications
#
#
"""
Here we define entities for Operations and Algorithms.
.. moduleauthor:: Lia Domide <lia.domide@codemart.ro>
.. moduleauthor:: Bogdan Neacsa <bogdan.neacsa@codemart.ro>
.. moduleauthor:: Yann Gordon <yann@tvb.invalid>
"""
import json
from datetime import datetime
from sqlalchemy import Boolean, Integer, String, DateTime, Column, ForeignKey, UniqueConstraint
from sqlalchemy.orm import relationship, backref
from tvb.basic.logger.builder import get_logger
from tvb.config import TVB_IMPORTER_CLASS, TVB_IMPORTER_MODULE
from tvb.core.entities.exportable import Exportable
from tvb.core.entities.model.model_project import Project, User
from tvb.core.entities.transient.range_parameter import RangeParameter
from tvb.core.neotraits.db import Base
from tvb.core.utils import generate_guid, string2date, string2bool, date2string, LESS_COMPLEX_TIME_FORMAT
LOG = get_logger(__name__)
[docs]class AlgorithmCategory(Base):
"""
A category that will have different boolean attributes
e.g.: launchable|rawinput|
display, a displayName and a default state for data.
"""
__tablename__ = 'ALGORITHM_CATEGORIES'
id = Column(Integer, primary_key=True)
displayname = Column(String)
launchable = Column(Boolean)
rawinput = Column(Boolean)
display = Column(Boolean)
defaultdatastate = Column(String)
order_nr = Column(Integer)
last_introspection_check = Column(DateTime)
removed = Column(Boolean, default=False)
def __init__(self, displayname, launchable=False, rawinput=False, display=False, defaultdatastate='',
order_nr='999', last_introspection_check=None):
self.displayname = displayname
self.launchable = launchable
self.rawinput = rawinput
self.display = display
self.defaultdatastate = defaultdatastate
self.order_nr = order_nr
self.last_introspection_check = last_introspection_check
self.removed = False
def __repr__(self):
return "<AlgorithmCategory('%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s')>" % (
self.id, self.displayname, self.launchable, self.rawinput, self.display,
self.defaultdatastate, self.order_nr, self.last_introspection_check, self.removed)
def __hash__(self):
return hash((self.displayname, self.launchable, self.rawinput,
self.display, self.defaultdatastate, self.order_nr))
def __eq__(self, other):
return (isinstance(other, AlgorithmCategory) and self.displayname == other.displayname and
self.launchable == other.launchable and self.rawinput == other.rawinput and
self.display == other.display and self.defaultdatastate == other.defaultdatastate)
[docs]class AlgorithmTransientGroup(object):
def __init__(self, name, description, subsection=None):
self.name = name
self.description = description
self.children = []
self.subsection = subsection
[docs]class Algorithm(Base):
__tablename__ = 'ALGORITHMS'
id = Column(Integer, primary_key=True)
module = Column(String)
classname = Column(String)
fk_category = Column(Integer, ForeignKey('ALGORITHM_CATEGORIES.id', ondelete="CASCADE"))
group_name = Column(String)
group_description = Column(String)
displayname = Column(String)
description = Column(String)
subsection_name = Column(String)
required_datatype = Column(String)
datatype_filter = Column(String)
parameter_name = Column(String)
outputlist = Column(String)
last_introspection_check = Column(DateTime)
removed = Column(Boolean, default=False)
algorithm_category = relationship(AlgorithmCategory,
backref=backref('ALGORITHMS', order_by=id, cascade="delete, all"))
# explicit/composite unique constraint. 'name' is optional.
UniqueConstraint("module", "classname", name="uq_class")
def __init__(self, module, classname, category_key, group_name=None, group_description=None,
display_name='', description="", subsection_name=None, last_introspection_check=None):
# type: (str, str, int, str, str, str, str, str, DateTime) -> None
self.module = module
self.classname = classname
self.fk_category = category_key
self.group_name = group_name
self.group_description = group_description
self.displayname = display_name
self.description = description
self.last_introspection_check = last_introspection_check
self.removed = False
if subsection_name is not None:
self.subsection_name = subsection_name
else:
self.subsection_name = self.module.split('.')[-1].replace('_adapter', '')
def __repr__(self):
return "<Algorithm('%s', '%s', '%s', '%d', '%s', '%s', '%s', '%s', '%s')>" % (
self.id, self.module, self.classname, self.fk_category, self.displayname,
self.subsection_name, self.group_name, self.group_description, self.removed)
[docs]class OperationGroup(Base, Exportable):
"""
We use this group entity, to map in DB a group of operations started
in the same time by the user
"""
__tablename__ = "OPERATION_GROUPS"
id = Column(Integer, primary_key=True)
name = Column(String)
range1 = Column(String)
range2 = Column(String)
range3 = Column(String)
gid = Column(String)
fk_launched_in = Column(Integer, ForeignKey('PROJECTS.id', ondelete="CASCADE"))
project = relationship(Project, backref=backref('OPERATION_GROUPS', order_by=id, cascade="all,delete"))
def __init__(self, project_id, name='incomplete', ranges=None, gid=None):
self.name = name
if ranges:
if len(ranges) > 0:
self.range1 = ranges[0]
if len(ranges) > 1:
self.range2 = ranges[1]
if len(ranges) > 2:
self.range3 = ranges[2]
self.gid = gid or generate_guid()
self.fk_launched_in = project_id
def __repr__(self):
return "<OperationGroup(%s,%s)>" % (self.name, self.gid)
@property
def range_references(self):
"""Memorized range starter"""
ranges = [self.range1]
if self.range2 and self.range2 != 'null':
ranges.append(self.range2)
if self.range3 and self.range3 != 'null':
ranges.append(self.range3)
return ranges
[docs] def fill_operationgroup_name(self, entities_in_group):
"""
Display name for UI.
"""
new_name = "of " + entities_in_group + " varying "
if self.range1 is not None:
range_param1 = RangeParameter.from_json(self.range1)
new_name += range_param1.name
if self.range2 is not None:
range_param2 = RangeParameter.from_json(self.range2)
new_name += " x " + range_param2.name
if self.range3 is not None:
range_param3 = RangeParameter.from_json(self.range3)
new_name += " x " + range_param3.name
new_name += " - " + date2string(datetime.now(), date_format=LESS_COMPLEX_TIME_FORMAT)
self.name = new_name
# Possible values for Operation.status field
STATUS_FINISHED = "5-FINISHED"
STATUS_PENDING = "4-PENDING"
STATUS_STARTED = "3-STARTED"
STATUS_CANCELED = "2-CANCELED"
STATUS_ERROR = "1-ERROR"
OperationPossibleStatus = [STATUS_FINISHED, STATUS_PENDING, STATUS_STARTED, STATUS_CANCELED, STATUS_ERROR]
[docs]def has_finished(status):
""" Is the given status indicating a finished operation? """
return status in [STATUS_ERROR, STATUS_CANCELED, STATUS_FINISHED]
[docs]class Operation(Base, Exportable):
"""
The class used to log any action executed in Projects.
"""
__tablename__ = 'OPERATIONS'
id = Column(Integer, primary_key=True)
fk_launched_by = Column(Integer, ForeignKey('USERS.id'))
fk_launched_in = Column(Integer, ForeignKey('PROJECTS.id', ondelete="CASCADE"))
fk_from_algo = Column(Integer, ForeignKey('ALGORITHMS.id'))
fk_operation_group = Column(Integer, ForeignKey('OPERATION_GROUPS.id', ondelete="CASCADE"), default=None)
gid = Column(String)
view_model_gid = Column(String)
create_date = Column(DateTime) # Date at which the user generated this entity
start_date = Column(DateTime) # Actual time when the operation executions is started (without queue time)
completion_date = Column(DateTime) # Time when the operation got status FINISHED/ ERROR or CANCEL set.
status = Column(String, index=True)
visible = Column(Boolean, default=True)
additional_info = Column(String)
user_group = Column(String, default=None)
range_values = Column(String, default=None)
estimated_disk_size = Column(Integer)
view_model_disk_size = Column(Integer, default=0)
# This flag is set to true for operations which cannot start a new Thread because of the full LOCKS_QUEUE.
queue_full = Column(Boolean, default=False)
algorithm = relationship(Algorithm)
project = relationship(Project, backref=backref('OPERATIONS', order_by=id, cascade="all,delete"))
operation_group = relationship(OperationGroup)
user = relationship(User)
def __init__(self, view_model_gid, fk_launched_by, fk_launched_in, fk_from_algo,
status=STATUS_PENDING, start_date=None, completion_date=None, op_group_id=None, additional_info='',
user_group=None, range_values=None, estimated_disk_size=0):
self.fk_launched_by = fk_launched_by
self.fk_launched_in = fk_launched_in
self.fk_from_algo = fk_from_algo
self.view_model_gid = view_model_gid
self.create_date = datetime.now()
self.start_date = start_date
self.completion_date = completion_date
self.status = status
self.visible = True
self.fk_operation_group = op_group_id
self.range_values = range_values
self.user_group = user_group
self.additional_info = additional_info
self.gid = generate_guid()
self.estimated_disk_size = estimated_disk_size
def __repr__(self):
return "<Operation('%s', %s, %s,'%s','%s','%s','%s', '%s','%s',%s, '%s', '%s', '%s', %s)>" \
% (self.view_model_gid, self.gid, self.fk_launched_by, self.fk_launched_in, self.fk_from_algo,
self.create_date, self.start_date, self.completion_date, self.status, self.visible,
self.fk_operation_group, self.user_group, self.additional_info, self.estimated_disk_size)
[docs] def start_now(self):
""" Update Operation fields at startup: Status and Date"""
self.start_date = datetime.now()
self.status = STATUS_STARTED
[docs] def mark_complete(self, status, additional_info=None):
""" Update Operation fields on completion: Status and Date"""
self.completion_date = datetime.now()
if additional_info is not None:
self.additional_info = additional_info
self.status = status
@property
def has_finished(self):
return has_finished(self.status)
[docs] def to_dict(self):
"""
Overwrite superclass method to add required changes.
"""
_, base_dict = super(Operation, self).to_dict(excludes=['id', 'fk_launched_by', 'user', 'fk_launched_in',
'project', 'fk_from_algo', 'algorithm',
'fk_operation_group', 'operation_group'])
base_dict['fk_launched_in'] = self.project.gid
base_dict['fk_from_algo'] = json.dumps(dict(module=self.algorithm.module,
classname=self.algorithm.classname))
# We keep the information for the operation_group in this place (on each operation)
# because we don't have an XML file for the operation_group entity.
# We don't want to keep the information about the operation groups into the project XML file
# because it may be opened from different places and may produce conflicts.
if self.operation_group:
base_dict['fk_operation_group'] = json.dumps(self.operation_group.to_dict()[1])
return self.__class__.__name__, base_dict
# TODO: Fix this hackish dao pass
[docs] def from_dict(self, dictionary, dao, user_id=None, project_gid=None):
"""
Add specific attributes from a input dictionary.
"""
# If user id was specified try to load it, otherwise use System account
user = dao.get_system_user() if user_id is None else dao.get_user_by_id(user_id)
self.fk_launched_by = user.id
# Find parent Project
prj_to_load = project_gid if project_gid is not None else dictionary['fk_launched_in']
parent_project = dao.get_project_by_gid(prj_to_load)
self.fk_launched_in = parent_project.id
self.project = parent_project
# Find parent Algorithm
source_algorithm = json.loads(dictionary['fk_from_algo'])
algorithm = dao.get_algorithm_by_module(source_algorithm['module'], source_algorithm['classname'])
if algorithm:
self.algorithm = algorithm
self.fk_from_algo = algorithm.id
else:
# The algorithm that produced this operation no longer exists most likely due to
# exported operation from different version. Fallback to tvb importer.
LOG.warning("Algorithm group %s was not found in DB. Most likely cause is that archive was exported "
"from a different TVB version. Using fallback TVB_Importer as source of "
"this operation." % (source_algorithm['module'],))
algorithm = dao.get_algorithm_by_module(TVB_IMPORTER_MODULE, TVB_IMPORTER_CLASS)
self.fk_from_algo = algorithm.id
dictionary['additional_info'] = ("The original parameters for this operation were: \nAdapter: %s "
"\nParameters %s" % (source_algorithm['module'] + '.' +
source_algorithm['classname'],
dictionary['parameters']))
# Find OperationGroup, if any
if 'fk_operation_group' in dictionary:
group_dict = json.loads(dictionary['fk_operation_group'])
op_group = None
if group_dict:
op_group = dao.get_operationgroup_by_gid(group_dict['gid'])
if not op_group:
name = group_dict['name']
ranges = [group_dict['range1'], group_dict['range2'], group_dict['range3']]
gid = group_dict['gid']
op_group = OperationGroup(self.fk_launched_in, name, ranges)
op_group.gid = gid
op_group = dao.store_entity(op_group)
self.operation_group = op_group
self.fk_operation_group = op_group.id
else:
self.operation_group = None
self.fk_operation_group = None
self.meta_data = dictionary['meta_data']
self.create_date = string2date(dictionary['create_date'])
if dictionary['start_date'] != "None":
self.start_date = string2date(dictionary['start_date'])
if dictionary['completion_date'] != "None":
self.completion_date = string2date(dictionary['completion_date'])
self.status = self._parse_status(dictionary['status'])
self.visible = string2bool(dictionary['visible'])
self.range_values = dictionary['range_values']
self.user_group = dictionary['user_group']
self.additional_info = dictionary['additional_info']
self.gid = dictionary['gid']
return self, dictionary['parameters'], dictionary['fk_from_algo']
def _parse_status(self, status):
"""
To keep backwards compatibility, when we import an operation that did not have new
operation status.
"""
if 'FINISHED' in status:
return STATUS_FINISHED
elif 'ERROR' in status:
return STATUS_ERROR
elif 'CANCELED' in status:
return STATUS_CANCELED
elif 'STARTED' in status:
return STATUS_STARTED
return STATUS_PENDING
[docs]class OperationProcessIdentifier(Base):
"""
Class for storing for each operation the process identifier under which
it was launched so any operation can be stopped from tvb.
"""
__tablename__ = "OPERATION_PROCESS_IDENTIFIERS"
id = Column(Integer, primary_key=True)
fk_from_operation = Column(Integer, ForeignKey('OPERATIONS.id', ondelete="CASCADE"))
pid = Column(String)
job_id = Column(String)
operation = relationship(Operation, backref=backref('OPERATION_PROCESS_IDENTIFIERS', order_by=id, cascade="delete"))
def __init__(self, operation_id, pid=None, job_id=None):
self.fk_from_operation = operation_id
self.pid = pid
self.job_id = job_id