Source code for tvb.core.entities.storage.project_dao

# -*- coding: utf-8 -*-
#
#
# TheVirtualBrain-Framework Package. This package holds all Data Management, and 
# Web-UI helpful to run brain-simulations. To use it, you also need to download
# TheVirtualBrain-Scientific Package (for simulators). See content of the
# documentation-folder for more details. See also http://www.thevirtualbrain.org
#
# (c) 2012-2023, Baycrest Centre for Geriatric Care ("Baycrest") and others
#
# This program is free software: you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software Foundation,
# either version 3 of the License, or (at your option) any later version.
# This program is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
# PARTICULAR PURPOSE.  See the GNU General Public License for more details.
# You should have received a copy of the GNU General Public License along with this
# program.  If not, see <http://www.gnu.org/licenses/>.
#
#
#   CITATION:
# When using The Virtual Brain for scientific publications, please cite it as explained here:
# https://www.thevirtualbrain.org/tvb/zwei/neuroscience-publications
#
#

"""
DAO operation related to Users and Projects are defined here.

.. moduleauthor:: Lia Domide <lia.domide@codemart.ro>
.. moduleauthor:: Bogdan Neacsa <bogdan.neacsa@codemart.ro>
"""

from sqlalchemy import or_, and_, func
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.orm.exc import NoResultFound
from sqlalchemy.sql.expression import desc
from tvb.basic.profile import TvbProfile
from tvb.core.entities.model.model_datatype import DataType, Links
from tvb.core.entities.model.model_operation import Operation
from tvb.core.entities.model.model_project import User, ROLE_ADMINISTRATOR, Project, User_to_Project
from tvb.core.entities.storage.root_dao import RootDAO, DEFAULT_PAGE_SIZE


