|
|
@ -0,0 +1,134 @@ |
|
|
|
import fnmatch |
|
|
|
import logging.config |
|
|
|
import os |
|
|
|
import time |
|
|
|
import json |
|
|
|
from unittest import TestCase |
|
|
|
from logzio.handler import ExtraFieldsLogFilter |
|
|
|
from .mockLogzioListener import listener |
|
|
|
|
|
|
|
|
|
|
|
def _find(pattern, path): |
|
|
|
result = [] |
|
|
|
for root, dirs, files in os.walk(path): |
|
|
|
for name in files: |
|
|
|
if fnmatch.fnmatch(name, pattern): |
|
|
|
result.append(os.path.join(root, name)) |
|
|
|
|
|
|
|
break # Not descending recursively |
|
|
|
return result |
|
|
|
|
|
|
|
|
|
|
|
class TestExtraFieldsFilter(TestCase): |
|
|
|
|
|
|
|
def setUp(self): |
|
|
|
self.logzio_listener = listener.MockLogzioListener() |
|
|
|
self.logzio_listener.clear_logs_buffer() |
|
|
|
self.logzio_listener.clear_server_error() |
|
|
|
self.logs_drain_timeout = 1 |
|
|
|
self.retries_no = 4 |
|
|
|
self.retry_timeout = 2 |
|
|
|
self.add_context = True |
|
|
|
logging_configuration = { |
|
|
|
"version": 1, |
|
|
|
"formatters": { |
|
|
|
"logzio": { |
|
|
|
"format": '{"key": "value"}', |
|
|
|
"validate": False |
|
|
|
} |
|
|
|
}, |
|
|
|
"handlers": { |
|
|
|
"LogzioHandler": { |
|
|
|
"class": "logzio.handler.LogzioHandler", |
|
|
|
"formatter": "logzio", |
|
|
|
"level": "DEBUG", |
|
|
|
"token": "token", |
|
|
|
'logzio_type': "type", |
|
|
|
'logs_drain_timeout': self.logs_drain_timeout, |
|
|
|
'url': "http://" + self.logzio_listener.get_host() + ":" + str(self.logzio_listener.get_port()), |
|
|
|
'debug': True, |
|
|
|
'retries_no': self.retries_no, |
|
|
|
'retry_timeout': self.retry_timeout, |
|
|
|
'add_context': self.add_context |
|
|
|
} |
|
|
|
}, |
|
|
|
"loggers": { |
|
|
|
"test": { |
|
|
|
"handlers": ["LogzioHandler"], |
|
|
|
"level": "DEBUG" |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
logging.config.dictConfig(logging_configuration) |
|
|
|
self.logger = logging.getLogger('test') |
|
|
|
|
|
|
|
for curr_file in _find("logzio-failures-*.txt", "."): |
|
|
|
os.remove(curr_file) |
|
|
|
|
|
|
|
def test_add_extra_fields(self): |
|
|
|
extra_fields = {"foo": "bar"} |
|
|
|
self.logger.addFilter(ExtraFieldsLogFilter(extra=extra_fields)) |
|
|
|
log_message = "this log should have a additional fields" |
|
|
|
self.logger.info(log_message) |
|
|
|
time.sleep(self.logs_drain_timeout * 2) |
|
|
|
logs_list = self.logzio_listener.logs_list |
|
|
|
for current_log in logs_list: |
|
|
|
if log_message in current_log: |
|
|
|
log_dict = json.loads(current_log) |
|
|
|
try: |
|
|
|
self.assertEqual(extra_fields, {**extra_fields, **log_dict}) |
|
|
|
except AssertionError as err: |
|
|
|
print(err) |
|
|
|
|
|
|
|
def test_remove_extra_fields(self): |
|
|
|
extra_fields = {"foo": "bar"} |
|
|
|
|
|
|
|
self.logger.addFilter(ExtraFieldsLogFilter(extra=extra_fields)) |
|
|
|
log_message = "this log should have a additional fields" |
|
|
|
self.logger.info(log_message) |
|
|
|
|
|
|
|
self.logger.removeFilter(ExtraFieldsLogFilter(extra=extra_fields)) |
|
|
|
unfiltered_log_message = "this log shouldn't have a additional fields" |
|
|
|
self.logger.info(unfiltered_log_message) |
|
|
|
|
|
|
|
time.sleep(self.logs_drain_timeout * 2) |
|
|
|
logs_list = self.logzio_listener.logs_list |
|
|
|
for current_log in logs_list: |
|
|
|
if unfiltered_log_message in current_log: |
|
|
|
log_dict = json.loads(current_log) |
|
|
|
try: |
|
|
|
self.assertNotEqual(extra_fields, {**extra_fields, **log_dict}) |
|
|
|
except AssertionError as err: |
|
|
|
print(err) |
|
|
|
|
|
|
|
def test_add_multiple_extra_fields(self): |
|
|
|
extra_fields = {"foo": "bar"} |
|
|
|
self.logger.addFilter(ExtraFieldsLogFilter(extra=extra_fields)) |
|
|
|
log_message = "this log should have additional fields" |
|
|
|
self.logger.info(log_message) |
|
|
|
|
|
|
|
extra_fields = {"counter":1} |
|
|
|
self.logger.addFilter(ExtraFieldsLogFilter(extra=extra_fields)) |
|
|
|
filtered_log_message = "this log should have multiple additional fields" |
|
|
|
self.logger.info(filtered_log_message) |
|
|
|
|
|
|
|
time.sleep(self.logs_drain_timeout * 2) |
|
|
|
logs_list = self.logzio_listener.logs_list |
|
|
|
for current_log in logs_list: |
|
|
|
if log_message in current_log: |
|
|
|
log_dict = json.loads(current_log) |
|
|
|
try: |
|
|
|
self.assertEqual(extra_fields, {**extra_fields, **log_dict}) |
|
|
|
except AssertionError as err: |
|
|
|
print(err) |
|
|
|
elif filtered_log_message in current_log: |
|
|
|
log_dict = json.loads(current_log) |
|
|
|
try: |
|
|
|
self.assertEqual(extra_fields, {**extra_fields, **log_dict}) |
|
|
|
except AssertionError as err: |
|
|
|
print(err) |
|
|
|
|
|
|
|
|
|
|
|
if __name__ == '__main__': |
|
|
|
unittest.main() |