Source code for tvb.config.logger.elasticsearch_handler
# -*- coding: utf-8 -*-
#
#
# TheVirtualBrain-Framework Package. This package holds all Data Management, and
# Web-UI helpful to run brain-simulations. To use it, you also need to download
# TheVirtualBrain-Scientific Package (for simulators). See content of the
# documentation-folder for more details. See also http://www.thevirtualbrain.org
#
# (c) 2012-2023, Baycrest Centre for Geriatric Care ("Baycrest") and others
#
# This program is free software: you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software Foundation,
# either version 3 of the License, or (at your option) any later version.
# This program is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
# PARTICULAR PURPOSE. See the GNU General Public License for more details.
# You should have received a copy of the GNU General Public License along with this
# program. If not, see <http://www.gnu.org/licenses/>.
#
#
# CITATION:
# When using The Virtual Brain for scientific publications, please cite it as explained here:
# https://www.thevirtualbrain.org/tvb/zwei/neuroscience-publications
#
#
"""
.. moduleauthor:: Horge Rares <rares.horge@codemart.ro>
"""
import json
from logging import Handler, LogRecord, getLogger, ERROR
from logging.handlers import QueueListener, QueueHandler
from queue import Queue
from tvb.basic.profile import TvbProfile
if not TvbProfile.current.TRACE_USER_ACTIONS:
class ElasticQueueHandler(Handler):
[docs] def emit(self, record: LogRecord) -> None:
pass
else:
from elasticsearch import Elasticsearch
logger = getLogger("elastic_transport")
def _convert_to_bulk_format(record):
return [{"index": {}},
{"@timestamp": record.asctime,
"message": record.message,
"controller_method": record.controller_method if hasattr(record, 'controller_method') else "",
"user": {
"user_id": record.user_id if hasattr(record, 'user_id') else ""
}
}]
def _compute_json_size_in_bytes(json_obj):
json_string = json.dumps(json_obj)
byte_ = bytes(json_string, "utf-8")
return len(byte_)
class ElasticSendHandler(Handler):
def __init__(self):
"""
Initializes the custom http handler
"""
try:
super().__init__()
self._client = Elasticsearch(
TvbProfile.current.ELASTICSEARCH_URL,
api_key=TvbProfile.current.ELASTICSEARCH_API_KEY,
request_timeout=TvbProfile.current.ELASTICSEARCH_REQUEST_TIMEOUT
)
self.threshold = TvbProfile.current.ELASTICSEARCH_BUFFER_THRESHOLD
self.buffer = []
self.buffer_size = 0
self.operation_size = _compute_json_size_in_bytes({"index": {}})
except Exception as e:
logger.log(ERROR, "could not create Elasticsearch connection object", exc_info=e)
def emit(self, record: LogRecord):
"""
This function gets called when a log event gets emitted. It recieves a
record, formats it and sends it to the url
Parameters:
record: a log record
"""
if hasattr(self, "_client"):
operation, data = _convert_to_bulk_format(record)
data_size = self.operation_size + _compute_json_size_in_bytes(data)
if data_size > self.threshold:
removed_characters_nr = min(data_size - self.threshold, len(data['message']))
# truncate the log message to respect the threshold
data['message'] = data['message'][:-removed_characters_nr]
data_size -= removed_characters_nr
if self.buffer_size + data_size > self.threshold:
try:
response = self._client.bulk(
index=TvbProfile.current.ELASTICSEARCH_LOGGING_INDEX,
operations=self.buffer
)
if response['errors']:
logger.log(ERROR, "could not send bulk request to elasticsearch server", stack_info=True)
except Exception as e:
logger.log(ERROR, f"could not send bulk request to elasticsearch server", exc_info=e)
self.buffer = []
self.buffer_size = 0
self.buffer += [operation, data]
self.buffer_size += data_size
def close(self) -> None:
if hasattr(self, "_client"):
self._client.close()
self.buffer = []
return super().close()
[docs] class ElasticQueueHandler(QueueHandler):
def __init__(self):
# sets the queue attribute
super().__init__(Queue(-1))
self.sending_handler = ElasticSendHandler()
self._listener = QueueListener(self.queue, self.sending_handler)
self._listener.start()
def close(self) -> None:
self._listener.stop()
self.sending_handler.close()
return super().close()