import fnmatch
|
|
import logging.config
|
|
import os
|
|
import time
|
|
import json
|
|
from unittest import TestCase
|
|
from logzio.handler import ExtraFieldsLogFilter
|
|
from .mockLogzioListener import listener
|
|
|
|
|
|
def _find(pattern, path):
|
|
result = []
|
|
for root, dirs, files in os.walk(path):
|
|
for name in files:
|
|
if fnmatch.fnmatch(name, pattern):
|
|
result.append(os.path.join(root, name))
|
|
|
|
break # Not descending recursively
|
|
return result
|
|
|
|
|
|
class TestExtraFieldsFilter(TestCase):
|
|
|
|
def setUp(self):
|
|
self.logzio_listener = listener.MockLogzioListener()
|
|
self.logzio_listener.clear_logs_buffer()
|
|
self.logzio_listener.clear_server_error()
|
|
self.logs_drain_timeout = 1
|
|
self.retries_no = 4
|
|
self.retry_timeout = 2
|
|
self.add_context = True
|
|
logging_configuration = {
|
|
"version": 1,
|
|
"formatters": {
|
|
"logzio": {
|
|
"format": '{"key": "value"}',
|
|
"validate": False
|
|
}
|
|
},
|
|
"handlers": {
|
|
"LogzioHandler": {
|
|
"class": "logzio.handler.LogzioHandler",
|
|
"formatter": "logzio",
|
|
"level": "DEBUG",
|
|
"token": "token",
|
|
'logzio_type': "type",
|
|
'logs_drain_timeout': self.logs_drain_timeout,
|
|
'url': "http://" + self.logzio_listener.get_host() + ":" + str(self.logzio_listener.get_port()),
|
|
'debug': True,
|
|
'retries_no': self.retries_no,
|
|
'retry_timeout': self.retry_timeout,
|
|
'add_context': self.add_context
|
|
}
|
|
},
|
|
"loggers": {
|
|
"test": {
|
|
"handlers": ["LogzioHandler"],
|
|
"level": "DEBUG"
|
|
}
|
|
}
|
|
}
|
|
|
|
logging.config.dictConfig(logging_configuration)
|
|
self.logger = logging.getLogger('test')
|
|
|
|
for curr_file in _find("logzio-failures-*.txt", "."):
|
|
os.remove(curr_file)
|
|
|
|
def test_add_extra_fields(self):
|
|
extra_fields = {"foo": "bar"}
|
|
self.logger.addFilter(ExtraFieldsLogFilter(extra=extra_fields))
|
|
log_message = "this log should have a additional fields"
|
|
self.logger.info(log_message)
|
|
time.sleep(self.logs_drain_timeout * 2)
|
|
logs_list = self.logzio_listener.logs_list
|
|
for current_log in logs_list:
|
|
if log_message in current_log:
|
|
log_dict = json.loads(current_log)
|
|
try:
|
|
self.assertEqual(extra_fields, {**extra_fields, **log_dict})
|
|
except AssertionError as err:
|
|
print(err)
|
|
|
|
def test_remove_extra_fields(self):
|
|
extra_fields = {"foo": "bar"}
|
|
|
|
self.logger.addFilter(ExtraFieldsLogFilter(extra=extra_fields))
|
|
log_message = "this log should have a additional fields"
|
|
self.logger.info(log_message)
|
|
|
|
self.logger.removeFilter(ExtraFieldsLogFilter(extra=extra_fields))
|
|
unfiltered_log_message = "this log shouldn't have a additional fields"
|
|
self.logger.info(unfiltered_log_message)
|
|
|
|
time.sleep(self.logs_drain_timeout * 2)
|
|
logs_list = self.logzio_listener.logs_list
|
|
for current_log in logs_list:
|
|
if unfiltered_log_message in current_log:
|
|
log_dict = json.loads(current_log)
|
|
try:
|
|
self.assertNotEqual(extra_fields, {**extra_fields, **log_dict})
|
|
except AssertionError as err:
|
|
print(err)
|
|
|
|
def test_add_multiple_extra_fields(self):
|
|
extra_fields = {"foo": "bar"}
|
|
self.logger.addFilter(ExtraFieldsLogFilter(extra=extra_fields))
|
|
log_message = "this log should have additional fields"
|
|
self.logger.info(log_message)
|
|
|
|
extra_fields = {"counter":1}
|
|
self.logger.addFilter(ExtraFieldsLogFilter(extra=extra_fields))
|
|
filtered_log_message = "this log should have multiple additional fields"
|
|
self.logger.info(filtered_log_message)
|
|
|
|
time.sleep(self.logs_drain_timeout * 2)
|
|
logs_list = self.logzio_listener.logs_list
|
|
for current_log in logs_list:
|
|
if log_message in current_log:
|
|
log_dict = json.loads(current_log)
|
|
try:
|
|
self.assertEqual(extra_fields, {**extra_fields, **log_dict})
|
|
except AssertionError as err:
|
|
print(err)
|
|
elif filtered_log_message in current_log:
|
|
log_dict = json.loads(current_log)
|
|
try:
|
|
self.assertEqual(extra_fields, {**extra_fields, **log_dict})
|
|
except AssertionError as err:
|
|
print(err)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
unittest.main()
|