Logging handler to send logs to your OpenSearch cluster with bulk SSL. Forked from https://github.com/logzio/logzio-python-handler
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

134 lines
4.8 KiB

import fnmatch
import logging.config
import os
import time
import json
from unittest import TestCase
from logzio.handler import ExtraFieldsLogFilter
from .mockLogzioListener import listener
def _find(pattern, path):
result = []
for root, dirs, files in os.walk(path):
for name in files:
if fnmatch.fnmatch(name, pattern):
result.append(os.path.join(root, name))
break # Not descending recursively
return result
class TestExtraFieldsFilter(TestCase):
def setUp(self):
self.logzio_listener = listener.MockLogzioListener()
self.logzio_listener.clear_logs_buffer()
self.logzio_listener.clear_server_error()
self.logs_drain_timeout = 1
self.retries_no = 4
self.retry_timeout = 2
self.add_context = True
logging_configuration = {
"version": 1,
"formatters": {
"logzio": {
"format": '{"key": "value"}',
"validate": False
}
},
"handlers": {
"LogzioHandler": {
"class": "logzio.handler.LogzioHandler",
"formatter": "logzio",
"level": "DEBUG",
"token": "token",
'logzio_type': "type",
'logs_drain_timeout': self.logs_drain_timeout,
'url': "http://" + self.logzio_listener.get_host() + ":" + str(self.logzio_listener.get_port()),
'debug': True,
'retries_no': self.retries_no,
'retry_timeout': self.retry_timeout,
'add_context': self.add_context
}
},
"loggers": {
"test": {
"handlers": ["LogzioHandler"],
"level": "DEBUG"
}
}
}
logging.config.dictConfig(logging_configuration)
self.logger = logging.getLogger('test')
for curr_file in _find("logzio-failures-*.txt", "."):
os.remove(curr_file)
def test_add_extra_fields(self):
extra_fields = {"foo": "bar"}
self.logger.addFilter(ExtraFieldsLogFilter(extra=extra_fields))
log_message = "this log should have a additional fields"
self.logger.info(log_message)
time.sleep(self.logs_drain_timeout * 2)
logs_list = self.logzio_listener.logs_list
for current_log in logs_list:
if log_message in current_log:
log_dict = json.loads(current_log)
try:
self.assertEqual(extra_fields, {**extra_fields, **log_dict})
except AssertionError as err:
print(err)
def test_remove_extra_fields(self):
extra_fields = {"foo": "bar"}
self.logger.addFilter(ExtraFieldsLogFilter(extra=extra_fields))
log_message = "this log should have a additional fields"
self.logger.info(log_message)
self.logger.removeFilter(ExtraFieldsLogFilter(extra=extra_fields))
unfiltered_log_message = "this log shouldn't have a additional fields"
self.logger.info(unfiltered_log_message)
time.sleep(self.logs_drain_timeout * 2)
logs_list = self.logzio_listener.logs_list
for current_log in logs_list:
if unfiltered_log_message in current_log:
log_dict = json.loads(current_log)
try:
self.assertNotEqual(extra_fields, {**extra_fields, **log_dict})
except AssertionError as err:
print(err)
def test_add_multiple_extra_fields(self):
extra_fields = {"foo": "bar"}
self.logger.addFilter(ExtraFieldsLogFilter(extra=extra_fields))
log_message = "this log should have additional fields"
self.logger.info(log_message)
extra_fields = {"counter":1}
self.logger.addFilter(ExtraFieldsLogFilter(extra=extra_fields))
filtered_log_message = "this log should have multiple additional fields"
self.logger.info(filtered_log_message)
time.sleep(self.logs_drain_timeout * 2)
logs_list = self.logzio_listener.logs_list
for current_log in logs_list:
if log_message in current_log:
log_dict = json.loads(current_log)
try:
self.assertEqual(extra_fields, {**extra_fields, **log_dict})
except AssertionError as err:
print(err)
elif filtered_log_message in current_log:
log_dict = json.loads(current_log)
try:
self.assertEqual(extra_fields, {**extra_fields, **log_dict})
except AssertionError as err:
print(err)
if __name__ == '__main__':
unittest.main()