Source code for tvb.core.services.user_service

# -*- coding: utf-8 -*-
#
#
# TheVirtualBrain-Framework Package. This package holds all Data Management, and 
# Web-UI helpful to run brain-simulations. To use it, you also need to download
# TheVirtualBrain-Scientific Package (for simulators). See content of the
# documentation-folder for more details. See also http://www.thevirtualbrain.org
#
# (c) 2012-2023, Baycrest Centre for Geriatric Care ("Baycrest") and others
#
# This program is free software: you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software Foundation,
# either version 3 of the License, or (at your option) any later version.
# This program is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
# PARTICULAR PURPOSE.  See the GNU General Public License for more details.
# You should have received a copy of the GNU General Public License along with this
# program.  If not, see <http://www.gnu.org/licenses/>.
#
#
#   CITATION:
# When using The Virtual Brain for scientific publications, please cite it as explained here:
# https://www.thevirtualbrain.org/tvb/zwei/neuroscience-publications
#
#

"""
Service layer for USER entities. 
   
.. moduleauthor:: Lia Domide <lia.domide@codemart.ro>
"""

import os
import random
import six
import tvb_data
from tvb.basic.logger.builder import get_logger
from tvb.basic.profile import TvbProfile
from tvb.config import DEFAULT_PROJECT_GID
from tvb.core.entities.model.model_project import User, ROLE_ADMINISTRATOR, USER_ROLES
from tvb.core.entities.storage import dao
from tvb.core.services import email_sender
from tvb.core.services.exceptions import UsernameException
from tvb.core.services.import_service import ImportService
from tvb.core.services.settings_service import SettingsService
from tvb.core.utils import hash_password

FROM_ADDRESS = 'donotreply@thevirtualbrain.org'
SUBJECT_REGISTER = '[TVB] Registration Confirmation'
SUBJECT_VALIDATE = '[TVB] Account validated'
SUBJECT_RECOVERY = '[TVB] Recover password'
TEXT_RECOVERY = 'Hi %s,\n\n' \
                'According to your recent request, a new password was generated for your user, by the system.\n' \
                'Please login with the below password and change it into one you can easily remember.\n\n ' \
                'The new password is: %s\n\n' \
                'TVB Team.'
TEXT_DISPLAY = "Thank you! Please check your email for further details!"
TEXT_CREATE = (',\n\nYour registration has been notified to the administrators '
               + 'of The Virtual Brain Project; you will receive an email as '
               + 'soon as the administrator has validated your registration.'
               + ' \n\nThank you for registering!\nTVB Team')
TEXT_CREATE_TO_ADMIN = 'New member requires validation. Go to this url to validate '
TEXT_VALIDATED = ',\n\nYour registration has been validated by TVB Administrator, Please proceed with the login at '
KEY_USERNAME = "username"
KEY_PASSWORD = "password"
KEY_AUTH_TOKEN = "auth_token"
KEY_EMAIL = "email"
KEY_ROLE = "role"
KEY_COMMENT = "comment"
DEFAULT_PASS_LENGTH = 10
USERS_PAGE_SIZE = 10
MEMBERS_PAGE_SIZE = 30


