# -*- coding: utf-8 -*-
#
#
# TheVirtualBrain-Framework Package. This package holds all Data Management, and
# Web-UI helpful to run brain-simulations. To use it, you also need to download
# TheVirtualBrain-Scientific Package (for simulators). See content of the
# documentation-folder for more details. See also http://www.thevirtualbrain.org
#
# (c) 2012-2023, Baycrest Centre for Geriatric Care ("Baycrest") and others
#
# This program is free software: you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software Foundation,
# either version 3 of the License, or (at your option) any later version.
# This program is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
# PARTICULAR PURPOSE. See the GNU General Public License for more details.
# You should have received a copy of the GNU General Public License along with this
# program. If not, see <http://www.gnu.org/licenses/>.
#
#
# CITATION:
# When using The Virtual Brain for scientific publications, please cite it as explained here:
# https://www.thevirtualbrain.org/tvb/zwei/neuroscience-publications
#
#
"""
Persistence of data in HDF5 format.
.. moduleauthor:: Lia Domide <lia.domide@codemart.ro>
.. moduleauthor:: Bogdan Neacsa <bogdan.neacsa@codemart.ro>
.. moduleauthor:: Calin Pavel <calin.pavel@codemart.ro>
"""
import copy
import os
import threading
from datetime import datetime
import h5py as hdf5
import numpy as numpy
from tvb.basic.logger.builder import get_logger
from tvb.basic.profile import TvbProfile
from tvb.storage.h5 import utils
from tvb.storage.h5.encryption.data_encryption_handler import encryption_handler
from tvb.storage.h5.file.exceptions import MissingDataSetException, IncompatibleFileManagerException, \
FileStructureException, MissingDataFileException
from tvb.storage.h5.file.files_helper import FilesHelper
# Create logger for this module
LOG = get_logger(__name__)
LOCK_OPEN_FILE = threading.Lock()
BUFFER_SIZE = 300
[docs]class HDF5StorageManager(object):
"""
This class is responsible for saving / loading data in HDF5 file / format.
"""
__file_title_ = "TVB data file"
__storage_full_name = None
__hfd5_file = None
TVB_ATTRIBUTE_PREFIX = "TVB_"
ROOT_NODE_PATH = "/"
BOOL_VALUE_PREFIX = "bool:"
DATETIME_VALUE_PREFIX = "datetime:"
DATE_TIME_FORMAT = '%Y-%m-%d %H:%M:%S.%f'
LOCKS = {}
def __init__(self, storage_full_name):
"""
Creates a new storage manager instance.
:param buffer_size: the size in Bytes of the amount of data that will be buffered before writing to file.
"""
if storage_full_name is None:
raise FileStructureException("Please provide the file name where to store data")
self.__storage_full_name = storage_full_name
self.__buffer_array = None
self.data_buffers = {}
self.data_encryption_handler = encryption_handler
[docs] def is_valid_tvb_file(self):
"""
This method checks if specified file exists and if it has correct HDF5 format
:returns: True is file exists and has HDF5 format. False otherwise.
"""
try:
return os.path.exists(self.__storage_full_name) and hdf5.is_hdf5(self.__storage_full_name)
except RuntimeError:
return False
[docs] def store_data(self, data_list, dataset_name='', where=ROOT_NODE_PATH):
"""
This method stores provided data list into a data set in the H5 file.
:param dataset_name: Name of the data set where to store data
:param data_list: Data to be stored
:param where: represents the path where to store our dataset (e.g. /data/info)
"""
data_to_store = self._check_data(data_list)
try:
LOG.debug("Saving data into data set: %s" % dataset_name)
# Open file in append mode ('a') to allow adding multiple data sets in the same file
hdf5_file = self._open_h5_file()
full_dataset_name = where + dataset_name
if full_dataset_name not in hdf5_file:
hdf5_file.create_dataset(full_dataset_name, data=data_to_store)
elif hdf5_file[full_dataset_name].shape == data_to_store.shape:
hdf5_file[full_dataset_name][...] = data_to_store[...]
else:
raise IncompatibleFileManagerException("Cannot update existing H5 DataSet %s with a different shape. "
"Try defining it as chunked!" % full_dataset_name)
finally:
# Now close file
self.close_file()
self.data_encryption_handler.push_folder_to_sync(FilesHelper.get_project_folder_from_h5(
self.__storage_full_name))
[docs] def append_data(self, data_list, dataset_name='', grow_dimension=-1, close_file=True, where=ROOT_NODE_PATH):
"""
This method appends data to an existing data set. If the data set does not exists, create it first.
:param dataset_name: Name of the data set where to store data
:param data_list: Data to be stored / appended
:param grow_dimension: The dimension to be used to grow stored array. By default will grow on the LAST dimension
:param close_file: Specify if the file should be closed automatically after write operation. If not,
you have to close file by calling method close_file()
:param where: represents the path where to store our dataset (e.g. /data/info)
"""
data_to_store = self._check_data(data_list)
data_buffer = self.data_buffers.get(where + dataset_name, None)
if data_buffer is None:
hdf5_file = self._open_h5_file()
datapath = where + dataset_name
if datapath in hdf5_file:
dataset = hdf5_file[datapath]
self.data_buffers[datapath] = HDF5StorageManager.H5pyStorageBuffer(dataset,
buffered_data=data_to_store,
grow_dimension=grow_dimension)
else:
data_shape_list = list(data_to_store.shape)
data_shape_list[grow_dimension] = None
data_shape = tuple(data_shape_list)
dataset = hdf5_file.create_dataset(where + dataset_name, data=data_to_store, shape=data_to_store.shape,
dtype=data_to_store.dtype, maxshape=data_shape)
self.data_buffers[datapath] = HDF5StorageManager.H5pyStorageBuffer(dataset,
buffered_data=None,
grow_dimension=grow_dimension)
else:
if not data_buffer.buffer_data(data_to_store):
data_buffer.flush_buffered_data()
if close_file:
self.close_file()
self.data_encryption_handler.push_folder_to_sync(
FilesHelper.get_project_folder_from_h5(self.__storage_full_name))
[docs] def remove_data(self, dataset_name='', where=ROOT_NODE_PATH):
"""
Deleting a data set from H5 file.
:param dataset_name:name of the data set to be deleted
:param where: represents the path where dataset is stored (e.g. /data/info)
"""
LOG.debug("Removing data set: %s" % dataset_name)
try:
# Open file in append mode ('a') to allow data remove
hdf5_file = self._open_h5_file()
del hdf5_file[where + dataset_name]
except KeyError:
LOG.warning("Trying to delete data set: %s but current file does not contain it." % dataset_name)
raise FileStructureException("Could not locate dataset: %s" % dataset_name)
finally:
self.close_file()
self.data_encryption_handler.push_folder_to_sync(
FilesHelper.get_project_folder_from_h5(self.__storage_full_name))
[docs] def get_data(self, dataset_name='', data_slice=None, where=ROOT_NODE_PATH, ignore_errors=False, close_file=True):
"""
This method reads data from the given data set based on the slice specification
:param close_file: Automatically close after reading the current field
:param ignore_errors: return None in case of error, or throw exception
:param dataset_name: Name of the data set from where to read data
:param data_slice: Specify how to retrieve data from array {e.g (slice(1,10,1),slice(1,6,2)) }
:param where: represents the path where dataset is stored (e.g. /data/info)
:returns: a numpy.ndarray containing filtered data
"""
LOG.debug("Reading data from data set: %s" % dataset_name)
data_path = where + dataset_name
try:
# Open file to read data
hdf5_file = self._open_h5_file('r')
if data_path in hdf5_file:
data_array = hdf5_file[data_path]
# Now read data
if data_slice is None:
result = data_array[()]
if isinstance(result, hdf5.Empty):
return numpy.empty([])
return result
else:
return data_array[data_slice]
else:
if not ignore_errors:
LOG.error("Trying to read data from a missing data set: %s" % dataset_name)
raise MissingDataSetException("Could not locate dataset: %s" % dataset_name)
else:
return None
finally:
if close_file:
self.close_file()
[docs] def get_data_shape(self, dataset_name='', where=ROOT_NODE_PATH):
"""
This method reads data-size from the given data set
:param dataset_name: Name of the data set from where to read data
:param where: represents the path where dataset is stored (e.g. /data/info)
:returns: a tuple containing data size
"""
LOG.debug("Reading data from data set: %s" % dataset_name)
try:
# Open file to read data
hdf5_file = self._open_h5_file('r')
data_array = hdf5_file[where + dataset_name]
return data_array.shape
except KeyError:
LOG.debug("Trying to read data from a missing data set: %s" % dataset_name)
raise MissingDataSetException("Could not locate dataset: %s" % dataset_name)
finally:
self.close_file()
[docs] @staticmethod
def serialize_bool(value):
return HDF5StorageManager.BOOL_VALUE_PREFIX + str(value)
def _serialize_value(self, value):
"""
This method takes a value which will be stored as metadata and
apply some transformation if necessary
:param value: value which is planned to be stored
:returns: value to be stored
"""
if value is None:
return ''
# Transform boolean to string and prefix it
if isinstance(value, bool):
return self.serialize_bool(value)
# Transform date to string and append prefix
elif isinstance(value, datetime):
return self.DATETIME_VALUE_PREFIX + utils.date2string(value, date_format=self.DATE_TIME_FORMAT)
else:
return value
[docs] def get_file_data_version(self, data_version, dataset_name='', where=ROOT_NODE_PATH):
"""
Checks the data version for the current file.
"""
if not os.path.exists(self.__storage_full_name):
raise MissingDataFileException("File storage data not found at path %s" % (self.__storage_full_name,))
if self.is_valid_tvb_file():
metadata = self.get_metadata(dataset_name, where)
if data_version in metadata:
return metadata[data_version]
else:
raise IncompatibleFileManagerException("Could not find TVB specific data version attribute %s in file: "
"%s." % (data_version, self.__storage_full_name))
raise IncompatibleFileManagerException("File %s is not a hdf5 format file. Are you using the correct "
"manager for this file?" % (self.__storage_full_name,))
def _deserialize_value(self, value):
"""
This method takes value loaded from H5 file and transform it to TVB data.
"""
if value is not None:
if isinstance(value, numpy.string_):
if len(value) == 0:
value = None
else:
value = str(value)
if isinstance(value, str):
if value.startswith(self.BOOL_VALUE_PREFIX):
# Remove bool prefix and transform to bool
return utils.string2bool(value[len(self.BOOL_VALUE_PREFIX):])
if value.startswith(self.DATETIME_VALUE_PREFIX):
# Remove datetime prefix and transform to datetime
return utils.string2date(value[len(self.DATETIME_VALUE_PREFIX):], date_format=self.DATE_TIME_FORMAT)
return value
def __aquire_lock(self):
"""
Aquire a unique lock for each different file path on the system.
"""
lock = self.LOCKS.get(self.__storage_full_name, None)
if lock is None:
lock = threading.Lock()
self.LOCKS[self.__storage_full_name] = lock
lock.acquire()
def __release_lock(self):
"""
Aquire a unique lock for each different file path on the system.
"""
lock = self.LOCKS.get(self.__storage_full_name, None)
if lock is None:
raise Exception("Some lock was deleted without being released beforehand.")
lock.release()
[docs] def close_file(self):
"""
The synchronization of open/close doesn't seem to be needed anymore for h5py in
contrast to PyTables for concurrent reads. However since it shouldn't add that
much overhead in most situation we'll leave it like this for now since in case
of concurrent writes(metadata) this provides extra safety.
"""
try:
self.__aquire_lock()
self.__close_file()
finally:
self.__release_lock()
def _open_h5_file(self, mode='a'):
"""
The synchronization of open/close doesn't seem to be needed anymore for h5py in
contrast to PyTables for concurrent reads. However since it shouldn't add that
much overhead in most situation we'll leave it like this for now since in case
of concurrent writes(metadata) this provides extra safety.
"""
try:
self.__aquire_lock()
file_obj = self.__open_h5_file(mode)
finally:
self.__release_lock()
return file_obj
def __close_file(self):
"""
Close file used to store data.
"""
hdf5_file = self.__hfd5_file
# Try to close file only if it was opened before
if hdf5_file is not None and hdf5_file.id.valid:
LOG.debug("Closing file: %s" % self.__storage_full_name)
try:
for h5py_buffer in self.data_buffers.values():
h5py_buffer.flush_buffered_data()
self.data_buffers = {}
hdf5_file.close()
except Exception as excep:
# Do nothing is this situation.
# The file is correctly closed, but the list of open files on HDF5 is not updated in a synch manner.
# del _open_files[filename] might throw KeyError
LOG.exception(excep)
if not hdf5_file.id.valid:
self.__hfd5_file = None
# -------------- Private methods --------------
def __open_h5_file(self, mode='a'):
"""
Open file for reading, writing or append.
:param mode: Mode to open file (possible values are w / r / a).
Default value is 'a', to allow adding multiple data to the same file.
:returns: returns the file which stores data in HDF5 format opened for read / write according to mode param
"""
if self.__storage_full_name is None:
raise FileStructureException("Invalid storage file. Please provide a valid path.")
try:
# Check if file is still open from previous writes.
if self.__hfd5_file is None or not self.__hfd5_file.id.valid:
file_exists = os.path.exists(self.__storage_full_name)
# bug in some versions of hdf5 on windows prevent creating file with mode='a'
if not file_exists and mode == 'a':
mode = 'w'
LOG.debug("Opening file: %s in mode: %s" % (self.__storage_full_name, mode))
self.__hfd5_file = hdf5.File(self.__storage_full_name, mode, libver='latest')
# If this is the first time we access file, write data version
if not file_exists:
os.chmod(self.__storage_full_name, TvbProfile.current.ACCESS_MODE_TVB_FILES)
attr_name = self.TVB_ATTRIBUTE_PREFIX + TvbProfile.current.version.DATA_VERSION_ATTRIBUTE
self.__hfd5_file['/'].attrs[attr_name] = TvbProfile.current.version.DATA_VERSION
except (IOError, OSError) as err:
LOG.exception("Could not open storage file.")
raise FileStructureException("Could not open storage file. %s" % err)
return self.__hfd5_file
@staticmethod
def _check_data(data_list):
"""
Check if the data to be stores is in a good format. If not adapt it.
"""
if data_list is None:
raise FileStructureException("Could not store null data")
if not (isinstance(data_list, list) or isinstance(data_list, numpy.ndarray)):
raise FileStructureException("Invalid data type. Could not store data of type:" + str(type(data_list)))
data_to_store = data_list
if isinstance(data_to_store, list):
data_to_store = numpy.array(data_list)
if data_to_store.shape == ():
data_to_store = hdf5.Empty("f")
return data_to_store
[docs] class H5pyStorageBuffer(object):
"""
Helper class in order to buffer data for append operations, to limit the number of actual
HDD I/O operations.
"""
def __init__(self, h5py_dataset, buffered_data=None, grow_dimension=-1):
self.buffered_data = buffered_data
self.buffer_size = BUFFER_SIZE
if h5py_dataset is None:
raise MissingDataSetException("A H5pyStorageBuffer instance must have a h5py dataset for which the"
"buffering is done. Please supply one to the 'h5py_dataset' parameter.")
self.h5py_dataset = h5py_dataset
self.grow_dimension = grow_dimension
[docs] def buffer_data(self, data_list):
"""
Add data_list to an internal buffer in order to improve performance for append_data type of operations.
:returns: True if buffer is still fine, \
False if a flush is necessary since the buffer is full
"""
if self.buffered_data is None:
self.buffered_data = data_list
else:
self.buffered_data = self.__custom_numpy_append(self.buffered_data, data_list)
if self.buffered_data.nbytes > self.buffer_size:
return False
else:
return True
def __custom_numpy_append(self, array1, array2):
array_1_shape = numpy.array(array1.shape)
array_2_shape = numpy.array(array2.shape)
result_shape = copy.deepcopy(array_1_shape)
result_shape[self.grow_dimension] += array_2_shape[self.grow_dimension]
result_array = numpy.empty(shape=tuple(result_shape), dtype=array1.dtype)
full_slice = slice(None, None, None)
full_index = [full_slice for _ in array_1_shape]
full_index[self.grow_dimension] = slice(0, array_1_shape[self.grow_dimension], None)
result_array[tuple(full_index)] = array1
full_index[self.grow_dimension] = slice(array_1_shape[self.grow_dimension],
result_shape[self.grow_dimension], None)
result_array[tuple(full_index)] = array2
return result_array
[docs] def flush_buffered_data(self):
"""
Append the data buffered so far to the input dataset using :param grow_dimension: as the dimension that
will be expanded.
"""
if self.buffered_data is not None:
current_shape = self.h5py_dataset.shape
new_shape = list(current_shape)
new_shape[self.grow_dimension] += self.buffered_data.shape[self.grow_dimension]
# Create the required slice to which the new data will be added.
# For example if the 3nd dimension of a 4D datashape (74, 1, 100, 1)
# we want to get the slice (:, :, 100:200, :) in order to add 100 new entries
full_slice = slice(None, None, None)
slice_to_add = slice(current_shape[self.grow_dimension], new_shape[self.grow_dimension], None)
append2address = [full_slice for _ in new_shape]
append2address[self.grow_dimension] = slice_to_add
# Do the data reshape and copy the new data
self.h5py_dataset.resize(tuple(new_shape))
self.h5py_dataset[tuple(append2address)] = self.buffered_data
self.buffered_data = None