Source code for tvb.basic.readers

# -*- coding: utf-8 -*-
#
#
# TheVirtualBrain-Scientific Package. This package holds all simulators, and
# analysers necessary to run brain-simulations. You can use it stand alone or
# in conjunction with TheVirtualBrain-Framework Package. See content of the
# documentation-folder for more details. See also http://www.thevirtualbrain.org
#
# (c) 2012-2023, Baycrest Centre for Geriatric Care ("Baycrest") and others
#
# This program is free software: you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software Foundation,
# either version 3 of the License, or (at your option) any later version.
# This program is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
# PARTICULAR PURPOSE.  See the GNU General Public License for more details.
# You should have received a copy of the GNU General Public License along with this
# program.  If not, see <http://www.gnu.org/licenses/>.
#
#
#   CITATION:
# When using The Virtual Brain for scientific publications, please cite it as explained here:
# https://www.thevirtualbrain.org/tvb/zwei/neuroscience-publications
#
#

"""
This module contains basic reading mechanism for default DataType fields.

.. moduleauthor:: Lia Domide <lia.domide@codemart.ro>
"""

try:
    H5PY_SUPPORT = True
    import h5py as hdf5
except ImportError:
    H5PY_SUPPORT = False

import importlib
import os
import numpy
import zipfile
import uuid
from tempfile import gettempdir
from scipy import io as scipy_io
from tvb.basic.logger.builder import get_logger



[docs]class H5Reader(object): """ Read one or many numpy arrays from a H5 file. """ def __init__(self, h5_path): self.logger = get_logger(__name__) if H5PY_SUPPORT: self.hfd5_source = hdf5.File(h5_path, 'r', libver='latest') else: self.logger.warning("You need h5py properly installed in order to load from a HDF5 source.")
[docs] def read_field(self, field, log_exception=True): try: return self.hfd5_source['/' + field][()] except Exception: if log_exception: self.logger.exception("Could not read from %s field" % field) raise ReaderException("Could not read from %s field" % field)
[docs] def read_optional_field(self, field): try: return self.read_field(field, log_exception=False) except ReaderException: return None
[docs]class FileReader(object): """ Read one or multiple numpy arrays from a text/bz2 file. """ def __init__(self, file_path): self.logger = get_logger(__name__) self.file_path = file_path self.file_stream = file_path
[docs] def read_array(self, dtype=numpy.float64, skip_rows=0, use_cols=None, matlab_data_name=None): self.logger.debug("Starting to read from: " + str(self.file_path)) try: # Try to read H5: if self.file_path.endswith('.h5'): self.logger.error("Not yet implemented read from a ZIP of H5 files!") return numpy.array([]) # Try to read NumPy: if self.file_path.endswith('.txt') or self.file_path.endswith('.bz2'): return self._read_text(self.file_stream, dtype, skip_rows, use_cols) if self.file_path.endswith('.npz') or self.file_path.endswith(".npy"): return numpy.load(self.file_stream) # Try to read Matlab format: return self._read_matlab(self.file_stream, matlab_data_name) except Exception as e: msg = "Could not read from %s file \n %s" % (self.file_path, e) self.logger.exception(msg) raise ReaderException(msg)
def _read_text(self, file_stream, dtype, skip_rows, use_cols): array_result = numpy.loadtxt(file_stream, dtype=dtype, skiprows=skip_rows, usecols=use_cols) return array_result def _read_matlab(self, file_stream, matlab_data_name=None): if self.file_path.endswith(".mtx"): return scipy_io.mmread(file_stream) if self.file_path.endswith(".mat"): matlab_data = scipy_io.matlab.loadmat(file_stream) return matlab_data[matlab_data_name]
[docs] def read_gain_from_brainstorm(self): if not self.file_path.endswith('.mat'): raise ReaderException("Brainstorm format is expected in a Matlab file not %s" % self.file_path) mat = scipy_io.loadmat(self.file_stream) expected_fields = ['Gain', 'GridLoc', 'GridOrient'] for field in expected_fields: if field not in mat.keys(): raise ReaderException("Brainstorm format is expecting field %s" % field) gain, loc, ori = (mat[field] for field in expected_fields) return (gain.reshape((gain.shape[0], -1, 3)) * ori).sum(axis=-1)
[docs]class ZipReader(object): """ Read one or many numpy arrays from a ZIP archive. """ def __init__(self, zip_path): self.logger = get_logger(__name__) self.zip_archive = zipfile.ZipFile(zip_path)
[docs] def has_file_like(self, file_name): for actual_name in self.zip_archive.namelist(): if file_name in actual_name: return True return False
[docs] def read_array_from_file(self, file_name, dtype=numpy.float64, skip_rows=0, use_cols=None, matlab_data_name=None): matching_file_name = None for actual_name in self.zip_archive.namelist(): if file_name in actual_name and not actual_name.startswith("__MACOSX"): matching_file_name = actual_name break if matching_file_name is None: self.logger.warning("File %r not found in ZIP." % file_name) raise ReaderException("File %r not found in ZIP." % file_name) zip_entry = self.zip_archive.open(matching_file_name, 'r') if matching_file_name.endswith(".bz2"): temp_file = copy_zip_entry_into_temp(zip_entry, matching_file_name) file_reader = FileReader(temp_file) result = file_reader.read_array(dtype, skip_rows, use_cols, matlab_data_name) os.remove(temp_file) return result file_reader = FileReader(matching_file_name) file_reader.file_stream = zip_entry return file_reader.read_array(dtype, skip_rows, use_cols, matlab_data_name)
[docs] def read_optional_array_from_file(self, file_name, dtype=numpy.float64, skip_rows=0, use_cols=None, matlab_data_name=None): try: return self.read_array_from_file(file_name, dtype, skip_rows, use_cols, matlab_data_name) except ReaderException: return numpy.array([], dtype=dtype)
[docs]class ReaderException(Exception): pass
[docs]def try_get_absolute_path(relative_module, file_suffix): """ :param relative_module: python module to be imported. When import of this fails, we will return the file_suffix :param file_suffix: In case this is already an absolute path, return it immediately, otherwise append it after the module path :return: Try to build an absolute path based on a python module and a file-suffix """ result_full_path = file_suffix if not os.path.isabs(file_suffix): try: module_import = importlib.import_module(relative_module) result_full_path = os.path.join(os.path.dirname(module_import.__file__), file_suffix) except ImportError: logger = get_logger(__name__) logger.exception("Could not import tvb_data Python module for default data-set!") return result_full_path
[docs]def copy_zip_entry_into_temp(source, file_suffix, buffer_size=1024 * 1024): """ Copy a ZIP Entry into a new file created under system temporary folder. :param source: ZipEntry :param file_suffix: String suffix to be added to the temporary file name :param buffer_size: Buffer size used when copying the file-content :return: the path towards the new file. """ result_dest_path = os.path.join(gettempdir(), "tvb_" + str(uuid.uuid1()) + file_suffix) result_dest = open(result_dest_path, 'wb') while 1: copy_buffer = source.read(buffer_size) if copy_buffer: result_dest.write(copy_buffer) else: break source.close() result_dest.close() return result_dest_path