[docs]class CaseDAO(RootDAO): """ USER and PROJECT RELATED OPERATIONS """ # # USER RELATED METHODS #
[docs] def get_user_by_id(self, user_id): """Retrieve USER entity by ID.""" user = None try: user = self.session.query(User).filter_by(id=user_id).one() except SQLAlchemyError: self.logger.exception("Could not retrieve user for id " + str(user_id)) return user
[docs] def get_user_by_name(self, name): """Retrieve USER entity by name.""" user = None try: user = self.session.query(User).filter_by(username=name).one() except SQLAlchemyError: self.logger.debug("Could not retrieve user for name " + str(name)) return user
[docs] def get_user_by_gid(self, gid): """Retrieve USER entity by gid.""" user = None try: user = self.session.query(User).filter_by(gid=gid).one() except SQLAlchemyError: self.logger.debug("Could not retrieve user for gid= " + gid) return user
[docs] def get_system_user(self): """Retrieve System user from DB.""" user = None sys_name = TvbProfile.current.web.admin.SYSTEM_USER_NAME try: user = self.session.query(User).filter_by(username=sys_name).one() except SQLAlchemyError: self.logger.exception("Could not retrieve system user " + str(sys_name)) return user
[docs] def count_users_for_name(self, name): """Retrieve the number of users in DB for a given name.""" result = self.session.query(User).filter_by(username=name).count() return result
[docs] def get_administrators(self): """Retrieve all users with Admin role""" admins = self.session.query(User).filter_by(role=ROLE_ADMINISTRATOR).all() return admins
[docs] def get_all_users(self, different_names=None, page_start=0, page_size=DEFAULT_PAGE_SIZE, is_count=False): """Retrieve all USERS in DB, except given users and system user.""" if different_names is None: different_names = [] try: sys_name = TvbProfile.current.web.admin.SYSTEM_USER_NAME query = self.session.query(User ).filter(User.username.notin_(different_names) ).filter(User.username != sys_name) if is_count: result = query.count() else: result = query.order_by(User.username).offset(max(page_start, 0)).limit(max(page_size, 0)).all() return result except NoResultFound: self.logger.warning("No users found. Maybe database is empty.") raise
[docs] def get_user_by_email(self, email, name_hint=""): """ Find a user by email address and name. :param email: Valid email address string, to search for its exact match in DB :param name_hint: string for a user's name; to search with like in DB :return: None if none or more than one users matches the criteria. """ user = None if name_hint: try: # In case of multiple users: first try to find exact match for the given username user = self.session.query(User).filter_by(email=email).filter_by(username=name_hint).one() except SQLAlchemyError: # Ignore pass if user is None: try: user = self.session.query(User).filter_by(email=email ).filter(User.username.ilike('%' + name_hint + '%')).one() except SQLAlchemyError: self.logger.exception("Could not get a single user by email " + email + " and name " + name_hint) return user
[docs] def get_user_for_datatype(self, dt_id): """Get the user who created a DT""" try: datatype = self.session.query(DataType).filter_by(id=dt_id).one() return datatype.parent_operation.user except SQLAlchemyError as ex: self.logger.exception(ex) return None
[docs] def compute_user_generated_disk_size(self, user_id): """ Do a SUM on DATA_TYPES table column DISK_SIZE, for the current user. :returns 0 when no DT are found, or SUM from DB. """ try: total_size = self.session.query(func.sum(DataType.disk_size)).join(Operation ).filter( Operation.fk_launched_by == user_id).scalar() return total_size or 0 except SQLAlchemyError as excep: self.logger.exception(excep) return -1
# # PROJECT RELATED METHODS #
[docs] def get_project_by_id(self, project_id): """Retrieve PROJECT entity for a given identifier. THROW SqlException when not found.""" prj = self.session.query(Project).filter_by(id=project_id).one() prj.administrator return prj
[docs] def get_project_by_gid(self, project_gid): """Retrieve PROJECT entity for a given identifier. THROW SqlException when not found.""" prj = self.session.query(Project).filter_by(gid=project_gid).one() prj.administrator return prj
[docs] def get_project_by_name(self, project_name): """Retrieve PROJECT entity for a given name. THROW SQLException when not found.""" prj = self.session.query(Project).filter_by(name=project_name).one() prj.administrator return prj
[docs] def get_project_lazy_by_gid(self, project_gid): """Retrieve PROJECT entity for a given identifier. THROW SqlException when not found.""" return self.session.query(Project).filter_by(gid=project_gid).one()
[docs] def delete_project(self, project_id): """Remove PROJECT entity by ID.""" project = self.session.query(Project).filter_by(id=project_id).one() self.session.delete(project) linked_users = self.session.query(User ).filter_by(selected_project=project_id).all() for user in linked_users: user.selected_project = None self.session.commit()
[docs] def get_project_disk_size(self, project_id): """ Do a SUM on DATA_TYPES table column DISK_SIZE, for the current project. :returns 0 when no DT are found, or SUM from DB. """ try: total_size_for_dt = self.session.query(func.sum(DataType.disk_size)).join(Operation ).filter( Operation.fk_launched_in == project_id).scalar() or 0 total_size = self.session.query(func.sum(Operation.view_model_disk_size)).filter( Operation.fk_launched_in == project_id).scalar() or 0 total_size = total_size + total_size_for_dt return total_size except SQLAlchemyError as excep: self.logger.exception(excep) return -1
[docs] def count_projects_for_name(self, name, different_id): """Retrieve the number of projects with a given name currently in DB.""" if different_id is not None: number = self.session.query(Project ).filter_by(name=name).filter(Project.id != different_id).count() else: number = self.session.query(Project).filter_by(name=name).count() return number
[docs] def get_all_projects(self, page_start=0, page_size=DEFAULT_PAGE_SIZE, is_count=False): """ Retrieve all Project entities currently in the system. WARNING: use this wisely, as it might easily overflow the system. """ query = self.session.query(Project) if is_count: result = query.count() else: result = query.offset(max(page_start, 0)).limit(max(page_size, 0)).all() return result
[docs] def get_projects_for_user(self, user_id, page_start=0, page_size=DEFAULT_PAGE_SIZE, is_count=False): """ Return all projects a given user can access (administrator or not). """ # First load projects that current user is administrator for. query = self.session.query(Project).join(User, Project.fk_admin == User.id ).outerjoin(User_to_Project, and_(Project.id == User_to_Project.fk_project, User_to_Project.fk_user == user_id) ).filter( or_(User.id == user_id, User_to_Project.fk_user == user_id) ).order_by(desc(Project.id)) if is_count: result = query.count() else: result = query.offset(max(page_start, 0)).limit(max(page_size, 0)).all() [project.administrator.username for project in result] return result
[docs] def get_project_for_operation(self, operation_id): """ Find parent project for current operation. THROW SqlException when not found. """ result = self.session.query(Project ).filter(Operation.fk_launched_in == Project.id ).filter(Operation.id == operation_id).one() return result
[docs] def get_linkable_projects_for_user(self, user_id, data_id): """ Return all projects a given user can link some data given by a data_id to. """ try: # Load projects where the current user is Admin or Member result = self.session.query(Project).join(User ).join(User_to_Project ).filter(or_(User.id == user_id, User_to_Project.fk_user == user_id) ).all() linked_project_ids = self.session.query(Links.fk_to_project ).filter(Links.fk_from_datatype == data_id).all() linked_project_ids = [i[0] for i in linked_project_ids] datatype = self.get_datatype_by_id(data_id) current_prj = self.session.query(Operation.fk_launched_in ).filter(Operation.id == datatype.fk_from_operation).one() if linked_project_ids: linked_project_ids.append(current_prj[0]) else: linked_project_ids = [current_prj[0]] filtered_result = [entry for entry in result if entry.id not in linked_project_ids] [project.administrator.username for project in filtered_result] linked_project_ids.remove(current_prj[0]) linked_projects = [entry for entry in result if entry.id in linked_project_ids] return filtered_result, linked_projects except Exception as excep: self.logger.exception(excep) return None, None
[docs] def delete_members_for_project(self, project_id, members): """Remove all linked user to current project.""" members = self.session.query(User_to_Project ).filter(User_to_Project.fk_project == project_id ).filter(User_to_Project.fk_user.in_(members)).all() [self.session.delete(m) for m in members] self.session.commit()
[docs] def get_members_of_project(self, proj_id): """Retrieve USER entities with rights on current project.""" users_members = self.session.query(User).join(User_to_Project ).filter(User_to_Project.fk_project == proj_id).all() return users_members
[docs] def add_members_to_project(self, proj_id, selected_user_ids): """Add link between Users and Project.""" for u_id in selected_user_ids: self.store_entity(User_to_Project(u_id, proj_id))