Source code for tvb.core.services.backend_clients.cluster_scheduler_client

# -*- coding: utf-8 -*-
#
#
# TheVirtualBrain-Framework Package. This package holds all Data Management, and
# Web-UI helpful to run brain-simulations. To use it, you also need to download
# TheVirtualBrain-Scientific Package (for simulators). See content of the
# documentation-folder for more details. See also http://www.thevirtualbrain.org
#
# (c) 2012-2023, Baycrest Centre for Geriatric Care ("Baycrest") and others
#
# This program is free software: you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software Foundation,
# either version 3 of the License, or (at your option) any later version.
# This program is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
# PARTICULAR PURPOSE.  See the GNU General Public License for more details.
# You should have received a copy of the GNU General Public License along with this
# program.  If not, see <http://www.gnu.org/licenses/>.
#
#
#   CITATION:
# When using The Virtual Brain for scientific publications, please cite it as explained here:
# https://www.thevirtualbrain.org/tvb/zwei/neuroscience-publications
#
#

"""
.. moduleauthor:: Lia Domide <lia.domide@codemart.ro>
.. moduleauthor:: Bogdan Neacsa <bogdan.neacsa@codemart.ro>
.. moduleauthor:: Yann Gordon <yann@invalid.tvb>
"""

import os
from subprocess import Popen, PIPE
from threading import Thread

from tvb.basic.logger.builder import get_logger
from tvb.basic.profile import TvbProfile
from tvb.core.entities.model.model_operation import OperationProcessIdentifier, STATUS_CANCELED
from tvb.core.entities.storage import dao
from tvb.core.neocom import h5
from tvb.core.services.backend_clients.backend_client import BackendClient
from tvb.core.services.burst_service import BurstService
from tvb.core.utils import parse_json_parameters

LOGGER = get_logger(__name__)


[docs]class ClusterSchedulerClient(BackendClient): # TODO: fix this with neoforms """ Simple class, to mimic the same behavior we are expecting from StandAloneClient, but firing behind the cluster job scheduling process.. """ @staticmethod def _run_cluster_job(operation_identifier, user_name_label, adapter_instance): """ Threaded Popen It is the function called by the ClusterSchedulerClient in a Thread. This function starts a new process. """ # Load operation so we can estimate the execution time operation = dao.get_operation_by_id(operation_identifier) view_model = h5.load_view_model(operation) # kwargs = adapter_instance.prepare_ui_inputs(kwargs) time_estimate = int(adapter_instance.get_execution_time_approximation(view_model)) hours = int(time_estimate / 3600) minutes = (int(time_estimate) % 3600) / 60 seconds = int(time_estimate) % 60 # Anything lower than 5 hours just use default walltime if hours < 5: walltime = "05:00:00" else: if hours < 10: hours = "0%d" % hours else: hours = str(hours) walltime = "%s:%s:%s" % (hours, str(minutes), str(seconds)) call_arg = TvbProfile.current.cluster.SCHEDULE_COMMAND % (operation_identifier, user_name_label, walltime) LOGGER.info(call_arg) process_ = Popen([call_arg], stdout=PIPE, shell=True) job_id = process_.stdout.read().replace('\n', '').split(TvbProfile.current.cluster.JOB_ID_STRING)[-1] LOGGER.info("Got jobIdentifier = %s for CLUSTER operationID = %s" % (job_id, operation_identifier)) operation_identifier = OperationProcessIdentifier(operation_identifier, job_id=job_id) dao.store_entity(operation_identifier)
[docs] @staticmethod def execute(operation_id, user_name_label, adapter_instance): """Call the correct system command to submit a job to the cluster.""" thread = Thread(target=ClusterSchedulerClient._run_cluster_job, kwargs={'operation_identifier': operation_id, 'user_name_label': user_name_label, 'adapter_instance': adapter_instance}) thread.start()
[docs] @staticmethod def stop_operation(operation_id): """ Stop a thread for a given operation id """ operation = dao.try_get_operation_by_id(operation_id) if not operation or operation.has_finished: LOGGER.warning("Operation already stopped or not found is given to stop job: %s" % operation_id) return True operation_process = dao.get_operation_process_for_operation(operation_id) result = 0 # Try to kill only if operation job process is not None if operation_process is not None: stop_command = TvbProfile.current.cluster.STOP_COMMAND % operation_process.job_id LOGGER.info("Stopping cluster operation: %s" % stop_command) result = os.system(stop_command) if result != 0: LOGGER.error("Stopping cluster operation was unsuccessful. Try following status with '" + TvbProfile.current.cluster.STATUS_COMMAND + "'" % operation_process.job_id) BurstService().persist_operation_state(operation, STATUS_CANCELED) return result == 0