[docs]class UserService: """ CRUD methods for USER entities are here. """ USER_ROLES = USER_ROLES def __init__(self): self.logger = get_logger(self.__class__.__module__)
[docs] def create_user(self, username=None, display_name=None, password=None, password2=None, role=None, email=None, comment=None, email_msg=None, validated=False, skip_import=False, gid=None, skip_sending_email=False): """ Service Layer for creating a new user. """ if (username is None) or len(username) < 1: raise UsernameException("Empty UserName!") if (display_name is None) or len(display_name) < 1: raise UsernameException("Empty display name!") if (password is None) or len(password) < 1: raise UsernameException("Empty password!") if password2 is None: password2 = password if password != password2: raise UsernameException("Passwords do not match!") try: user_validated = (role == ROLE_ADMINISTRATOR) or validated user = User(username, display_name, password, email, user_validated, role, gid) if email_msg is None: email_msg = 'Hello ' + username + TEXT_CREATE admin_msg = (TEXT_CREATE_TO_ADMIN + username + ' :\n ' + TvbProfile.current.web.BASE_URL + '/user/validate/' + username + '\n\n"' + str(comment) + '"') self.logger.info("Registering user " + username + " !") if role != ROLE_ADMINISTRATOR and email is not None and not skip_sending_email: admins = UserService.get_administrators() admin = admins[random.randint(0, len(admins) - 1)] if admin.email is not None and (admin.email != TvbProfile.current.web.admin.DEFAULT_ADMIN_EMAIL): # Do not send validation email in case default admin email remained unchanged email_sender.send(FROM_ADDRESS, admin.email, SUBJECT_REGISTER, admin_msg) self.logger.debug("Email sent to:" + admin.email + " for validating user:" + username + " !") email_sender.send(FROM_ADDRESS, email, SUBJECT_REGISTER, email_msg) self.logger.debug("Email sent to:" + email + " for notifying new user:" + username + " !") user = dao.store_entity(user) if role == ROLE_ADMINISTRATOR and not skip_import: to_upload = os.path.join(os.path.dirname(tvb_data.__file__), "Default_Project.zip") if not os.path.exists(to_upload): self.logger.warning("Could not find DEFAULT PROJECT at path %s. You might want to import it " "yourself. See TVB documentation about where to find it!" % to_upload) return TEXT_DISPLAY ImportService().import_project_structure(to_upload, user.id) else: try: default_prj_id = dao.get_project_by_gid(DEFAULT_PROJECT_GID).id dao.add_members_to_project(default_prj_id, [user.id]) except Exception: self.logger.exception( "Could not link user_id: %d with project_gid: %s " % (user.id, DEFAULT_PROJECT_GID)) return TEXT_DISPLAY except Exception as excep: self.logger.exception("Could not create user!") raise UsernameException(str(excep))
[docs] def reset_password(self, **data): """ Service Layer for resetting a password. """ if (KEY_EMAIL not in data) or len(data[KEY_EMAIL]) < 1: raise UsernameException("Empty Email!") old_pass, user = None, None try: email = data[KEY_EMAIL] name_hint = data[KEY_USERNAME] user = dao.get_user_by_email(email, name_hint) if user is None: raise UsernameException("No singular user could be found for the given data!") old_pass = user.password new_pass = ''.join(chr(random.randint(48, 122)) for _ in range(DEFAULT_PASS_LENGTH)) user.password = hash_password(new_pass) self.edit_user(user, old_pass) self.logger.info("Resetting password for email : " + email) email_sender.send(FROM_ADDRESS, email, SUBJECT_RECOVERY, TEXT_RECOVERY % (user.username, new_pass)) return TEXT_DISPLAY except Exception as excep: if old_pass and len(old_pass) > 1 and user: user.password = old_pass dao.store_entity(user) self.logger.exception("Could not change user password!") raise UsernameException(excep)
[docs] @staticmethod def is_username_valid(name): """ Service layer for checking if a given UserName is unique or not. """ users_no = dao.count_users_for_name(name) if users_no > 0: return False return True
[docs] def validate_user(self, name='', user_id=None): """ Service layer for editing a user and validating the account. """ try: if user_id: user = dao.get_user_by_id(user_id) else: user = dao.get_user_by_name(name) if user is None or user.validated: self.logger.debug("UserName not found or already validated:" + name) return False user.validated = True user = dao.store_entity(user) self.logger.debug("Sending validation email for userName=" + user.username + " to address=" + user.email) email_sender.send(FROM_ADDRESS, user.email, SUBJECT_VALIDATE, "Hello " + user.username + TEXT_VALIDATED + TvbProfile.current.web.BASE_URL + "/user/") self.logger.info("User:" + user.username + " was validated successfully" + " and notification email sent!") return True except Exception as excep: self.logger.warning('Could not validate user:') self.logger.warning('WARNING : ' + str(excep)) return False
[docs] @staticmethod def check_login(username, password): """ Service layer to check if given UserName and Password are according to DB. """ user = dao.get_user_by_name(username) if user is not None and user.password == hash_password(password) and user.validated: return user else: return None
[docs] def get_users_for_project(self, user_name, project_id, page=1): """ Return tuple: (All Users except the project administrator, Project Members). Parameter "user_name" is the current user. Parameter "user_name" is used for new projects (project_id is None). When "project_id" not None, parameter "user_name" is ignored. """ try: admin_name = user_name if project_id is not None: project = dao.get_project_by_id(project_id) if project is not None: admin_name = project.administrator.username all_users, total_pages = self.retrieve_users_except([admin_name], page, MEMBERS_PAGE_SIZE) members = dao.get_members_of_project(project_id) return all_users, members, total_pages except Exception as excep: self.logger.exception("Invalid userName or project identifier") raise UsernameException(str(excep))
[docs] @staticmethod def retrieve_users_except(usernames, current_page, page_size): # type: (list, int, int) -> (list, int) """ Return all users from the database except the given users """ start_idx = page_size * (current_page - 1) total = dao.get_all_users(usernames, is_count=True) user_list = dao.get_all_users(usernames, start_idx, page_size) pages_no = total // page_size + (1 if total % page_size else 0) return user_list, pages_no
[docs] def edit_user(self, edited_user, old_password=None): """ Retrieve a user by and id, then modify it's role and validate status. """ if edited_user.validated: self.validate_user(user_id=edited_user.id) user = dao.get_user_by_id(edited_user.id) user.role = edited_user.role user.validated = edited_user.validated if old_password is not None: if user.password == old_password: user.password = edited_user.password else: raise UsernameException("Invalid old password!") user.email = edited_user.email for key, value in six.iteritems(edited_user.preferences): user.preferences[key] = value dao.store_entity(user) if user.is_administrator() and user.username == TvbProfile.current.web.admin.ADMINISTRATOR_NAME: TvbProfile.current.manager.add_entries_to_config_file({SettingsService.KEY_ADMIN_EMAIL: user.email, SettingsService.KEY_ADMIN_PWD: user.password})
[docs] def delete_user(self, user_id): """ Delete a user with a given ID. Return True when successfully, or False.""" try: dao.remove_entity(User, user_id) return True except Exception as excep: self.logger.exception(excep) return False
[docs] @staticmethod def get_administrators(): """Retrieve system administrators. Will be used for sending emails, for example.""" return dao.get_administrators()
[docs] @staticmethod def save_project_to_user(user_id, project_id): """ Mark for current user that the given project is the last one selected. """ user = dao.get_user_by_id(user_id) user.selected_project = project_id dao.store_entity(user)
[docs] @staticmethod def get_user_by_id(user_id): """ Retrieves a user by its id. """ return dao.get_user_by_id(user_id)
[docs] @staticmethod def get_user_by_name(username): """ Retrieves a user by its username. """ return dao.get_user_by_name(username)
[docs] @staticmethod def get_user_by_gid(gid): """ Retrieves a user by its gid. """ return dao.get_user_by_gid(gid)
[docs] @staticmethod def compute_user_generated_disk_size(user_id): return dao.compute_user_generated_disk_size(user_id)
def _create_external_service_user(self, user_data): gid = user_data['sub'] self.logger.info('Create a new external user for external id {}'.format(gid)) username, name, email, role = self._extract_user_info(user_data) if not self.is_username_valid(username): username = gid self.create_user(username, name, hash_password(''.join(random.sample(gid, len(gid)))), gid=gid, email=email, validated=True, skip_sending_email=True, role=role, skip_import=True) return self.get_user_by_gid(gid) def _update_external_service_user(self, current_user, new_data): username, display_name, email, role = self._extract_user_info(new_data) should_update = False if current_user.email != email \ or current_user.role != role \ or current_user.display_name != display_name \ or current_user.username != username and self.is_username_valid(username): current_user.email = email current_user.role = role current_user.username = username current_user.display_name = display_name should_update = True if should_update: dao.store_entity(current_user, True) return self.get_user_by_gid(current_user.gid) return current_user def _extract_user_info(self, keycloak_data): email = keycloak_data['email'] if 'email' in keycloak_data else None if 'group' in keycloak_data: user_groups = keycloak_data['group'] else: user_roles = keycloak_data['roles'] if 'roles' in keycloak_data else [] user_groups = user_roles['group'] if 'group' in user_roles else [] role = ROLE_ADMINISTRATOR if TvbProfile.current.web.admin.ADMINISTRATORS_GROUP in user_groups else None if role == ROLE_ADMINISTRATOR: self.logger.info("Administrator logged in") username = keycloak_data['preferred_username'] if 'preferred_username' in keycloak_data else keycloak_data[ 'sub'] name = keycloak_data['name'] if 'name' in keycloak_data else keycloak_data['sub'] return username, name, email, role
[docs] def get_external_db_user(self, user_data): gid = user_data['sub'] db_user = UserService.get_user_by_gid(gid) if db_user is None: db_user = self._create_external_service_user(user_data) else: db_user = self._update_external_service_user(db_user, user_data) return db_user if db_user.validated else None